From 74d8271819c855d5c433b314e6026c62d0aa9c73 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 24 Feb 2024 01:29:49 +0000 Subject: [PATCH 1/4] Add new OTEL Metrics source that creates events Signed-off-by: Krishna Kondaka --- .../source/otelmetrics/OTelMetrics.java | 272 ++++++++++++++++++ .../otelmetrics/OTelMetricsGrpcService.java | 37 +-- .../source/otelmetrics/OTelMetricsSource.java | 17 +- 3 files changed, 295 insertions(+), 31 deletions(-) create mode 100644 data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java new file mode 100644 index 0000000000..0bcaca992e --- /dev/null +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java @@ -0,0 +1,272 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.otelmetrics; + +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.encoding.DecodingService; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.server.healthcheck.HealthCheckService; +import io.grpc.MethodDescriptor; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.protobuf.services.ProtoReflectionService; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.health.HealthGrpcService; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; +import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.exceptions.BufferWriteException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.concurrent.atomic.AtomicInteger; + +@DataPrepperPlugin(name = "otel_metrics", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class) +public class OTelMetrics implements Source> { + private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class); + private static final String HTTP_HEALTH_CHECK_PATH = "/health"; + private static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; + private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; + + private final OTelMetricsSourceConfig oTelMetricsSourceConfig; + private final String pipelineName; + private final PluginMetrics pluginMetrics; + private final GrpcAuthenticationProvider authenticationProvider; + private final CertificateProviderFactory certificateProviderFactory; + private final GrpcRequestExceptionHandler requestExceptionHandler; + private Server server; + private final ByteDecoder byteDecoder; + + @DataPrepperPluginConstructor + public OTelMetrics(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics, + final PluginFactory pluginFactory, final PipelineDescription pipelineDescription) { + this(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, new CertificateProviderFactory(oTelMetricsSourceConfig), pipelineDescription); + } + + // accessible only in the same package for unit test + OTelMetrics(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, + final CertificateProviderFactory certificateProviderFactory, final PipelineDescription pipelineDescription) { + oTelMetricsSourceConfig.validateAndInitializeCertAndKeyFileInS3(); + this.oTelMetricsSourceConfig = oTelMetricsSourceConfig; + this.pluginMetrics = pluginMetrics; + this.certificateProviderFactory = certificateProviderFactory; + this.pipelineName = pipelineDescription.getPipelineName(); + this.authenticationProvider = createAuthenticationProvider(pluginFactory); + this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); + this.byteDecoder = new OTelMetricDecoder(); + } + + @Override + public ByteDecoder getDecoder() { + return byteDecoder; + } + + @Override + public void start(Buffer> buffer) { + if (buffer == null) { + throw new IllegalStateException("Buffer provided is null"); + } + + if (server == null) { + final int bufferWriteTimeoutInMillis = + (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8); + + final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( + (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), + request -> { + try { + if (buffer.isByteBuffer()) { + buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); + } else { + Collection> metrics; + AtomicInteger droppedCounter = new AtomicInteger(0); + + OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); + buffer.writeAll(metrics, bufferWriteTimeoutInMillis); + } + } catch (Exception e) { + LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e); + throw new BufferWriteException(e.getMessage(), e); + } + }, + pluginMetrics + ); + + final List serverInterceptors = getAuthenticationInterceptor(); + + final GrpcServiceBuilder grpcServiceBuilder = GrpcService + .builder() + .useClientTimeoutHeader(false) + .useBlockingTaskExecutor(true) + .exceptionMapping(requestExceptionHandler); + + final MethodDescriptor methodDescriptor = MetricsServiceGrpc.getExportMethod(); + final String oTelMetricsSourcePath = oTelMetricsSourceConfig.getPath(); + if (oTelMetricsSourcePath != null) { + final String transformedOTelMetricsSourcePath = oTelMetricsSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); + grpcServiceBuilder.addService(transformedOTelMetricsSourcePath, + ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors), methodDescriptor); + } else { + grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors)); + } + + if (oTelMetricsSourceConfig.hasHealthCheck()) { + LOG.info("Health check is enabled"); + grpcServiceBuilder.addService(new HealthGrpcService()); + } + + if (oTelMetricsSourceConfig.hasProtoReflectionService()) { + LOG.info("Proto reflection service is enabled"); + grpcServiceBuilder.addService(ProtoReflectionService.newInstance()); + } + + grpcServiceBuilder.enableUnframedRequests(oTelMetricsSourceConfig.enableUnframedRequests()); + + final ServerBuilder sb = Server.builder(); + sb.disableServerHeader(); + if (CompressionOption.NONE.equals(oTelMetricsSourceConfig.getCompression())) { + sb.service(grpcServiceBuilder.build()); + } else { + sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); + } + + if(oTelMetricsSourceConfig.enableHttpHealthCheck()) { + sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); + } + + if(oTelMetricsSourceConfig.getAuthentication() != null) { + final Optional> optionalHttpAuthenticationService = + authenticationProvider.getHttpAuthenticationService(); + + if(oTelMetricsSourceConfig.isUnauthenticatedHealthCheck()) { + optionalHttpAuthenticationService.ifPresent(httpAuthenticationService -> + sb.decorator(REGEX_HEALTH, httpAuthenticationService)); + } else { + optionalHttpAuthenticationService.ifPresent(sb::decorator); + } + } + + sb.requestTimeoutMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis()); + if(oTelMetricsSourceConfig.getMaxRequestLength() != null) { + sb.maxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength().getBytes()); + } + + // ACM Cert for SSL takes preference + if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) { + LOG.info("SSL/TLS is enabled."); + final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); + final Certificate certificate = certificateProvider.getCertificate(); + sb.https(oTelMetricsSourceConfig.getPort()).tls( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) + ) + ); + } else { + LOG.warn("Creating otel_metrics_source without SSL/TLS. This is not secure."); + LOG.warn("In order to set up TLS for the otel_metrics_source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#ssl"); + sb.http(oTelMetricsSourceConfig.getPort()); + } + + sb.maxNumConnections(oTelMetricsSourceConfig.getMaxConnectionCount()); + sb.blockingTaskExecutor( + Executors.newScheduledThreadPool(oTelMetricsSourceConfig.getThreadCount()), + true); + + server = sb.build(); + } + try { + server.start().get(); + } catch (ExecutionException ex) { + if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } else { + throw new RuntimeException(ex); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); + } + LOG.info("Started otel_metrics_source..."); + } + + @Override + public void stop() { + if (server != null) { + try { + server.stop().get(); + } catch (ExecutionException ex) { + if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } else { + throw new RuntimeException(ex); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); + } + } + LOG.info("Stopped otel_metrics_source."); + } + + private List getAuthenticationInterceptor() { + final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); + if (authenticationInterceptor == null) { + return Collections.emptyList(); + } + return Collections.singletonList(authenticationInterceptor); + } + + private GrpcAuthenticationProvider createAuthenticationProvider(final PluginFactory pluginFactory) { + final PluginModel authenticationConfiguration = oTelMetricsSourceConfig.getAuthentication(); + + if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals(GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME)) { + LOG.warn("Creating otel-metrics-source without authentication. This is not secure."); + LOG.warn("In order to set up Http Basic authentication for the otel-metrics-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#authentication-configurations"); + } + + final PluginSetting authenticationPluginSetting; + if (authenticationConfiguration != null) { + authenticationPluginSetting = new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()); + } else { + authenticationPluginSetting = new PluginSetting(GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap()); + } + authenticationPluginSetting.setPipelineName(pipelineName); + return pluginFactory.loadPlugin(GrpcAuthenticationProvider.class, authenticationPluginSetting); + } +} diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index b4c45e5a05..2fe60687c1 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -14,14 +14,13 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; -import org.opensearch.dataprepper.exceptions.BufferWriteException; import org.opensearch.dataprepper.exceptions.RequestCancelledException; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Consumer; + public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImplBase { private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsGrpcService.class); @@ -31,7 +30,7 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; private final int bufferWriteTimeoutInMillis; - private final Buffer> buffer; + private final Consumer consumer; private final Counter requestsReceivedCounter; private final Counter successRequestsCounter; @@ -40,10 +39,11 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, - Buffer> buffer, + + final Consumer consumer, final PluginMetrics pluginMetrics) { this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; - this.buffer = buffer; + this.consumer = consumer; requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); @@ -51,15 +51,6 @@ public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); } - public void rawExport(final ExportMetricsServiceRequest request) { - try { - if (buffer.isByteBuffer()) { - buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); - } - } catch (Exception e) { - } - } - @Override public void export(final ExportMetricsServiceRequest request, final StreamObserver responseObserver) { requestsReceivedCounter.increment(); @@ -77,21 +68,7 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv } private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver responseObserver) { - try { - if (buffer.isByteBuffer()) { - buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); - } else { - buffer.write(new Record<>(request), bufferWriteTimeoutInMillis); - } - } catch (Exception e) { - if (ServiceRequestContext.current().isTimedOut()) { - LOG.warn("Exception writing to buffer but request already timed out.", e); - return; - } - - LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e); - throw new BufferWriteException(e.getMessage(), e); - } + consumer.accept(request); if (ServiceRequestContext.current().isTimedOut()) { LOG.warn("Buffer write completed successfully but request already timed out."); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 85e6982e23..05cebb8f67 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -38,6 +38,7 @@ import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.health.HealthGrpcService; import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.exceptions.BufferWriteException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,9 +99,23 @@ public void start(Buffer> buffer) { if (server == null) { + final int bufferWriteTimeoutInMillis = + (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8); final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), - buffer, + + request -> { + try { + if (buffer.isByteBuffer()) { + buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); + } else { + buffer.write(new Record<>(request), bufferWriteTimeoutInMillis); + } + } catch (Exception e) { + LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e); + throw new BufferWriteException(e.getMessage(), e); + } + }, pluginMetrics ); From ca0cfe849e3c7afb8ae8aadc8a28efd4e6a29718 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 24 Feb 2024 07:54:59 +0000 Subject: [PATCH 2/4] Modified to replace existing processor with new functionality where new events are created in the source Signed-off-by: Krishna Kondaka --- .../source/otelmetrics/OTelMetrics.java | 272 ------------------ .../otelmetrics/OTelMetricsGrpcService.java | 37 ++- .../source/otelmetrics/OTelMetricsSource.java | 19 +- .../OTelMetricsGrpcServiceTest.java | 27 +- .../otelmetrics/OTelMetricsSourceTest.java | 45 ++- 5 files changed, 89 insertions(+), 311 deletions(-) delete mode 100644 data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java deleted file mode 100644 index 0bcaca992e..0000000000 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetrics.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.otelmetrics; - -import com.linecorp.armeria.server.HttpService; -import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.encoding.DecodingService; -import com.linecorp.armeria.server.grpc.GrpcService; -import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; -import com.linecorp.armeria.server.healthcheck.HealthCheckService; -import io.grpc.MethodDescriptor; -import io.grpc.ServerInterceptor; -import io.grpc.ServerInterceptors; -import io.grpc.protobuf.services.ProtoReflectionService; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; -import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; -import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; -import org.opensearch.dataprepper.GrpcRequestExceptionHandler; -import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.metric.Metric; -import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.model.codec.ByteDecoder; -import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; -import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; -import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; -import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory; -import org.opensearch.dataprepper.exceptions.BufferWriteException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.function.Function; -import java.util.concurrent.atomic.AtomicInteger; - -@DataPrepperPlugin(name = "otel_metrics", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class) -public class OTelMetrics implements Source> { - private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class); - private static final String HTTP_HEALTH_CHECK_PATH = "/health"; - private static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; - private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; - - private final OTelMetricsSourceConfig oTelMetricsSourceConfig; - private final String pipelineName; - private final PluginMetrics pluginMetrics; - private final GrpcAuthenticationProvider authenticationProvider; - private final CertificateProviderFactory certificateProviderFactory; - private final GrpcRequestExceptionHandler requestExceptionHandler; - private Server server; - private final ByteDecoder byteDecoder; - - @DataPrepperPluginConstructor - public OTelMetrics(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics, - final PluginFactory pluginFactory, final PipelineDescription pipelineDescription) { - this(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, new CertificateProviderFactory(oTelMetricsSourceConfig), pipelineDescription); - } - - // accessible only in the same package for unit test - OTelMetrics(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, - final CertificateProviderFactory certificateProviderFactory, final PipelineDescription pipelineDescription) { - oTelMetricsSourceConfig.validateAndInitializeCertAndKeyFileInS3(); - this.oTelMetricsSourceConfig = oTelMetricsSourceConfig; - this.pluginMetrics = pluginMetrics; - this.certificateProviderFactory = certificateProviderFactory; - this.pipelineName = pipelineDescription.getPipelineName(); - this.authenticationProvider = createAuthenticationProvider(pluginFactory); - this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); - this.byteDecoder = new OTelMetricDecoder(); - } - - @Override - public ByteDecoder getDecoder() { - return byteDecoder; - } - - @Override - public void start(Buffer> buffer) { - if (buffer == null) { - throw new IllegalStateException("Buffer provided is null"); - } - - if (server == null) { - final int bufferWriteTimeoutInMillis = - (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8); - - final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( - (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), - request -> { - try { - if (buffer.isByteBuffer()) { - buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); - } else { - Collection> metrics; - AtomicInteger droppedCounter = new AtomicInteger(0); - - OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); - metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); - buffer.writeAll(metrics, bufferWriteTimeoutInMillis); - } - } catch (Exception e) { - LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e); - throw new BufferWriteException(e.getMessage(), e); - } - }, - pluginMetrics - ); - - final List serverInterceptors = getAuthenticationInterceptor(); - - final GrpcServiceBuilder grpcServiceBuilder = GrpcService - .builder() - .useClientTimeoutHeader(false) - .useBlockingTaskExecutor(true) - .exceptionMapping(requestExceptionHandler); - - final MethodDescriptor methodDescriptor = MetricsServiceGrpc.getExportMethod(); - final String oTelMetricsSourcePath = oTelMetricsSourceConfig.getPath(); - if (oTelMetricsSourcePath != null) { - final String transformedOTelMetricsSourcePath = oTelMetricsSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); - grpcServiceBuilder.addService(transformedOTelMetricsSourcePath, - ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors), methodDescriptor); - } else { - grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors)); - } - - if (oTelMetricsSourceConfig.hasHealthCheck()) { - LOG.info("Health check is enabled"); - grpcServiceBuilder.addService(new HealthGrpcService()); - } - - if (oTelMetricsSourceConfig.hasProtoReflectionService()) { - LOG.info("Proto reflection service is enabled"); - grpcServiceBuilder.addService(ProtoReflectionService.newInstance()); - } - - grpcServiceBuilder.enableUnframedRequests(oTelMetricsSourceConfig.enableUnframedRequests()); - - final ServerBuilder sb = Server.builder(); - sb.disableServerHeader(); - if (CompressionOption.NONE.equals(oTelMetricsSourceConfig.getCompression())) { - sb.service(grpcServiceBuilder.build()); - } else { - sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); - } - - if(oTelMetricsSourceConfig.enableHttpHealthCheck()) { - sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); - } - - if(oTelMetricsSourceConfig.getAuthentication() != null) { - final Optional> optionalHttpAuthenticationService = - authenticationProvider.getHttpAuthenticationService(); - - if(oTelMetricsSourceConfig.isUnauthenticatedHealthCheck()) { - optionalHttpAuthenticationService.ifPresent(httpAuthenticationService -> - sb.decorator(REGEX_HEALTH, httpAuthenticationService)); - } else { - optionalHttpAuthenticationService.ifPresent(sb::decorator); - } - } - - sb.requestTimeoutMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis()); - if(oTelMetricsSourceConfig.getMaxRequestLength() != null) { - sb.maxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength().getBytes()); - } - - // ACM Cert for SSL takes preference - if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) { - LOG.info("SSL/TLS is enabled."); - final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); - final Certificate certificate = certificateProvider.getCertificate(); - sb.https(oTelMetricsSourceConfig.getPort()).tls( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), - new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) - ) - ); - } else { - LOG.warn("Creating otel_metrics_source without SSL/TLS. This is not secure."); - LOG.warn("In order to set up TLS for the otel_metrics_source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#ssl"); - sb.http(oTelMetricsSourceConfig.getPort()); - } - - sb.maxNumConnections(oTelMetricsSourceConfig.getMaxConnectionCount()); - sb.blockingTaskExecutor( - Executors.newScheduledThreadPool(oTelMetricsSourceConfig.getThreadCount()), - true); - - server = sb.build(); - } - try { - server.start().get(); - } catch (ExecutionException ex) { - if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { - throw (RuntimeException) ex.getCause(); - } else { - throw new RuntimeException(ex); - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ex); - } - LOG.info("Started otel_metrics_source..."); - } - - @Override - public void stop() { - if (server != null) { - try { - server.stop().get(); - } catch (ExecutionException ex) { - if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { - throw (RuntimeException) ex.getCause(); - } else { - throw new RuntimeException(ex); - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ex); - } - } - LOG.info("Stopped otel_metrics_source."); - } - - private List getAuthenticationInterceptor() { - final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); - if (authenticationInterceptor == null) { - return Collections.emptyList(); - } - return Collections.singletonList(authenticationInterceptor); - } - - private GrpcAuthenticationProvider createAuthenticationProvider(final PluginFactory pluginFactory) { - final PluginModel authenticationConfiguration = oTelMetricsSourceConfig.getAuthentication(); - - if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals(GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME)) { - LOG.warn("Creating otel-metrics-source without authentication. This is not secure."); - LOG.warn("In order to set up Http Basic authentication for the otel-metrics-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#authentication-configurations"); - } - - final PluginSetting authenticationPluginSetting; - if (authenticationConfiguration != null) { - authenticationPluginSetting = new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()); - } else { - authenticationPluginSetting = new PluginSetting(GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap()); - } - authenticationPluginSetting.setPipelineName(pipelineName); - return pluginFactory.loadPlugin(GrpcAuthenticationProvider.class, authenticationPluginSetting); - } -} diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index 2fe60687c1..0b8e093bc7 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -14,12 +14,19 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import org.opensearch.dataprepper.exceptions.BufferWriteException; import org.opensearch.dataprepper.exceptions.RequestCancelledException; +import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.Consumer; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImplBase { private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsGrpcService.class); @@ -30,7 +37,7 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; private final int bufferWriteTimeoutInMillis; - private final Consumer consumer; + private final Buffer> buffer; private final Counter requestsReceivedCounter; private final Counter successRequestsCounter; @@ -39,11 +46,10 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, - - final Consumer consumer, + Buffer> buffer, final PluginMetrics pluginMetrics) { this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; - this.consumer = consumer; + this.buffer = buffer; requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); @@ -68,7 +74,26 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv } private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver responseObserver) { - consumer.accept(request); + try { + if (buffer.isByteBuffer()) { + buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); + } else { + Collection> metrics; + AtomicInteger droppedCounter = new AtomicInteger(0); + + OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); + buffer.writeAll(metrics, bufferWriteTimeoutInMillis); + } + } catch (Exception e) { + if (ServiceRequestContext.current().isTimedOut()) { + LOG.warn("Exception writing to buffer but request already timed out.", e); + return; + } + + LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e); + throw new BufferWriteException(e.getMessage(), e); + } if (ServiceRequestContext.current().isTimedOut()) { LOG.warn("Buffer write completed successfully but request already timed out."); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 05cebb8f67..d3c641d121 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -31,6 +31,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; @@ -38,7 +39,6 @@ import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.health.HealthGrpcService; import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory; -import org.opensearch.dataprepper.exceptions.BufferWriteException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,7 @@ import java.util.function.Function; @DataPrepperPlugin(name = "otel_metrics_source", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class) -public class OTelMetricsSource implements Source> { +public class OTelMetricsSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class); private static final String HTTP_HEALTH_CHECK_PATH = "/health"; private static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; @@ -92,7 +92,7 @@ public ByteDecoder getDecoder() { } @Override - public void start(Buffer> buffer) { + public void start(Buffer> buffer) { if (buffer == null) { throw new IllegalStateException("Buffer provided is null"); } @@ -104,18 +104,7 @@ public void start(Buffer> buffer) { final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), - request -> { - try { - if (buffer.isByteBuffer()) { - buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); - } else { - buffer.write(new Record<>(request), bufferWriteTimeoutInMillis); - } - } catch (Exception e) { - LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e); - throw new BufferWriteException(e.getMessage(), e); - } - }, + buffer, pluginMetrics ); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java index f257f64f83..d7a46be0fb 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java @@ -29,17 +29,20 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.hamcrest.Matchers.hasEntry; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -48,15 +51,20 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.Gauge; @ExtendWith(MockitoExtension.class) public class OTelMetricsGrpcServiceTest { + private static NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4); + private static Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build(); private static final ExportMetricsServiceRequest METRICS_REQUEST = ExportMetricsServiceRequest.newBuilder() .addResourceMetrics(ResourceMetrics.newBuilder() .addInstrumentationLibraryMetrics(InstrumentationLibraryMetrics.newBuilder() - .addMetrics(Metric.newBuilder().build()) + .addMetrics(Metric.newBuilder().setGauge(gauge).setUnit("seconds").setName("name").build()) .build())).build(); + private static Map expectedMetric = Map.of("unit", (Object)"seconds", "name", (Object)"name", "kind", (Object)"GAUGE"); private static PluginSetting pluginSetting; private final int bufferWriteTimeoutInMillis = 100000; @@ -76,7 +84,7 @@ public class OTelMetricsGrpcServiceTest { private ServiceRequestContext serviceRequestContext; @Captor - private ArgumentCaptor recordCaptor; + private ArgumentCaptor> recordCaptor; @Captor ArgumentCaptor bytesCaptor; @@ -111,7 +119,7 @@ public void export_Success_responseObserverOnCompleted() throws Exception { sut.export(METRICS_REQUEST, responseObserver); } - verify(buffer, times(1)).write(recordCaptor.capture(), anyInt()); + verify(buffer, times(1)).writeAll(recordCaptor.capture(), anyInt()); verify(responseObserver, times(1)).onNext(ExportMetricsServiceResponse.newBuilder().build()); verify(responseObserver, times(1)).onCompleted(); verify(requestsReceivedCounter, times(1)).increment(); @@ -122,8 +130,11 @@ public void export_Success_responseObserverOnCompleted() throws Exception { assertThat(payloadLengthCaptor.getValue().intValue(), equalTo(METRICS_REQUEST.getSerializedSize())); verify(requestProcessDuration, times(1)).record(ArgumentMatchers.any()); - Record capturedRecord = recordCaptor.getValue(); - assertEquals(METRICS_REQUEST, capturedRecord.getData()); + Collection capturedRecords = recordCaptor.getValue(); + Record capturedRecord = (Record)(capturedRecords.toArray()[0]); + Map map = ((Event)capturedRecord.getData()).toMap(); + + expectedMetric.forEach((k, v) -> assertThat(map, hasEntry((String)k, (Object)v))); } @Test @@ -151,14 +162,14 @@ public void export_Success_with_ByteBuffer_responseObserverOnCompleted() throws @Test public void export_BufferTimeout_responseObserverOnError() throws Exception { - doThrow(new TimeoutException()).when(buffer).write(any(Record.class), anyInt()); + doThrow(new TimeoutException()).when(buffer).writeAll(any(Collection.class), anyInt()); try (MockedStatic mockedStatic = mockStatic(ServiceRequestContext.class)) { mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext); assertThrows(BufferWriteException.class, () -> sut.export(METRICS_REQUEST, responseObserver)); } - verify(buffer, times(1)).write(any(Record.class), anyInt()); + verify(buffer, times(1)).writeAll(any(Collection.class), anyInt()); verifyNoInteractions(responseObserver); verify(requestsReceivedCounter, times(1)).increment(); verifyNoInteractions(successRequestsCounter); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index 66cab56203..0cef22115a 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -34,6 +34,11 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; import io.opentelemetry.proto.common.v1.AnyValue; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; @@ -62,6 +67,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; @@ -79,6 +85,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -163,7 +170,7 @@ class OTelMetricsSourceTest { private OTelMetricsSourceConfig oTelMetricsSourceConfig; @Mock - private BlockingBuffer> buffer; + private BlockingBuffer> buffer; @Mock private HttpBasicAuthenticationConfig httpBasicAuthenticationConfig; @@ -901,12 +908,12 @@ void gRPC_request_writes_to_buffer_with_successful_response() throws Exception { final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest()); assertThat(exportResponse, notNullValue()); - final ArgumentCaptor> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(buffer).write(bufferWriteArgumentCaptor.capture(), anyInt()); + final ArgumentCaptor>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class); + verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt()); - final Record actualBufferWrites = bufferWriteArgumentCaptor.getValue(); + final Collection> actualBufferWrites = bufferWriteArgumentCaptor.getValue(); assertThat(actualBufferWrites, notNullValue()); - assertThat(actualBufferWrites.getData().getResourceMetricsCount(), equalTo(1)); + assertThat(actualBufferWrites.size(), equalTo(1)); } @Test @@ -935,12 +942,13 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest()); assertThat(exportResponse, notNullValue()); - final ArgumentCaptor> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(buffer).write(bufferWriteArgumentCaptor.capture(), anyInt()); + //final ArgumentCaptor> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class); + final ArgumentCaptor>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class); + verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt()); - final Record actualBufferWrites = bufferWriteArgumentCaptor.getValue(); + final Collection> actualBufferWrites = bufferWriteArgumentCaptor.getValue(); assertThat(actualBufferWrites, notNullValue()); - assertThat(actualBufferWrites.getData().getResourceMetricsCount(), equalTo(1)); + assertThat(actualBufferWrites.size(), equalTo(1)); } @Test @@ -971,7 +979,7 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer( doThrow(bufferExceptionClass) .when(buffer) - .write(any(Record.class), anyInt()); + .writeAll(any(Collection.class), anyInt()); final ExportMetricsServiceRequest exportMetricsRequest = createExportMetricsRequest(); final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(exportMetricsRequest)); @@ -1014,9 +1022,26 @@ private ExportMetricsServiceRequest createExportMetricsRequest() { .setKey("service.name") .setValue(AnyValue.newBuilder().setStringValue("service").build()) ).build(); + NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4); + Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build(); + + io.opentelemetry.proto.metrics.v1.Metric.Builder metric = io.opentelemetry.proto.metrics.v1.Metric.newBuilder() + .setGauge(gauge) + .setUnit("seconds") + .setName("name") + .setDescription("description"); + InstrumentationLibraryMetrics isntLib = InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metric) + .setInstrumentationLibrary(InstrumentationLibrary.newBuilder() + .setName("ilname") + .setVersion("ilversion") + .build()) + .build(); + final ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder() .setResource(resource) + .addInstrumentationLibraryMetrics(isntLib) .build(); return ExportMetricsServiceRequest.newBuilder() From f00afb297e99cc3fe0bf24c4e5afb94280b41708 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 24 Feb 2024 22:12:14 +0000 Subject: [PATCH 3/4] Addressed review comments Signed-off-by: Krishna Kondaka --- .../source/otelmetrics/OTelMetricsGrpcService.java | 14 ++++++++++++-- .../source/otelmetrics/OTelMetricsSource.java | 6 ++---- .../source/otelmetrics/OTelMetricsSourceTest.java | 1 - 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index 0b8e093bc7..eaf3cf5f23 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -33,19 +33,25 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp public static final String REQUESTS_RECEIVED = "requestsReceived"; public static final String SUCCESS_REQUESTS = "successRequests"; + public static final String RECORDS_CREATED = "recordsCreated"; + public static final String RECORDS_DROPPED = "recordsDropped"; public static final String PAYLOAD_SIZE = "payloadSize"; public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; private final int bufferWriteTimeoutInMillis; + private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder; private final Buffer> buffer; private final Counter requestsReceivedCounter; private final Counter successRequestsCounter; + private final Counter recordsCreatedCounter; + private final Counter recordsDroppedCounter; private final DistributionSummary payloadSizeSummary; private final Timer requestProcessDuration; public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, + final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder, Buffer> buffer, final PluginMetrics pluginMetrics) { this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; @@ -53,8 +59,11 @@ public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); + recordsCreatedCounter = pluginMetrics.counter(RECORDS_CREATED); + recordsDroppedCounter = pluginMetrics.counter(RECORDS_DROPPED); payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE); requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); + this.oTelProtoDecoder = oTelProtoDecoder; } @Override @@ -79,10 +88,11 @@ private void processRequest(final ExportMetricsServiceRequest request, final Str buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); } else { Collection> metrics; - AtomicInteger droppedCounter = new AtomicInteger(0); - OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + AtomicInteger droppedCounter = new AtomicInteger(0); metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); + recordsDroppedCounter.increment(droppedCounter.get()); + recordsCreatedCounter.increment(metrics.size()); buffer.writeAll(metrics, bufferWriteTimeoutInMillis); } } catch (Exception e) { diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index d3c641d121..337006cc71 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -35,6 +35,7 @@ import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.health.HealthGrpcService; @@ -98,12 +99,9 @@ public void start(Buffer> buffer) { } if (server == null) { - - final int bufferWriteTimeoutInMillis = - (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8); final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), - + new OTelProtoCodec.OTelProtoDecoder(), buffer, pluginMetrics ); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index 0cef22115a..daf6ae363a 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -942,7 +942,6 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest()); assertThat(exportResponse, notNullValue()); - //final ArgumentCaptor> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class); final ArgumentCaptor>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class); verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt()); From 37e1664a59dab1c2b2152700d89ccc01396ad390 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sun, 25 Feb 2024 02:47:34 +0000 Subject: [PATCH 4/4] Fixed failing tests Signed-off-by: Krishna Kondaka --- .../source/otelmetrics/OTelMetricsGrpcServiceTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java index d7a46be0fb..be5c1c817d 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java @@ -13,6 +13,7 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import org.junit.jupiter.api.BeforeEach; @@ -73,6 +74,10 @@ public class OTelMetricsGrpcServiceTest { @Mock private Counter successRequestsCounter; @Mock + private Counter droppedCounter; + @Mock + private Counter createdCounter; + @Mock private DistributionSummary payloadSize; @Mock private Timer requestProcessDuration; @@ -100,6 +105,8 @@ public void setup() { when(mockPluginMetrics.counter(OTelMetricsGrpcService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter); when(mockPluginMetrics.counter(OTelMetricsGrpcService.SUCCESS_REQUESTS)).thenReturn(successRequestsCounter); + when(mockPluginMetrics.counter(OTelMetricsGrpcService.RECORDS_CREATED)).thenReturn(createdCounter); + when(mockPluginMetrics.counter(OTelMetricsGrpcService.RECORDS_DROPPED)).thenReturn(droppedCounter); when(mockPluginMetrics.summary(OTelMetricsGrpcService.PAYLOAD_SIZE)).thenReturn(payloadSize); when(mockPluginMetrics.timer(OTelMetricsGrpcService.REQUEST_PROCESS_DURATION)).thenReturn(requestProcessDuration); doAnswer(invocation -> { @@ -109,7 +116,7 @@ public void setup() { when(serviceRequestContext.isTimedOut()).thenReturn(false); - sut = new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, buffer, mockPluginMetrics); + sut = new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, new OTelProtoCodec.OTelProtoDecoder(), buffer, mockPluginMetrics); } @Test