Storm的CEP框架 flowmix

Flowmix - A Flexible Event Processing Engine for Apache Storm: http://github.com/zqhxuyuan/bigdata/tree/master/storm/storm-flowmix

Introduce

One of the solutions Flowmix offers to the resource and sliding window orchestration problem is having a single topology
deployed with a generic “stream” of domain-agnostic objects that can be routed around in different ways,
applying different operations to the events on their way through the bolts of the topology.
The streams can be split and joined together, bridged to other streams, and passed through a standard pluggable output bolt.
Events can be passed through relational operations like partitioning, aggregating, collecting, sorting, filtering, selection, and joining.

Other non-relational operations like switches and governors can also be applied to orchestrate the flow of a stream of data.
Generic functions can be applied to each event as it passes through a stream.

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class AggregatorExample implements FlowProvider {
@Override
public List<Flow> getFlows() {
Flow flow = new FlowBuilder()
.id("flow1")
.flowDefs()
.stream("stream1")
.select().fields("key3").end()
.aggregate().aggregator(CountAggregator.class)
.config("operatedField", "key3")
.evict(Policy.COUNT, 50000)
.trigger(Policy.TIME, 5)
.windowEvictMillis(3600000) //60min window
.clearOnTrigger().end()
.endStream()
.endDefs()
.createFlow();
return asList(new Flow[]{flow});
}
public static void main(String args[]) {
new ExampleRunner(new AggregatorExample()).run();
}
}

public class ExampleRunner {
FlowProvider provider;

public ExampleRunner(FlowProvider provider) {
this.provider = provider;
}

public void run() {
StormTopology topology = new FlowmixBuilder()
.setFlowLoader(new SimpleFlowLoaderSpout(provider.getFlows(), 60000))
.setEventsLoader(new MockOneEventGeneratorSpout(5000))
.setOutputBolt(new PrinterBolt())
.setParallelismHint(6)
.create()
.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("example-topology", new Config(), topology);
}
}

等价于:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ExampleRunner {
public static List<Flow> getFlows() {
Flow flow = new FlowBuilder()
.id("flow1")
.flowDefs() //调用者:FlowBuilder,返回值:FlowDefsBuilder
.stream("stream1") //调用者:FlowDefsBuilder,返回值:StreamBuilder
.select().fields("key3").end()
.aggregate().aggregator(CountAggregator.class)
.config("operatedField", "key3")
.evict(Policy.COUNT, 50000)
.trigger(Policy.TIME, 5)
.windowEvictMillis(3600000)
.clearOnTrigger().end()
.endStream() //调用者:StreamBuilder,返回值:FlowDefsBuilder
.endDefs() //调用者:FlowDefsBuilder,返回值:FlowBuilder
.createFlow(); //调用者:FlowBuilder,返回值:Flow
return asList(new Flow[]{flow});
}

public static void main(String args[]) {
StormTopology topology = new FlowmixBuilder()
.setFlowLoader(new SimpleFlowLoaderSpout(getFlows(), 60000))
.setEventsLoader(new MockOneEventGeneratorSpout(5000))
.setOutputBolt(new PrinterBolt())
.setParallelismHint(6)
.create()
.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("example-topology", new Config(), topology);
}
}

通过FlowBuilder构造的Flow作为Spout构造函数的参数,允许Spout接收多个Flow。

Topology组件

Flowmix在构建Topology时,各个组件的关系:

  1. 一个Topology可以有多个Flow
  2. 一个Flow只能有一个FlowBuilder,Flow通过FlowBuilder生成
  3. 一个FlowBuilder只有一个FlowDefsBuilder
  4. 一个FlowDefsBuilder调用一次stream,就会生成一个StreamBuilder
  5. 因此一个FlowDefsBuilder可以有多个StreamBuilder
  6. 在每个StreamBuilder上,调用不同的算子方法(比如select,aggregate),都会在StreamBuilder上添加一个对应的FlowOp(比如select生成SelectOp,aggregate生成AggregateOp)
  7. 最终在StreamBuilder上调用endStream时,会将上一步所有算子产生的FlowOp列表,封装成StreamDef
  8. 因此一个StreamBuilder只有一个StreamDef
  9. 由于一个FlowDefsBuilder可以有多个StreamBuilder,而每个StreamBuilder都有一个StreamDef,所以一个FlowDefsBuilder可以有多个StreamDef
  10. 一个Flow对应一个FlowDefsBuilder,所以FlowDefsBuilder的StreamDef列表会被设置到Flow中,作为Flow的成员变量

步骤4中:在一个FlowDefsBuilder上可以调用多次stream方法,从而构建多个Stream,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Flow flow = new FlowBuilder()
.id("flow1")
.flowDefs() //FlowDefsBuilder
.stream("stream1") //StreamBuilder 1
.select().fields("key1").end() //FlowOp 1
.aggregate().....end() //FlowOp 2
.endStream() //StreamDef 1
.stream("stream2") //StreamBuilder 2
.select().fields("key1").end() //FlowOp 1
.aggregate().....end() //FlowOp 2
.endStream() //StreamDef 2
.endDefs()
.createFlow();
return asList(new Flow[]{flow});

下图上面左图都定义了一个Flow,不同的是左边只有一个Stream,右边定义了两个Stream。

flowmix1

Flow实际上需要的是FlowOp(集合,每种算子都是一个FlowOp),通过Builder构建。

Builder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class FlowBuilder {
Flow flow = new Flow();
FlowDefsBuilder flowOpsBuilder=new FlowDefsBuilder(this);

public FlowDefsBuilder flowDefs() {
return flowOpsBuilder;
}
public Flow createFlow() {
flow.setStreams(flowOpsBuilder.getStreamList());
return flow;
}
}

public class FlowDefsBuilder {
private FlowBuilder flowBuilder;

public FlowDefsBuilder(FlowBuilder flowBuilder) {
this.flowBuilder = flowBuilder;
}
public FlowBuilder endDefs() {
return flowBuilder;
}
public StreamBuilder stream(String name) {
return new StreamBuilder(this, name, true);
}

private List<StreamDef> streamList = new ArrayList();

public List<StreamDef> getStreamList() {
return streamList;
}
protected void addStream(StreamDef stream) {
streamList.add(stream);
}
}

public class StreamBuilder {
private List<FlowOp> flowOpList = new ArrayList<FlowOp>();
private FlowDefsBuilder flowOpsBuilder;

public StreamBuilder(FlowDefsBuilder flowOpsBuilder) {
this.flowOpsBuilder = flowOpsBuilder;
}
public SelectBuilder select(){
return new SelectBuilder(this);
}
public AggregateBuilder aggregate(){
return new AggregateBuilder(this);
}
public FlowDefsBuilder endStream(String... outputs) {
StreamDef def=new StreamDef(name,flowOpList,stdInput,stdOutput,outputs);
flowOpsBuilder.addStream(def);
return flowOpsBuilder;
}
protected void addFlowOp(FlowOp flowOp) {
flowOpList.add(flowOp);
}
}

public class SelectBuilder {
private StreamBuilder streamBuilder;
public SelectBuilder(StreamBuilder streamBuilder) {
this.streamBuilder = streamBuilder;
}
public StreamBuilder end() {
streamBuilder.addFlowOp(new SelectOp(...));
return streamBuilder;
}
}
public class AggregateBuilder extends AbstractOpBuilder {
private StreamBuilder streamBuilder;
public AggregateBuilder(StreamBuilder streamBuilder) {
this.streamBuilder = streamBuilder;
}
public StreamBuilder end() {
streamBuilder.addFlowOp(new AggregateOp(...));
return streamBuilder;
}
}

下图是各个Builder之间的关系图,绿色部分是Builder需要保存的数据

flowmix2

下图从最后的FlowOp到StreamDef,最后传递给Flow

flowmix3

下图是各个Builder的调用顺序

flowmix4

算子Builder

各种不同算子的Builder和算子的关系是:以链式方式调用Builder的各种set/with方法,同时每个set方法都有对应的成员变量,
最后在结束build的时候,把每个set方法产生的变量,创建出算子:

伪代码(scala)如下:

1
2
3
4
5
6
val flowOp: MyFlowOp = MyBuilder.withMethod1("value1")
.withMethod2("value2")
.withMethod3("value3")
.build();

case class MyFlowOP(param1, param2, param3)

以AggregateBuilder为例(其他类型的Builder类似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AggregateBuilder extends AbstractOpBuilder {
private Class<? extends Aggregator> aggregatorClass;
private Policy triggerPolicy; //触发策略.比如每隔10分钟,或每隔100条数据,统计一次
private long triggerThreshold = -1;
private Policy evictionPolicy; //失效策略.当事件进来10分钟后,或者事件总数到达1000条后,移除
private long evictionThreshold = -1;
private boolean clearOnTrigger = false;
private long windowEvictMillis = 3600000; // 60 minutes by default
private Map<String,String> config = new HashMap<String,String>();

public AggregateBuilder aggregator(Class<? extends Aggregator> aggregatorClass) {
this.aggregatorClass = aggregatorClass;
return this;
}
public AggregateBuilder trigger(Policy policy, long threshold) {
this.triggerPolicy = policy;
this.triggerThreshold = threshold;
return this;
}
//其他方法类似,都set方式,设置成员变量

public StreamBuilder end() { //实际上是build方法,一定要到最后才能执行
AggregateOp flowOp = new AggregateOp(aggregatorClass,
triggerPolicy, triggerThreshold, evictionPolicy,
evictionThreshold, config, clearOnTrigger, windowEvictMillis));
getStreamBuilder().addFlowOp(flowOp);
return getStreamBuilder();
}
}

FlowOpBuilder用来构建FlowOp,FlowOp算子可以理解为Storm中Spout/Bolt组件的元数据,
比如SelectOp选择算子需要知道要select哪些字段,
FilterOp过滤算子需要知道过滤方式,AggregateOp聚合算子需要知道聚合的方式(聚合算法,比如Count/Sum等)。

flowmix5

在Storm中,Topology由Spout/Bolt组件组成一个DAG,Spout/Bolt会从数据源/上游接收Tuple,然后处理Tuple,最后向下游Bolt发送Tuple。
Tuple即数据是流动的,而Spout/Bolt组件是不动的,它们只负责接收数据、处理数据、发送数据。下图中DAG的边是Tuple数据,顶点则是Spout/Bolt计算组件。

storm-topo1

而这里的算子可以认为是Spout/Bolt组件的元数据,但它们本身不是Spout/Bolt,因为它们并没有处理数据。
比如AggregeOp算子只是一个Bean Object,定义了聚合算子的元数据!

正如FlowOp算子是Spout/Bolt组件的元数据,Flow也可以看做是Topology的元数据。
元数据不负责接收数据,不负责计算数据,不负责产生数据。

管理者 组件
Topology Spout、Bolt
Flow FlowOp

目前为止,我们看到在StreamBuilder上依次调用select(),aggregate(),filter()等产生的算子组成的flowOpList是一个ArrayList,数组链表是有序的!
这也确保了Flow的各个算子是有序的。比如调用.select().filter().aggregate()或者.filter().aggregate().select()都是不一样的!

StreamBuilder invoke flow List
select().aggregate().filter() ArrayList(SelectOp,AggregateOp,FilterOp)
select().filter().aggregate() ArrayList(SelectOp,FilterOp,AggregateOp)
filter().aggregate().select() ArrayList(FilterOp,AggregateOp,SelectOp)

在StreamBuilder中,创建StreamDef时,有几个比较重要的变量:stdInput, stdOutput, outputs。
这里主要是用来在不同Stream之间进行数据的交互。我们上面看到的示例定义的Stream都是单独的一个Stream,或者即使定义了多个Stream,也没有关联关系。
但是如果想要关联两个Stream,比如进行Join操作,就需要在这里做文章。这部分比较复杂,后面再分析。

FlowmixBuilder

下面是客户端应用程序创建StormTopology的示例:

1
2
3
4
5
6
7
StormTopology topology = new FlowmixBuilder()
.setFlowLoader(new SimpleFlowLoaderSpout(provider.getFlows(), 60000)) //Flow Loader
.setEventsLoader(new MockOneEventGeneratorSpout(5000)) //Event Loader
.setOutputBolt(new PrinterBolt()) //Final Output
.setParallelismHint(6)
.create()
.createTopology();

Storm的Topology只由Spout/Bolt组成,那么上面Flow定义的算子怎么和Topology的组件结合上呢?
其实我们的目的是希望算子能和对应的Bolt对应上来,比如SelectOp对应SelectBolt,FilterOp对应FilterBolt,
由于客户端应用程序并没有builder.setBolt这样的代码,而只是定义了FlowOp组成的Flow,
那么Flow中的FlowOp必须要能够转换为Bolt才可以形成Storm所需的Topology
并且FlowOp组成的Flow是一个DAG(有序的),所以FlowOp对应的Bolt也是一个DAG(同样的顺序)。

在一般的Topology中,Spout负责读取输入源数据,Spout后的第一个Bolt读取Spout发送的数据,
然后发送给后续的Bolt,后续的Bolt会读取上一个Bolt发射的数据,经过计算后发送新数据给下一个Bolt。

这里除了负责读取输入源/事件数据的EventSpout外,还有另外两个Spout:

  1. Event事件流:EventSpout,负责读取事件数据
  2. Flow算子流:FlowLoaderSpout,负责加载应用程序配置的Flow
  3. Tick时钟流:TickSpout,实现窗口计算的关键组件

这三个Spout对应发射的stream-id,发射字段和发射数据如下表:

Component emit stream-id emit fields emit values
MockOneEventGeneratorSpout default event Event
SimpleFlowLoaderSpout FLOW_LOADER_STREAM flows List
TickSpout tick null null

紧接着几个Spout后的第一个Bolt是FlowInitializerBolt,它的职责除了接收事件数据外,还要接收客户端定义的List
并且FlowInitBolt接收上游EventSpout和FlowLoaderSpout的分组策略是不同的:

  1. EventSpout分组方式:shuffleGrouping
  2. FlowLoaderSpout分组方式:allGrouping
1
2
3
4
5
6
7
8
9
10
public TopologyBuilder create() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(EVENT, (IRichSpout) eventsComponent, eventLoaderParallelism);
builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1);
builder.setSpout("tick", new TickSpout(1000), 1);

builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint) // kicks off a flow determining where to start
.localOrShuffleGrouping(EVENT) //事件流可以通过shuffle负载到Bolt的Tasks上
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM); //Flow流通过All Grouping策略发射给Bolt的每个Task
}

下面假设FloInitBolt设置的并行度为3,即存在3个Task,其中FlowLoaderSpout发射出来的数据会发送到每个Task,
而EventSpout发射的事件则是shuffle方式(类似于RoundRobin,假设有三个事件,则每个Task只会收到一个事件)。

flowmix-init bolt

Flow流是静态的,必须每个Task都要有这份数据,而且每个Task收到的这份数据(即flows)都是一致的。
事件流是动态的,EventSpout发送出去的事件,经过shuffle落到各个FlowInitBolt的Task上是不同的。

Event Spout

EventSpout在发射时没有指定stream-id。所以上面设置FlowInitBolt的localOrShuffleGrouping只有一个参数EVENT,而没有stream-id第二个参数。
EventSpout虽然没有指定stream-id,但是它下游接收的Bolt只可能是FlowInitBolt,不可能是其他Bolt(比如我们自定义的SelectBolt等)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MockOneEventGeneratorSpout extends EventsLoaderBaseSpout {
private int sleepBetweenEvents = 1000;
private SpoutOutputCollector collector;

public MockOneEventGeneratorSpout(int sleepBetweenEvents) {
this.sleepBetweenEvents = sleepBetweenEvents;
}
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
public void nextTuple() {
Event event = MockEvent.mockOneEvent();
collector.emit(new Values(singleton(event)));
Thread.sleep(sleepBetweenEvents);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("event"));
}
}

Spout没有定义输出的stream-id,下游接收该Spout的Bolt(比如这里是FlowInitBolt)在grouping策略中也不需要定义stream-id,默认没有指定时为default。

FlowLoader Spout

FlowLoaderSpout构造函数接收List参数,Flow集合是在应用程序中创建的(比如ExampleRunner的getFlows方法)。
通常Storm的Spout负责读取一些外部数据源,比如KafkaSpout会读取Kafka的消息流,或者RandomWordSpout会随机产生一些单词。
Spout的职责是将从Kafka读取的消息流,或者随机产生的单词发送出去。至于要发送到下游哪个Bolt,则有下游Bolt自己在Group中指定。

并不是在Spout中指定要发送到下游具体的Bolt,而是由下游自己指定要读取哪个上游的Spout/Bolt。
有点像消息系统中push和pull的区别,push是主动将消息推给消费者,push是消费者主动拉取消息。
这种方式的好处是:下游消费者可以动态增加,动态订阅任意的消息输入源。如果用push方式,Broker必须要
指定推送消息给哪些消费者,如果要增加新的消费者,Broker代码需要变更才能重新使用。
同样的Spout并不指定下游组件,而是由下游组件自己定义要读取哪些数据源。如果由Spout推送消息给下游组件,
但是下游组件失败了怎么办,或者某个Bolt后来不需要Spout的消息,或者有新的Bolt需要让Spout推送给它,都不容易实现。
flowmix6

举个现实的例子:老家的房子都是有屋檐的,下雨的时候,屋顶下形成的一排排的水柱看起来非常壮观,
当然水柱的数量和屋顶上瓦片形成的缝隙一样(假设有10个),两个瓦片之间形成一道水柱(自行脑补下雨的场景)。
但是雨水形成的每个水柱的水量是不同的,我们会在水量比较大的底下放上水盆盛水,水量小的就不放水盆了。
这里假设Spout的输入源是天上下的雨,Spout可以并行处理多个任务,屋顶每两排瓦片之间形成的水流都是一个Task,
所以每个Task读取的输入源数据都是不一样的(同一滴雨只会流入一个水流,不会同时出现在两个水流里)。
正常来说每个Task输出的stream-id可以都是默认一样的(default),不过这里到下游组件时,我们假设Spout的水流形成
的水柱的stream-id是不一样的,也就是瓦片1-瓦片2之间的水柱的stream-id=”stream-id-1”,
瓦片2和瓦片3之间水柱的stream-id=”stream-id2”,其他水柱都是类似的,最后Spout一共发射了10个不同的stream-id。
如果在某个水柱下面没有放水盆,那么这条水柱的水不会被收集,也就是会被丢弃掉。对应Spout,如果发射的某个stream,
但是下游的Bolt组件中没有一个人订阅了这个stream,那么那条stream上的Tuple消息全部被丢弃。

FlowLoaderSpout发送flows数据给下游组件时,指定了stream-id=FLOW_LOADER_STREAM。
那么下游组件只有订阅了当前这个SimpleFlowLoaderSpout(Grouping的第一个参数),
而且第二个参数stream-id=FLOW_LOADER_STREAM时,才会接收到SimpleFlowLoaderSpout发射的flows数据。
比如上面的FlowInitBolt就满足了这个特点:allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM);

这里虽然两个参数都是FLOW_LOADER_STREAM,但是含义是不同的:第一个参数是component-id,第二个是stream-id。
这句话的意思是当前Bolt(即FlowInitBolt),会读取Topology中component-id=FLOW_LOADER_STREAM的组件
(比如这里SimpleFlowLoaderSpout的component-id就是FLOW_LOADER_STREAM:
builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1)),
而且只会读取这个组件发射出stream-id=FLOW_LOADER_STREAM。
恰好下面SimpleFlowLoaderSpout发射的stream-id就是FLOW_LOADER_STREAM。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SimpleFlowLoaderSpout extends BaseRichSpout {
Collection<Flow> flows; //Flow的Spout可以有多个,比如两个Flow进行join
long pauseBetweenLoads = 1000; //1秒
SpoutOutputCollector collector;

//构造函数由Application调用
public SimpleFlowLoaderSpout(Collection<Flow> flows, long pauseBetweenLoads) {
this.flows = flows;
this.pauseBetweenLoads = pauseBetweenLoads;
}
//open方法的初始化由Storm框架调用
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
//数据源每当收到一条数据,就往Topology中发射
public void nextTuple() {
collector.emit(FLOW_LOADER_STREAM, new Values(flows)); //flow是传给Spout的流程定义对象.Flow中定义了FlowOp操作符.
Thread.sleep(pauseBetweenLoads);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//outputFieldsDeclarer.declare(new Fields("flows"));
//指定stream-id,好让topology中当前spout的下一个bolt得到这个stream-id
outputFieldsDeclarer.declareStream(FLOW_LOADER_STREAM, new Fields("flows"));
}
}

spout/bolt可以使用emit(streamId, tuple)把元组分发到多个流,参数streamId是一个用来标识流的字符串。
然后,你可以在TopologyBuilder决定由哪个流订阅它emit的3个参数:
发送到的streamid, anchors(来源tuples), tuple(values list)

Tick Spout

FlowInitBolt似乎只关心EventSpout和FlowLoaderSpout,并不会读取TickSpout的数据(虽然TickSpout没有发射任何消息)。
TickSpout和EventLoaderSpout类似,都定义了stream-id。既然FlowInitBolt不会读取TickSpout,那么谁会扛起这个重任呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TickSpout extends BaseRichSpout{
private long sleepMillis; //tick时钟的时间间隔
private SpoutOutputCollector collector;

public TickSpout(long sleepMillis) {
this.sleepMillis = sleepMillis;
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("tick", new Fields());
}
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
public void nextTuple() {
collector.emit("tick", new Values());
Thread.sleep(sleepMillis); //这里用来模拟定时发送tick
}
}

FlowInitializerBolt

前面我们知道FlowInitBolt会接收EventSpout和FlowLoaderSpout发射的数据,前者是动态的事件流,后者是静态的Flow流。

1
2
3
builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint)
.localOrShuffleGrouping(EVENT) //FlowInitBolt订阅了EVENT这个component-id,即EventSpout
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM); //FlowInitBolt订阅了FLOW_LOADER_STREAM,即FlowLoaderSpout

FlowInitBolt如果收到FlowLoaderSpout,只需要存储flows,实际上FlowInitBolt只是初始化工作,并没有能力处理Flow!
如果收到的是EventSpout发射的事件,必须确保flows是存在的,如果没有存在flows,而先收到了Event,这些Event是会被丢弃的。
因为Event需要通过Flow处理才能真正起作用,如果没有Flow,即使来了事件也是白搭。好比我都没准备好要处理,有事件来了也没办法处理。

在有flows的前提下处理EventSpout发射的Event比较关键(Tuple的sourceStream不等于tick,因为FlowInit并不会接收TickSpout):
下面有三个For循环,第一个For循环是事件,因为EventSpout可能会一次发射多条事件(这里实际上只有一条事件包装成single元素的Collection),
第二层for循环是Flow,前面的应用程序中我们知道getFlows返回的是List,不过大部分例子都只有一个Flow。
最后一层for循环是StreamDef,针对每个Flow,如果应用程序定义了多个stream,则每个Stream(第一个算子)都会接收到Tuple中的每一条事件!

虽然会循环每个Stream,而Stream是由FlowOp组成的,那么FlowOp应该能够和Bolt对应上来,这样Event通过FlowOp就能发送到指定的Bolt。
StreamDef.getFlowOps()返回的是StreamBuilder中的List,即应用程序中每个stream后调用的各个算子组成的数组链表。
以前面的.select().aggregate().filter()为例,数组链表第一个元素是SelectOp,对应的stream-id=select,
如果stream的调用链是.filter().select.aggregate(),则第一个元素FilterOp对应的stream-id=filter,其他都是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Flow flow = new FlowBuilder()
.id("flow1")
.flowDefs() //FlowDefsBuilder
.stream("stream1") //StreamBuilder 1
.select().fields("key1").end() //FlowOp 1
.aggregate().....end() //FlowOp 2
.filter()....end //FlowOp 3
.endStream() //StreamDef 1
.stream("stream2") //StreamBuilder 2
.filter().....end() //FlowOp 1
.select().....end //FlowOp 2
.aggregate().....end() //FlowOp 3
.endStream() //StreamDef 2
.endDefs()
.createFlow();
return asList(new Flow[]{flow});

上面一个Flow定义了两个Stream,每个Stream的第一个FlowOp(第一个Stream的SelectOp,第二个Stream的FilterOp)都会收到EventSpout发射的事件。
等等,这么说有点问题!前面我们说过不管是Spout或者Bolt都只是负责发送Tuple而已,并不会指定Tuple会发送给下游哪些Bolt!
所以对于每个Stream的第一个FlowOp对应的Bolt要想订阅FlowInitBolt发射的消息,
它们的分组策略第一个参数必须是FlowInitializerBolt对应的component-id(这里是INITIALIZER),
关键是下游Bolt分组策略的第二个参数stream-id需要和这里FlowInitBolt发射出去的stream-id一致

注:下面代码中的注释是第一次读代码时写的,现在重新理解,有些地方还不是很严谨,这里保留的目的是看下当初自己是怎么读的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class FlowInitializerBolt extends BaseRichBolt {
Map<String,Flow> flows; //flow-id -> Flow
OutputCollector collector;

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
flows = new HashMap<String, Flow>();
}
public void execute(Tuple tuple) {
//Spout的emit方法的stream-id=FLOW_LOADER_STREAM,tuple=flows
if(FLOW_LOADER_STREAM.equals(tuple.getSourceStreamId())) {
//因为只有一个字段:new Values(flows),所以bolt通过getValue(0)可以获取到flows
for(Flow flow : (Collection<Flow>)tuple.getValue(0))
flows.put(flow.getId(), flow);
} else if(!"tick".equals(tuple.getSourceStreamId())){
//因为有flows和events两个Spout发送tuple(实际上还有一个tick spout).下面是接收到事件tuple的处理
if(flows.size() > 0) {
//一个flowLoaderSpout会有多个flows
for(Flow flow : flows.values()) {
Collection<Event> events = (Collection<Event>) tuple.getValue(0);
for(Event event : events) {
//一个Flow会有多个Stream.初始Bolt是发射给一个Flow中所有Stream的第一个FlowOp.注意这些Stream是平等的(非标准输入的Stream除外).
//即初始Bolt发射的Tuple会往所有Stream发射. 而不是说Stream之间是链接的,只发给第一个Stream,第一个Stream完成后将发给第二个Stream.
//这实际上是AllGrouping的策略.即相同的数据发给所有其他的Bolt.
for(StreamDef stream : flow.getStreams()) {
//取Flow的stream的第一个component(FlowOp).因为这是初始化Bolt.
String streamid = stream.getFlowOps().get(0).getComponentName();
String streamName = stream.getName();

//JoinExample中stream3:.stream("stream3", false).因为它不是从源数据过来的,而是从其他Stream发射数据给它使用.所以不是标准的输入.
//标准的输入即InitBolt发射的Tuple不会发射给非标准输入的Stream.因为非标准输入的Stream不是来自于InitBolt,而是来自其他Stream.
if(stream.isStdInput())
//tuple还是events,不会发生变化. 第二个参数tuple是anchor锚点.在最后进行ack.
//FLOW_OP_IDX=-1,接收到的Bolt会根据这些字段构造FlowInfo,并且idx+1.所以下一个Bolt的FlowInfo实际IDX=0
collector.emit(streamid, tuple, new Values(flow.getId(), event, -1, streamName, streamName));
}
}
}
}
collector.ack(tuple);
}
}

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
FlowmixBuilder.declareOutputStreams(outputFieldsDeclarer, fields);
}
}

Storm中GeneralTopologyContext的componentToStreamToFields表示: component包含哪些streams, 每个stream包含哪些fields
上游组件通过declareStream定义了stream-id -> Fields,下游组件通过分组策略定义了component-id -> stream-id

由于每个Stream的List的第一个FlowOp不是固定的,所以需要在declareOutputFields定义所有可能用到的算子。
即使有些算子用不到,也都要全部定义,毕竟你不知道你的应用程序具体的第一个FlowOp是什么,那么全部定义也不会怀孕的。

假设Bolt的declareOutputFields定义了10个输出stream,而实际emit时可能只会选择其中的一个stream-id,也不会有问题。

