Skip to content

Commit

Permalink
tests fix
Browse files Browse the repository at this point in the history
  • Loading branch information
stas-panasiuk committed Oct 28, 2024
1 parent 09af6e4 commit b51ac47
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,18 @@ protected StreamExecutionEnvironment createPipeline(ParameterTool params) throws
SingleOutputStreamOperator<Message> parsed = results.map(new ParserChainMapFunction(configMap));


DataStream<Tuple2<String, Long>> counts = parsed
.map((MapFunction<Message, Tuple2<String, Long>>) message -> Tuple2.of(message.getSource(), 1L))
.keyBy(0)
.timeWindow(Time.milliseconds(
params.getLong(PARAM_COUNT_INTERVAL,
DEFAULT_COUNT_INTERVAL)))
.allowedLateness(Time.milliseconds(0))
.sum(1);
DataStream<Tuple2<String, Long>> counts = parsed.map(new MapFunction<Message, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Message message) throws Exception {
return Tuple2.of(message.getSource(), 1L);
}
})
.keyBy(0)
.timeWindow(Time.milliseconds(
params.getLong(PARAM_COUNT_INTERVAL,
DEFAULT_COUNT_INTERVAL)))
.allowedLateness(Time.milliseconds(0))
.sum(1);
writeCounts(params, counts);


Expand Down

0 comments on commit b51ac47

Please sign in to comment.