From 0b056f3db7d7d3df59ee45dbaaf361802d4a7a48 Mon Sep 17 00:00:00 2001 From: Manjunath Date: Mon, 25 Nov 2024 17:12:36 +0530 Subject: [PATCH] #000: Added a metrics registry class to register all the metrics --- .../obsrv/core/otel/MetricRegistry.scala | 86 +++++++++++++++++++ .../core/otel/OTelMetricsGenerator.scala | 45 ++++------ 2 files changed, 102 insertions(+), 29 deletions(-) create mode 100644 framework/src/main/scala/org/sunbird/obsrv/core/otel/MetricRegistry.scala diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/otel/MetricRegistry.scala b/framework/src/main/scala/org/sunbird/obsrv/core/otel/MetricRegistry.scala new file mode 100644 index 00000000..13d3519c --- /dev/null +++ b/framework/src/main/scala/org/sunbird/obsrv/core/otel/MetricRegistry.scala @@ -0,0 +1,86 @@ +package org.sunbird.obsrv.core.otel + +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.metrics.{LongCounter, Meter} + +object MetricRegistry { + private val oTel: OpenTelemetry = OTelService.init() + private val meter: Meter = oTel.meterBuilder("obsrv-pipeline").build() + + val errorCount: LongCounter = meter.counterBuilder("event.error.count") + .setDescription("Dataset Error Event Count") + .setUnit("1") + .build() + + val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time") + .setDescription("Processing Time") + .setUnit("ms") + .build() + + val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time") + .setDescription("Total Processing Time") + .setUnit("ms") + .build() + + val latencyTimeCounter: LongCounter = meter.counterBuilder("pipeline.latency.time") + .setDescription("Latency Time") + .setUnit("ms") + .build() + + val extractorEventCounter: LongCounter = meter.counterBuilder("pipeline.extractor.events.count") + .setDescription("Count of Extractor Events") + .setUnit("1") + .build() + + val extractorTimeCounter: LongCounter = meter.counterBuilder("pipeline.extractor.time") + .setDescription("Extractor Processing Time") + .setUnit("ms") + .build() + + val transformStatusCounter: LongCounter = meter.counterBuilder("pipeline.transform.status") + .setDescription("Data transform Status") + .setUnit("1") + .build() + + val transformTimeCounter: LongCounter = meter.counterBuilder("pipeline.transform.time") + .setDescription("Transformation Processing Time") + .setUnit("ms") + .build() + + val denormStatusCounter: LongCounter = meter.counterBuilder("pipeline.denorm.status") + .setDescription("Denorm Status") + .setUnit("1") + .build() + + val denormTimeCounter: LongCounter = meter.counterBuilder("pipeline.denorm.time") + .setDescription("Denormalization Processing Time") + .setUnit("ms") + .build() + + val dedupStatusCounter: LongCounter = meter.counterBuilder("pipeline.de-dup.status") + .setDescription("De-dup Status") + .setUnit("1") + .build() + + val dedupTimeCounter: LongCounter = meter.counterBuilder("pipeline.dedup.time") + .setDescription("Deduplication Processing Time") + .setUnit("ms") + .build() + + val validatorTimeCounter: LongCounter = meter.counterBuilder("pipeline.validator.time") + .setDescription("Validator Processing Time") + .setUnit("ms") + .build() + + val validatorStatusCounter: LongCounter = meter.counterBuilder("pipeline.validator.status") + .setDescription("Validator Status") + .setUnit("1") + .build() + + val extractorStatusCounter: LongCounter = meter.counterBuilder("pipeline.extractor.status") + .setDescription("Extractor Status") + .setUnit("1") + .build() + +} + diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/otel/OTelMetricsGenerator.scala b/framework/src/main/scala/org/sunbird/obsrv/core/otel/OTelMetricsGenerator.scala index 8764e5bc..3f805793 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/otel/OTelMetricsGenerator.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/otel/OTelMetricsGenerator.scala @@ -45,76 +45,63 @@ object OTelMetricsGenerator { // Extractor Job Metrics stats.extractor_events.foreach { events => - val extractorEventCounter: LongCounter = meter.counterBuilder("pipeline.extractor.events.count").setDescription("Count of Extractor Events").setUnit("1").build() - extractorEventCounter.add(events.toLong, contextAttributes) + MetricRegistry.extractorEventCounter.add(events.toLong, contextAttributes) } stats.extractor_time.foreach { time => - val extractorTimeCounter: LongCounter = meter.counterBuilder("pipeline.extractor.time").setDescription("Extractor Processing Time").setUnit("ms").build() - extractorTimeCounter.add(time, contextAttributes) + MetricRegistry.extractorTimeCounter.add(time, contextAttributes) } - stats.extractor_status.foreach { status => - val extractorStatusCounter: LongCounter = meter.counterBuilder("pipeline.extractor.status").setDescription("Extractor Status").setUnit("1").build() - extractorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) + MetricRegistry.extractorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) } // Schema Validator Metrics stats.validator_status.foreach { status => - val validatorStatusCounter: LongCounter = meter.counterBuilder("pipeline.validator.status").setDescription("Validator Status").setUnit("1").build() - validatorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) + MetricRegistry.validatorStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) } stats.validator_time.foreach { time => - val validatorTimeCounter: LongCounter = meter.counterBuilder("pipeline.validator.time").setDescription("Validator Processing Time").setUnit("ms").build() - validatorTimeCounter.add(time, contextAttributes) + MetricRegistry.validatorTimeCounter.add(time, contextAttributes) } // De-Duplication Metrics stats.dedup_time.foreach { time => - val dedupTimeCounter: LongCounter = meter.counterBuilder("pipeline.dedup.time").setDescription("Deduplication Processing Time").setUnit("ms").build() - dedupTimeCounter.add(time, contextAttributes) + MetricRegistry.dedupTimeCounter.add(time, contextAttributes) } stats.dedup_status.foreach { status => - val dedupStatusCounter: LongCounter = meter.counterBuilder("pipeline.de-dup.status").setDescription("De-dup Status").setUnit("1").build() - dedupStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build() + MetricRegistry.dedupStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build() ) } // De-normalisation Metrics stats.denorm_time.foreach { time => - val denormTimeCounter: LongCounter = meter.counterBuilder("pipeline.denorm.time").setDescription("Denormalization Processing Time").setUnit("ms").build() - denormTimeCounter.add(time, contextAttributes) + MetricRegistry.denormTimeCounter.add(time, contextAttributes) } stats.denorm_status.foreach { status => - val denormStatusCounter: LongCounter = meter.counterBuilder("pipeline.denorm.status").setDescription("Denorm Status").setUnit("1").build() - denormStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) + MetricRegistry.denormStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) } // Data transformation Metrics stats.transform_time.foreach { time => - val transformTimeCounter: LongCounter = meter.counterBuilder("pipeline.transform.time").setDescription("Transformation Processing Time").setUnit("ms").build() - transformTimeCounter.add(time, contextAttributes) + MetricRegistry.transformTimeCounter.add(time, contextAttributes) } stats.transform_status.foreach { status => - val transformStatusCounter: LongCounter = meter.counterBuilder("pipeline.transform.status").setDescription("Data transform Status").setUnit("1").build() - transformStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) + MetricRegistry.transformStatusCounter.add(1, Attributes.builder().put("status", status.toString).putAll(contextAttributes).build()) } // Common timestamp Metrics stats.total_processing_time.foreach { time => - val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time").setDescription("Total Processing Time").setUnit("ms").build() - totalProcessingTimeCounter.add(time, contextAttributes) + //val totalProcessingTimeCounter: LongCounter = meter.counterBuilder("pipeline.total.processing.time").setDescription("Total Processing Time").setUnit("ms").build() + MetricRegistry.totalProcessingTimeCounter.add(time, contextAttributes) } stats.latency_time.foreach { time => - val latencyTimeCounter: LongCounter = meter.counterBuilder("pipeline.latency.time").setDescription("Latency Time").setUnit("ms").build() - latencyTimeCounter.add(time, contextAttributes) + MetricRegistry.latencyTimeCounter.add(time, contextAttributes) } stats.processing_time.foreach { time => - val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time").setDescription("Processing Time").setUnit("ms").build() - processingTimeCounter.add(time, contextAttributes) + //val processingTimeCounter: LongCounter = meter.counterBuilder("pipeline.processing.time").setDescription("Processing Time").setUnit("ms").build() + MetricRegistry.processingTimeCounter.add(time, contextAttributes) } }