在Bolt中发射的Fileds和Values的字段数量必须一致,下面的Values中如果是在一个Flow的一个Stream中,第一个参数和最后两个参数都不会变化。
当然同一个Flow不同的Stream,不同的是最后两个参数,以前面定义的两个Stream为例:

1
2
3
4
collector.emit(streamid, tuple, new Values(flow.getId(), event, -1, streamName, streamName));

collector.emit("select", tuple, new Values("flow1", event, -1, "stream1", "stream1")); //第一个Stream第一个FlowOp是SelectOp
collector.emit("filter", tuple, new Values("flow1", event, -1, "stream2", "stream2")); //第二个Stream第一个FlowOp是FilterOp

下表是目前为止所有FlowOp实现类对应的component-name,注意这些名称实际上会作为对应聚合Bolt的component-name。

FlowOp component-name 对应的Bolt
SelectOp select SelectBolt
FilterOp filter FilterBolt
AggregateOp aggregate AggregateBolt
EachOp each EachBolt
JoinOp join JoinBolt
PartitionOp partition PartitionBolt
SortOp sort SortBolt
SplitOp split SplitBolt
Switch stopGate SwitchBolt

虽然Topology由Spout和Bolt组成,而且这里有三个Spout(实际上EventSpout其实没有定义stream-id),
不过下面的declareOutputStreams并不需要定义Spout的steam-id,因为stream-id的作用是指向下游Bolt,
而Spout不可能会去接收Bolt的数据!Spout只可能发送数据给下游Bolt,不要强人所难!人家只发不接的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static Fields fields = new Fields(FLOW_ID, EVENT, FLOW_OP_IDX, STREAM_NAME, LAST_STREAM);
public static Fields partitionFields = new Fields(FLOW_ID, EVENT, FLOW_OP_IDX, STREAM_NAME, PARTITION, LAST_STREAM);

public static void declareOutputStreams(OutputFieldsDeclarer declarer, Fields fields) {
//由于设置Grouping策略时使用了component-id,stream-id. 所以声明输出字段时,也要加上stream-id
declarer.declareStream(PARTITION, fields);
declarer.declareStream(FILTER, fields);
declarer.declareStream(SELECT, fields);
declarer.declareStream(AGGREGATE, fields);
declarer.declareStream(SWITCH, fields);
declarer.declareStream(SORT, fields);
declarer.declareStream(JOIN, fields);
declarer.declareStream(SPLIT, fields);
declarer.declareStream(EACH, fields);
declarer.declareStream(OUTPUT, fields);
}

前面的分析我们知道,虽然FlowInitBolt发射时指定了stream-id是Stream中第一个FlowOp的名称(上面几种算子中的某一个)。
但是下游Bolt要想获得FlowInitBolt发射的消息,必须在分组策略时做文章:
确保订阅了FlowInitBolt的component-id,以及FlowInitBolt发射的stream-id。

Dynamic FlowOp’s Bolt Definition

stream-id和在Topology中定义Bolt时指定分组策略的第二个参数息息相关。
下面我们就回到FlowmixBuilder这个类中接着FlowInitBolt后面定义算子类型的Bolt部分:

1
2
3
4
5
6
7
8
9
10
11
//CEP Bolt Definition
declarebolt(builder, FILTER, new FilterBolt(), parallelismHint, true);
declarebolt(builder, SELECT, new SelectorBolt(), parallelismHint, true);
declarebolt(builder, PARTITION, new PartitionBolt(), parallelismHint, true);
declarebolt(builder, SWITCH, new SwitchBolt(), parallelismHint, true);
declarebolt(builder, AGGREGATE, new AggregatorBolt(), parallelismHint, true);
declarebolt(builder, JOIN, new JoinBolt(), parallelismHint, true);
declarebolt(builder, EACH, new EachBolt(), parallelismHint, true);
declarebolt(builder, SORT, new SortBolt(), parallelismHint, true);
declarebolt(builder, SPLIT, new SplitBolt(), parallelismHint, true);
declarebolt(builder, OUTPUT, outputBolt, parallelismHint, false);

第一次看到这样的代码,我的内心其实是很纠结的,在对stream-id做了实验之后,参考:http://zqhxuyuan.github.io/2016/06/30/Hello-Storm/
我终于明白了一个道理:定义这么多Group其实没有关系,实际上真正用到的并不多!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void declarebolt(TopologyBuilder builder, String boltName, IRichBolt bolt, int parallelism, boolean control) {
BoltDeclarer declarer = builder.setBolt(boltName, bolt, parallelism)
//XXXGrouping的两个参数: component-id, stream-id
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
.allGrouping("tick", "tick")
.localOrShuffleGrouping(INITIALIZER, boltName)

.localOrShuffleGrouping(FILTER, boltName)
.fieldsGrouping(PARTITION, boltName, new Fields(FLOW_ID, PARTITION)) // guaranteed partitions will always group the same flow for flows that have joins with default partitions.
.localOrShuffleGrouping(AGGREGATE, boltName)
.localOrShuffleGrouping(SELECT, boltName)
.localOrShuffleGrouping(EACH, boltName)
.localOrShuffleGrouping(SORT, boltName)
.localOrShuffleGrouping(SWITCH, boltName)
.localOrShuffleGrouping(SPLIT, boltName)
.localOrShuffleGrouping(JOIN, boltName);
}

以declarebolt(“SELECT”)和declarebolt(“FILTER”)为例(其他Bolt同理):
看起来SelectBolt或者FilterBolt订阅的数据源包括了所有的算子,以及FLOW_LOADER_STREAM,还有tick,当然少不了INITIALIZER!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
BoltDeclarer declarer = builder.setBolt("SELECT", new SelectBolt(), parallelism)
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
.allGrouping("tick", "tick")
.localOrShuffleGrouping(INITIALIZER, "SELECT") //⬅
.localOrShuffleGrouping(FILTER, "SELECT")
.fieldsGrouping(PARTITION, "SELECT", new Fields(FLOW_ID, PARTITION))
.localOrShuffleGrouping(AGGREGATE, "SELECT")
.localOrShuffleGrouping(SELECT, "SELECT")
.localOrShuffleGrouping(EACH, "SELECT")
.localOrShuffleGrouping(SORT, "SELECT")
.localOrShuffleGrouping(SWITCH, "SELECT")
.localOrShuffleGrouping(SPLIT, "SELECT")
.localOrShuffleGrouping(JOIN, "SELECT");

BoltDeclarer declarer = builder.setBolt("FILTER", new FilterBolt(), parallelism)
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
.allGrouping("tick", "tick")
.localOrShuffleGrouping(INITIALIZER, "FILTER") //⬅
.localOrShuffleGrouping(FILTER, "FILTER")
.fieldsGrouping(PARTITION, "FILTER", new Fields(FLOW_ID, PARTITION))
.localOrShuffleGrouping(AGGREGATE, "FILTER")
.localOrShuffleGrouping(SELECT, "FILTER")
.localOrShuffleGrouping(EACH, "FILTER")
.localOrShuffleGrouping(SORT, "FILTER")
.localOrShuffleGrouping(SWITCH, "FILTER")
.localOrShuffleGrouping(SPLIT, "FILTER")
.localOrShuffleGrouping(JOIN, "FILTER");

为什么没有看到EVENT对应的EventSpout呢?注意只有FlowInitBolt才会订阅EventSpout,
FlowInitBolt是我们这里构建的Topology的第一个Bolt组件(前面还有三个Spout),
其他Bolt都是算子类型的Bolt,是会对事件进行计算的,这些算子Bolt不需要接收原始事件数据!

算子类型的Bolt虽然没有Event,但是比FlowInitBolt多了tick:FlowInitBolt并没有订阅TickSpout!

Bolt类型 订阅的Spout
FlowInitBolt FlowLoaderSpout,EventSpout
算子Bolt FlowLoaderSpout,TickSpout

FlowInitBolt下一个Bolt

因为客户端应用程序自定义Stream的第一个FlowOp是不固定的,你无法强制用户第一个算子就是什么,
所以紧接着FlowInitBolt的下一个Bolt为了订阅FlowInitBolt发射出来的不固定的消息流,
第二个参数必须能够和FlowInitBolt在emit时的第一个参数是一样的。

举例用户在一个Flow中定义了两个Stream(或者两个Flow定义不同的Stream等等,都是可以的)

1
2
3
4
5
6
7
new FlowBuilder()...
stream("stream1")
.select().end.().aggregate()....end()
.endStream()
.stream("stream2")
.filter().end().aggregate().....end()
.endStream()

FlowInitBolt作为入口,会将每一条事件发射到每个Stream的第一个FlowOp对应的Bolt
(FlowOp仅仅是算子,只有Bolt才可以接收数据并计算),最终发射的消息流和数据:

1
2
collector.emit("SELECT", tuple, new Values("flow1", event, -1, "stream1", "stream1")); //第一个Stream第一个FlowOp是SelectOp
collector.emit("FILTER", tuple, new Values("flow1", event, -1, "stream2", "stream2")); //第二个Stream第一个FlowOp是FilterOp

虽然现在我们还不知道FlowOp和FlowBolt的对应关系是怎么建立的,比如SelectOp对应SelectBolt,FilterOp对应FilterBolt。
但是我们至少相信SelectOp是会服务于SelectBolt的,从名称上来看SelectOp肯定不会对应到FilterBolt或者其他Bolt上的。
那么FlowInitBolt发射的stream-id=”SELECT”时,它其实是想要发送给SelectBolt的,同样stream-id=”FILTER”时,它也是想要发送给FilterBolt的。
(虽然并不是由发送者/上游组件来决定下游组件,而应该由下游组件主动订阅上游组件,但是上游组件其实心里也是有分寸的,它也不是漫无目的的)。

SelectBolt为了订阅FlowInitBolt的stream-id=”SELECT”的消息流,FilterBolt为了订阅FlowInitBolt的”FILTER”消息流,需要这么做:

1
2
3
4
5
BoltDeclarer declarer = builder.setBolt("SELECT", new SelectBolt(), parallelism)
.localOrShuffleGrouping(INITIALIZER, "SELECT"); //⬅

BoltDeclarer declarer = builder.setBolt("FILTER", new FilterBolt(), parallelism)
.localOrShuffleGrouping(INITIALIZER, "FILTER"); //⬅

其他组件也是类似,都要定义以INITIALIZER为component-id,以自己component-id为stream-id作为分组策略的第二个参数。
上游组件可以发射多个消息流,下游组件应该有目的地去选择其中(自己感兴趣的)一个消息流!

另外一种理解方式,我们举例只定义了一个Stream,SelectBolt和FilterBolt都各自订阅了INITIALIZER中不同的消息流。
注意:除了订阅源INITIALIZER外,还必须指定消息流!但是FlowInitBolt实际上只发射了一个消息流,所以下面
只有SelectBolt才会从FlowInitBolt的SELECT消息流中读取到消息,而FilterBolt因为订阅的是FlowInitBolt的FILTER
消息流,但是FlowInitBolt并没有产生FILTER这个消息流(或者说即使存在FILTER消息流,但是这个消息流没有任何数据),
所以这里的FilterBolt是不会读取到FlowInitBolt发射的任何消息。

1
2
3
4
5
6
7
8
//FlowInitBolt只发射了一个消息流,stream-id=SELECT
collector.emit("SELECT", tuple, new Values("flow1", event, -1, "stream1", "stream1"));

BoltDeclarer declarer = builder.setBolt("SELECT", new SelectBolt(), parallelism)
.localOrShuffleGrouping(INITIALIZER, "SELECT"); //⬅

BoltDeclarer declarer = builder.setBolt("FILTER", new FilterBolt(), parallelism)
.localOrShuffleGrouping(INITIALIZER, "FILTER"); //×

那么为什么还要这么定义FilterBolt,前面说了,你怎么知道应用程序第一个FlowOp是什么?
(FlowInitBolt发射到的stream-id是第一个FlowOp对应的名称)。有可能如果Stream的
第一个FlowOp是”Filter”的话,那么下面示例中FilterBolt能收到数据,而SelectBolt就收不到了。

1
2
3
4
5
6
7
8
//FlowInitBolt只发射了一个消息流,stream-id=FILTER
collector.emit("FILTER", tuple, new Values("flow1", event, -1, "stream1", "stream1"));

BoltDeclarer declarer = builder.setBolt("SELECT", new SelectBolt(), parallelism)
.localOrShuffleGrouping(INITIALIZER, "SELECT"); //×

BoltDeclarer declarer = builder.setBolt("FILTER", new FilterBolt(), parallelism)
.localOrShuffleGrouping(INITIALIZER, "FILTER"); //⬅

所以为了构建动态的消息流,最好的办法是事先把所有可能的消息流都定义好,
然后在上游组件中通过指定stream-id的方式来保证把消息流发送给下游指定的Bolt组件。比如事先会给所有算子
对应的Bolt(SelectBolt、FilterBolt、AggregateBolt、EachBolt等)都订阅了INITIALIZER消息源,
第二个参数是当前Bolt对应的名称(SELECT、FILTER、AGGREGATE、EACH)。
然后在上游组件中,如果只是想发送给SelectBolt,在emit时,第一个参数stream-id=”SELECT”。
如果不想发送给SelectBolt了而是想发送给FilterBolt,在emit时设置第一个参数为”FILTER”。

这种做法看起来好像是由上游组件来决定要发送消息给哪个下游组件[方案1],跟前面我们探讨的让下游组件主动订阅上游组件[约束条件]
貌似有点矛盾啊!实际上这种方案并没有违反约束条件,下游组件确实也是有主动订阅上游组件的,只不过每个算子对应的Bolt都订阅了
同一个上游组件(比如这里的FlowInitBolt),但是实际上对于一个Stream而言,FlowInitBolt只需要把数据发送给Stream的第一个算子
对应的Bolt,其他Bolt虽然都订阅了FlowInitBolt,但是它们实际上不需要/不应该收到FlowInitBolt发射的消息的。

