diff --git a/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/function/MasterDataProcessorFunction.scala b/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/function/MasterDataProcessorFunction.scala index f7c53044..759bc326 100644 --- a/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/function/MasterDataProcessorFunction.scala +++ b/pipeline/master-data-processor/src/main/scala/org/sunbird/obsrv/pipeline/function/MasterDataProcessorFunction.scala @@ -64,6 +64,9 @@ class MasterDataProcessorFunction(config: MasterDataProcessorConfig) extends Win metrics.incCounter(datasetId, config.successUpdateCount, result._2) metrics.incCounter(datasetId, config.successEventCount, eventsList.size.toLong) - context.output(config.successTag(), markComplete(eventsList.head)) + eventsList.foreach(event => { + event.remove(config.CONST_EVENT) + context.output(config.successTag(), markComplete(event)) + }) } } diff --git a/pipeline/master-data-processor/src/test/scala/org/sunbird/obsrv/pipeline/MasterDataProcessorStreamTaskTestSpec.scala b/pipeline/master-data-processor/src/test/scala/org/sunbird/obsrv/pipeline/MasterDataProcessorStreamTaskTestSpec.scala index bc2cda55..ee23eecc 100644 --- a/pipeline/master-data-processor/src/test/scala/org/sunbird/obsrv/pipeline/MasterDataProcessorStreamTaskTestSpec.scala +++ b/pipeline/master-data-processor/src/test/scala/org/sunbird/obsrv/pipeline/MasterDataProcessorStreamTaskTestSpec.scala @@ -88,9 +88,8 @@ class MasterDataProcessorStreamTaskTestSpec extends BaseSpecWithDatasetRegistry env.execute(masterDataConfig.jobName) } - val input = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.stats.topic"), 3, timeout = 30.seconds) - //input.foreach(Console.println("Event:", _)) - input.size should be (3) + val input = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.stats.topic"), 4, timeout = 30.seconds) + input.size should be (4) val mutableMetricsMap = mutable.Map[String, Long](); BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2))