Skip to content

Commit

Permalink
STORM-3422: Make the TupleCaptureBolt thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Janeček committed Jul 8, 2019
1 parent c6bf261 commit 60a93ad
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;


public class TupleCaptureBolt implements IRichBolt {
public static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<>();

private String name;
/*
* Even though normally bolts do not need to care about thread safety, this particular bolt is different.
* It maintains a static field that is prepopulated before the topology starts, is written into by the topology,
* and is then read from after the topology is completed - all of this by potentially different threads.
*/

private static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new ConcurrentHashMap<>();

private final String name;
private OutputCollector collector;

public TupleCaptureBolt() {
name = UUID.randomUUID().toString();
emitted_tuples.put(name, new HashMap<String, List<FixedTuple>>());
emitted_tuples.put(name, new ConcurrentHashMap<String, List<FixedTuple>>());
}

@Override
Expand All @@ -43,11 +50,14 @@ public void prepare(Map<String, Object> topoConf, TopologyContext context, Outpu
@Override
public void execute(Tuple input) {
String component = input.getSourceComponent();
Map<String, List<FixedTuple>> captured = emitted_tuples.get(name);
if (!captured.containsKey(component)) {
captured.put(component, new ArrayList<FixedTuple>());
}
captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
emitted_tuples.get(name)
.compute(component, (String key, List<FixedTuple> tuples) -> {
if (tuples == null) {
tuples = new ArrayList<>();
}
tuples.add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
return tuples;
});
collector.ack(input);
}

Expand All @@ -64,8 +74,9 @@ public Map<String, List<FixedTuple>> getAndRemoveResults() {
}

public Map<String, List<FixedTuple>> getAndClearResults() {
Map<String, List<FixedTuple>> ret = new HashMap<>(emitted_tuples.get(name));
emitted_tuples.get(name).clear();
Map<String, List<FixedTuple>> results = emitted_tuples.get(name);
Map<String, List<FixedTuple>> ret = new HashMap<>(results);
results.clear();
return ret;
}

Expand Down

0 comments on commit 60a93ad

Please sign in to comment.