所以最好的解决办法是:如果能够在上游组件中只发射一个消息流,那么下游组件只可能有一个Bolt有机会得到这个消息流。
这里说的是一个Stream的情况,如果说上游组件要发射到多个Stream,而且每个Stream的第一个FlowOp是不一样的,
那么上游组件仍然可能产生多个Stream,当然同一个Stream也只会有第一个Bolt会接收到对应的消息流。

以前面的两个Stream为例,SelectBolt会收到第一个Stream的消息,FilterBolt会收到第二个Stream的消息。
这里并不会说SelectBolt也会收到第二个Stream的消息,因为第二个Stream发射的是FILTER消息流,
而SelecdBolt并没有订阅FlowInitBolt的FILTER消息流(实际上也不应该让SelectdBolt去订阅其他类型的消息流)。

collector.emit(“SELECT”, tuple, new Values(“flow1”, event, -1, “stream1”, “stream1”)); //第一个Stream第一个FlowOp是SelectOp
collector.emit(“FILTER”, tuple, new Values(“flow1”, event, -1, “stream2”, “stream2”)); //第二个Stream第一个FlowOp是FilterOp

flowmix7
我感觉自己好啰(能)嗦(说)啊!

算子Bolt之间的上下游依赖关系

回到下游组件定义订阅数据源的地方,除了FlowLoaderSpout,TickSpout,和第一个FlowInitBolt外,还定义了其他算子类型的输入源。

就像SelectBolt虽然订阅了INITIALIZER->SELECT消息流,但并不一定会获取到FlowInitBolt发射的消息流,
因为如果FlowInitBolt没有发射SELECT消息流的话,SelectBolt就不会真正读取到消息。
但是如果不订阅的话,则肯定是有问题的:FlowInitBolt发射的消息流,必须确保会有下游组件接收,否则消息就会丢失。

所以每个算子类型的Bolt都要订阅其他所有算子的输入源(以SelectBolt为例):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
BoltDeclarer declarer = builder
.setBolt("SELECT", new SelectBolt(), parallelism)
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
.allGrouping("tick", "tick")
.localOrShuffleGrouping(INITIALIZER, "SELECT")
.localOrShuffleGrouping(FILTER, "SELECT")
.fieldsGrouping(PARTITION,"SELECT",new Fields(FLOW_ID,PARTITION))
.localOrShuffleGrouping(AGGREGATE, "SELECT")
.localOrShuffleGrouping(SELECT, "SELECT")
.localOrShuffleGrouping(EACH, "SELECT")
.localOrShuffleGrouping(SORT, "SELECT")
.localOrShuffleGrouping(SWITCH, "SELECT")
.localOrShuffleGrouping(SPLIT, "SELECT")
.localOrShuffleGrouping(JOIN, "SELECT");

假设应用程序产生Flow的过程是:filter().select().aggregate().select().filter().each(),
Flow的Stream中各个FlowOp的顺序是:List(FilterOp, SelectOp, AggregateOp, SelectOp, FilterOp, EachOp)。
注意这里允许多次调用同一种类型的FlowOp,而且数组链表也支持相同类型的元素存在。

第一个FlowOp=FilterOp,对应的FilterBolt会订阅FlowInitBolt的INITIALIZER->”FILTER”消息流,
其他Bolt虽然也订阅了INITIALIZER输入源,但是由于FlowInitBolt只发射了FILTER,所以它们订阅的INITIALIZER不会有数据进来。

下面详细分析了Stream中各个FlowOp对应的FlowBolt的分组策略:

filter().select().aggregate().select().filter().each()
FilterOp, SelectOp, AggregateOp, SelectOp, FilterOp, EachOp
FilterBolt, SelectBolt, AggregateBolt, SelectBolt, FilterBolt, EachBolt

group1

filter().select().aggregate().select().filter().each()
FilterOp, SelectOp, AggregateOp, SelectOp, FilterOp, EachOp
FilterBolt, SelectBolt, AggregateBolt, SelectBolt, FilterBolt, EachBolt

group2

filter().select().aggregate().select().filter().each()
FilterOp, SelectOp, AggregateOp, SelectOp, FilterOp, EachOp
FilterBolt, SelectBolt, AggregateBolt, SelectBolt, FilterBolt, EachBolt

group3

filter().select().aggregate().select().filter().each()
FilterOp, SelectOp, AggregateOp, SelectOp, FilterOp, EachOp
FilterBolt, SelectBolt, AggregateBolt, SelectBolt, FilterBolt, EachBolt

group4

filter().select().aggregate().select().filter().each()
FilterOp, SelectOp, AggregateOp, SelectOp, FilterOp, EachOp
FilterBolt, SelectBolt, AggregateBolt, SelectBolt, FilterBolt, EachBolt

group5

filter().select().aggregate().select().filter().each()
FilterOp, SelectOp, AggregateOp, SelectOp, FilterOp, EachOp
FilterBolt, SelectBolt, AggregateBolt, SelectBolt, FilterBolt, EachBolt

group6

现在FlowmixBuilder类基本上分析完毕了,好家伙,这里面水真的不浅啊

算子Bolt

再来思考下FlowInitBolt需要EventSpout而不需要TickSpout,而算子Bolt需要TickSpout不需要EventSpout的原因。

Bolt类型 订阅的Spout 订阅的Bolt
FlowInitBolt FlowLoaderSpout,EventSpout null
算子Bolt FlowLoaderSpout,TickSpout 上游Bolt

EventSpout产生的事件只需要流入FlowInitBolt即可,后续FlowInitBolt下发给Stream的第一个Bolt,
并且按照Stream中FlowOp的顺序依次将每个Bolt中计算好的数据下发给下游的Bolt。
所以Stream中的Bolt并不直接解除原始EventSpout发射的事件。

如果EventSpout要发射事件给Stream中的Bolt,那么它就要跟FlowInitBolt那样,选择一个确定的stream-id,
并且也只发送事件给Stream中第一个Bolt:FlowInitBolt就是这么干的。
抽取出初始化Bolt的主要目的就是统一把事件发射到这里,
然后再由FlowInitBolt决定要把收到的事件转发给下游Stream的第一个Bolt。

所以说FlowInitBolt是EventSpout和Stream第一个Bolt的中间介质。如果不使用FlowInitBolt,
而让EventSpout直接发射事件给Stream的第一个Bolt,可以想想会有什么问题的。
答案:EventSpout并不是把事件发射给Stream所有的Bolt,那么它就要知道Stream中第一个Bolt的stream-id是什么,
但是EventSpout本身是数据源,不会接收其他消息,就没办法知道Stream到底长什么样。
使用FlowInitBolt,它不仅可以接收FlowLoaderSpout发射的flows:这里包含Stream的List信息,
同时还会收到EventSpout发射的数据,FlowInitBolt会首先计算出Stream的第一个FlowOp的stream-id,
然后把从EventSpout收到的事件原封不动地发射到这个stream-id上。
关键是EventSpout和Stream第一个FlowOp中间必须有flows数据,才能让EventSpout决定目标stream-id。
当然你可以像构造FlowLoaderSpout那样,把客户端应用程序创建的flows也传递给EventSpout,
这样就可以把FlowInitBolt去掉了,不过通常我们不希望EventSpout和flows绑定的太紧。

算子Bolt和FlowInitBolt(这两种类型的Bolt是FlowmixBuilder中Topology的所有Bolt,再没有其他的Bolt了)都
需要接收FlowLoaderSpout发射的flows,而且算子Bolt还会接收TickSpout发射的tick tock tick tock空/心跳消息。

SelectBolt

同FlowInitBolt一样,SelectBolt也会接收FlowLoaderSpout发射的FLOW_LOADER_STREAM消息流(其他Bolt也都会),
必须确保先接收FlowLoaderSpout的消息,处理方式是使用一个Map来保存flows,key是Flow的ID,value是Flow对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SelectorBolt extends BaseRichBolt {
Map<String,Flow> flows;
OutputCollector collector;

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
flows = new HashMap<String, Flow>();
}
public void execute(Tuple tuple) {
if(FLOW_LOADER_STREAM.equals(tuple.getSourceStreamId())) {
for(Flow flow : (Collection<Flow>)tuple.getValue(0))
flows.put(flow.getId(), flow);
} else if(!"tick".equals(tuple.getSourceStreamId())) {
FlowInfo flowInfo = new FlowInfo(tuple);
Flow flow = flows.get(flowInfo.getFlowId());
if (flow != null) {
//....
}
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
FlowmixBuilder.declareOutputStreams(outputFieldsDeclarer, fields);
}
}

SelectBolt以及其他算子类型的Bolt和FlowInitBolt的declareOutputFields方法都是一样的,定义了所有的输出stream。

上面我们说过算子Bolt是有接收TickSpout的,但是SelectBolt的execute方法中看起来并不想处理TickSpout消息流。
其实算子Bolt除了接收FlowLoaderSpout和TickSpout外,还有一种情况是接收上游Bolt经过处理后的消息流。
假设SelectBolt是紧接着FlowInitBolt的下游Bolt,那么它也会接收FlowInitBolt发射的事件。
(FlowInitBolt的事件其实来自于EventSpout,它并没有对事件做处理,而是起到转发的作用)。

同样在SelectBolt之后假设是FilterBolt,那么FilterBolt接收的数据源有三个:
FlowLoaderSpout发射的flows,TickSpout发射的心跳,SelectBolt处理过的消息流。

为什么SelectBolt不处理TickSpout呢?实际上Tick的目的是定时器,而Select算子并不需要定时!

FlowInitBolt原封不动地发射event给Stream的第一个FlowOp,它的Values是经过设计的,最主要的是第二个和第三个参数:
第二个参数表示要发射的数据,第三个参数表示当前Bolt在Stream的FlowOp数组链表对应的索引位置,这个索引位置从-1开始,
每次经过一个Bolt的处理,index值就加1。FlowOp对应的Bolt都处理完了,index也到了数组链表的最后一个位置。

1
2
String streamid = stream.getFlowOps().get(0).getComponentName();
collector.emit(streamid, tuple, new Values(flow.getId(), event, -1, streamName, streamName));

假设FlowInitBolt的下一个Bolt是SelectBolt(其他Bolt也是类似的),tuple就是上面FlowInitBolt发射的Values整个数据。

1
2
3
FlowInfo flowInfo = new FlowInfo(tuple);
Flow flow = flows.get(flowInfo.getFlowId());
SelectOp selectOp = getFlowOpFromStream(flow, flowInfo.getStreamName(), flowInfo.getIdx());

Bolt接收的每个Tuple都会被封装为FlowInfo:除了事件数据外,还有其他和Flow相关的元数据:flowId,streamName这些就不用说了。
previousStream主要用于多个Stream之间的合并操作比如join等,如果多个Stream没有关联关系,通常previousStream=streamName。
partition这个字段用于聚合,比如前面Bolt的分组策略fieldsGrouping就会使用PARTITION。
idx这个变量决定了当前Bolt接收到Tuple后,如何从Steram的List中获取出对应位置的FlowOp。

1
2
3
4
5
6
7
8
9
public FlowInfo(Tuple tuple) {
flowId = tuple.getStringByField(FLOW_ID);
event = (Event) tuple.getValueByField(EVENT);
idx = tuple.getIntegerByField(FLOW_OP_IDX);
idx++;
streamName = tuple.getStringByField(STREAM_NAME);
previousStream = tuple.getStringByField(LAST_STREAM);
if(tuple.contains(PARTITION)) partition = tuple.getStringByField(PARTITION);
}

以SelectBolt为例,它收到的FlowInitBolt发射的index=-1,所以它对-1+1=0,得到的index=0。
getFlowOpFromStream方法根据传入的idx参数值=0,从Stream的List得到的FlowOp=SelectOp!
因为我们前面假设了SelectBolt就是紧接着FlowInitBolt后面的Bolt,所以SelectBolt得到的一定是SelectOp!

会不会说假设ArrayList=[FilterOp,SelectOp,EachOp,FilterOp…]而导致SelectBolt选择出了FilterOp?
不会出现这种情况的!因为FlowInitBolt和其他所有Bolt使用的ArrayList都是一致的数据结构。
如果Stream的ArrayList真如上面的,那么FlowInitBolt发送的stream-id=Stream的第一个FlowOp=”FILTER”
这样只会有FilterBolt能够收到这个消息流,也就是说FlowInitBolt的下一个Bolt是FilterBolt,
那么FilterBolt得到的index=0后,从List数组链表取得的FlowOp还是FilterOp!

ArrayList FlowInitBolt.emit(stream-id) next bolt index getFlowOpFromStream
SelectOp,FilterOp,EachOp SELECT SelectBolt 0 SelectOp
FilterOp,SelectOp,EachOp FILTER FilterBolt 0 FilterOp
EachOp,SelectOp,FilterOp EACH EachBolt 0 EachOp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//获取Flow的Stream的第idx个FlowOp
public static <T extends FlowOp> T getFlowOpFromStream(Flow flow, String stream, int idx) {
return (T) flow.getStream(stream).getFlowOps().get(idx);
}

//Flow的Stream的FlowOps集合的下一个FlowOp. 因为组成Stream的flowOps是有序的,即根据定义顺序依次加入
//假设FlowOpsSize=5,idx从0开始.0,1,2,3.第四个(idx=3)的nextStream为最后一个(idx=4)Op的name.
//当处理的是最后一个时,idx+1=5 !< 5,所以最后一个FlowOp的nextStream=OUTPUT.
public static String getNextStreamFromFlowInfo(Flow flow, String streamName, int idx) {
return idx + 1 < flow.getStreamFlowOpsSize(streamName) ?
//如果当前FlowOp在Stream中还有下一个FlowOp,则nextStream为下一个FlowOp的name.
//如果当前FlowOp在Stream中是最后一个FlowOp,则nextStream=OUTPUT
flow.getFlowOp(streamName,idx+1).getComponentName() : OUTPUT;
}

就像FlowInitBolt发射事件时,要指定stream-id,从而将事件转交给下一个Bolt处理。假设当前Bolt是SelectBolt,
它也要将自己处理过的事件发射给下游Bolt,也要指定下游Bolt的stream-id,同样这个信息要从List获取。

FlowInitBolt发射的stream-id是List的第一个FlowOp的名称,SelectBolt则要找第二个FlowOp的名称。
因为SelectBolt是List中的第一个FlowOp!当然由于实际应用中SelectBolt不一定是第一个FlowOp,所以
我们应该使用动态的方式。因为接收上游发射的Tuple的index时,加上1为当前的FlowOp的索引位置,即FlowInfo的idx,
那么在当前FlowInfo.idx再加上1,就是当前FlowOp的下一个FlowOp–>作为当前Bolt发射时的stream-id。Perfect!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
FlowInfo flowInfo = new FlowInfo(tuple);
Flow flow = flows.get(flowInfo.getFlowId());
if (flow != null) {
SelectOp selectOp = getFlowOpFromStream(flow, flowInfo.getStreamName(), flowInfo.getIdx());
String nextStream = Utils.getNextStreamFromFlowInfo(flow, flowInfo.getStreamName(), flowInfo.getIdx());

//映射操作会对事件产生影响,只有包含对应字段的事件,才会进入下一轮
Event newEvent = new BaseEvent(flowInfo.getEvent().getId(), flowInfo.getEvent().getTimestamp());
//Event中的tuples类似于一个Map.
for(org.calrissian.mango.domain.Tuple eventTuple : flowInfo.getEvent().getTuples()) {
if(selectOp.getFields().contains(eventTuple.getKey()))
newEvent.put(eventTuple);
}

//If no selected tuples existed, event will not be emitted
if(hasNextOutput(flow, flowInfo.getStreamName(), nextStream)) {
if (newEvent.getTuples().size() > 0)
collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), newEvent, flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream()));
}

// send directly to any non std output streams
if(exportsToOtherStreams(flow, flowInfo.getStreamName(), nextStream)) {
for (String output : flow.getStream(flowInfo.getStreamName()).getOutputs()) {
String outputStream = flow.getStream(output).getFlowOps().get(0).getComponentName();
collector.emit(outputStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), -1, output, flowInfo.getStreamName()));
}
}
}

exportsToOtherStreams这部分逻辑是多个流的处理,暂时放在一边

假设ArrayList=[SelectOp,FilterOp,EachOp,SelectOp,FilterOp],那么各个Bolt的emit(stream-id)如下:

Bolt FlowInfo(idx) FlowOp idx+1 nextStream id new Values(event, flowInfo.idx)
FlowInitBolt SELECT new Values(event,-1)
SelectBolt -1+1=0 SelectOp 0+1=1 FILTER new Values(event1,0)
FilterBolt 0+1=1 FilterOp 1+1=2 EACH new Values(event2,1)
EachBolt 1+1=2 EachOp 2+1=3 SELECT new Values(event3,2)
SelectBolt 2+1=3 SelectOp 3+1=4 FILTER new Values(event4,3)
FilterBolt 3+1=4 OUTPUT new Values(event5,4)

这里主要流程如下:

  1. 收到上游组件的Tuple,构造FlowInfo,其中idx为上游组件的idx+1
  2. getFlowOpFromStream获取indx对应的FlowOp,通常Bolt和FlowOp的算子类型是对应的
  3. nextStream=getNextStreamFromFlowInfo
  4. 根据当前Bolt特有的计算逻辑,对收到的Event数据进行计算,并生成最新的事件数据Event’
  5. emit(nextStream, new Values(Event’, FlowInfo.idx))

按照这样的处理流程,基本的处理框架如下:

1
2
3
4
5
6
7
8
FlowInfo flowInfo = new FlowInfo(tuple);
Flow flow = flows.get(flowInfo.getFlowId());
if(flow != null) {
FlowOp myFlowOp = getFlowOpFromStream(flow, flowInfo.getStreamName(), flowInfo.getIdx());
String nextStream = Utils.getNextStreamFromFlowInfo(flow, flowInfo.getStreamName(), flowInfo.getIdx());
Event newEvent = processEvent(flowInfo.getEvent());
collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), newEvent, flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream()));
}

flowmix13

flowmix14

flowmix15

flowmix16

flowmix17

那么看接下来几种类型的Bolt就非常简单了(不同的是processEvent这部分逻辑):

  1. SelectBolt:只从事件中获取出指定的Fields字段,假设Event有[key1,key2,key3]三个字段,
    而SelectOp只配置了key1,那么这条事件最后只有key1这一个字段
  2. FilterBolt:假设Filter的accept方法是:equals(“key1”),那么最后也只有key1一个字段
  3. EachBolt类似于scala的map函数

问题:如果一个Stream中有相同的FlowOp,会有什么问题吗?比如.select().aggregate().select().aggregate()。
flowmix-same-flowop

FilterBolt

1
2
3
4
5
6
7
8
9
10
11
12
FilterOp filterOp = getFlowOpFromStream(flow, flowInfo.getStreamName(), flowInfo.getIdx());

//将事件运用到Flow的FilterOp上,验证事件是否符合过滤器.
if(filterOp.getFilter().accept(flowInfo.getEvent())) {
//下一个FlowOp的stream-id
String nextStream = getNextStreamFromFlowInfo(flow, flowInfo.getStreamName(), flowInfo.getIdx());

//还有下一个Component,则继续将自己处理完的tuple发射给下一个Component(Bolt)
if(hasNextOutput(flow, flowInfo.getStreamName(), nextStream))
//和InitBolt发射的Values一样,是构造FlowInfo的所有字段.
collector.emit(nextStream, tuple, new Values(flow.getId(), flowInfo.getEvent(), flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream()));
}

EachBolt

1
2
3
4
5
6
7
8
9
10
11
12
EachOp functionOp =  getFlowOpFromStream(flow, flowInfo.getStreamName(), flowInfo.getIdx());
String nextStream = getNextStreamFromFlowInfo(flow, flowInfo.getStreamName(), flowInfo.getIdx());

//Function可以看做是map操作,将事件转换为新的事件
List<Event> events = functionOp.getFunction().execute(flowInfo.getEvent());

if(hasNextOutput(flow, flowInfo.getStreamName(), nextStream)) {
if (events != null) {
for (Event newEvent : events)
collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), newEvent, flowInfo.getIdx(), flowInfo.getStreamName(), flowInfo.getPreviousStream()));
}
}

PartitionBolt

1
2
3
4
5
6
7
8
PartitionOp partitionOp = getFlowOpFromStream(flow, flowInfo.getStreamName(), flowInfo.getIdx());
String nextStream = Utils.getNextStreamFromFlowInfo(flow, flowInfo.getStreamName(), flowInfo.getIdx());

//Partition和Select一样,定义了一些字段,表示group by的字段
String hash = buildKeyIndexForEvent(flowInfo.getFlowId(), flowInfo.getEvent(), partitionOp.getFields());

if(hasNextOutput(flow, flowInfo.getStreamName(), nextStream))
collector.emit(nextStream, tuple, new Values(flowInfo.getFlowId(), flowInfo.getEvent(), flowInfo.getIdx(), flowInfo.getStreamName(), hash, flowInfo.getPreviousStream()));

PartitionBolt的declareOutputFields partitionFields比普通的fields多了一个PARTITION字段:

1
2
3
4
5
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
declareOutputStreams(outputFieldsDeclarer, partitionFields);
}

public static Fields partitionFields = new Fields(FLOW_ID, EVENT, FLOW_OP_IDX, STREAM_NAME, PARTITION, LAST_STREAM);

FlowmixBuilder中订阅了PARTITION数据源的分组策略定义(以SelectBolt为例,其他Bolt同理):

1
2
bolt.setBolt("SELECT", new SelectBolt())
.fieldsGrouping(PARTITION, "SELECT", new Fields(FLOW_ID,PARTITION));

上面的示例中,Topology拓扑图是:PartitionBolt跟着SelectBolt,因为SelectBolt订阅了PartitionBolt数据源,
比如客户端应用程序的调用顺序是:stream(“stream1”).partition().select()…
而且PartitionBolt发射的stream-id=”SELECT”,PartitionBolt不会发射其他stream-id(否则下一个Bolt就不是SelectBolt)!

PartitionOp和SelectOp类似,可以指定多个fields字段,SelectOp表示的是选择这几个字段作为事件的输出字段,
PartitionOp表示根据这几个字段进行分区/分组。

fields… SQL
stream(“stream1”).select().fields(“key1”, “key2”) select key1, key2 from stream1
stream(“stream1”).partition().fields(“key1”, “key2”) select * from stream1 group by key1,key2

buildKeyIndexForEvent从一条事件中根据groupBy字段构建keyIndex,最后会作为发射时的hash即Fields的PARTITION字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static String buildKeyIndexForEvent(Event event, List<String> groupBy) {
StringBuffer stringBuffer = new StringBuffer();
if (groupBy == null || groupBy.size() == 0)
return stringBuffer.toString(); // default partition when no groupBy fields are specified.

//groupBy可以定义多个字段.
for (String groupField : groupBy) {
//一条事件Event可以有多个Tuples.
Collection<Tuple> tuples = event.getAll(groupField); //一个字段也允许多个Tuple!
SortedSet<String> values = new TreeSet<String>();

if (tuples == null) {
values.add("");
} else {
for (Tuple tuple : tuples)
values.add(registry.encode(tuple.getValue()));
}
stringBuffer.append(groupField + join(values, "") + "|");
}
return stringBuffer.toString();
}

//flowId|groupField1.xxxxx|groupField2.yyyyy
public static String buildKeyIndexForEvent(String flowId, Event event, List<String> groupBy) {
return flowId + buildKeyIndexForEvent(event, groupBy);
}

Event

通常每一条事件至少在某个字段上要保证唯一性,单单用时间撮是无法保证唯一性的,因为有可能在同一个时刻会产生多条事件。
如果业务意义上需要隔离的两条事件,因为时间撮相同而放在同一个Event中显然是有问题的!比下面的两条事件:

事件1:[时间撮:1468220424,合作方:A,事件:登陆,账户:张三,IP:192.168.0.1,设备:AAAAAA]
事件2:[时间撮:1468220424,合作方:B,事件:付款,账户:张三,IP:192.168.0.1,设备:AAAAAA]

虽然这两条事件的账户、IP、设备信息都是一样的,但是因为合作方不同,事件也不同,不能看做是同一条事件!
通常业务系统会自己设计一个全局唯一的序号(不管用哪种方式实现都要保证不同事件有不同的序号),比如sequence_id:

事件1:[sequence_id: 1468220424001,时间撮:1468220424,合作方:A,事件:登陆,账户:张三,IP:192.168.0.1,设备:AAAAAA]
事件2:[sequence_id: 1468220424002,时间撮:1468220424,合作方:B,事件:付款,账户:张三,IP:192.168.0.1,设备:AAAAAA]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static Collection<Event> getMockEvents() {
Collection<Event> eventCollection = new ArrayList<Event>();
//创建一条事件,只有UUID和时间撮
Event event = new BaseEvent(UUID.randomUUID().toString(), System.currentTimeMillis());

//往事件中填充信息. Key-Value键值对, event看起来像Map. 或者如果用JSON表示:
//{id:f171edb8-a393-4eaf-bbc0-8fece8f1a22b, timestamp:1441637681000, {key1:val1, key2:val2, key3:val3}}
event.put(new Tuple("key1", "val1"));
event.put(new Tuple("key2", "val2"));
event.put(new Tuple("key3", "val3"));
event.put(new Tuple("key4", "val4"));
event.put(new Tuple("key5", "val5"));

//在一条事件中相同key有不同的value.而我们实际要的效果可能是在不同的event中相同key有不同value.用上面的方式模拟.
//注意key1的值没有变化.而key3的值变化. 表示key1这个ip=val1,在不同的事件中,它的账户(key3)分别是val3和val-3
/*
event.put(new Tuple("key1", "val1"));
event.put(new Tuple("key3", "val-3"));
*/
eventCollection.add(event);
return eventCollection;
}

下面举例了多种模拟事件:

//事件序号=1,登陆事件,用户:张三,IP:192.168.0.1,设备:AAAAAAAA
event1.put(new Tuple("sequence_id", "1"));
event1.put(new Tuple("type", "login"));
event1.put(new Tuple("ip", "192.168.0.1"));
event1.put(new Tuple("account", "张三"));
event1.put(new Tuple("device", "AAAAAAAA"));

//事件序号=2,登陆事件,用户:李四,IP:192.168.0.1,设备:AAAAAAAA
event2.put(new Tuple("sequence_id", "2"));
event2.put(new Tuple("type", "login"));
event2.put(new Tuple("ip", "192.168.0.1"));
event2.put(new Tuple("account", "李四"));
event2.put(new Tuple("device", "AAAAAAAA"));

//事件序号=3,登陆事件,用户:张三,IP:192.168.0.1,设备:AAAAAAAA
event3.put(new Tuple("sequence_id", "3"));
event3.put(new Tuple("type", "login"));
event3.put(new Tuple("ip", "192.168.0.1"));
event3.put(new Tuple("account", "张三"));
event3.put(new Tuple("device", "AAAAAAAA"));    

//事件序号=4,登陆事件,用户:李四,IP:192.168.0.1,设备:BBBBBBBB
event4.put(new Tuple("sequence_id", "4"));
event4.put(new Tuple("type", "login"));
event4.put(new Tuple("ip", "192.168.0.1"));
event4.put(new Tuple("account", "李四"));
event4.put(new Tuple("device", "BBBBBBBB")); 

//事件序号=5,登陆事件,用户:李四,IP:192.168.0.2,设备:BBBBBBBB
event5.put(new Tuple("sequence_id", "5"));
event5.put(new Tuple("type", "login"));
event5.put(new Tuple("ip", "192.168.0.2"));
event5.put(new Tuple("account", "李四"));
event5.put(new Tuple("device", "BBBBBBBB")); 

//事件序号=6,登陆事件,用户:张三/李四,IP:192.168.0.2,设备:BBBBBBBB(同一台设备在一个IP地址上同时登陆两个账户)
event6.put(new Tuple("sequence_id", "6"));
event6.put(new Tuple("type", "login"));
event6.put(new Tuple("ip", "192.168.0.2"));
event6.put(new Tuple("account", "张三"));
event6.put(new Tuple("account", "李四"));    
event6.put(new Tuple("device", "BBBBBBBB")); 

