diff --git a/Dockerfile b/Dockerfile index bb103b2a..c03c08ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,47 +1,61 @@ -FROM --platform=linux/x86_64 maven:3.9.4-eclipse-temurin-11-focal AS build-core +FROM maven:3.9.4-eclipse-temurin-11-focal AS build-core COPY . /app RUN mvn clean install -DskipTests -f /app/framework/pom.xml RUN mvn clean install -DskipTests -f /app/dataset-registry/pom.xml RUN mvn clean install -DskipTests -f /app/transformation-sdk/pom.xml -FROM --platform=linux/x86_64 maven:3.9.4-eclipse-temurin-11-focal AS build-pipeline +FROM maven:3.9.4-eclipse-temurin-11-focal AS build-pipeline COPY --from=build-core /root/.m2 /root/.m2 COPY . /app RUN mvn clean package -DskipTests -f /app/pipeline/pom.xml -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as extractor-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS extractor-image USER flink -COPY --from=build-pipeline /app/pipeline/extractor/target/extractor-1.0.0.jar $FLINK_HOME/lib/ +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/extractor/target/extractor-1.0.0.jar $FLINK_HOME/usrlib/ -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as preprocessor-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS preprocessor-image USER flink -COPY --from=build-pipeline /app/pipeline/preprocessor/target/preprocessor-1.0.0.jar $FLINK_HOME/lib/ +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/preprocessor/target/preprocessor-1.0.0.jar $FLINK_HOME/usrlib/ -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as denormalizer-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS denormalizer-image USER flink -COPY --from=build-pipeline /app/pipeline/denormalizer/target/denormalizer-1.0.0.jar $FLINK_HOME/lib/ +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/denormalizer/target/denormalizer-1.0.0.jar $FLINK_HOME/usrlib/ -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as transformer-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS transformer-image USER flink -COPY --from=build-pipeline /app/pipeline/transformer/target/transformer-1.0.0.jar $FLINK_HOME/lib/ +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/transformer/target/transformer-1.0.0.jar $FLINK_HOME/usrlib/ -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as router-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS router-image USER flink -COPY --from=build-pipeline /app/pipeline/druid-router/target/druid-router-1.0.0.jar $FLINK_HOME/lib/ +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/druid-router/target/druid-router-1.0.0.jar $FLINK_HOME/usrlib/ -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as unified-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS unified-image USER flink -COPY --from=build-pipeline /app/pipeline/unified-pipeline/target/unified-pipeline-1.0.0.jar $FLINK_HOME/lib/ +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/unified-pipeline/target/unified-pipeline-1.0.0.jar $FLINK_HOME/usrlib/ -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as master-data-processor-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS master-data-processor-image USER flink -COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-data-processor-1.0.0.jar $FLINK_HOME/lib +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-data-processor-1.0.0.jar $FLINK_HOME/usrlib/ -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.0-scala_2.12-lakehouse as lakehouse-connector-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS lakehouse-connector-image USER flink -RUN mkdir $FLINK_HOME/custom-lib -COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custom-lib +RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar +RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.17.2/flink-s3-fs-hadoop-1.17.2.jar +RUN wget https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/0.15.0/hudi-flink1.17-bundle-0.15.0.jar +RUN mv flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib +RUN mv flink-s3-fs-hadoop-1.17.2.jar $FLINK_HOME/lib +RUN mv hudi-flink1.17-bundle-0.15.0.jar $FLINK_HOME/lib +# RUN mkdir $FLINK_HOME/custom-lib +COPY --from=build-pipeline /app/pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/lib -FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as cache-indexer-image +FROM sanketikahub/flink:1.17.2-scala_2.12-java11 AS cache-indexer-image USER flink -COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib \ No newline at end of file +RUN mkdir -p $FLINK_HOME/usrlib +COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/usrlib/ \ No newline at end of file diff --git a/data-products/pom.xml b/data-products/pom.xml index 977030d0..584e4553 100644 --- a/data-products/pom.xml +++ b/data-products/pom.xml @@ -15,8 +15,22 @@ 1.4.0 11 2.14.1 + + 2.17.2 + + + + com.fasterxml.jackson + jackson-bom + ${jackson-bom.version} + pom + import + + + + org.apache.spark @@ -67,17 +81,17 @@ com.fasterxml.jackson.core jackson-annotations - 2.15.2 + com.fasterxml.jackson.core jackson-core - 2.15.2 + com.fasterxml.jackson.module jackson-module-scala_${scala.maj.version} - 2.15.2 + com.fasterxml.jackson.core @@ -88,7 +102,7 @@ com.fasterxml.jackson.core jackson-databind - 2.15.2 + org.sunbird.obsrv diff --git a/dataset-registry/pom.xml b/dataset-registry/pom.xml index fd17db70..9261bf3a 100644 --- a/dataset-registry/pom.xml +++ b/dataset-registry/pom.xml @@ -13,6 +13,7 @@ UTF-8 2.12 2.12.11 + 1.17.2 11 1.4.0 release-4.6.0 @@ -80,6 +81,13 @@ test-jar test + + org.apache.flink + flink-runtime + ${flink.version} + test + tests + diff --git a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala index f206e193..a3bd9b81 100644 --- a/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala +++ b/dataset-registry/src/test/scala/org/sunbird/obsrv/spec/BaseSpecWithDatasetRegistry.scala @@ -1,9 +1,12 @@ package org.sunbird.obsrv.spec import com.typesafe.config.{Config, ConfigFactory} +import org.apache.flink.api.scala.metrics.ScalaGauge +import org.apache.flink.runtime.testutils.InMemoryReporter import org.sunbird.obsrv.core.util.{PostgresConnect, PostgresConnectionConfig} import org.sunbird.spec.BaseSpecWithPostgres +import scala.collection.JavaConverters._ import scala.collection.mutable class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres { @@ -42,8 +45,8 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres { } private def insertTestData(postgresConnect: PostgresConnect): Unit = { - postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d1', 'event', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":"+config.getInt("redis.port")+",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());") - postgresConnect.execute("update datasets set denorm_config = '{\"redis_db_host\":\"localhost\",\"redis_db_port\":"+config.getInt("redis.port")+",\"denorm_fields\":[{\"denorm_key\":\"vehicleCode\",\"redis_db\":2,\"denorm_out_field\":\"vehicleData\"}]}' where id='d1';") + postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d1', 'event', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":" + config.getInt("redis.port") + ",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());") + postgresConnect.execute("update datasets set denorm_config = '{\"redis_db_host\":\"localhost\",\"redis_db_port\":" + config.getInt("redis.port") + ",\"denorm_fields\":[{\"denorm_key\":\"vehicleCode\",\"redis_db\":2,\"denorm_out_field\":\"vehicleData\"}]}' where id='d1';") postgresConnect.execute("insert into dataset_transformations values('tf1', 'd1', 'dealer.email', '{\"type\":\"mask\",\"expr\":\"dealer.email\"}', 'Strict', 'System', 'System', now(), now());") postgresConnect.execute("insert into dataset_transformations values('tf2', 'd1', 'dealer.maskedPhone', '{\"type\":\"mask\",\"expr\": \"dealer.phone\"}', null, 'System', 'System', now(), now());") postgresConnect.execute("insert into datasets(id, type, data_schema, router_config, dataset_config, status, api_version, entry_topic, created_by, updated_by, created_date, updated_date, tags) values ('d2', 'event', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'v1', 'ingest', 'System', 'System', now(), now(), ARRAY['Tag1','Tag2']);") @@ -60,4 +63,20 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres { }).groupBy(f => f._1).mapValues(f => f.map(p => (p._2, p._3, p._4))).mapValues(f => f.groupBy(p => p._1).mapValues(q => q.map(r => (r._2, r._3)).toMap)) } + def getMetrics(metricsReporter: InMemoryReporter, dataset: String, debug: Option[Boolean] = None): Map[String, Long] = { + val groups = metricsReporter.findGroups(dataset).asScala + groups.map(group => metricsReporter.getMetricsByGroup(group).asScala) + .map(group => group.map { case (k, v) => + val value = if(v.isInstanceOf[ScalaGauge[Long]]) v.asInstanceOf[ScalaGauge[Long]].getValue() else 0 + if (debug.isDefined && debug.get) + Console.println("Metric", k, value) + k -> value + }) + .map(f => f.toMap) + .reduce((map1, map2) => { + val mergedMap = map2.map { case (k: String, v: Long) => k -> (v + map1.getOrElse(k, 0l)) } + map1 ++ mergedMap + }) + } + } \ No newline at end of file diff --git a/framework/pom.xml b/framework/pom.xml index 4b3b194d..842004e1 100644 --- a/framework/pom.xml +++ b/framework/pom.xml @@ -19,7 +19,20 @@ 1.9.13 1.4.0 release-4.6.0 + + 2.17.2 + + + + com.fasterxml.jackson + jackson-bom + ${jackson-bom.version} + pom + import + + + org.apache.flink @@ -30,6 +43,12 @@ org.apache.flink flink-connector-kafka ${flink.version} + + + org.apache.kafka + kafka-clients + + org.apache.kafka @@ -54,17 +73,25 @@ com.fasterxml.jackson.core jackson-databind - 2.15.2 + com.fasterxml.jackson.module jackson-module-scala_${scala.maj.version} - 2.15.2 + com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + diff --git a/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/pipeline/CacheIndexerStreamTaskTestSpec.scala b/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/pipeline/CacheIndexerStreamTaskTestSpec.scala index b95d754d..aca017f0 100644 --- a/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/pipeline/CacheIndexerStreamTaskTestSpec.scala +++ b/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/pipeline/CacheIndexerStreamTaskTestSpec.scala @@ -2,12 +2,11 @@ package org.sunbird.obsrv.pipeline import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.cache.RedisConnect import org.sunbird.obsrv.core.model.ErrorConstants import org.sunbird.obsrv.core.model.Models.SystemEvent @@ -18,15 +17,13 @@ import org.sunbird.obsrv.pipeline.task.CacheIndexerConfig import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry import org.sunbird.obsrv.streaming.CacheIndexerStreamTask -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class CacheIndexerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -42,16 +39,8 @@ class CacheIndexerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) val postgresConnect = new PostgresConnect(postgresConfig) insertTestData(postgresConnect) @@ -85,9 +74,7 @@ class CacheIndexerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(cacheIndexerConfig) val task = new CacheIndexerStreamTask(cacheIndexerConfig, kafkaConnector) task.process(env) - Future { - env.execute(cacheIndexerConfig.jobName) - } + env.executeAsync(cacheIndexerConfig.jobName) val input = EmbeddedKafka.consumeNumberMessagesFrom[String](config.getString("kafka.output.system.event.topic"), 1, timeout = 30.seconds) input.size should be(1) @@ -109,20 +96,19 @@ class CacheIndexerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { event.ctx.dataset_type should be(Some("master")) }) - val mutableMetricsMap = mutable.Map[String, Long](); - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - cacheIndexerConfig.successTag().getId should be("processing_stats") - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset3.${cacheIndexerConfig.totalEventCount}") should be(3) - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset3.${cacheIndexerConfig.successEventCount}") should be(3) - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset3.${cacheIndexerConfig.successInsertCount}") should be(2) - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset3.${cacheIndexerConfig.successUpdateCount}") should be(1) + val dataset3Metrics = getMetrics(metricsReporter, "dataset3") + dataset3Metrics(cacheIndexerConfig.totalEventCount) should be(3) + dataset3Metrics(cacheIndexerConfig.successEventCount) should be(3) + dataset3Metrics(cacheIndexerConfig.successInsertCount) should be(2) + dataset3Metrics(cacheIndexerConfig.successUpdateCount) should be(1) - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset4.${cacheIndexerConfig.totalEventCount}") should be(2) - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset4.${cacheIndexerConfig.successEventCount}") should be(1) - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset4.${cacheIndexerConfig.successInsertCount}") should be(1) - mutableMetricsMap(s"${cacheIndexerConfig.jobName}.dataset4.${cacheIndexerConfig.eventFailedMetricsCount}") should be(1) + val dataset4Metrics = getMetrics(metricsReporter, "dataset4") + dataset4Metrics(cacheIndexerConfig.totalEventCount) should be(2) + dataset4Metrics(cacheIndexerConfig.successEventCount) should be(1) + dataset4Metrics(cacheIndexerConfig.successInsertCount) should be(1) + dataset4Metrics(cacheIndexerConfig.eventFailedMetricsCount) should be(1) val redisConnection = new RedisConnect(cacheIndexerConfig.redisHost, cacheIndexerConfig.redisPort, cacheIndexerConfig.redisConnectionTimeout) val jedis1 = redisConnection.getConnection(3) @@ -138,5 +124,4 @@ class CacheIndexerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { jedis2.close() } - } diff --git a/pipeline/dataset-router/src/test/scala/org/sunbird/obsrv/router/DynamicRouterStreamTaskTestSpec.scala b/pipeline/dataset-router/src/test/scala/org/sunbird/obsrv/router/DynamicRouterStreamTaskTestSpec.scala index 98370128..1d246e4d 100644 --- a/pipeline/dataset-router/src/test/scala/org/sunbird/obsrv/router/DynamicRouterStreamTaskTestSpec.scala +++ b/pipeline/dataset-router/src/test/scala/org/sunbird/obsrv/router/DynamicRouterStreamTaskTestSpec.scala @@ -2,12 +2,11 @@ package org.sunbird.obsrv.router import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.model.Models.SystemEvent import org.sunbird.obsrv.core.model._ import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector @@ -15,15 +14,13 @@ import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil, PostgresConnect} import org.sunbird.obsrv.router.task.{DynamicRouterConfig, DynamicRouterStreamTask} import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class DynamicRouterStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -39,16 +36,8 @@ class DynamicRouterStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) val postgresConnect = new PostgresConnect(postgresConfig) insertTestData(postgresConnect) @@ -86,9 +75,7 @@ class DynamicRouterStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(routerConfig) val task = new DynamicRouterStreamTask(routerConfig, kafkaConnector) task.process(env) - Future { - env.execute(routerConfig.jobName) - } + env.executeAsync(routerConfig.jobName) val outputs = EmbeddedKafka.consumeNumberMessagesFrom[String]("d1-events", 1, timeout = 30.seconds) validateOutputs(outputs) @@ -99,10 +86,7 @@ class DynamicRouterStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { val systemEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](routerConfig.kafkaSystemTopic, 2, timeout = 30.seconds) validateSystemEvents(systemEvents) - val mutableMetricsMap = mutable.Map[String, Long]() - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - Console.println("### DynamicRouterStreamTaskTestSpec:metrics ###", JSONUtil.serialize(getPrintableMetrics(mutableMetricsMap))) - validateMetrics(mutableMetricsMap) + validateMetrics(metricsReporter) } private def validateOutputs(outputs: List[String]): Unit = { @@ -142,7 +126,7 @@ class DynamicRouterStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { event.ctx.pdata.id should be(routerConfig.jobName) event.ctx.pdata.`type` should be(PDataType.flink) event.ctx.pdata.pid.get should be(Producer.router) - if(event.data.error.isDefined) { + if (event.data.error.isDefined) { val errorLog = event.data.error.get errorLog.error_level should be(ErrorLevel.critical) errorLog.pdata_id should be(Producer.router) @@ -152,20 +136,25 @@ class DynamicRouterStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { errorLog.error_message should be(ErrorConstants.INDEX_KEY_MISSING_OR_BLANK.errorMsg) errorLog.error_type should be(FunctionalError.MissingTimestampKey) } else { - event.data.pipeline_stats.isDefined should be (true) - event.data.pipeline_stats.get.latency_time.isDefined should be (true) - event.data.pipeline_stats.get.processing_time.isDefined should be (true) - event.data.pipeline_stats.get.total_processing_time.isDefined should be (true) + event.data.pipeline_stats.isDefined should be(true) + event.data.pipeline_stats.get.latency_time.isDefined should be(true) + event.data.pipeline_stats.get.processing_time.isDefined should be(true) + event.data.pipeline_stats.get.total_processing_time.isDefined should be(true) } }) } - private def validateMetrics(mutableMetricsMap: mutable.Map[String, Long]): Unit = { - mutableMetricsMap(s"${routerConfig.jobName}.d1.${routerConfig.routerTotalCount}") should be(1) - mutableMetricsMap(s"${routerConfig.jobName}.d1.${routerConfig.routerSuccessCount}") should be(1) - mutableMetricsMap(s"${routerConfig.jobName}.d2.${routerConfig.routerTotalCount}") should be(1) - mutableMetricsMap(s"${routerConfig.jobName}.d2.${routerConfig.eventFailedMetricsCount}") should be(1) + private def validateMetrics(metricsReporter: InMemoryReporter): Unit = { + + val d1Metrics = getMetrics(metricsReporter, "d1") + d1Metrics(routerConfig.routerTotalCount) should be(1) + d1Metrics(routerConfig.routerSuccessCount) should be(1) + + val d2Metrics = getMetrics(metricsReporter, "d2") + d2Metrics(routerConfig.routerTotalCount) should be(1) + d2Metrics(routerConfig.eventFailedMetricsCount) should be(1) + } } diff --git a/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerStreamTaskTestSpec.scala b/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerStreamTaskTestSpec.scala index 89256390..ec167b89 100644 --- a/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerStreamTaskTestSpec.scala +++ b/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerStreamTaskTestSpec.scala @@ -2,12 +2,11 @@ package org.sunbird.obsrv.denormalizer import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.cache.RedisConnect import org.sunbird.obsrv.core.model.Models.SystemEvent import org.sunbird.obsrv.core.model._ @@ -19,15 +18,13 @@ import org.sunbird.obsrv.model.DatasetModels._ import org.sunbird.obsrv.model.DatasetStatus import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class DenormalizerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -44,16 +41,8 @@ class DenormalizerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) val postgresConnect = new PostgresConnect(postgresConfig) insertTestData(postgresConnect) @@ -98,9 +87,7 @@ class DenormalizerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(denormConfig) val task = new DenormalizerStreamTask(denormConfig, kafkaConnector) task.process(env) - Future { - env.execute(denormConfig.jobName) - } + env.executeAsync(denormConfig.jobName) val outputs = EmbeddedKafka.consumeNumberMessagesFrom[String](denormConfig.denormOutputTopic, 4, timeout = 30.seconds) validateOutputs(outputs) @@ -108,10 +95,7 @@ class DenormalizerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { val systemEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](denormConfig.kafkaSystemTopic, 3, timeout = 30.seconds) validateSystemEvents(systemEvents) - val mutableMetricsMap = mutable.Map[String, Long]() - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - Console.println("### DenormalizerStreamTaskTestSpec:metrics ###", JSONUtil.serialize(getPrintableMetrics(mutableMetricsMap))) - validateMetrics(mutableMetricsMap) + validateMetrics(metricsReporter) } it should "validate dynamic cache creation within DenormCache" in { @@ -164,13 +148,17 @@ class DenormalizerStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { }) } - private def validateMetrics(mutableMetricsMap: mutable.Map[String, Long]): Unit = { - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormTotal}") should be(3) - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormFailed}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormSuccess}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormPartialSuccess}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d2.${denormConfig.denormTotal}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d2.${denormConfig.eventsSkipped}") should be(1) + private def validateMetrics(metricsReporter: InMemoryReporter): Unit = { + + val d1Metrics = getMetrics(metricsReporter, "d1") + d1Metrics(denormConfig.denormTotal) should be(3) + d1Metrics(denormConfig.denormFailed) should be(1) + d1Metrics(denormConfig.denormSuccess) should be(1) + d1Metrics(denormConfig.denormPartialSuccess) should be(1) + + val d2Metrics = getMetrics(metricsReporter, "d2") + d2Metrics(denormConfig.denormTotal) should be(1) + d2Metrics(denormConfig.eventsSkipped) should be(1) } } diff --git a/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerWindowStreamTaskTestSpec.scala b/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerWindowStreamTaskTestSpec.scala index 964bf70d..b273470d 100644 --- a/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerWindowStreamTaskTestSpec.scala +++ b/pipeline/denormalizer/src/test/scala/org/sunbird/obsrv/denormalizer/DenormalizerWindowStreamTaskTestSpec.scala @@ -2,12 +2,11 @@ package org.sunbird.obsrv.denormalizer import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.cache.RedisConnect import org.sunbird.obsrv.core.model.Models.SystemEvent import org.sunbird.obsrv.core.model._ @@ -16,15 +15,13 @@ import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil, PostgresConnect} import org.sunbird.obsrv.denormalizer.task.{DenormalizerConfig, DenormalizerWindowStreamTask} import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class DenormalizerWindowStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -41,16 +38,8 @@ class DenormalizerWindowStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) val postgresConnect = new PostgresConnect(postgresConfig) insertTestData(postgresConnect) @@ -70,7 +59,7 @@ class DenormalizerWindowStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { } private def insertTestData(postgresConnect: PostgresConnect): Unit = { - postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d3', 'dataset', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":"+config.getInt("redis.port")+",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());") + postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d3', 'dataset', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":" + config.getInt("redis.port") + ",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());") postgresConnect.execute("update datasets set denorm_config = '" + s"""{"redis_db_host":"localhost","redis_db_port":$redisPort,"denorm_fields":[{"denorm_key":"vehicleCode","redis_db":3,"denorm_out_field":"vehicle_data"},{"denorm_key":"dealer.dealerCode","redis_db":4,"denorm_out_field":"dealer_data"}]}""" + "' where id='d1';") val redisConnection = new RedisConnect(denormConfig.redisHost, denormConfig.redisPort, denormConfig.redisConnectionTimeout) redisConnection.getConnection(3).set("HYUN-CRE-D6", EventFixture.DENORM_DATA_1) @@ -98,9 +87,7 @@ class DenormalizerWindowStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(denormConfig) val task = new DenormalizerWindowStreamTask(denormConfig, kafkaConnector) task.process(env) - Future { - env.execute(denormConfig.jobName) - } + env.executeAsync(denormConfig.jobName) val outputs = EmbeddedKafka.consumeNumberMessagesFrom[String](denormConfig.denormOutputTopic, 4, timeout = 30.seconds) validateOutputs(outputs) @@ -108,10 +95,7 @@ class DenormalizerWindowStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { val systemEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](denormConfig.kafkaSystemTopic, 5, timeout = 30.seconds) validateSystemEvents(systemEvents) - val mutableMetricsMap = mutable.Map[String, Long]() - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - Console.println("### DenormalizerStreamWindowTaskTestSpec:metrics ###", JSONUtil.serialize(getPrintableMetrics(mutableMetricsMap))) - validateMetrics(mutableMetricsMap) + validateMetrics(metricsReporter) } private def validateOutputs(outputs: List[String]): Unit = { @@ -136,7 +120,7 @@ class DenormalizerWindowStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { systemEvents.count(f => { val event = JSONUtil.deserialize[SystemEvent](f) Producer.validator.equals(event.ctx.pdata.pid.get) - }) should be (2) + }) should be(2) systemEvents.count(f => { val event = JSONUtil.deserialize[SystemEvent](f) FunctionalError.MissingEventData.equals(event.data.error.get.error_type) @@ -199,15 +183,23 @@ class DenormalizerWindowStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { }) } - private def validateMetrics(mutableMetricsMap: mutable.Map[String, Long]): Unit = { - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormTotal}") should be(3) - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormFailed}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormSuccess}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d1.${denormConfig.denormPartialSuccess}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d2.${denormConfig.denormTotal}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d2.${denormConfig.eventsSkipped}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.d3.${denormConfig.eventFailedMetricsCount}") should be(1) - mutableMetricsMap(s"${denormConfig.jobName}.dxyz.${denormConfig.eventFailedMetricsCount}") should be(1) + private def validateMetrics(metricsReporter: InMemoryReporter): Unit = { + + val d1Metrics = getMetrics(metricsReporter, "d1") + d1Metrics(denormConfig.denormTotal) should be(3) + d1Metrics(denormConfig.denormFailed) should be(1) + d1Metrics(denormConfig.denormSuccess) should be(1) + d1Metrics(denormConfig.denormPartialSuccess) should be(1) + + val d2Metrics = getMetrics(metricsReporter, "d2") + d2Metrics(denormConfig.denormTotal) should be(1) + d2Metrics(denormConfig.eventsSkipped) should be(1) + + val d3Metrics = getMetrics(metricsReporter, "d3") + d3Metrics(denormConfig.eventFailedMetricsCount) should be(1) + + val dxyzMetrics = getMetrics(metricsReporter, "dxyz") + dxyzMetrics(denormConfig.eventFailedMetricsCount) should be(1) } } diff --git a/pipeline/extractor/src/test/scala/org/sunbird/obsrv/extractor/ExtractorStreamTestSpec.scala b/pipeline/extractor/src/test/scala/org/sunbird/obsrv/extractor/ExtractorStreamTestSpec.scala index 3574249a..5a8b74c2 100644 --- a/pipeline/extractor/src/test/scala/org/sunbird/obsrv/extractor/ExtractorStreamTestSpec.scala +++ b/pipeline/extractor/src/test/scala/org/sunbird/obsrv/extractor/ExtractorStreamTestSpec.scala @@ -3,12 +3,11 @@ package org.sunbird.obsrv.extractor import com.typesafe.config.{Config, ConfigFactory} import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.cache.RedisConnect import org.sunbird.obsrv.core.model.Models.SystemEvent import org.sunbird.obsrv.core.model.SystemConfig @@ -17,15 +16,13 @@ import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil} import org.sunbird.obsrv.extractor.task.{ExtractorConfig, ExtractorStreamTask} import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class ExtractorStreamTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -41,16 +38,8 @@ class ExtractorStreamTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) createTestTopics() EmbeddedKafka.publishStringMessageToKafka(pConfig.kafkaInputTopic, EventFixture.INVALID_JSON) @@ -82,9 +71,8 @@ class ExtractorStreamTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(pConfig) val task = new ExtractorStreamTask(pConfig, kafkaConnector) task.process(env) - Future { - env.execute(pConfig.jobName) - } + env.executeAsync(pConfig.jobName) + val batchFailedEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](pConfig.kafkaBatchFailedTopic, 1, timeout = 30.seconds) val invalidEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](pConfig.kafkaFailedTopic, 2, timeout = 30.seconds) val systemEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](pConfig.kafkaSystemTopic, 6, timeout = 30.seconds) @@ -95,18 +83,15 @@ class ExtractorStreamTestSpec extends BaseSpecWithDatasetRegistry { validateInvalidEvents(invalidEvents) validateSystemEvents(systemEvents) - val mutableMetricsMap = mutable.Map[String, Long]() - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - Console.println("### ExtractorStreamTestSpec:metrics ###", JSONUtil.serialize(getPrintableMetrics(mutableMetricsMap))) - validateMetrics(mutableMetricsMap) + validateMetrics(metricsReporter) val config2: Config = ConfigFactory.load("test2.conf") val extractorConfig = new ExtractorConfig(config2) - extractorConfig.eventMaxSize should be (SystemConfig.getLong("maxEventSize", 1048576L)) + extractorConfig.eventMaxSize should be(SystemConfig.getLong("maxEventSize", 1048576L)) } private def validateOutputEvents(outputEvents: List[String]) = { - outputEvents.size should be (3) + outputEvents.size should be(3) //TODO: Add assertions for all 3 events /* (OutEvent,{"event":{"dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"vehicleCode":"HYUN-CRE-D6","id":"1","date":"2023-03-01","metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}},"obsrv_meta":{"flags":{"extractor":"success"},"syncts":1701760331686,"prevProcessingTime":1701760337492,"error":{},"processingStartTime":1701760337087,"timespans":{"extractor":405}},"dataset":"d1"}) @@ -137,7 +122,7 @@ class ExtractorStreamTestSpec extends BaseSpecWithDatasetRegistry { systemEvents.foreach(se => { val event = JSONUtil.deserialize[SystemEvent](se) - if(event.ctx.dataset.getOrElse("ALL").equals("ALL")) + if (event.ctx.dataset.getOrElse("ALL").equals("ALL")) event.ctx.dataset_type should be(None) else event.ctx.dataset_type should be(Some("event")) @@ -154,14 +139,18 @@ class ExtractorStreamTestSpec extends BaseSpecWithDatasetRegistry { */ } - private def validateMetrics(mutableMetricsMap: mutable.Map[String, Long]): Unit = { + private def validateMetrics(metricsReporter: InMemoryReporter): Unit = { + + val allMetrics = getMetrics(metricsReporter, "ALL") + allMetrics(pConfig.eventFailedMetricsCount) should be(1) + + val d1Metrics = getMetrics(metricsReporter, "d1") + d1Metrics(pConfig.totalEventCount) should be(5) + d1Metrics(pConfig.eventFailedMetricsCount) should be(2) + d1Metrics(pConfig.skippedExtractionCount) should be(1) + d1Metrics(pConfig.successEventCount) should be(2) + d1Metrics(pConfig.successExtractionCount) should be(3) - mutableMetricsMap(s"${pConfig.jobName}.ALL.${pConfig.eventFailedMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.totalEventCount}") should be(5) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.eventFailedMetricsCount}") should be(2) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.skippedExtractionCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.successEventCount}") should be(2) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.successExtractionCount}") should be(3) } } \ No newline at end of file diff --git a/pipeline/hudi-connector/pom.xml b/pipeline/hudi-connector/pom.xml index a76a47c4..49267e3e 100644 --- a/pipeline/hudi-connector/pom.xml +++ b/pipeline/hudi-connector/pom.xml @@ -12,8 +12,21 @@ UTF-8 1.4.0 + 2.13.4.20221013 + + + + com.fasterxml.jackson + jackson-bom + ${jackson-bom.version} + pom + import + + + + org.apache.flink @@ -27,10 +40,35 @@ + + com.fasterxml.jackson.core + jackson-databind + provided + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + com.fasterxml.jackson.core + jackson-core + provided + org.sunbird.obsrv framework 1.0.0 + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + org.sunbird.obsrv @@ -65,6 +103,10 @@ com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.core + jackson-annotations + org.slf4j slf4j-log4j12 @@ -104,6 +146,10 @@ com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.core + jackson-annotations + @@ -119,6 +165,10 @@ com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.core + jackson-annotations + org.apache.hadoop hadoop-common diff --git a/pipeline/pom.xml b/pipeline/pom.xml index 8de20840..c879f367 100644 --- a/pipeline/pom.xml +++ b/pipeline/pom.xml @@ -22,7 +22,7 @@ transformer dataset-router unified-pipeline - master-data-processor + hudi-connector cache-indexer diff --git a/pipeline/preprocessor/pom.xml b/pipeline/preprocessor/pom.xml index 7c80236d..9604e5a6 100644 --- a/pipeline/preprocessor/pom.xml +++ b/pipeline/preprocessor/pom.xml @@ -44,6 +44,20 @@ com.networknt json-schema-validator 1.5.1 + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + com.google.guava diff --git a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/PipelinePreprocessorStreamTestSpec.scala b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/PipelinePreprocessorStreamTestSpec.scala index bafb9013..8c28672e 100644 --- a/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/PipelinePreprocessorStreamTestSpec.scala +++ b/pipeline/preprocessor/src/test/scala/org/sunbird/obsrv/preprocessor/PipelinePreprocessorStreamTestSpec.scala @@ -2,12 +2,11 @@ package org.sunbird.obsrv.preprocessor import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.cache.RedisConnect import org.sunbird.obsrv.core.model.ErrorConstants import org.sunbird.obsrv.core.model.Models.SystemEvent @@ -17,15 +16,13 @@ import org.sunbird.obsrv.preprocessor.fixture.EventFixtures import org.sunbird.obsrv.preprocessor.task.{PipelinePreprocessorConfig, PipelinePreprocessorStreamTask} import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class PipelinePreprocessorStreamTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -41,16 +38,8 @@ class PipelinePreprocessorStreamTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) prepareTestData() createTestTopics() @@ -79,8 +68,8 @@ class PipelinePreprocessorStreamTestSpec extends BaseSpecWithDatasetRegistry { postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, router_config, dataset_config, status, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d4', 'dataset', '" + """{"$schema":"https://json-schema.org/draft/2020-12/schema","type":"object","properties":{"id":{"type":"string"},"vehicleCode":{"type":"string"},"date":{"type":"string"},"dealer":{"type":"object","properties":{"dealerCode":{"type":"string"},"locationId":{"type":"string"},"email":{"type":"string"},"phone":{"type":"string"}},"additionalProperties":false,"required":["dealerCode","locationId"]},"metrics":{"type":"object","properties":{"bookingsTaken":{"type":"integer"},"deliveriesPromised":{"type":"integer"},"deliveriesDone":{"type":"integer"}},"additionalProperties":false}},"additionalProperties":false,"required":["id","vehicleCode","date"]}""" + "', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'v1', 'ingest', 'System', 'System', now(), now());") postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, router_config, dataset_config, status, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d5', 'dataset', '" + """{"$schema":"https://json-schema.org/draft/2020-12/schema","type":"object","properties":{"id":{"type":"string"},"vehicleCode":{"type":"string"},"date":{"type":"string"},"dealer":{"type":"object","properties":{"dealerCode":{"type":"string"},"locationId":{"type":"string"},"email":{"type":"string"},"phone":{"type":"string"}},"additionalProperties":false,"required":["dealerCode","locationId"]},"metrics":{"type":"object","properties":{"bookingsTaken":{"type":"integer"},"deliveriesPromised":{"type":"integer"},"deliveriesDone":{"type":"integer"}},"additionalProperties":false}},"additionalProperties":false,"required":["id","vehicleCode","date"]}""" + "', '{\"validate\": true, \"mode\": \"IgnoreNewFields\"}', '{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'v1', 'ingest', 'System', 'System', now(), now());") postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, router_config, dataset_config, status, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d6', 'dataset', '" + """{"$schema":"https://json-schema.org/draft/2020-12/schema","type":"object","properties":{"id":{"type":"string","maxLength":5},"vehicleCode":{"type":"string"},"date":{"type":"string"},"dealer":{"type":"object","properties":{"dealerCode":{"type":"string"},"locationId":{"type":"string"},"email":{"type":"string"},"phone":{"type":"string"}},"additionalProperties":false,"required":["dealerCode","locationId"]},"metrics":{"type":"object","properties":{"bookingsTaken":{"type":"integer"},"deliveriesPromised":{"type":"integer"},"deliveriesDone":{"type":"integer"}},"additionalProperties":false}},"additionalProperties":false,"required":["id","vehicleCode","date"]}""" + "', '{\"validate\": true, \"mode\": \"DiscardNewFields\"}', '{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'v1', 'ingest', 'System', 'System', now(), now());") - postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, router_config, dataset_config, status, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d7', 'dataset', '"+EventFixtures.INVALID_SCHEMA+"', '{\"validate\": true, \"mode\": \"Strict\"}','{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'v1', 'ingest', 'System', 'System', now(), now());") - postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d8', 'dataset', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": false, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":"+config.getInt("redis.port")+",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());") + postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, router_config, dataset_config, status, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d7', 'dataset', '" + EventFixtures.INVALID_SCHEMA + "', '{\"validate\": true, \"mode\": \"Strict\"}','{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'v1', 'ingest', 'System', 'System', now(), now());") + postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, data_version, api_version, entry_topic, created_by, updated_by, created_date, updated_date) values ('d8', 'dataset', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"$id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": false, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":" + config.getInt("redis.port") + ",\"redis_db\":2}', 'Live', 2, 'v1', 'ingest', 'System', 'System', now(), now());") postgresConnect.closeConnection() } @@ -103,10 +92,8 @@ class PipelinePreprocessorStreamTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(pConfig) val task = new PipelinePreprocessorStreamTask(pConfig, kafkaConnector) task.process(env) - Future { - env.execute(pConfig.jobName) - Thread.sleep(5000) - } + env.executeAsync(pConfig.jobName) + val outputEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](pConfig.kafkaUniqueTopic, 5, timeout = 30.seconds) val invalidEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](pConfig.kafkaInvalidTopic, 7, timeout = 30.seconds) val systemEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](pConfig.kafkaSystemTopic, 8, timeout = 30.seconds) @@ -114,11 +101,7 @@ class PipelinePreprocessorStreamTestSpec extends BaseSpecWithDatasetRegistry { validateOutputEvents(outputEvents) validateInvalidEvents(invalidEvents) validateSystemEvents(systemEvents) - - val mutableMetricsMap = mutable.Map[String, Long]() - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - Console.println("### PipelinePreprocessorStreamTestSpec:metrics ###", JSONUtil.serialize(getPrintableMetrics(mutableMetricsMap))) - validateMetrics(mutableMetricsMap) + validateMetrics(metricsReporter) } @@ -177,44 +160,55 @@ class PipelinePreprocessorStreamTestSpec extends BaseSpecWithDatasetRegistry { */ } - private def validateMetrics(mutableMetricsMap: mutable.Map[String, Long]): Unit = { - mutableMetricsMap(s"${pConfig.jobName}.ALL.${pConfig.eventFailedMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.dX.${pConfig.eventFailedMetricsCount}") should be(1) - - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.validationFailureMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.duplicationProcessedEventMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.duplicationEventMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.validationSuccessMetricsCount}") should be(2) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.validationTotalMetricsCount}") should be(3) - mutableMetricsMap(s"${pConfig.jobName}.d1.${pConfig.duplicationTotalMetricsCount}") should be(2) - - mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.duplicationSkippedEventMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.validationSkipMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.eventFailedMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.validationTotalMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d2.${pConfig.duplicationTotalMetricsCount}") should be(1) - - mutableMetricsMap(s"${pConfig.jobName}.d3.${pConfig.validationTotalMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d3.${pConfig.eventIgnoredMetricsCount}") should be(1) - - mutableMetricsMap(s"${pConfig.jobName}.d4.${pConfig.validationTotalMetricsCount}") should be(2) - mutableMetricsMap(s"${pConfig.jobName}.d4.${pConfig.validationFailureMetricsCount}") should be(2) - - mutableMetricsMap(s"${pConfig.jobName}.d5.${pConfig.validationTotalMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d5.${pConfig.validationSuccessMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d5.${pConfig.duplicationTotalMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d5.${pConfig.duplicationSkippedEventMetricsCount}") should be(1) - - mutableMetricsMap(s"${pConfig.jobName}.d6.${pConfig.validationTotalMetricsCount}") should be(2) - mutableMetricsMap(s"${pConfig.jobName}.d6.${pConfig.validationSuccessMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d6.${pConfig.validationFailureMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d6.${pConfig.duplicationTotalMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d6.${pConfig.duplicationSkippedEventMetricsCount}") should be(1) - - mutableMetricsMap(s"${pConfig.jobName}.d8.${pConfig.validationTotalMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d8.${pConfig.validationSkipMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d8.${pConfig.duplicationTotalMetricsCount}") should be(1) - mutableMetricsMap(s"${pConfig.jobName}.d8.${pConfig.duplicationProcessedEventMetricsCount}") should be(1) + private def validateMetrics(metricsReporter: InMemoryReporter): Unit = { + val allMetrics = getMetrics(metricsReporter, "ALL") + allMetrics(pConfig.eventFailedMetricsCount) should be(1) + + val dxMetrics = getMetrics(metricsReporter, "dX") + dxMetrics(pConfig.eventFailedMetricsCount) should be(1) + + val d1Metrics = getMetrics(metricsReporter, "d1") + d1Metrics(pConfig.validationFailureMetricsCount) should be(1) + d1Metrics(pConfig.duplicationProcessedEventMetricsCount) should be(1) + d1Metrics(pConfig.duplicationEventMetricsCount) should be(1) + d1Metrics(pConfig.validationSuccessMetricsCount) should be(2) + d1Metrics(pConfig.validationTotalMetricsCount) should be(3) + d1Metrics(pConfig.duplicationTotalMetricsCount) should be(2) + + val d2Metrics = getMetrics(metricsReporter, "d2") + d2Metrics(pConfig.duplicationSkippedEventMetricsCount) should be(1) + d2Metrics(pConfig.validationSkipMetricsCount) should be(1) + d2Metrics(pConfig.eventFailedMetricsCount) should be(1) + d2Metrics(pConfig.validationTotalMetricsCount) should be(1) + d2Metrics(pConfig.duplicationTotalMetricsCount) should be(1) + + val d3Metrics = getMetrics(metricsReporter, "d3") + d3Metrics(pConfig.validationTotalMetricsCount) should be(1) + d3Metrics(pConfig.eventIgnoredMetricsCount) should be(1) + + val d4Metrics = getMetrics(metricsReporter, "d4") + d4Metrics(pConfig.validationTotalMetricsCount) should be(2) + d4Metrics(pConfig.validationFailureMetricsCount) should be(2) + + val d5Metrics = getMetrics(metricsReporter, "d5") + d5Metrics(pConfig.validationTotalMetricsCount) should be(1) + d5Metrics(pConfig.validationSuccessMetricsCount) should be(1) + d5Metrics(pConfig.duplicationTotalMetricsCount) should be(1) + d5Metrics(pConfig.duplicationSkippedEventMetricsCount) should be(1) + + val d6Metrics = getMetrics(metricsReporter, "d6") + d6Metrics(pConfig.validationTotalMetricsCount) should be(2) + d6Metrics(pConfig.validationSuccessMetricsCount) should be(1) + d6Metrics(pConfig.validationFailureMetricsCount) should be(1) + d6Metrics(pConfig.duplicationTotalMetricsCount) should be(1) + d6Metrics(pConfig.duplicationSkippedEventMetricsCount) should be(1) + + val d8Metrics = getMetrics(metricsReporter, "d8") + d8Metrics(pConfig.validationTotalMetricsCount) should be(1) + d8Metrics(pConfig.validationSkipMetricsCount) should be(1) + d8Metrics(pConfig.duplicationTotalMetricsCount) should be(1) + d8Metrics(pConfig.duplicationProcessedEventMetricsCount) should be(1) + } } \ No newline at end of file diff --git a/pipeline/transformer/src/test/scala/org/sunbird/obsrv/transformer/TransformerStreamTestSpec.scala b/pipeline/transformer/src/test/scala/org/sunbird/obsrv/transformer/TransformerStreamTestSpec.scala index 76500f19..a86df369 100644 --- a/pipeline/transformer/src/test/scala/org/sunbird/obsrv/transformer/TransformerStreamTestSpec.scala +++ b/pipeline/transformer/src/test/scala/org/sunbird/obsrv/transformer/TransformerStreamTestSpec.scala @@ -2,12 +2,11 @@ package org.sunbird.obsrv.transformer import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.model.Models.SystemEvent import org.sunbird.obsrv.core.model._ import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector @@ -15,15 +14,13 @@ import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil, PostgresConnect} import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry import org.sunbird.obsrv.transformer.task.{TransformerConfig, TransformerStreamTask} -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class TransformerStreamTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -40,16 +37,8 @@ class TransformerStreamTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) insertTestData() createTestTopics() @@ -94,9 +83,7 @@ class TransformerStreamTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(transformerConfig) val task = new TransformerStreamTask(transformerConfig, kafkaConnector) task.process(env) - Future { - env.execute(transformerConfig.jobName) - } + env.executeAsync(transformerConfig.jobName) val outputs = EmbeddedKafka.consumeNumberMessagesFrom[String](transformerConfig.kafkaTransformTopic, 3, timeout = 30.seconds) validateOutputs(outputs) @@ -107,10 +94,7 @@ class TransformerStreamTestSpec extends BaseSpecWithDatasetRegistry { val systemEvents = EmbeddedKafka.consumeNumberMessagesFrom[String](transformerConfig.kafkaSystemTopic, 5, timeout = 30.seconds) validateSystemEvents(systemEvents) - val mutableMetricsMap = mutable.Map[String, Long]() - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - Console.println("### DenormalizerStreamTaskTestSpec:metrics ###", JSONUtil.serialize(getPrintableMetrics(mutableMetricsMap))) - validateMetrics(mutableMetricsMap) + validateMetrics(metricsReporter) transformerConfig.successTag().getId should be("transformed-events") } @@ -150,9 +134,9 @@ class TransformerStreamTestSpec extends BaseSpecWithDatasetRegistry { val event = msg(Constants.EVENT).asInstanceOf[String] val obsrvMeta = msg(Constants.OBSRV_META).asInstanceOf[Map[String, AnyRef]] obsrvMeta("timespans").asInstanceOf[Map[String, AnyRef]]("transformer").asInstanceOf[Int] should be > 0 - obsrvMeta("flags").asInstanceOf[Map[String, AnyRef]]("transformer").asInstanceOf[String] should be (StatusCode.failed.toString) - obsrvMeta("error").asInstanceOf[Map[String, AnyRef]]("src").asInstanceOf[String] should be (Producer.transformer.toString) - obsrvMeta("error").asInstanceOf[Map[String, AnyRef]]("error_code").asInstanceOf[String] should be (ErrorConstants.ERR_TRANSFORMATION_FAILED.errorCode) + obsrvMeta("flags").asInstanceOf[Map[String, AnyRef]]("transformer").asInstanceOf[String] should be(StatusCode.failed.toString) + obsrvMeta("error").asInstanceOf[Map[String, AnyRef]]("src").asInstanceOf[String] should be(Producer.transformer.toString) + obsrvMeta("error").asInstanceOf[Map[String, AnyRef]]("error_code").asInstanceOf[String] should be(ErrorConstants.ERR_TRANSFORMATION_FAILED.errorCode) idx match { case 0 => event should be("{\"event\":{\"dealer\":{\"maskedPhone\":\"98******45\",\"locationId\":\"KUN1\",\"dealerCode\":\"D123\",\"phone\":\"9849012345\"},\"vehicleCode\":\"HYUN-CRE-D6\",\"id\":\"1235\",\"date\":\"2023-03-01\",\"metrics\":{\"bookingsTaken\":50,\"deliveriesPromised\":20,\"deliveriesDone\":19}},\"dataset\":\"d1\"}") @@ -211,19 +195,24 @@ class TransformerStreamTestSpec extends BaseSpecWithDatasetRegistry { */ } - private def validateMetrics(mutableMetricsMap: mutable.Map[String, Long]): Unit = { - mutableMetricsMap(s"${transformerConfig.jobName}.d1.${transformerConfig.totalEventCount}") should be(2) - mutableMetricsMap(s"${transformerConfig.jobName}.d1.${transformerConfig.transformSuccessCount}") should be(1) - mutableMetricsMap(s"${transformerConfig.jobName}.d1.${transformerConfig.transformFailedCount}") should be(1) + private def validateMetrics(metricsReporter: InMemoryReporter): Unit = { + + val d1Metrics = getMetrics(metricsReporter, "d1") + d1Metrics(transformerConfig.totalEventCount) should be(2) + d1Metrics(transformerConfig.transformSuccessCount) should be(1) + d1Metrics(transformerConfig.transformFailedCount) should be(1) - mutableMetricsMap(s"${transformerConfig.jobName}.d2.${transformerConfig.totalEventCount}") should be(1) - mutableMetricsMap(s"${transformerConfig.jobName}.d2.${transformerConfig.transformPartialCount}") should be(1) + val d2Metrics = getMetrics(metricsReporter, "d2") + d2Metrics(transformerConfig.totalEventCount) should be(1) + d2Metrics(transformerConfig.transformPartialCount) should be(1) - mutableMetricsMap(s"${transformerConfig.jobName}.d3.${transformerConfig.totalEventCount}") should be(1) - mutableMetricsMap(s"${transformerConfig.jobName}.d3.${transformerConfig.transformSkippedCount}") should be(1) + val d3Metrics = getMetrics(metricsReporter, "d3") + d3Metrics(transformerConfig.totalEventCount) should be(1) + d3Metrics(transformerConfig.transformSkippedCount) should be(1) - mutableMetricsMap(s"${transformerConfig.jobName}.d4.${transformerConfig.totalEventCount}") should be(1) - mutableMetricsMap(s"${transformerConfig.jobName}.d4.${transformerConfig.transformFailedCount}") should be(1) + val d4Metrics = getMetrics(metricsReporter, "d4") + d4Metrics(transformerConfig.totalEventCount) should be(1) + d4Metrics(transformerConfig.transformFailedCount) should be(1) } } \ No newline at end of file diff --git a/pipeline/unified-pipeline/pom.xml b/pipeline/unified-pipeline/pom.xml index 37d06fe3..75e8dd1f 100644 --- a/pipeline/unified-pipeline/pom.xml +++ b/pipeline/unified-pipeline/pom.xml @@ -163,7 +163,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.2.1 + 3.6.0 @@ -173,6 +173,7 @@ false + true com.google.code.findbugs:jsr305 @@ -180,8 +181,6 @@ - *:* META-INF/*.SF @@ -190,9 +189,15 @@ + - in.sanketika.obsrv.pipeline.task.UnifiedPipelineStreamTask + org.sunbird.obsrv.pipeline.task.UnifiedPipelineStreamTask diff --git a/pipeline/unified-pipeline/src/test/scala/org/sunbird/obsrv/pipeline/UnifiedPipelineStreamTaskTestSpec.scala b/pipeline/unified-pipeline/src/test/scala/org/sunbird/obsrv/pipeline/UnifiedPipelineStreamTaskTestSpec.scala index 879abeec..a61f3966 100644 --- a/pipeline/unified-pipeline/src/test/scala/org/sunbird/obsrv/pipeline/UnifiedPipelineStreamTaskTestSpec.scala +++ b/pipeline/unified-pipeline/src/test/scala/org/sunbird/obsrv/pipeline/UnifiedPipelineStreamTaskTestSpec.scala @@ -2,27 +2,25 @@ package org.sunbird.obsrv.pipeline import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.runtime.testutils.{InMemoryReporter, MiniClusterResourceConfiguration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter import org.sunbird.obsrv.core.cache.RedisConnect import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector -import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil} +import org.sunbird.obsrv.core.util.FlinkUtil import org.sunbird.obsrv.pipeline.task.{UnifiedPipelineConfig, UnifiedPipelineStreamTask} import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry -import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ class UnifiedPipelineStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { + private val metricsReporter = InMemoryReporter.createWithRetainedMetrics + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) + .setConfiguration(metricsReporter.addToConfiguration(new Configuration())) .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(1) .build) @@ -38,16 +36,8 @@ class UnifiedPipelineStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { ) implicit val deserializer: StringDeserializer = new StringDeserializer() - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - override def beforeAll(): Unit = { super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() EmbeddedKafka.start()(embeddedKafkaConfig) createTestTopics() EmbeddedKafka.publishStringMessageToKafka(config.getString("kafka.input.topic"), EventFixture.VALID_BATCH_EVENT_D1) @@ -85,9 +75,7 @@ class UnifiedPipelineStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(unifiedPipelineConfig) val task = new UnifiedPipelineStreamTask(config, unifiedPipelineConfig, kafkaConnector) task.process(env) - Future { - env.execute(unifiedPipelineConfig.jobName) - } + env.executeAsync(unifiedPipelineConfig.jobName) try { val d1Events = EmbeddedKafka.consumeNumberMessagesFrom[String]("d1-events", 1, timeout = 30.seconds) @@ -104,42 +92,37 @@ class UnifiedPipelineStreamTaskTestSpec extends BaseSpecWithDatasetRegistry { case ex: Exception => ex.printStackTrace() } - val mutableMetricsMap = mutable.Map[String, Long](); - BaseMetricsReporter.gaugeMetrics.toMap.mapValues(f => f.getValue()).map(f => mutableMetricsMap.put(f._1, f._2)) - Console.println("### UnifiedPipelineStreamTaskTestSpec:metrics ###", JSONUtil.serialize(getPrintableMetrics(mutableMetricsMap))) - - mutableMetricsMap("ExtractorJob.d1.extractor-total-count") should be(4) - mutableMetricsMap("ExtractorJob.d1.extractor-duplicate-count") should be(1) - mutableMetricsMap("ExtractorJob.d1.extractor-event-count") should be(1) - mutableMetricsMap("ExtractorJob.d1.extractor-success-count") should be(1) - mutableMetricsMap("ExtractorJob.d1.extractor-failed-count") should be(2) - mutableMetricsMap("ExtractorJob.d2.extractor-total-count") should be(2) - mutableMetricsMap("ExtractorJob.d2.failed-event-count") should be(1) - mutableMetricsMap("ExtractorJob.d2.extractor-skipped-count") should be(1) - - mutableMetricsMap("PipelinePreprocessorJob.d1.validator-total-count") should be(1) - mutableMetricsMap("PipelinePreprocessorJob.d1.validator-success-count") should be(1) - mutableMetricsMap("PipelinePreprocessorJob.d1.dedup-total-count") should be(1) - mutableMetricsMap("PipelinePreprocessorJob.d1.dedup-success-count") should be(1) - mutableMetricsMap("PipelinePreprocessorJob.d2.validator-total-count") should be(1) - mutableMetricsMap("PipelinePreprocessorJob.d2.validator-skipped-count") should be(1) - mutableMetricsMap("PipelinePreprocessorJob.d2.dedup-total-count") should be(1) - mutableMetricsMap("PipelinePreprocessorJob.d2.dedup-skipped-count") should be(1) - - mutableMetricsMap("DenormalizerJob.d1.denorm-total") should be(1) - mutableMetricsMap("DenormalizerJob.d1.denorm-failed") should be(1) - mutableMetricsMap("DenormalizerJob.d2.denorm-total") should be(1) - mutableMetricsMap("DenormalizerJob.d2.denorm-skipped") should be(1) - - mutableMetricsMap("TransformerJob.d1.transform-total-count") should be(1) - mutableMetricsMap("TransformerJob.d1.transform-success-count") should be(1) - mutableMetricsMap("TransformerJob.d2.transform-total-count") should be(1) - mutableMetricsMap("TransformerJob.d2.transform-skipped-count") should be(1) - - mutableMetricsMap("DruidRouterJob.d1.router-total-count") should be(1) - mutableMetricsMap("DruidRouterJob.d1.router-success-count") should be(1) - mutableMetricsMap("DruidRouterJob.d2.router-total-count") should be(1) - mutableMetricsMap("DruidRouterJob.d2.router-success-count") should be(1) + val d1Metrics = getMetrics(metricsReporter, "d1") + d1Metrics("extractor-total-count") should be(4) + d1Metrics("extractor-duplicate-count") should be(1) + d1Metrics("extractor-event-count") should be(1) + d1Metrics("extractor-success-count") should be(1) + d1Metrics("extractor-failed-count") should be(2) + d1Metrics("validator-total-count") should be(1) + d1Metrics("validator-success-count") should be(1) + d1Metrics("dedup-total-count") should be(1) + d1Metrics("dedup-success-count") should be(1) + d1Metrics("denorm-total") should be(1) + d1Metrics("denorm-failed") should be(1) + d1Metrics("transform-total-count") should be(1) + d1Metrics("transform-success-count") should be(1) + d1Metrics("router-total-count") should be(1) + d1Metrics("router-success-count") should be(1) + + val d2Metrics = getMetrics(metricsReporter, "d2") + d2Metrics("extractor-total-count") should be(2) + d2Metrics("failed-event-count") should be(1) + d2Metrics("extractor-skipped-count") should be(1) + d2Metrics("validator-total-count") should be(1) + d2Metrics("validator-skipped-count") should be(1) + d2Metrics("dedup-total-count") should be(1) + d2Metrics("dedup-skipped-count") should be(1) + d2Metrics("denorm-total") should be(1) + d2Metrics("denorm-skipped") should be(1) + d2Metrics("transform-total-count") should be(1) + d2Metrics("transform-skipped-count") should be(1) + d2Metrics("router-total-count") should be(1) + d2Metrics("router-success-count") should be(1) unifiedPipelineConfig.successTag().getId should be("processing_stats") unifiedPipelineConfig.failedEventsOutputTag().getId should be("failed-events") diff --git a/pom.xml b/pom.xml index 6022b9a5..ef514a08 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ 2.12 2.12.11 1.17.2 - 2.8.1 + 3.7.1 11 1.9.13 1.4.0 diff --git a/transformation-sdk/pom.xml b/transformation-sdk/pom.xml index 10d393ce..70fa100c 100644 --- a/transformation-sdk/pom.xml +++ b/transformation-sdk/pom.xml @@ -13,8 +13,8 @@ UTF-8 2.12 2.12.11 - 1.15.2 - 2.8.1 + 1.17.2 + 3.7.1 11 1.9.13 1.4.0 @@ -34,12 +34,20 @@ com.ibm.jsonata4java JSONata4Java - 2.2.6 + 2.5.1 com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations +