Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
fix WindowedWordCountTopology (#2755)
Browse files Browse the repository at this point in the history
* fix WindowedWordCountTopology

* fix checkstyle
  • Loading branch information
jerrypeng authored Feb 28, 2018
1 parent d37ec86 commit d53a6f2
Showing 1 changed file with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import com.twitter.heron.api.Config;
import com.twitter.heron.api.HeronSubmitter;
Expand Down Expand Up @@ -42,6 +43,7 @@ private WindowedWordCountTopology() {
private static class SentenceSpout extends BaseRichSpout {
private static final long serialVersionUID = 2879005791639364028L;
private SpoutOutputCollector collector;
private Random rand = new Random();

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
Expand All @@ -57,7 +59,16 @@ public void open(Map map, TopologyContext topologyContext,

@Override
public void nextTuple() {
collector.emit(new Values("Mary had a little lamb"));

String[] sentences = {
"Mary had a little lamb",
"The quick brown fox jumps over the lazy dog",
"The book is in front of the table",
"Mary plays the piano"
};

int n = rand.nextInt(sentences.length);
collector.emit(new Values(sentences[n]));
}
}

Expand All @@ -81,7 +92,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
private static class WindowSumBolt extends BaseWindowedBolt {
private static final long serialVersionUID = 8458595466693183050L;
private OutputCollector collector;
private Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
@SuppressWarnings("HiddenField")
Expand All @@ -92,17 +102,18 @@ public void prepare(Map<String, Object> topoConf, TopologyContext context, Outpu

@Override
public void execute(TupleWindow inputWindow) {
int sum = counts.get("sum");
Map<String, Integer> counts = new HashMap<String, Integer>();

for (Tuple tuple : inputWindow.get()) {
sum += tuple.getIntegerByField("value");
String word = tuple.getStringByField("word");
if (!counts.containsKey(word)) {
counts.put(word, 0);
}
int previousCount = counts.get(word);
counts.put(word, previousCount + 1);
}
counts.put("sum", sum);
collector.emit(new Values(sum));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sum"));
System.out.println("Word Counts for window: " + counts);
}
}

Expand All @@ -113,9 +124,10 @@ public static void main(String[] args) throws AlreadyAliveException, InvalidTopo
builder.setSpout("sentence", new SentenceSpout(), parallelism);
builder.setBolt("split", new SplitSentence(), parallelism).shuffleGrouping("sentence");
builder.setBolt("consumer", new WindowSumBolt()
.withWindow(BaseWindowedBolt.Count.of(10)), parallelism)
.withWindow(BaseWindowedBolt.Count.of(10000), BaseWindowedBolt.Count.of(5000)), parallelism)
.fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setMaxSpoutPending(1000000);

HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
Expand Down

0 comments on commit d53a6f2

Please sign in to comment.