注意到事件6的不同,如果按照业务逻辑可能并不符合实际情况,但是Event支持这种在同一个事件里允许存在相同key不同value的场景。

AggregatorBolt

聚合算子和前面的SelectBolt,FilterBolt,EachBolt只处理一个事件不同的是,聚合算子要聚合多个事件
同时还要处理TickSpout发射的心跳,普通的Bolt虽然也配置了TickSpout输入源,但是并没有处理心跳!

除此之外,还有一个不同点是双层Map变量:windows,第二层的Cache实际上也是一个Map。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AggregatorBolt extends BaseRichBolt {
Map<String,Flow> flows;
OutputCollector collector;

//聚合比其他Bolt多了个聚合窗口. key=flowId.streamName.aggFlowOpsIndex. Cache.key=hash partition
Map<String, Cache<String, AggregatorWindow>> windows;
SimpleDateFormat sdf = new SimpleDateFormat("", Locale.SIMPLIFIED_CHINESE);

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
flows = new HashMap<String, Flow>();
windows = new HashMap<String, Cache<String, AggregatorWindow>>();
sdf.applyPattern("yyyy-MM-dd HH:mm:ss");
}
public void execute(Tuple tuple) {
if(FLOW_LOADER_STREAM.equals(tuple.getSourceStreamId())) { //FlowLoaderSpout发射的flows
for(Flow flow : (Collection<Flow>)tuple.getValue(0))
flows.put(flow.getId(), flow);
} else if("tick".equals(tuple.getSourceStreamId())) { //TickSpout发射的心跳事件

} else if(!"tick".equals(tuple.getSourceStreamId())){ //上游Bolt发射的事件

}
}
}

问题:AggregatorBolt一定要有window吗?window表示窗口,如果我只想要单纯的聚合,不想要窗口的功能呢?

处理Event事件

处理Event事件时,Event一定来自于上游组件发射的tuple,根据tuple的idx,+1后,从Stream中获取的当前FlowOp=AggregateOp。

假设Stream=[SelectOp,FilterOp,AggregateOp],Topology拓扑图的构建过程如下:
flowmix8

由于AggregateBolt的主要目的是要聚合Event,所以它不像普通的SelectBolt/FilterBolt收到输入tuple后经过计算后直接发射出去。
它会使用windows对事件进行暂存,windows的key是:flowId|streamName|flowOpIndex。最后的flowOpIndex表示当前FlowOp的索引位置。

在一个确定的AggregateBolt中,idx是确定的,那么windows的key只可能有一个元素!
Stream可以有不止一个AggregateOp,比如[SelectOp,AggregateOp,FilterOp,AggregateOp]存在两个AggregateOp。
对应的Bolt分别是:[SelectBolt,AggregateBolt,FilterBolt,AggregateBolt]。
思考问题:假设当前AggregateBolt是第二个,为什么windows还需要flowInfo.getIdx来区分呢?

windows的数据结构如下(构造这个双层Map的数据都来自于FlowInfo,FlowInfo又都是来自于上游的Tuple):

flowmix-windows map

针对双层Map,往里面放入数据时,必须确保每一层的Map不为空,由于内层的value也是一个对象:AggregateWindow,所以要
保证windows.get(idx)得到的windowCache,以及windowCache.get(partiton)得到的AggregatorWindow都不能为空
时才能最终将事件(tuple->FlowInfo->getEvent)放入AggregatorWindow中(AggregatorWindow才是Event的最终归宿)。

  1. AggregatorWindow.add(Event)
  2. windowCache.put(Partition, AggregatorWindow)
  3. windows.put(flow-stream-idx, windowCache)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//在tick spout每秒都会发射tick tuple时.其他Bolt结合事件会发射到这里.在没有达到触发条件时,Aggregate负责收集事件到窗口内.在满足触发条件时,统计窗口内收集到的所有事件.
FlowInfo flowInfo = new FlowInfo(tuple);
//如果select一个字段,则打印出的都是1,说明调用一次execute,只有一条事件进来.尽管一秒钟(tick中)会有很多条事件.
//如果select了多个字段,则一个event因为有多个Tuple,size!=1
//System.out.println("events:" + flowInfo.getEvent().getTuples().size());
//System.out.println("events:" + flowInfo.getEvent());
Flow flow = flows.get(flowInfo.getFlowId());
if(flow != null) {
AggregateOp op = getFlowOpFromStream(flow, flowInfo.getStreamName(), flowInfo.getIdx());
Cache<String, AggregatorWindow> windowCache = windows.get(flowInfo.getFlowId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx());

AggregatorWindow window = null;
if(windowCache != null) {
window = windowCache.getIfPresent(flowInfo.getPartition());
// if we have a window already constructed, proces it
if(window != null) {
//If we need to evict any buffered items, let's do that here 这里只有失效策略移除事件,没有触发策略增加计数器,计数器是在tick中
if(op.getEvictionPolicy() == Policy.TIME)
window.timeEvict(op.getEvictionThreshold());
} else {
//may be exist window but expire after windowEvictMillis, or non exist window which is a new partition.
window = buildWindow(op, flowInfo.getStreamName(), flowInfo.getIdx(), flowInfo.getPartition(), flowInfo.getFlowId(), windowCache);
}
} else {
/**
* Cache不存在,先创建这样的Cache,让后往Cache中put Key Value.
* TODO: default windowEvict=60min, what about long time window such as 1day,1week,even 1month,3month.after window evict, does data lost too?
* after window expire, windowCache!=null, but window=null. see EventTupleTest.testCacheExpire. so buildWindow again.
* window是针对partition的. 当这个partition的window在1个小时之内都没有任何事件再进来, 这个窗口就会被关闭!
* 假设我们要统计过去2个小时的数据, 假设key3=val3有两条事件发生的时间是: 1min和65min. 在第一条事件发生的一个小时之后即61min时,window被关闭.
* 当第二条事件进来时,windowCache!=null & window=null. 创建了一个新的window. 然后往这个新的window添加了第二条事件.
* 注意:还要考虑tick操作,因为是TIME-Based统计,所以在61min之后,在windowCache中不会存在key2=val3这个Window了.
* 这样第二条事件只会放入新的Window中, 而旧的Window已经不复存在了. 所以旧的Window中的第一条事件信息也会丢失. 那么统计就不准确了.
* 一个解决办法是如果TIME=2hour,则设置windowEvictMillis=2hour! 保证窗口不会过期. 但是如果时间跨度很大,window在内存中就存活地更长了.
*/
windowCache = CacheBuilder.newBuilder().expireAfterWrite(op.getWindowEvictMillis(), TimeUnit.MILLISECONDS).build();
//先往windowCache中put partitionKey -> 新build的AggregatorWindow
window = buildWindow(op, flowInfo.getStreamName(), flowInfo.getIdx(), flowInfo.getPartition(), flowInfo.getFlowId(), windowCache);
}

//AggregatorWindow中put事件. 在窗口还没关闭时,一直往这个窗口内添加数据
window.add(flowInfo.getEvent(), flowInfo.getPreviousStream());
//window eviction is on writes, so we need to write to the window to reset our expiration.
//如果Aggregator前一个不是Partition,则partition=null.
//NOTICE: 通常设置partition字段相当于SQL中的group by.相同partition的事件会进入同一个Window被聚合函数处理.
windowCache.put(flowInfo.getPartition(), window);

//Perform count-based trigger if necessary 基于次数:当窗口内的事件数量达到阈值时,开始聚合操作
//emitAggregate动作有两处:基于时间在tick中, 基于次数在非tick中. 因为tick一秒钟可能有多条事件,所以用tick计数器每秒+1
//非tick的tuple进来时,execute方法一次只可能接收一条事件,所以基于次数的统计,可以在这里增加计数器
//因为一个AggregateOp只可能定义要么时间要么次数的策略,所以可以共用triggerTicks计数器,不会说既在tick中也在这里都往同一个计数器+1
if(op.getTriggerPolicy() == Policy.COUNT) {
window.incrTriggerTicks();
if(window.getTriggerTicks() == op.getTriggerThreshold()){
System.out.println("COUNT emitAggregate..");
emitAggregate(flow, op, flowInfo.getStreamName(), flowInfo.getIdx(), window);
}
}
}

假设客户端定义的Stream的FlowOp数组链表顺序是:[SelectOp,PartitionOp,AggregateOp,FilterOp,AggregateOp],
对于第二个AggregateBolt,前面存在一个PartitionBolt,假设它定义的字段为”key1”,上游组件发射了如下的Event Tuple,

Event Event Tuple
event1 {“key1”: “val10”, “key2”: “val20”, “key3”: “val30”}
event2 {“key1”: “val11”, “key2”: “val21”, “key3”: “val31”}
event3 {“key1”: “val12”, “key2”: “val22”, “key3”: “val32”}
event4 {“key1”: “val10”, “key2”: “val23”, “key3”: “val33”}
event5 {“key1”: “val11”, “key2”: “val24”, “key3”: “val34”}

注意Storm的Bolt处理的都是记录级别的,即一条事件进来都会执行一次Bolt的execute方法(不像batch方式一次处理多条事件)。
下面是AggregateBolt处理每一条(Event Tuple)记录的过程,假设Flow和Stream名称是”flow1”和”stream1”:

第一条事件,windowCache=windows.get(“flow1.stream1.2”)=null,2表示这是第二个FlowOp
new一个windowCache(内层Map),以及buildWindow创建内层Map中的AggregatorWindow=window
这个AggregatorWindow对应的hash=”val10”,因为Partition字段=”key1”,tuple.get(“key1”)=”val10”=hash
将第一条事件添加到创建刚刚创建的AggregatorWindow中,window1.add(event1)
往windowCache中添加事件的hash->AggregatorWindow,即”val10”->AggregatorWindow(event1,”val10”)

第二条事件:windowCache!=null,因为第一条事件已经创建了windowCache,所以不需要再创建了,
第二条事件的partition=tuple.get(“key1”)=”val11”,window=windowCache.get(“val11”)=null不存在,
所以buildWindow()创建第二个AggregatorWindow(event2,”val11”),
将第二条事件添加到创建刚刚创建的AggregatorWindow中,window2.add(event2)
往windowCache中添加事件的hash->AggregatorWindow,即”val11”->AggregatorWindow(event2,”val11”)

第三条事件和第二条的处理类似,最终:”val12”->AggregatorWindow(event2,”val12”)

第四条事件,windowCache不需要创建,partition=tuple.get(“key1”)=”val10”,
window=windowCache.get(“val10”)在第一条事件中创建过了,所以不需要再buildWindow了,直接
将第四条事件添加到已经创建好的AggregatorWindow中,window1.add(event2)

第五条事件,windowCache不需要创建,partition=tuple.get(“key1”)=”val11”,
window=windowCache.get(“val11”)在第二条事件中创建过了,所以不需要再buildWindow了,直接
将第五条事件添加到已经创建好的AggregatorWindow中,window2.add(event2)

最终AggregatorBolt处理了上面的五条事件后,windows的数据结构如下:

flowmix-windows example

上面除了windows数据结构的添加外,如果window不为空,而且evict策略为TIME的话,调用timeEvict,
并且会在最后添加事件到AggregatorWindow以及windowCache.put(partition,window)后,如果
触发策略为COUNT,会增加计数器(基于COUNT的聚合,处理一条记录计数器+1),如果计数器等于触发阈值,
会调用emitAggregate(window),将AggregatorWindow收集到的所有事件做最终的计算并发射出去。
具体的聚合计算方式跟Aggregator的实现类有关(COUNT,SUM等)。

AggregatorBolt在处理每条事件的时候,如果聚合的窗口还没有到(不管是基于次数还是基于时间),
就应该把每条收到的事件暂存在AggregatorWindow中,这样最后窗口达到阈值时,就可以取出这个窗口内的
所有事件,并运用具体的Aggregator算法,对所有时间做计算,得到结果后,发射计算结果给下游组件。

store event

这种如果直接把原始的Event数据简单粗暴地存储在窗口中,如果一个窗口的内事件很多,而且在窗口还没触发执行时,
windows因为要保存所有Partition的窗口,如果Partition数量也很多,windows占用的内存将会非常可观(不容乐观)。

many event

那么有什么解决办法呢?在添加事件到AggregatorWindow的时候,根据Aggregator算法,只存储对最终计算结果有用的数据!
比如CountAggregator,在窗口中保存一个计数器,每条事件进来时,只需要增加计数器的值即可,不需要保存原始的事件!

同样其他类似的Aggregator也可以采用类似的思路,比如SumAggregator/MaxAggregator等等,同样只需要一个变量,
当然和Count不需要关心现有状态(Count是无状态的),Sum/Max操作需要获取已有的值和当前事件的值进行比较(有状态)。

现在每条事件进入窗口时,由原先存储原始事件改为只存储中间结果,下面是第一条事件:

flowmix9

第二条事件以及接下来的事件:

flowmix10

flowmix11

flowmix12

目前的实现中,AggregatorWindow包含Aggregator和Window,Aggregator保存的是确实是每次事件的计算结果,
但是Window还是有保存原始事件的!既然add操作和evict操作都可以只更新Aggregator,为什么还需要Window的原始事件?
flowmix-aggregator window

AggregateOp

从Stream中获得的AggregateOp,会被用来构建AggregatorWindow。应用程序可以定义AggregateOp的各种属性,
比如触发策略(Policy.TIME),失效策略(Policy.COUNT),窗口大小(windowEvictMillis),比如下面的示例:

1
2
3
4
5
6
7
8
private Class<? extends Aggregator> aggregatorClass;
private Policy triggerPolicy; //触发策略.比如每隔10分钟,或每隔100条数据,统计一次
private Policy evictionPolicy; //失效策略.当事件进来10分钟后,或者事件总数到达1000条后,移除
private long triggerThreshold = -1;
private long evictionThreshold = -1;
private boolean clearOnTrigger = false;
private long windowEvictMillis = 3600000; // 60 minutes by default
private Map<String,String> config = new HashMap<String,String>();

Policy分为三种:COUNT, TIME, TIME_DELTA_LT

