From d53a6f265f239936f1e0c8ac27cf5cf5a5856b83 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Wed, 28 Feb 2018 10:40:56 -0800 Subject: [PATCH] fix WindowedWordCountTopology (#2755) * fix WindowedWordCountTopology * fix checkstyle --- .../api/WindowedWordCountTopology.java | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/examples/src/java/com/twitter/heron/examples/api/WindowedWordCountTopology.java b/examples/src/java/com/twitter/heron/examples/api/WindowedWordCountTopology.java index 4f9caf944e2..3dc6e7a9562 100644 --- a/examples/src/java/com/twitter/heron/examples/api/WindowedWordCountTopology.java +++ b/examples/src/java/com/twitter/heron/examples/api/WindowedWordCountTopology.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Random; import com.twitter.heron.api.Config; import com.twitter.heron.api.HeronSubmitter; @@ -42,6 +43,7 @@ private WindowedWordCountTopology() { private static class SentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 2879005791639364028L; private SpoutOutputCollector collector; + private Random rand = new Random(); @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { @@ -57,7 +59,16 @@ public void open(Map map, TopologyContext topologyContext, @Override public void nextTuple() { - collector.emit(new Values("Mary had a little lamb")); + + String[] sentences = { + "Mary had a little lamb", + "The quick brown fox jumps over the lazy dog", + "The book is in front of the table", + "Mary plays the piano" + }; + + int n = rand.nextInt(sentences.length); + collector.emit(new Values(sentences[n])); } } @@ -81,7 +92,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { private static class WindowSumBolt extends BaseWindowedBolt { private static final long serialVersionUID = 8458595466693183050L; private OutputCollector collector; - private Map counts = new HashMap(); @Override @SuppressWarnings("HiddenField") @@ -92,17 +102,18 @@ public void prepare(Map topoConf, TopologyContext context, Outpu @Override public void execute(TupleWindow inputWindow) { - int sum = counts.get("sum"); + Map counts = new HashMap(); + for (Tuple tuple : inputWindow.get()) { - sum += tuple.getIntegerByField("value"); + String word = tuple.getStringByField("word"); + if (!counts.containsKey(word)) { + counts.put(word, 0); + } + int previousCount = counts.get(word); + counts.put(word, previousCount + 1); } - counts.put("sum", sum); - collector.emit(new Values(sum)); - } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sum")); + System.out.println("Word Counts for window: " + counts); } } @@ -113,9 +124,10 @@ public static void main(String[] args) throws AlreadyAliveException, InvalidTopo builder.setSpout("sentence", new SentenceSpout(), parallelism); builder.setBolt("split", new SplitSentence(), parallelism).shuffleGrouping("sentence"); builder.setBolt("consumer", new WindowSumBolt() - .withWindow(BaseWindowedBolt.Count.of(10)), parallelism) + .withWindow(BaseWindowedBolt.Count.of(10000), BaseWindowedBolt.Count.of(5000)), parallelism) .fieldsGrouping("split", new Fields("word")); Config conf = new Config(); + conf.setMaxSpoutPending(1000000); HeronSubmitter.submitTopology(args[0], conf, builder.createTopology()); }