Storm Picture

图解Storm

Storm Component

消息传递

Acker

Trident

TODO

Some Code

1
2
3
4
5
6
7
8
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context,
SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public static class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;

public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(tuple, new Values(word));
}
this.collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

public static class WordCountBolt extends BaseRichBolt {
OutputCollector collector;
Map<String, Integer> counts = new HashMap<String, Integer>();
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) count = 0;
count++;
counts.put(word, count);
collector.emit(tuple, new Values(word, count));
collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public class SplitSentenceBolt extends BaseBasicBolt {

public void execute(Tuple tuple, BasicOutputCollector collector){
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

public static class RandomSentenceSpout implements IRichSpout {
SpoutOutputCollector collector;
Random rand;
String[] sentences = null;
AtomicInteger counter = null;

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
counter = new AtomicInteger(1);
sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
}
public void nextTuple() {
Utils.sleep(5000);
int msgId = counter.getAndIncrement();
String sentence = sentences[rand.nextInt(sentences.length)];
System.out.println(">>>>>>>>>>emit sentence["+msgId+"]:" + sentence);
this.collector.emit(new Values(sentence), msgId);
}
public void ack(Object msgId) {
System.out.println(">>>>>>>>>>ack:"+msgId);
}
public void fail(Object msgId) {
System.out.println(">>>>>>>>>>fail:"+msgId);
}
}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt2(), 2).fieldsGrouping("split", new Fields("word"));
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");

Config conf = new Config();
conf.setNumAckers(1);

if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(32000);
cluster.shutdown();
}
}

文章目录
  1. 1. Storm Component
  2. 2. 消息传递
  3. 3. Acker
  4. 4. Trident
  5. 5. Some Code