1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| public static class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector;
public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; }
public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { this.collector.emit(tuple, new Values(word)); } this.collector.ack(tuple); }
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
public static class WordCountBolt extends BaseRichBolt { OutputCollector collector; Map<String, Integer> counts = new HashMap<String, Integer>(); public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(tuple, new Values(word, count)); collector.ack(tuple); }
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
public class SplitSentenceBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector){ String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } }
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
public static class RandomSentenceSpout implements IRichSpout { SpoutOutputCollector collector; Random rand; String[] sentences = null; AtomicInteger counter = null;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; rand = new Random(); counter = new AtomicInteger(1); sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; } public void nextTuple() { Utils.sleep(5000); int msgId = counter.getAndIncrement(); String sentence = sentences[rand.nextInt(sentences.length)]; System.out.println(">>>>>>>>>>emit sentence["+msgId+"]:" + sentence); this.collector.emit(new Values(sentence), msgId); } public void ack(Object msgId) { System.out.println(">>>>>>>>>>ack:"+msgId); } public void fail(Object msgId) { System.out.println(">>>>>>>>>>fail:"+msgId); } }
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 1); builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt2(), 2).fieldsGrouping("split", new Fields("word")); builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");
Config conf = new Config(); conf.setNumAckers(1);
if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(32000); cluster.shutdown(); } }
|