有必要再解释下AggregateOp各个属性代表的含义,这对AggregateBolt的处理至关重要:

  1. 触发策略/触发阈值:满足触发阈值时,执行emitAggregator方法
  2. 删除策略/删除阈值:窗口时间饱和/事件数量饱和时,删除最旧的事件,确保内存空间,不一定会执行触发策略
  3. 窗口大小:
  4. 自定义配置:定义Partition用于分区/分组的字段等
AggregateWindow

AggregatorWindow是Aggregator和Window的组合。groupedIndex作为分组的unique key。
比如前面示例中Partition的字段=”key1”,那么key1对应的不同值作为分组条件。

1
2
3
4
5
6
7
8
9
10
11
public class AggregatorWindow extends Window {
//聚合动作,计算策略
private Aggregator aggregator;

public AggregatorWindow(Aggregator aggregator, String groupedIndex, long size) {
//基于容量的双端队列
events = new AggregatorLimitingDeque(size, aggregator);
this.groupedIndex = groupedIndex;
this.aggregator = aggregator;
}
}

AggregatorWindow支持事件的添加和删除,注意这里只是针对一个事件。

  • 当往Window中添加事件,还要调用Aggregator的added方法添加一个WindowItem,
  • 从Window中删除事件时,也要调用Aggregator的evicted方法删除WindowItem。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public WindowItem add(Event event, String previousStream) {
WindowItem item = super.add(event, previousStream); //往窗口中添加一条事件
aggregator.added(item); //更新聚合里的变量
return item;
}
public WindowItem expire() {
WindowItem item = super.expire(); //从窗口中删除一条事件(可能是窗口过期了,可能是事件数量太多了)
aggregator.evicted(item); //更新聚合里的变量
return item;
}

public void clear() {
while(size() > 0) expire(); //清除所有WindowItem
}

Window

一个Window作为事件的存储队列,为了更方便地支持队列元素的增加和删除,采用双端队列:

  1. 添加事件时,加入到队列尾部(add等价于addLast)
  2. 删除事件时,从队列尾部移除(这样会有问题吗?)

AggregatorWindow继承了Window,而且AggregatorWindow最终是作为AggregatorBolt的windows缓存的最内层存储对象。
虽然我们保证了Aggregator添加事件/删除事件时更新了成员变量,但是Window本身还是要存储原始事件的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 窗口, 一个窗口内有很多事件, 用双端队列events保存这个窗口内的所有事件
*/
public class Window {
protected String groupedIndex; // a unique key given to the groupBy field/value combinations in the window buffer
protected Deque<WindowItem> events; // using standard array list for proof of concept. Circular buffer needs to be used after concept is proven
protected int triggerTicks = 0; //计数器:在基于时间的触发策略中,在tick中增加计数器,表示增加了一秒; 在基于次数的触发策略中,在非tick中增加计数器,表示增加了一条事件

//A progressive window buffer which automatically evicts by count
//渐进的窗口缓冲区, 根据数量进行失效. 即窗口内的事件数量有个阈值, 当达到阈值时, 移除最早加入窗口内的事件.
public Window(String groupedIndex, long size) {
this.groupedIndex = groupedIndex;
events = new LimitingDeque<WindowItem>(size);
}
//大小无限制的队列
public Window(String groupedIndex) {
this.groupedIndex = groupedIndex;
events = new LinkedBlockingDeque<WindowItem>();
}

//添加一个事件到窗口队列中,并返回这个事件
public WindowItem add(Event event, String previousStream) {
WindowItem item = new WindowItem(event, currentTimeMillis(), previousStream);
events.add(item);
return item;
}
//Used for count-based expiration 基于个数的失效
public WindowItem expire() {
return events.removeLast();
}
}

WindowItem除了原始的事件记录外,当前时间撮(进入Bolt时的时间撮,不是事件的时间撮),以及前一个Stream的名称。

通常在StreamProcessing中,事件会存在两种时间撮:事件本身的时间撮,事件进入到Bolt里的时间撮。
其实由于事件进入窗口后,没有立即被处理,应该还存在一个事件被处理的时间撮(这里等同于进入Bolt的时间撮)。
flowmix-event time

1
2
3
4
5
public class WindowItem {
Event event;
long timestamp;
String previousStream;
}

time evict/count emit

在收到事件时,需要处理基于时间的失效策略(EvictionPolicy=TIME)和基于次数的触发策略(TriggerPolicy=COUNT)。

flowmix-policy

为什么没有基于时间的触发策略,或者基于次数的失效策略?先来看下基于次数的触发和失效策略:

  1. 触发策略跟收到的Tuple类型有关,这里是Event,Bolt处理一条Event算一次,跟时间无关,所以触发策略是COUNT
  2. 基于次数的失效策略,是内置在Window的add操作中,因为Window是有界队列,如果事件数量超过队列大小,会自动移除事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Cache<String, AggregatorWindow> windowCache = windows.get(flowInfo.getFlowId() + "\0" + flowInfo.getStreamName() + "\0" + flowInfo.getIdx());
AggregatorWindow window = null;
if(windowCache != null) {
window = windowCache.getIfPresent(flowInfo.getPartition());
if(window != null) {
if(op.getEvictionPolicy() == Policy.TIME)
window.timeEvict(op.getEvictionThreshold());
} else {
window = buildWindow(op, flowInfo.getStreamName(), flowInfo.getIdx(), flowInfo.getPartition(), flowInfo.getFlowId(), windowCache);
}
}else{
}
//AggregatorWindow包括了Aggregator和Window,添加事件时不仅添加到Window中,也会更新Aggregator的变量
window.add(flowInfo.getEvent(), flowInfo.getPreviousStream());
windowCache.put(flowInfo.getPartition(), window);
//构建完AggregatorWindow,将事件放入窗口内
//由于Bolt处理的记录级别,所以需要知道什么时候触发执行
//基于COUNT的窗口一般通过计数器,处理一条记录,计数器+1,当计数器值等于阈值时,触发执行
if(op.getTriggerPolicy() == Policy.COUNT) {
window.incrTriggerTicks(); //Bolt每次处理一条记录,计数器+1
if(window.getTriggerTicks() == op.getTriggerThreshold()){ //计数器达到客户端设置的阈值,kick off
emitAggregate(flow, op, flowInfo.getStreamName(), flowInfo.getIdx(), window);
}
}

基于Count的emit,如果计数器值等于阈值,就可以发射:

flowmix-count-emitAggregate

基于时间的失效策略参数thresholdInSeconds表示只保存这个时间段内的事件,thresholdInSeconds=60s表示只保存最近一分钟的。

AggregatorWindow.timeEvict()基于时间的evict操作,和基于Count的expire()删除策略类似,都要操作events双端队列
以及调用aggregator.evicted(item)更新保存在Aggregator的临时变量(这个变量最终会被用于计算聚合结果)。

由于事件添加到队列中按照时间顺序(排除事件时间撮out of order的场景),基于时间的删除做法是:找出队列第一个元素(peek),
如果第一条事件的时间撮还没有超过thresholdInSeconds,说明队列中其他所有事件都还没有超时,表示不需要删除任何事件。

如果第一条事件的时间撮超过thresholdInSeconds。比如阈值是一分钟,当前时间是09:15:30,而第一条事件的时间撮=09:14:00,
当前时间-第一条事件时间撮=09:15:30-09:14:00=01:30>1min,说明该事件已经超时,需要移除(即poll)。
基于时间的删除,不止检查一个事件(对比基于次数,一次只删除一个,因为进来一个事件,如果队列满了,就删除一个事件),
如果队列中还有超时的,都要全部移除出去。比如队列中每条事件的时间撮分别是[09:14:00,09:14:01,09:14:10,09:14:35,…]
当前时间=09:15:30,前三个事件都超时了,都会被移除,检查到第四条事件=09:14:35,间隔<1min,才不会继续检查,退出循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Used for age-based expiration(time-based)
* 基于时间的失效策略: events.peek()选择队列中的第一个元素,因为基于时间的事件是按照时间顺序加入到队列中.
* 如果第一个事件过期了,则从队列中弹出该事件,并调用aggregator.evicted使该事件失效,并继续判断下一个事件.
* 如果第一个事件没有过期,则后面的事件也一定不会过期.
*/
public void timeEvict(long thresholdInSeconds) {
while(events != null && events.peek() != null &&
(System.currentTimeMillis() - events.peek().getTimestamp()) >= (thresholdInSeconds * 1000)) {
WindowItem item = events.poll();
aggregator.evicted(item);
}
}

问题:处理记录级别–对应的是COUNT,还跟基于时间的失效策略有关?(基于时间的触发策略我们知道不能在接收事件这里处理)
答案:Bolt处理每条事件时,对不必要的事件进行过滤是最恰当的时机,如果不需要,根本就不需要存储到窗口(能删除尽早删除)。

这里假设当前处理的事件总是落在thresholdInSeconds范围内,如果当前事件已经超时了,不应该添加到窗口中

buildWindow

buildWindow构建AggregatorWindow,如果EvictPolicy为Count时,第三个参数用来作为队列的大小。
如果没有第三个参数,那么队列是无界的,比如EvictPolicy为Time时,不限制窗口中事件的数量。

当然并不是说失效策略为Time时,就不允许EvictionThreshold没有值,基于Time的失效策略,
设置EvictionThreshold表示窗口内的事件的时间撮和当前时间相比,不能超过阈值。

1
2
3
4
5
6
//客户端的失效策略基于次数Count,需要第三个参数(失效阈值)作为队列大小
window=new AggregatorWindow(agg, hash, op.getEvictionThreshold());

//尽管这里处理的是事件Tuple,也要处理失效策略为TIME的timeEvict删除事件
if(op.getEvictionPolicy() == Policy.TIME)
window.timeEvict(op.getEvictionThreshold());

通常聚合算子前一个是Partition算子,并更新aggConfig,假设Partition指定的fields=”key1”,”key2”,
配置为:config.put(“GROUP_BY”,”key1.key2”),用SQL表示就是group by key1,key2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//window=null或者windowCache=null,都会创建新的聚合窗口. 并添加到windows map中. 而windows内层的KV中,因为value是新的聚合窗口,所以windowCache也要放一份.
private AggregatorWindow buildWindow(AggregateOp op, String stream, int idx, String hash, String flowId, Cache<String, AggregatorWindow> windowCache) {
Aggregator agg = op.getAggregatorClass().newInstance();
AggregatorWindow window = op.getEvictionPolicy() == Policy.TIME || op.getEvictionPolicy() == Policy.TIME_DELTA_LT ?
new AggregatorWindow(agg, hash) :
//如果是Count,后台会使用带有容量限制的双端队列. 它的事件失效策略由队列自己控制:比如添加事件进来时,如果队列满了,自动删除队列头事件.所以不需要代码中手动控制.
new AggregatorWindow(agg, hash, op.getEvictionThreshold());

//上一个是PartitionOp,则根据PartitionOp的字段进行GroupBy
FlowOp flowOp = getFlowOpFromStream(flows, flowId, stream, idx-1);
if(flowOp instanceof PartitionOp) {
PartitionOp partitionOp = (PartitionOp)flowOp;
Map<String,String> aggConfig = op.getConfig();
aggConfig.put(GROUP_BY, join(partitionOp.getFields(), GROUP_BY_DELIM));
//手动调用更新配置
agg.configure(aggConfig);
}

//新创建的聚合窗口AggregatorWindow,里面还没有填充数据,比如将事件加入到窗口中. 所以最后返回这个聚合窗口, 在execute中往window中填充事件.
windowCache.put(hash, window);
//windows被看做是key -> (key -> value)的结构. 内层的key->value即windowCache. 由于是创建新的聚合窗口,说明这是第一次加入到windows中.
//idx是AggregatorBolt在streams中的位置,注意key并没有和hash(partition)相关. idx更外层,hash为内层map的key.
windows.put(flowId + "\0" + stream + "\0" + idx, windowCache);
return window;
}

处理Tick心跳

tick tuple background

参考:http://stackoverflow.com/questions/12603920/twitter-storms-window-on-aggregation
和:http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
int tickFrequencyInSeconds = 10;
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
return conf;
}
public void execute(Tuple tuple) {
if (isTickTuple(tuple)) {
// now you can trigger e.g. a periodic activity
}
else {
// do something with the normal tuple
}
}
private static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

问题:和处理事件方式获取FlowOp不同的是,这里要遍历Stream的所有FlowOp,为什么不根据索引位置来查询呢?
如果FlowOp数组链表中有多个AggregateOp,这样遍历会不会有问题,因为会找出所有的AggregateOp?
答案:Topology中只有TickSpout会发射心跳,并且会发射心跳给除了FlowInitBolt外的其他所有Bolt。
Bolt接收的心跳只能来自于TickSpout,Bolt不会往下游Bolt发射心跳。而TickSpout没有发射索引位置!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for(Flow flow : flows.values()) {
for(StreamDef curStream : flow.getStreams()) {
//一个Stream中会有多个AggOp. 从0开始找这样的AggOp. 即使不是AggOp,idx++.这样找到的AggOp,此时的idx表示AggOp在flowOps中第几个.
int idx = 0;
for(FlowOp curFlowOp : curStream.getFlowOps()) {
if(curFlowOp instanceof AggregateOp) {
AggregateOp op = (AggregateOp)curFlowOp;
//...
}
//FlowOp的索引
idx++;
}
}
}

tick时必须保证windowCache有数据,就好像Bolt处理Event数据时,必须保证flows有数据。

  1. Bolt收到Tick心跳,但是没有收到Event事件,没有事件,有心跳也是白搭,心跳是无效的
  2. Bolt收到Event数据,但是没有flows,就不会有FlowOp,不会有算子对事件进行计算,即使收到事件是无效的

所以对于AggregateBolt而言,接收各种事件的顺序为:flows -> Event -> Tick
因为Event事件先接收,而在处理Event事件时会构造windows双层Map,所以tick时,windowCache一定有数据。

flowmix-tick

如果一个Stream中存在相同的FlowOp,目前的实现中这部分貌似没有做太多的逻辑判断。

