用实例理解Storm的Stream概念

用实例理解Storm的Stream概念

缘起

事情源于在看基于Storm的CEP引擎:flowmix
FlowmixBuilder代码,
每个Bolt设置了这么多的Group
而且declareStream也声明了这么多的stream-id,
对于只写过WordCountTopology的小白而言,
直接懵逼了,没见过这么用的啊,我承认一开始是拒绝的,每个Bolt都设置了这么多Group,这TMD拓扑图是什么样的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public TopologyBuilder create() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(EVENT, (IRichSpout) eventsComponent, eventLoaderParallelism == -1 ? parallelismHint : eventLoaderParallelism);
builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1);
builder.setSpout("tick", new TickSpout(1000), 1);
builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint) // kicks off a flow determining where to start
.localOrShuffleGrouping(EVENT)
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM);

declarebolt(builder, FILTER, new FilterBolt(), parallelismHint, true);
declarebolt(builder, SELECT, new SelectorBolt(), parallelismHint, true);
declarebolt(builder, PARTITION, new PartitionBolt(), parallelismHint, true);
declarebolt(builder, SWITCH, new SwitchBolt(), parallelismHint, true);
declarebolt(builder, AGGREGATE, new AggregatorBolt(), parallelismHint, true);
declarebolt(builder, JOIN, new JoinBolt(), parallelismHint, true);
declarebolt(builder, EACH, new EachBolt(), parallelismHint, true);
declarebolt(builder, SORT, new SortBolt(), parallelismHint, true);
declarebolt(builder, SPLIT, new SplitBolt(), parallelismHint, true);
declarebolt(builder, OUTPUT, outputBolt, parallelismHint, false);

return builder;
}
private static void declarebolt(TopologyBuilder builder, String boltName, IRichBolt bolt, int parallelism, boolean control) {
BoltDeclarer declarer = builder.setBolt(boltName, bolt, parallelism)
.allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
.allGrouping("tick", "tick")
.localOrShuffleGrouping(INITIALIZER, boltName)
.localOrShuffleGrouping(FILTER, boltName)
.fieldsGrouping(PARTITION, boltName, new Fields(FLOW_ID, PARTITION)) // guaranteed partitions will always group the same flow for flows that have joins with default partitions.
.localOrShuffleGrouping(AGGREGATE, boltName)
.localOrShuffleGrouping(SELECT, boltName)
.localOrShuffleGrouping(EACH, boltName)
.localOrShuffleGrouping(SORT, boltName)
.localOrShuffleGrouping(SWITCH, boltName)
.localOrShuffleGrouping(SPLIT, boltName)
.localOrShuffleGrouping(JOIN, boltName);
}
public static void declareOutputStreams(OutputFieldsDeclarer declarer, Fields fields) {
declarer.declareStream(PARTITION, fields);
declarer.declareStream(FILTER, fields);
declarer.declareStream(SELECT, fields);
declarer.declareStream(AGGREGATE, fields);
declarer.declareStream(SWITCH, fields);
declarer.declareStream(SORT, fields);
declarer.declareStream(JOIN, fields);
declarer.declareStream(SPLIT, fields);
declarer.declareStream(EACH, fields);
declarer.declareStream(OUTPUT, fields);
}

先来复习下经典的WordCountTopology

WordCountTopology Default Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class WordCountTopologySimple {

public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector collector;
Random rand;
String[] sentences = null;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
}

@Override
public void nextTuple() {
Utils.sleep(1000);
String sentence = sentences[rand.nextInt(sentences.length)];
System.out.println("\n" + sentence);
this.collector.emit(new Values(sentence));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
public void ack(Object id) {}
public void fail(Object id) {}
}

public static class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));
}
this.collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

public static class WordCountBolt extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
private OutputCollector collector;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static class PrinterBolt extends BaseBasicBolt {
private OutputCollector collector;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String first = tuple.getString(0);
int second = tuple.getInteger(1);
System.out.println(first + "," + second);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {}
}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);

builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");

Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}

SingleStream

默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是default

stream-1

可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。

代码说明:发射时指定一个stream-id,声明流时指定一个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class RandomSentenceSpout {
public void nextTuple() {
Utils.sleep(1000);
String sentence = sentences[rand.nextInt(sentences.length)];
System.out.println("\n" + sentence);
this.collector.emit("split-stream", new Values(sentence));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("sentence"));
}
}
class SplitSentenceBolt {
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit("count-stream", new Values(word));
}
this.collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("count-stream", new Fields("word"));
}
}
class WordCountBolt {
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
collector.emit("print-stream", new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("print-stream", new Fields("word", "count"));
}
}
class Topology {
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);

builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
}
}

使用自定义stream-id,主要分成两个步骤:

stream-2_1

stream-2_2

下图示例细说明了拓扑图中各个组件是怎么协调工作的:

stream-3

MultiStream

Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。
虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,
另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。
但是可以看到在declareStream中,要同时指定所有的stream-id。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void execute(Tuple input) {
String word = input.getString(0);
//小于j的word发送给stream1; 大于j的word发送给stream2;
if(word.compareTo("j") < 0){
collector.emit("stream1", new Values(word));
}else if(word.compareTo("j") > 0){
collector.emit("stream2", new Values(word));
}
//不管什么都发送给stream3
collector.emit("stream3", new Values(word));
}
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("stream1", new Fields("word"));
outputFieldsDeclarer.declareStream("stream2", new Fields("word"));
outputFieldsDeclarer.declareStream("stream3", new Fields("word"));
}

程序员都喜欢流程图,喏,下图左上角第一个就是了,右上角是对应到Storm中的Topology,下面两图示例了两条消息在Storm的消息流的走向。

stream-4

仿照上面的示例,对WordCountTopology的Spout/Bolt的发射方法都指定一个输出的stream-id,
同时在declareOutputFields声明多个输出的stream-id。

现在虽然Spout/Bolt声明了多个输出stream-id,但是emit时还是只发射到一个stream-id中。
所以本质上和前面的SingleStream是一样的,所以Topology不需要做任何改动也还是可以运行的。

代码说明:发射时指定一个stream-id,声明流时指定多个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id
emit不变,topology不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class RandomSentenceSpout {
public void nextTuple() {
this.collector.emit("split-stream", new Values(sentence)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("sentence")); //⬅
declarer.declareStream("count-stream", new Fields("sentence"));
declarer.declareStream("print-stream", new Fields("sentence"));
}
}
class SplitSentenceBolt {
public void execute(Tuple tuple) {
for (String word : words) {
this.collector.emit("count-stream", new Values(word)); //⬅
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("word"));
declarer.declareStream("count-stream", new Fields("word")); //⬅
declarer.declareStream("print-stream", new Fields("word"));
}
}
class WordCountBolt {
public void execute(Tuple tuple) {
collector.emit("print-stream", new Values(word, count)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("split-stream", new Fields("word", "count"));
declarer.declareStream("count-stream", new Fields("word", "count"));
declarer.declareStream("print-stream", new Fields("word", "count")); //⬅
}
}
class Topology {
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);

builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");
}
}

那么我们为什么还要在Spout/Bolt中定义多个输出流呢?观察这部分代码,stream-id都是一样的,不同的是Fields部分,
如果将每个Spout/Bolt的多个declarer.declareStream抽取出来:

1
2
3
4
5
6
public static void declareStream(OutputFieldsDeclarer declarer, 
Fields fields){
declarer.declareStream("split-stream", fields);
declarer.declareStream("count-stream", fields);
declarer.declareStream("print-stream", fields);
}

然后在Spout/Bolt的declareOutputFields调用declareStream方法一次声明所有的stream-id,只需要传递不同的Fields即可。

代码说明:声明多个stream时,每个组件的所有stream-id都一样,传入不同的Fields
emit不变,topology不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class RandomSentenceSpout {
public void nextTuple() {
this.collector.emit("split-stream", new Values(sentence)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("sentence"));
}
}
class SplitSentenceBolt {
public void execute(Tuple tuple) {
for (String word : words) {
this.collector.emit("count-stream", new Values(word)); //⬅
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word"));
}
}
class WordCountBolt {
public void execute(Tuple tuple) {
collector.emit("print-stream", new Values(word, count)); //⬅
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word", "count"));
}
}
class Topology {
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);

builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream");
builder.setBolt("count", new WordCountBolt(), 2)
.fieldsGrouping("split", "count-stream", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}
}

这样的好处是,如果事先知道所有的stream-id,只需要定义好declareStream,每个bolt都调用这个全局的方法即可。
实际上这种方式对于构建动态拓扑图是很有用的。

stream-5

MultiGroup

通过把所有stream-id封装到一个方法中,而emit时只指定一个stream-id。
现在每个组件emit时只指定了一个stream-id,声明输出流时都指定了相同的stream-id集合。
也就是说Spout/Bolt中虽然声明了多个stream-id,但是一条消息只会选择一个stream-id。

那么可不可以对Group方式运用同样的方式呢,我们的目的是想要把setBolt这种逻辑也抽取出一个共同的方法。
下面这种方式肯定是不对的,首先无法抽取,因为每个Bolt的Group分组策略不同。

虽然是错误的,但是我们并没有对首尾组件用多个Group,这是为什么呢?
1.Spout没有所谓的分组,因为Spout就是源头,分组时指定component指的是当前component的数据源自这个指定的component
2.最后一个Bolt我们先不设置,这里有坑…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);

builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.shuffleGrouping("split", "split-stream")
.shuffleGrouping("count", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
.fieldsGrouping("spout", "count-stream", new Fields("word"))
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.fieldsGrouping("count", "count-stream", new Fields("word"))
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}

而且也无法构建拓扑图,比如WordCountBolt的输入component=”spout”时,
在拓扑图中这个组件是RandomSentenceSpout,它的输出字段名称为”sentence”,根本就没有word这个字段。
下面的错误也证实了这一点:Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})
count这个组件(即WordCountBolt)订阅了spout组件(即RandomSentenceSpout)的count-stream输出流,但是spout组件并不存在word字段。

1
2
3
4
5
6972 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:
Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})>
7002 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

正确使用多个stream-id的姿势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);

builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.fieldsGrouping("split", "split-stream", new Fields("word"))
.shuffleGrouping("count", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
.shuffleGrouping("spout", "count-stream")
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.shuffleGrouping("count", "count-stream")
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}

现在每个Bolt的Group方式都是一样的了,并且component-id也是一样的,只有最后的stream-id不同。
很好,可以像抽取declareStream那样抽取setBolt了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
main(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new RandomSentenceSpout(),1);

setBolt(builder, new SplitSentenceBolt(), "split");
setBolt(builder, new WordCountBolt(), "count");
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("count", "print-stream");
}
public static void setBolt(TopologyBuilder builder,IRichBolt bolt,String name){
builder.setBolt(name, bolt, 2)
.shuffleGrouping("spout", name + "-stream")
.fieldsGrouping("split", name + "-stream", new Fields("word"))
.shuffleGrouping("count", name + "-stream")
;
}

每个Bolt都设置了多种分组策略,而分组的第一个参数component表示数据源自哪里,
现在SplitSentenceBolt和WordCountBolt都定义了三种分组策略,
那么是不是说[split]的数据源有:[spout],[split],[count],
同样[count]的数据源也有:[spout],[split],[count],这跟实际的Topology结构就完全不一样了。
可以看到下图的拓扑结构比原先的WordCountTopology多了几条线(而且还能自己指向自己我也是醉了)。

stream-7

不过虽然每个Bolt都有多个输入源,但是输入源组件不一定有指定的stream-id。
比如split的数据源虽然有三个[spout],[split],[count],但是这三个组件中stream-id=”split-stream”的组件
只有[spout],因此即使设置了三个数据源,另外两个数据源是无效的。

stream-8

同样[count]的数据源虽然也有三个[spout],[split],[count],但是这三个组件中stream-id=”count-stream”的组件也只有[split]才有。

stream-9

所以最后实际上拓扑图还是最原始的[spout]->[split]->[count]->[print],并不会出现之前出现的多条线以及自己指向自己的情况。

stream-10

最后一个Bolt

可以把最后一个PrintBolt也都加到每个Bolt的分组策略里吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.fieldsGrouping("split", "split-stream", new Fields("word"))
.shuffleGrouping("count", "split-stream")
.shuffleGrouping("print", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
.shuffleGrouping("spout", "count-stream")
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.shuffleGrouping("count", "count-stream")
.shuffleGrouping("print", "count-stream")
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("spout", "print-stream")
.fieldsGrouping("split", "print-stream", new Fields("word"))
.shuffleGrouping("count", "print-stream") //⬅
.shuffleGrouping("print", "print-stream")
;

拓扑图是这样的,虚线表示实际上是不存在的(因为输入源本身没有发射到这些stream)。

stream-11

Opps….报错显示:[count]组件订阅了[print]组件中一个不存在的[count-stream]

1
2
3
4
5
9510 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:Component:
[count] subscribes from non-existent stream: [count-stream] of component [print])>
9552 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

下面修改不同Bolt中和Print相关的分组方式,只有把Print全部注释掉才可以

  1. 不注释: [count] subscribes from non-existent stream: [count-stream] of component [print]
  2. 注释①: [split] subscribes from non-existent stream: [split-stream] of component [print]
  3. 注释①②: [print] subscribes from non-existent stream: [print-stream] of component [print]
  4. 注释①②③: SUCCESS!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
