diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index b4c45e5a05..eaf3cf5f23 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -16,48 +16,54 @@ import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; import org.opensearch.dataprepper.exceptions.BufferWriteException; import org.opensearch.dataprepper.exceptions.RequestCancelledException; +import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImplBase { private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsGrpcService.class); public static final String REQUESTS_RECEIVED = "requestsReceived"; public static final String SUCCESS_REQUESTS = "successRequests"; + public static final String RECORDS_CREATED = "recordsCreated"; + public static final String RECORDS_DROPPED = "recordsDropped"; public static final String PAYLOAD_SIZE = "payloadSize"; public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; private final int bufferWriteTimeoutInMillis; - private final Buffer> buffer; + private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder; + private final Buffer> buffer; private final Counter requestsReceivedCounter; private final Counter successRequestsCounter; + private final Counter recordsCreatedCounter; + private final Counter recordsDroppedCounter; private final DistributionSummary payloadSizeSummary; private final Timer requestProcessDuration; public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, - Buffer> buffer, + final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder, + Buffer> buffer, final PluginMetrics pluginMetrics) { this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; this.buffer = buffer; requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); + recordsCreatedCounter = pluginMetrics.counter(RECORDS_CREATED); + recordsDroppedCounter = pluginMetrics.counter(RECORDS_DROPPED); payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE); requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); - } - - public void rawExport(final ExportMetricsServiceRequest request) { - try { - if (buffer.isByteBuffer()) { - buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); - } - } catch (Exception e) { - } + this.oTelProtoDecoder = oTelProtoDecoder; } @Override @@ -81,7 +87,13 @@ private void processRequest(final ExportMetricsServiceRequest request, final Str if (buffer.isByteBuffer()) { buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); } else { - buffer.write(new Record<>(request), bufferWriteTimeoutInMillis); + Collection> metrics; + + AtomicInteger droppedCounter = new AtomicInteger(0); + metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); + recordsDroppedCounter.increment(droppedCounter.get()); + recordsCreatedCounter.increment(metrics.size()); + buffer.writeAll(metrics, bufferWriteTimeoutInMillis); } } catch (Exception e) { if (ServiceRequestContext.current().isTimedOut()) { diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 85e6982e23..337006cc71 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -31,9 +31,11 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.health.HealthGrpcService; @@ -51,7 +53,7 @@ import java.util.function.Function; @DataPrepperPlugin(name = "otel_metrics_source", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class) -public class OTelMetricsSource implements Source> { +public class OTelMetricsSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class); private static final String HTTP_HEALTH_CHECK_PATH = "/health"; private static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; @@ -91,15 +93,15 @@ public ByteDecoder getDecoder() { } @Override - public void start(Buffer> buffer) { + public void start(Buffer> buffer) { if (buffer == null) { throw new IllegalStateException("Buffer provided is null"); } if (server == null) { - final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService( (int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8), + new OTelProtoCodec.OTelProtoDecoder(), buffer, pluginMetrics ); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java index f257f64f83..be5c1c817d 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java @@ -13,6 +13,7 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import org.junit.jupiter.api.BeforeEach; @@ -29,17 +30,20 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.hamcrest.Matchers.hasEntry; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -48,15 +52,20 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.Gauge; @ExtendWith(MockitoExtension.class) public class OTelMetricsGrpcServiceTest { + private static NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4); + private static Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build(); private static final ExportMetricsServiceRequest METRICS_REQUEST = ExportMetricsServiceRequest.newBuilder() .addResourceMetrics(ResourceMetrics.newBuilder() .addInstrumentationLibraryMetrics(InstrumentationLibraryMetrics.newBuilder() - .addMetrics(Metric.newBuilder().build()) + .addMetrics(Metric.newBuilder().setGauge(gauge).setUnit("seconds").setName("name").build()) .build())).build(); + private static Map expectedMetric = Map.of("unit", (Object)"seconds", "name", (Object)"name", "kind", (Object)"GAUGE"); private static PluginSetting pluginSetting; private final int bufferWriteTimeoutInMillis = 100000; @@ -65,6 +74,10 @@ public class OTelMetricsGrpcServiceTest { @Mock private Counter successRequestsCounter; @Mock + private Counter droppedCounter; + @Mock + private Counter createdCounter; + @Mock private DistributionSummary payloadSize; @Mock private Timer requestProcessDuration; @@ -76,7 +89,7 @@ public class OTelMetricsGrpcServiceTest { private ServiceRequestContext serviceRequestContext; @Captor - private ArgumentCaptor recordCaptor; + private ArgumentCaptor> recordCaptor; @Captor ArgumentCaptor bytesCaptor; @@ -92,6 +105,8 @@ public void setup() { when(mockPluginMetrics.counter(OTelMetricsGrpcService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter); when(mockPluginMetrics.counter(OTelMetricsGrpcService.SUCCESS_REQUESTS)).thenReturn(successRequestsCounter); + when(mockPluginMetrics.counter(OTelMetricsGrpcService.RECORDS_CREATED)).thenReturn(createdCounter); + when(mockPluginMetrics.counter(OTelMetricsGrpcService.RECORDS_DROPPED)).thenReturn(droppedCounter); when(mockPluginMetrics.summary(OTelMetricsGrpcService.PAYLOAD_SIZE)).thenReturn(payloadSize); when(mockPluginMetrics.timer(OTelMetricsGrpcService.REQUEST_PROCESS_DURATION)).thenReturn(requestProcessDuration); doAnswer(invocation -> { @@ -101,7 +116,7 @@ public void setup() { when(serviceRequestContext.isTimedOut()).thenReturn(false); - sut = new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, buffer, mockPluginMetrics); + sut = new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, new OTelProtoCodec.OTelProtoDecoder(), buffer, mockPluginMetrics); } @Test @@ -111,7 +126,7 @@ public void export_Success_responseObserverOnCompleted() throws Exception { sut.export(METRICS_REQUEST, responseObserver); } - verify(buffer, times(1)).write(recordCaptor.capture(), anyInt()); + verify(buffer, times(1)).writeAll(recordCaptor.capture(), anyInt()); verify(responseObserver, times(1)).onNext(ExportMetricsServiceResponse.newBuilder().build()); verify(responseObserver, times(1)).onCompleted(); verify(requestsReceivedCounter, times(1)).increment(); @@ -122,8 +137,11 @@ public void export_Success_responseObserverOnCompleted() throws Exception { assertThat(payloadLengthCaptor.getValue().intValue(), equalTo(METRICS_REQUEST.getSerializedSize())); verify(requestProcessDuration, times(1)).record(ArgumentMatchers.any()); - Record capturedRecord = recordCaptor.getValue(); - assertEquals(METRICS_REQUEST, capturedRecord.getData()); + Collection capturedRecords = recordCaptor.getValue(); + Record capturedRecord = (Record)(capturedRecords.toArray()[0]); + Map map = ((Event)capturedRecord.getData()).toMap(); + + expectedMetric.forEach((k, v) -> assertThat(map, hasEntry((String)k, (Object)v))); } @Test @@ -151,14 +169,14 @@ public void export_Success_with_ByteBuffer_responseObserverOnCompleted() throws @Test public void export_BufferTimeout_responseObserverOnError() throws Exception { - doThrow(new TimeoutException()).when(buffer).write(any(Record.class), anyInt()); + doThrow(new TimeoutException()).when(buffer).writeAll(any(Collection.class), anyInt()); try (MockedStatic mockedStatic = mockStatic(ServiceRequestContext.class)) { mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext); assertThrows(BufferWriteException.class, () -> sut.export(METRICS_REQUEST, responseObserver)); } - verify(buffer, times(1)).write(any(Record.class), anyInt()); + verify(buffer, times(1)).writeAll(any(Collection.class), anyInt()); verifyNoInteractions(responseObserver); verify(requestsReceivedCounter, times(1)).increment(); verifyNoInteractions(successRequestsCounter); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index 66cab56203..daf6ae363a 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -34,6 +34,11 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; + +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics; import io.opentelemetry.proto.common.v1.AnyValue; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; @@ -62,6 +67,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; @@ -79,6 +85,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -163,7 +170,7 @@ class OTelMetricsSourceTest { private OTelMetricsSourceConfig oTelMetricsSourceConfig; @Mock - private BlockingBuffer> buffer; + private BlockingBuffer> buffer; @Mock private HttpBasicAuthenticationConfig httpBasicAuthenticationConfig; @@ -901,12 +908,12 @@ void gRPC_request_writes_to_buffer_with_successful_response() throws Exception { final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest()); assertThat(exportResponse, notNullValue()); - final ArgumentCaptor> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(buffer).write(bufferWriteArgumentCaptor.capture(), anyInt()); + final ArgumentCaptor>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class); + verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt()); - final Record actualBufferWrites = bufferWriteArgumentCaptor.getValue(); + final Collection> actualBufferWrites = bufferWriteArgumentCaptor.getValue(); assertThat(actualBufferWrites, notNullValue()); - assertThat(actualBufferWrites.getData().getResourceMetricsCount(), equalTo(1)); + assertThat(actualBufferWrites.size(), equalTo(1)); } @Test @@ -935,12 +942,12 @@ void gRPC_with_auth_request_writes_to_buffer_with_successful_response() throws E final ExportMetricsServiceResponse exportResponse = client.export(createExportMetricsRequest()); assertThat(exportResponse, notNullValue()); - final ArgumentCaptor> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(buffer).write(bufferWriteArgumentCaptor.capture(), anyInt()); + final ArgumentCaptor>> bufferWriteArgumentCaptor = ArgumentCaptor.forClass(Collection.class); + verify(buffer).writeAll(bufferWriteArgumentCaptor.capture(), anyInt()); - final Record actualBufferWrites = bufferWriteArgumentCaptor.getValue(); + final Collection> actualBufferWrites = bufferWriteArgumentCaptor.getValue(); assertThat(actualBufferWrites, notNullValue()); - assertThat(actualBufferWrites.getData().getResourceMetricsCount(), equalTo(1)); + assertThat(actualBufferWrites.size(), equalTo(1)); } @Test @@ -971,7 +978,7 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer( doThrow(bufferExceptionClass) .when(buffer) - .write(any(Record.class), anyInt()); + .writeAll(any(Collection.class), anyInt()); final ExportMetricsServiceRequest exportMetricsRequest = createExportMetricsRequest(); final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(exportMetricsRequest)); @@ -1014,9 +1021,26 @@ private ExportMetricsServiceRequest createExportMetricsRequest() { .setKey("service.name") .setValue(AnyValue.newBuilder().setStringValue("service").build()) ).build(); + NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4); + Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build(); + + io.opentelemetry.proto.metrics.v1.Metric.Builder metric = io.opentelemetry.proto.metrics.v1.Metric.newBuilder() + .setGauge(gauge) + .setUnit("seconds") + .setName("name") + .setDescription("description"); + InstrumentationLibraryMetrics isntLib = InstrumentationLibraryMetrics.newBuilder() + .addMetrics(metric) + .setInstrumentationLibrary(InstrumentationLibrary.newBuilder() + .setName("ilname") + .setVersion("ilversion") + .build()) + .build(); + final ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder() .setResource(resource) + .addInstrumentationLibraryMetrics(isntLib) .build(); return ExportMetricsServiceRequest.newBuilder()