由于Stream可能有多个AggregatorBolt,tick方式没有上下文位置信息,所以只能获取所有的AggregateOp。
不过由于event方式得到的windows的限制,如果找到的不属于当前AggregatorBolt对应的AggregateOp,
是不会处理那样的AggregateOp的。举例假设Stream的FlowOp=[Select,Aggregate,Select,Aggregate]。
如果当前的AggregateBolt是第二个,在处理事件时,windows的key只可能是”flow-stream-2”,
尽管处理tick时,会找到第二个和第四个AggregateOp,但是windows.get(“flow-stream-4”)得到的windowCache一定为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//只有聚合操作符才需要统计动作,即触发窗口内的所有事件的聚合动作(当然不一定是立即计算)
//对于窗口内的所有事件,每当tick一次,基于时间的失效策略需要移除过期的事件.
AggregateOp op = (AggregateOp)curFlowOp;
//If we need to trigger any time-based policies, let's do that here 基于时间的策略(触发策略和失效策略)的动作
if(op.getTriggerPolicy() == Policy.TIME || op.getEvictionPolicy() == Policy.TIME) {
Cache<String, AggregatorWindow> windowCache = windows.get(flow.getId() + "\0" + curStream.getName() + "\0" + idx);
//Cache中必须有,如果Cache为空,说明这个时间段内都没有事件进来.没有事件就没有Cache,就不需要移除事件的失效动作和统计结果的发射动作.
if(windowCache != null) {
//windowCache怎么存放数据在非tick中:<flowInfo.getPartition(), AggregatorWindow>.
//如果没有partition,则只有一个AggregatorWindow. 则在满足ticks后,调用一次emitAggregate
//如果有多个partition,则每个partition都有一个Window,并且每个partition在满足ticks后都会调用emitAggregate
//是否达到ticks是由每个AggregatorWindow的计数器确定的.
for(AggregatorWindow window : windowCache.asMap().values()) {
//失效策略,每tick一秒,移除过期的事件
if(op.getEvictionPolicy() == Policy.TIME)
window.timeEvict(op.getEvictionThreshold());

//触发策略,每tick一秒,增加时间tick计数器. 每隔多长时间触发一次统计动作是由这个计数器引起的!
if(op.getTriggerPolicy() == Policy.TIME)
window.incrTriggerTicks();

//因为每秒都会增加一次tick计数器. 最后计数器的值会达到阈值,触发聚合动作
//假设Stream设置trigger(Policy.TIME, 5)即每5秒触发一次统计(单位是秒,所以计数器每秒+1,加到阈值后,刚好满足触发条件).
//则tick spout发送5次tick tuple后.AggregatorBolt会统计5秒内的事件并发射给下一个Bolt
if(window.getTriggerTicks() == op.getTriggerThreshold()){
System.out.println("TIME emitAggregate..");
emitAggregate(flow, op, curStream.getName(), idx, window);
}
}
}
}

这里针对tick tuple,有多种操作:

每次收到tick tuple,说明时间过去了interval,那么AggregatorWindow中的事件(Window)和计算结果临时变量(Aggregator)都要更新。
因为处理的不是事件(Bolt接收的但是event tuple,表示进来一条新的事件),所以只会进行evict操作,而不会有add操作。

tuple AggregatorWindow.add AggregatorWindow.evict
event 接收新的事件,添加事件到窗口,并更新Aggregator的临时变量 接收新事件,由于窗口事件数量的限制可能需要移除事件
tick - 接收新的心跳,如果窗口队列的事件超时,应该删除

Bolt接收event tuple,触发策略为COUNT时,会在event tuple的处理过程中增加计数器,当窗口事件数量累积到阈值时,会计算最终结果并发射到下游组件。
Bolt接收tick tuple,触发策略为TIME时,也会增加计数器(计数器可以用来表示COUNT和TIME),同样计数器达到阈值时,也会执行emitAggregate。

tick和event不同的是,处理event时,因为每条事件对应的Window是确定的,计数器只会+1,而tick需要操作所有Window!

flowmix-time1

flowmix-time2

flowmix-time3

flowmix-time4

flowmix-time5

flowmix-time6

思考问题:基于Count的evict,如果计数器一直没有到达,那么是不是永远不会处置emit,

emitAggregate

TriggerPolicy为TIME或者COUNT,在满足条件时(达到触发阈值)都会执行emitAggregate。
对于COUNT,由于在处理事件时触发,所以一次只操作一个Window,对于TIMER,一次会操作多个Window。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void emitAggregate(Flow flow, AggregateOp op, String stream, int idx, AggregatorWindow window) {
//从窗口获取到的也是事件,这样可以继续将聚合结果下发给下一个组件.
//while(window.getEvents().iterator().hasNext()){
// System.out.println("AggregateEvents:"+window.getEvents().iterator().next().getEvent());
//}
System.out.println(sdf.format(System.currentTimeMillis())+" Begin Aggregate....[aggOpIndex:"+idx+", groupIndex:" + window.getGroupedIndex()+"]");

Collection<AggregatedEvent> eventsToEmit = window.getAggregate();
String nextStream = idx+1 < flow.getStream(stream).getFlowOps().size() ? getFlowOpFromStream(flow, stream, idx + 1).getComponentName() : "output";

if(hasNextOutput(flow, stream, nextStream)) {
for(AggregatedEvent event : eventsToEmit) {
String previousStream = event.getPreviousStream() != null ? event.getPreviousStream() : stream;
// Note: If aggregated event isn't keeping the previous stream, it's possible it could be lost
collector.emit(nextStream, new Values(flow.getId(), event.getEvent(), idx, stream, previousStream));
}
}

// send to any other streams that are configured (aside from output)
if(exportsToOtherStreams(flow, stream, nextStream)) {
for(String output : flow.getStream(stream).getOutputs()) {
for(AggregatedEvent event : eventsToEmit) {
String outputComponent = flow.getStream(output).getFlowOps().get(0).getComponentName();
collector.emit(outputComponent, new Values(flow.getId(), event.getEvent(), -1, output, stream));
}
}
}

//计数器达到阈值后,触发emitAggregate调用,上面对窗口内的事件统计完毕后,清空计数器.使得下一个窗口继续使用新的计数器.
if(op.isClearOnTrigger()) window.clear();
window.resetTriggerTicks();
}

再论AggregatorWindow:

由于AggregatorWindow包括了Aggregator和Window,当Tuple事件进来时,不仅要将原始事件加入到Window中暂存,
还要更新Aggregator的相关临时变量的值,最后在emitAggregate时,获取Aggregator最新的结果发射出去。也就是说
AggregatorWindow.add()和evict()方法会被调用多次(其中add每次都会调用,但是evict根据失效策略只有在达到阈值时才调用),
但是emitAggregate()只会调用一次(不管是TIMER还是COUNT,只有达到触发器的计数器阈值时才调用)。
最后在发射出去之后,要清空Window中的事件(如果允许清空的话,有些场景可能不需要清空),并且重置触发器的计数器。

AbstractAggregator

所有的Aggregator实现类都实现了AbstractAggregator抽象父类。

示例

场景:用户登陆事件(有IP,有账号)

RDBMS DataModel

Event ip account
E1 1.1.1.1 张三
E2 1.1.1.2 李四
E3 1.1.1.1 王五

NOSQL DataModel

Event key1 key2 key3 key4 type
E1 ip account 1.1.1.1 张三 login
E2 ip account 1.1.1.2 李四 login
E3 ip account 1.1.1.1 王五 login
Count
  1. 十分钟的登陆次数, key3能代表这是一条登陆事件
  2. 一天的交易次数,key3代表一条交易事件
  3. 一天的调用量,key3代表一条事件,可以用sequence_id字段表示

SQL:select count(key3) from tbl where key3 is not null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public List<Flow> count() {
Flow flow = new FlowBuilder()
.id("flow1")
.flowDefs()
.stream("stream1")
.select().fields("key3").end()
.aggregate().aggregator(CountAggregator.class)
.config("operatedField", "key3")
.evict(Policy.COUNT, 50000)
.trigger(Policy.TIME, 5)
.windowEvictMillis(3600000) //60min window
.clearOnTrigger().end()
.endStream()
.endDefs()
.createFlow();
return asList(new Flow[]{flow});
}
GroupBy(Partition)

一天内各个合作方的调用量, key3代表一条调用记录,比如sequence_id

SQL:select key3, count(*) from tbl group by key3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public  List<Flow> partitionCount() {
Flow flow = new FlowBuilder()
.id("flow1")
.flowDefs()
.stream("stream1")
.select().fields("key3").end()
/**
* Every 5 seconds, emit the counts of events grouped by the key3 field.
* Don't allow more than 50000 items to exist in the window at any point in time (maxCount = 50000)
* remove this to get the total number of events
*/
.partition().fields("key3").end()
.aggregate().aggregator(CountAggregator.class)
.config("operatedField", "key3")
.evict(Policy.COUNT, 50000)
.trigger(Policy.TIME, 5)
.clearOnTrigger().end()
.endStream()
.endDefs()
.createFlow();
return asList(new Flow[]{flow});
}

Fields

AbstractAggregator有多种字段:

  1. operatedField
  2. outputField
  3. groupByFields

operatedField表示要取Event中该字段进行计算,groupByFields表示分组条件,这两个字段可以不同。
举例SQL:select sum(key3) from tbl group by key1,key2。对应的

  • operatedField = key3
  • groupByFields = [key1,key2]

其中groupByFields是由Aggregator之前的PartitionOp确定的,在buildWindow中创建
AggregatorWindow时会获取PartitionOp的Fields,并更新AggregatorOp的aggConfig Map。
以上面的SQL为例,在partition中指定fields=[key1,key2],在aggregate中指定operateField=key3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//Application
.partition().fields("key1,key2").end()
.aggregate().aggregator(SumAggregator.class)
.config("operatedField", "key3")
.evict(Policy.COUNT, 50000)
.trigger(Policy.TIME, 5)
.clearOnTrigger().end()

//AggregatorBolt.buildWindow
FlowOp flowOp = getFlowOpFromStream(flows, flowId, stream, idx-1);
if(flowOp instanceof PartitionOp) {
PartitionOp partitionOp = (PartitionOp)flowOp;
Map<String,String> aggConfig = op.getConfig();
aggConfig.put(GROUP_BY, join(partitionOp.getFields(), GROUP_BY_DELIM));
//手动调用更新配置
agg.configure(aggConfig);
}

需要注意的是:

  1. operatedField只能有一个字段
  2. partition的groupByFields可以有多个,用逗号分隔

这样通过Event获取operateField的值,也只能有一个。即
如果我们想获取多个字段并计算怎么办?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public abstract class AbstractAggregator<T, F> implements Aggregator {
public static final String OPERATED_FIELD = "operatedField"; // field to operate with
public static final String OUTPUT_FIELD = "outputField"; // output field
public static final String ASSOC_FIELD = "assocField";

protected Map<String, Collection<Tuple>> groupedValues; // grouped fields description
protected String[] groupByFields; // fields to group by
protected String operatedField; // operated field name
protected String outputField = getOutputField(); // output field set by implementation

private String[] assocField; //关联字段,有两个,比如IP在账户上的关联个数.

public void configure(Map<String, String> configuration) {
if (configuration.get(GROUP_BY) != null) {
groupByFields = StringUtils.splitPreserveAllTokens(configuration.get(GROUP_BY), GROUP_BY_DELIM);
}
if (configuration.get(OUTPUT_FIELD) != null) {
outputField = configuration.get(OUTPUT_FIELD);
}
if (configuration.get(OPERATED_FIELD) != null) {
operatedField = configuration.get(OPERATED_FIELD);
}
if (configuration.get(ASSOC_FIELD) != null) {
assocField = StringUtils.splitPreserveAllTokens(configuration.get(ASSOC_FIELD), ",");
}
}

public void added(WindowItem item){
//第一个窗口项(事件)进来时满足条件.后面的窗口项因为groupedValues!=null,不满足条件.
if (groupedValues == null && groupByFields != null) {
groupedValues = new HashMap<String, Collection<Tuple>>();
for (String group : groupByFields) {
groupedValues.put(group, item.getEvent().getAll(group));
}
}
if (item.getEvent().get(operatedField) != null) {
//参数是事件记录中操作字段的值. 而不是事件记录本身.比如要聚合sum(key3),则我们要把key3的值取出来,作为sum的参数
add(((F) item.getEvent().get(operatedField).getValue()));
}
//TODO: 关联字段
if(assocField != null){
//如果关联的两个字段类型不同呢, 所以最好自定义的Aggregator的F为Object类型.
add(((F) item.getEvent().get(assocField[0]).getValue()), ((F) item.getEvent().get(assocField[1]).getValue()));
}
}
}

CountAggregator

DistinctCountAggregator

AssocCountAggregator


文章目录
  1. 1. Introduce
  2. 2. 示例
  3. 3. Topology组件
  4. 4. Builder
    1. 4.1. 算子Builder
  5. 5. FlowmixBuilder
    1. 5.1. Event Spout
    2. 5.2. FlowLoader Spout
    3. 5.3. Tick Spout
    4. 5.4. FlowInitializerBolt
    5. 5.5. Dynamic FlowOp’s Bolt Definition
      1. 5.5.1. FlowInitBolt下一个Bolt
      2. 5.5.2. 算子Bolt之间的上下游依赖关系
  6. 6. 算子Bolt
    1. 6.1. SelectBolt
    2. 6.2. FilterBolt
    3. 6.3. EachBolt
    4. 6.4. PartitionBolt
    5. 6.5. Event
    6. 6.6. AggregatorBolt
      1. 6.6.1. 处理Event事件
      2. 6.6.2. AggregateOp
        1. 6.6.2.1. AggregateWindow
      3. 6.6.3. Window
      4. 6.6.4. time evict/count emit
      5. 6.6.5. buildWindow
      6. 6.6.6. 处理Tick心跳
      7. 6.6.7. emitAggregate
    7. 6.7. AbstractAggregator
      1. 6.7.1. 示例
        1. 6.7.1.1. Count
        2. 6.7.1.2. GroupBy(Partition)
      2. 6.7.2. Fields
      3. 6.7.3. CountAggregator
      4. 6.7.4. DistinctCountAggregator
      5. 6.7.5. AssocCountAggregator