diff --git a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java index 19bdf8651c3..6d073720297 100644 --- a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java +++ b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java @@ -17,22 +17,29 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; - public class TupleCaptureBolt implements IRichBolt { - public static final transient Map>> emitted_tuples = new HashMap<>(); - private String name; + /* + * Even though normally bolts do not need to care about thread safety, this particular bolt is different. + * It maintains a static field that is prepopulated before the topology starts, is written into by the topology, + * and is then read from after the topology is completed - all of this by potentially different threads. + */ + + private static final transient Map>> emitted_tuples = new ConcurrentHashMap<>(); + + private final String name; private OutputCollector collector; public TupleCaptureBolt() { name = UUID.randomUUID().toString(); - emitted_tuples.put(name, new HashMap>()); + emitted_tuples.put(name, new ConcurrentHashMap>()); } @Override @@ -43,11 +50,14 @@ public void prepare(Map topoConf, TopologyContext context, Outpu @Override public void execute(Tuple input) { String component = input.getSourceComponent(); - Map> captured = emitted_tuples.get(name); - if (!captured.containsKey(component)) { - captured.put(component, new ArrayList()); - } - captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues())); + emitted_tuples.get(name) + .compute(component, (String key, List tuples) -> { + if (tuples == null) { + tuples = new ArrayList<>(); + } + tuples.add(new FixedTuple(input.getSourceStreamId(), input.getValues())); + return tuples; + }); collector.ack(input); } @@ -64,8 +74,9 @@ public Map> getAndRemoveResults() { } public Map> getAndClearResults() { - Map> ret = new HashMap<>(emitted_tuples.get(name)); - emitted_tuples.get(name).clear(); + Map> results = emitted_tuples.get(name); + Map> ret = new HashMap<>(results); + results.clear(); return ret; }