builder.setBolt("split", new SplitSentenceBolt(), 2)
.shuffleGrouping("spout", "split-stream") //⬅
.fieldsGrouping("split", "split-stream", new Fields("word"))
.shuffleGrouping("count", "split-stream")
//.shuffleGrouping("print", "split-stream") //②
;
builder.setBolt("count", new WordCountBolt(), 2)
.shuffleGrouping("spout", "count-stream")
.fieldsGrouping("split", "count-stream", new Fields("word")) //⬅
.shuffleGrouping("count", "count-stream")
//.shuffleGrouping("print", "count-stream") //①
;
builder.setBolt("print", new PrinterBolt(), 1)
.shuffleGrouping("spout", "print-stream")
.fieldsGrouping("split", "print-stream", new Fields("word"))
.shuffleGrouping("count", "print-stream") //⬅
//.shuffleGrouping("print", "print-stream") //③
;

发生了什么事?不存在stream为什么就不行?可是前面以SplitSentenceBolt为例,split和count也不存在split-stream啊,为什么就不会报错呢?
原因在于我们的PrintBolt只是打印数据,然后什么都不做,它没有emit出任何消息,也就没有emit消息到任何消息流,所以下图中从PrintBolt出来的线根本就不存在!

stream-12

怎么办呢,很简单,给PrintBolt添加一个带有stream-id的emit,同时也要在declareOutputFields中声明这个输出流。
只要PrintBolt有输出流,就不会报错了。也就是确保每个Bolt都会往下发送消息

stream-13

最终完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public class WordCountTopologyStream3 {

public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector collector;
Random rand;
String[] sentences = null;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
}

@Override
public void nextTuple() {
Utils.sleep(1000);
String sentence = sentences[rand.nextInt(sentences.length)];
System.out.println("\n" + sentence);
this.collector.emit("split-stream", new Values(sentence));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("sentence"));
}
public void ack(Object id) {}
public void fail(Object id) {}
}

public static class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit("count-stream", new Values(word));
}
this.collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word"));
}
}

public static class WordCountBolt extends BaseRichBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
collector.emit("print-stream", new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word", "count"));
}
}

public static class PrinterBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String first = tuple.getString(0);
int second = tuple.getInteger(1);
System.out.println(first + "," + second);
collector.emit("whatever-stream", new Values(first + ":" + second)); //⬅
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declareStream(declarer, new Fields("word:count")); //⬅
}
}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);

setBolt(builder, new SplitSentenceBolt(), "split");
setBolt(builder, new WordCountBolt(), "count");
setBolt(builder, new PrinterBolt(), "print");

Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}

public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
declarer.declareStream("split-stream", fields);
declarer.declareStream("count-stream", fields);
declarer.declareStream("print-stream", fields);
declarer.declareStream("whatever-stream", fields); //⬅
}

public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
builder.setBolt(name, bolt, 2)
.shuffleGrouping("spout", name + "-stream")
.fieldsGrouping("split", name + "-stream", new Fields("word"))
.shuffleGrouping("count", name + "-stream")
.shuffleGrouping("print", name + "-stream") //⬅
;
}
}

你以为这样就完了吗,如果把PrintBolt的输出stream-id去掉,即采用默认的default的话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class PrinterBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
String first = tuple.getString(0);
int second = tuple.getInteger(1);
System.out.println(first + "," + second);
//collector.emit("whatever-stream", new Values(first + ":" + second));
collector.emit(new Values(first + ":" + second));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//declareStream(declarer, new Fields("word:count"));
declarer.declare(new Fields("word:count"));
}
}

public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
declarer.declareStream("split-stream", fields);
declarer.declareStream("count-stream", fields);
declarer.declareStream("print-stream", fields);
//declarer.declareStream("whatever-stream", fields); //⬅
}

public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
builder.setBolt(name, bolt, 2)
.shuffleGrouping("spout", name + "-stream")
.fieldsGrouping("split", name + "-stream", new Fields("word"))
.shuffleGrouping("count", name + "-stream")
.shuffleGrouping("print", name + "-stream")
;
}

还是报错:[count]组件订阅了[print]组件中不存在的[count-stream]

1
Component: [count] subscribes from non-existent stream: [count-stream] of component [print]

好吧,看来前面的组件都使用自定义的stream-id,最后一个组件也必须使用自定义的stream-id,即使这个stream-id看起来没什么意义!

EOF.


文章目录
  1. 1. 缘起
  2. 2. WordCountTopology Default Stream
  3. 3. SingleStream
  4. 4. MultiStream
  5. 5. MultiGroup
  6. 6. 最后一个Bolt