Skip to content

Commit

Permalink
#6 [SV] - Master dataset processing pipeline, updated the stats outpu…
Browse files Browse the repository at this point in the history
…t to process stats for all events by removing the event itself
  • Loading branch information
SanthoshVasabhaktula committed Apr 10, 2023
1 parent 8a6e896 commit 1e1e262
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class MasterDataProcessorFunction(config: MasterDataProcessorConfig) extends Win
metrics.incCounter(datasetId, config.successUpdateCount, result._2)
metrics.incCounter(datasetId, config.successEventCount, eventsList.size.toLong)

context.output(config.successTag(), markComplete(eventsList.head))
eventsList.foreach(event => {
event.remove(config.CONST_EVENT)
context.output(config.successTag(), markComplete(event))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ class MasterDataProcessorStreamTaskTestSpec extends BaseSpecWithDatasetRegistry
env.execute(masterDataConfig.jobName)
}

val input = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.stats.topic"), 3, timeout = 30.seconds)
//input.foreach(Console.println("Event:", _))
input.size should be (3)
val input = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.stats.topic"), 4, timeout = 30.seconds)
input.size should be (4)

val mutableMetricsMap = mutable.Map[String, Long]();
BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2))
Expand Down

0 comments on commit 1e1e262

Please sign in to comment.