From f00afb297e99cc3fe0bf24c4e5afb94280b41708 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 24 Feb 2024 22:12:14 +0000 Subject: [PATCH] Addressed review comments Signed-off-by: Krishna Kondaka --- .../source/otelmetrics/OTelMetricsGrpcService.java | 14 ++++++++++++-- .../source/otelmetrics/OTelMetricsSource.java | 6 ++---- .../source/otelmetrics/OTelMetricsSourceTest.java | 1 - 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index 0b8e093bc7..eaf3cf5f23 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -33,19 +33,25 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp public static final String REQUESTS_RECEIVED = "requestsReceived"; public static final String SUCCESS_REQUESTS = "successRequests"; + public static final String RECORDS_CREATED = "recordsCreated"; + public static final String RECORDS_DROPPED = "recordsDropped"; public static final String PAYLOAD_SIZE = "payloadSize"; public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; private final int bufferWriteTimeoutInMillis; + private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder; private final Buffer> buffer; private final Counter requestsReceivedCounter; private final Counter successRequestsCounter; + private final Counter recordsCreatedCounter; + private final Counter recordsDroppedCounter; private final DistributionSummary payloadSizeSummary; private final Timer requestProcessDuration; public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, + final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder, Buffer> buffer, final PluginMetrics pluginMetrics) { this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; @@ -53,8 +59,11 @@ public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); + recordsCreatedCounter = pluginMetrics.counter(RECORDS_CREATED); + recordsDroppedCounter = pluginMetrics.counter(RECORDS_DROPPED); payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE); requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); + this.oTelProtoDecoder = oTelProtoDecoder; } @Override @@ -79,10 +88,11 @@ private void processRequest(final ExportMetricsServiceRequest request, final Str buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); } else { Collection> metrics; - AtomicInteger droppedCounter = new AtomicInteger(0); - OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + AtomicInteger droppedCounter = new AtomicInteger(0); metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); + recordsDroppedCounter.increment(droppedCounter.get()); + recordsCreatedCounter.increment(metrics.size()); buffer.writeAll(metrics, bufferWriteTimeoutInMillis); } } catch (Exception e) { diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index d3c641d121..337006cc71 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -35,6 +35,7 @@ import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.health.HealthGrpcService; @@ -98,12 +99,9 @@ public void start(Buffer> buffer) { } if (server == null) { - - final int bufferWriteTimeoutInMillis = - (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8); final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), - + new OTelProtoCodec.OTelProtoDecoder(), buffer, pluginMetrics ); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index 0cef22115a..daf6ae363a 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -942,7 +942,6 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest()); assertThat(exportResponse, notNullValue()); - //final ArgumentCaptor> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class); final ArgumentCaptor>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class); verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt());