From 42e763e5805bf4b1f1621bc55cdaed45d7005ac6 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:44:27 -0800 Subject: [PATCH] Fix pipeline latency to calculate correct latency when persistent buffer is used (#4187) * Fix pipeline latency to calculate correct latency when persistent buffer is used Signed-off-by: Krishna Kondaka * Fixed checkstyle error and addressed comments Signed-off-by: Krishna Kondaka * Fixed failing tests Signed-off-by: Krishna Kondaka * Fixed failing tests Signed-off-by: Krishna Kondaka * Fixed failing tests Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../dataprepper/model/codec/ByteDecoder.java | 4 +- .../dataprepper/model/codec/JsonDecoder.java | 20 +++-- .../dataprepper/model/log/JacksonOtelLog.java | 13 ++++ .../metric/JacksonExponentialHistogram.java | 13 ++++ .../model/metric/JacksonGauge.java | 13 ++++ .../model/metric/JacksonHistogram.java | 13 ++++ .../model/metric/JacksonMetric.java | 14 ++++ .../dataprepper/model/metric/JacksonSum.java | 13 ++++ .../model/metric/JacksonSummary.java | 13 ++++ .../dataprepper/model/trace/JacksonSpan.java | 13 ++++ .../model/codec/JsonDecoderTest.java | 28 ++++++- .../model/log/JacksonOtelLogTest.java | 10 +++ .../JacksonExponentialHistogramTest.java | 10 +++ .../model/metric/JacksonGaugeTest.java | 10 +++ .../model/metric/JacksonHistogramTest.java | 10 +++ .../model/metric/JacksonSumTest.java | 11 +++ .../model/metric/JacksonSummaryTest.java | 11 +++ .../model/trace/JacksonSpanTest.java | 8 ++ .../kafka/consumer/KafkaCustomConsumer.java | 3 +- .../kafka/producer/KafkaCustomProducer.java | 8 +- .../source/otellogs/OTelLogsGrpcService.java | 3 +- .../otellogs/OTelLogsGrpcServiceTest.java | 3 +- .../otelmetrics/OTelMetricsRawProcessor.java | 3 +- .../otelmetrics/OTelMetricsGrpcService.java | 3 +- .../plugins/otel/codec/OTelLogsDecoder.java | 5 +- .../plugins/otel/codec/OTelMetricDecoder.java | 5 +- .../plugins/otel/codec/OTelProtoCodec.java | 75 ++++++++++++------- .../plugins/otel/codec/OTelTraceDecoder.java | 6 +- .../otel/codec/OTelLogsDecoderTest.java | 3 +- .../otel/codec/OTelMetricsDecoderTest.java | 3 +- .../otel/codec/OTelProtoCodecTest.java | 26 +++---- .../otel/codec/OTelTraceDecoderTest.java | 3 +- .../oteltrace/OTelTraceGrpcService.java | 3 +- .../oteltrace/OTelTraceGrpcServiceTest.java | 3 +- .../plugins/codec/json/JsonInputCodec.java | 9 +++ 35 files changed, 318 insertions(+), 73 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/ByteDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/ByteDecoder.java index 46420ca7cc..eb43f72489 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/ByteDecoder.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/ByteDecoder.java @@ -11,6 +11,7 @@ import java.io.InputStream; import java.io.Serializable; import java.util.function.Consumer; +import java.time.Instant; public interface ByteDecoder extends Serializable { /** @@ -18,9 +19,10 @@ public interface ByteDecoder extends Serializable { * {@link Record} loaded from the {@link InputStream}. * * @param inputStream The input stream for code to process + * @param timeReceived The time received value to be populated in the Record * @param eventConsumer The consumer which handles each event from the stream * @throws IOException throws IOException when invalid input is received or incorrect codec name is provided */ - void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException; + void parse(InputStream inputStream, Instant timeReceived, Consumer> eventConsumer) throws IOException; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java index 1aba7e56ee..f0793aa65f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.record.Record; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -24,7 +25,7 @@ public class JsonDecoder implements ByteDecoder { private final ObjectMapper objectMapper = new ObjectMapper(); private final JsonFactory jsonFactory = new JsonFactory(); - public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + public void parse(InputStream inputStream, Instant timeReceived, Consumer> eventConsumer) throws IOException { Objects.requireNonNull(inputStream); Objects.requireNonNull(eventConsumer); @@ -32,25 +33,28 @@ public void parse(InputStream inputStream, Consumer> eventConsumer while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) { if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) { - parseRecordsArray(jsonParser, eventConsumer); + parseRecordsArray(jsonParser, timeReceived, eventConsumer); } } } - private void parseRecordsArray(final JsonParser jsonParser, final Consumer> eventConsumer) throws IOException { + private void parseRecordsArray(final JsonParser jsonParser, final Instant timeReceived, final Consumer> eventConsumer) throws IOException { while (jsonParser.nextToken() != JsonToken.END_ARRAY) { final Map innerJson = objectMapper.readValue(jsonParser, Map.class); - final Record record = createRecord(innerJson); + final Record record = createRecord(innerJson, timeReceived); eventConsumer.accept(record); } } - private Record createRecord(final Map json) { - final JacksonEvent event = (JacksonEvent)JacksonLog.builder() + private Record createRecord(final Map json, final Instant timeReceived) { + final JacksonLog.Builder logBuilder = JacksonLog.builder() .withData(json) - .getThis() - .build(); + .getThis(); + if (timeReceived != null) { + logBuilder.withTimeReceived(timeReceived); + } + final JacksonEvent event = (JacksonEvent)logBuilder.build(); return new Record<>(event); } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java index bb0c02be6d..b18013f637 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/log/JacksonOtelLog.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; +import java.time.Instant; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -161,6 +162,18 @@ public Builder withAttributes(final Map attributes) { return getThis(); } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public Builder withTimeReceived(final Instant timeReceived) { + return (Builder)super.withTimeReceived(timeReceived); + } + /** * Sets the observed time of the log event * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java index b865ce0eb5..a1ba387ee2 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogram.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.event.EventType; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -237,6 +238,18 @@ public JacksonExponentialHistogram.Builder withPositiveOffset(int offset) { return this; } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public JacksonExponentialHistogram.Builder withTimeReceived(final Instant timeReceived) { + return (JacksonExponentialHistogram.Builder)super.withTimeReceived(timeReceived); + } + /** * Sets the offset for the negative buckets * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonGauge.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonGauge.java index 4df5bf4793..33c633d951 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonGauge.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonGauge.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.event.EventType; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -67,6 +68,18 @@ public Builder withValue(final Double value) { return this; } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public Builder withTimeReceived(final Instant timeReceived) { + return (Builder)super.withTimeReceived(timeReceived); + } + /** * Returns a newly created {@link JacksonGauge} * @return a JacksonGauge diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java index f9e066875d..0a325bf7fd 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonHistogram.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.event.EventType; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -190,6 +191,18 @@ public JacksonHistogram.Builder withAggregationTemporality(String aggregationTe return this; } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public JacksonHistogram.Builder withTimeReceived(final Instant timeReceived) { + return (JacksonHistogram.Builder)super.withTimeReceived(timeReceived); + } + /** * Sets the buckets for this histogram * @param buckets a list of buckets diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java index 04ab33a347..4a30f3a637 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonMetric.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.opensearch.dataprepper.model.event.JacksonEvent; +import java.time.Instant; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -234,6 +235,19 @@ public T withSchemaUrl(final String schemaUrl) { return getThis(); } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public T withTimeReceived(final Instant timeReceived) { + return (T)super.withTimeReceived(timeReceived); + } + + /** * Sets the exemplars that are associated with this metric event * @param exemplars sets the exemplars for this metric diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSum.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSum.java index 9835650dd0..a5b9c4e1a0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSum.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSum.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.event.EventType; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -101,6 +102,18 @@ public Builder withIsMonotonic(final boolean isMonotonic) { return this; } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public Builder withTimeReceived(final Instant timeReceived) { + return (Builder)super.withTimeReceived(timeReceived); + } + /** * Returns a newly created {@link JacksonSum} * @return a JacksonSum diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSummary.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSummary.java index 2196e59075..01425d0c8c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSummary.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/metric/JacksonSummary.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.event.EventType; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -106,6 +107,18 @@ public Builder withSum(double sum) { return this; } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public Builder withTimeReceived(final Instant timeReceived) { + return (Builder)super.withTimeReceived(timeReceived); + } + /** * Sets the count * @param count the count of this summary diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java index d6bb211ca2..08a7fdccac 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/trace/JacksonSpan.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; +import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -444,6 +445,18 @@ public Builder withTraceGroup(final String traceGroup) { return this; } + /** + * Sets the time received for populating event origination time in event handle + * + * @param timeReceived time received + * @return the builder + * @since 2.7 + */ + @Override + public Builder withTimeReceived(final Instant timeReceived) { + return (Builder)super.withTimeReceived(timeReceived); + } + /** * Sets the duration of the span * diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java index 1c3c789c79..d2c7287313 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java @@ -2,9 +2,11 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.record.Record; import java.io.ByteArrayInputStream; +import java.time.Instant; import java.util.Map; import java.util.Random; import java.util.UUID; @@ -18,12 +20,13 @@ public class JsonDecoderTest { private JsonDecoder jsonDecoder; private Record receivedRecord; + private Instant receivedTime; private JsonDecoder createObjectUnderTest() { return new JsonDecoder(); } - @BeforeEach +@BeforeEach void setup() { jsonDecoder = createObjectUnderTest(); receivedRecord = null; @@ -36,7 +39,7 @@ void test_basicJsonDecoder() { int intValue = r.nextInt(); String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]"; try { - jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), (record) -> { + jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> { receivedRecord = record; }); } catch (Exception e){} @@ -47,4 +50,25 @@ void test_basicJsonDecoder() { assertThat(map.get("key2"), equalTo(intValue)); } + @Test + void test_basicJsonDecoder_withTimeReceived() { + String stringValue = UUID.randomUUID().toString(); + Random r = new Random(); + int intValue = r.nextInt(); + String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]"; + final Instant now = Instant.now(); + try { + jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> { + receivedRecord = record; + receivedTime = ((DefaultEventHandle)(((Event)record.getData()).getEventHandle())).getInternalOriginationTime(); + }); + } catch (Exception e){} + + assertNotEquals(receivedRecord, null); + Map map = receivedRecord.getData().toMap(); + assertThat(map.get("key1"), equalTo(stringValue)); + assertThat(map.get("key2"), equalTo(intValue)); + assertThat(receivedTime, equalTo(now)); + } + } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java index fc8213323b..0fe7cebd9c 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/log/JacksonOtelLogTest.java @@ -12,7 +12,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.skyscreamer.jsonassert.JSONAssert; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import java.time.Instant; import java.util.Date; import java.util.Map; import java.util.UUID; @@ -83,6 +85,14 @@ public void testGetServiceName() { assertThat(name, is(equalTo(TEST_SERVICE_NAME))); } + @Test + public void testGetTimeReceived() { + Instant now = Instant.now(); + builder.withTimeReceived(now); + log = builder.build(); + assertThat(((DefaultEventHandle)log.getEventHandle()).getInternalOriginationTime(), is(now)); + } + @Test public void testGetSchemaUrl() { final String schemaUrl = log.getSchemaUrl(); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogramTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogramTest.java index fb63a041b1..724d3ee130 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogramTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonExponentialHistogramTest.java @@ -11,8 +11,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.TestObject; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.skyscreamer.jsonassert.JSONAssert; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -246,6 +248,14 @@ public void testGetAttributes_withNull_mustBeEmpty() { assertThat(histogram.getAttributes(), is(anEmptyMap())); } + @Test + public void testGetTimeReceived() { + Instant now = Instant.now(); + builder.withTimeReceived(now); + JacksonExponentialHistogram histogram = builder.build(); + assertThat(((DefaultEventHandle)histogram.getEventHandle()).getInternalOriginationTime(), is(now)); + } + @Test public void testHistogramToJsonString() throws JSONException { histogram.put("foo", "bar"); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonGaugeTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonGaugeTest.java index 842f35bc4f..55fda1085c 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonGaugeTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonGaugeTest.java @@ -11,8 +11,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.TestObject; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.skyscreamer.jsonassert.JSONAssert; +import java.time.Instant; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -89,6 +91,14 @@ public void testGetName() { assertThat(name, is(equalTo(TEST_NAME))); } + @Test + public void testGetTimeReceived() { + Instant now = Instant.now(); + builder.withTimeReceived(now); + gauge = builder.build(); + assertThat(((DefaultEventHandle)gauge.getEventHandle()).getInternalOriginationTime(), is(now)); + } + @Test public void testGetDescription() { final String description = gauge.getDescription(); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonHistogramTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonHistogramTest.java index 15b22802a2..f9d6cffc11 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonHistogramTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonHistogramTest.java @@ -11,8 +11,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.TestObject; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.skyscreamer.jsonassert.JSONAssert; +import java.time.Instant; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -140,6 +142,14 @@ public void testGetCount() { assertThat(count, is(equalTo(TEST_COUNT))); } + @Test + public void testGetTimeReceived() { + Instant now = Instant.now(); + builder.withTimeReceived(now); + histogram = builder.build(); + assertThat(((DefaultEventHandle)histogram.getEventHandle()).getInternalOriginationTime(), is(now)); + } + @Test public void testGetServiceName() { final String name = histogram.getServiceName(); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSumTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSumTest.java index 58cab11f11..4dd47541e9 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSumTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSumTest.java @@ -9,6 +9,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; + +import java.time.Instant; import java.util.Date; import java.util.Map; import java.util.UUID; @@ -96,6 +99,14 @@ public void testGetServiceName() { assertThat(name, is(equalTo(TEST_SERVICE_NAME))); } + @Test + public void testGetTimeReceived() { + Instant now = Instant.now(); + builder.withTimeReceived(now); + sum = builder.build(); + assertThat(((DefaultEventHandle)sum.getEventHandle()).getInternalOriginationTime(), is(now)); + } + @Test public void testGetAggregationTemporality() { final String aggregationTemporality = sum.getAggregationTemporality(); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSummaryTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSummaryTest.java index 31820b86c6..6bc865169e 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSummaryTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/metric/JacksonSummaryTest.java @@ -10,6 +10,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; + +import java.time.Instant; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -116,6 +119,14 @@ public void testGetSum() { assertThat(sum, is(equalTo(TEST_SUM))); } + @Test + public void testGetTimeReceived() { + Instant now = Instant.now(); + builder.withTimeReceived(now); + summary = builder.build(); + assertThat(((DefaultEventHandle)summary.getEventHandle()).getInternalOriginationTime(), is(now)); + } + @Test public void testGetCount() { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java index 2648b172aa..b9b7e6e959 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/trace/JacksonSpanTest.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import java.time.Instant; import java.util.Arrays; @@ -440,6 +441,13 @@ public void testBuilder_withNullDroppedAttributesCount_createsSpanWithDefaultVal assertThat(span.getDroppedAttributesCount(), is(equalTo(0))); } + @Test + public void testGetTimeReceived() { + Instant now = Instant.now(); + final JacksonSpan span = builder.withTimeReceived(now).build(); + assertThat(((DefaultEventHandle)span.getEventHandle()).getInternalOriginationTime(), is(now)); + } + @Test public void testBuilder_withNullEvents_createsSpanWithDefaultValue() { final JacksonSpan span = builder.withEvents(null).build(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 0583dbddbe..581bf41a4e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -381,6 +381,7 @@ public void run() { } private Record getRecord(ConsumerRecord consumerRecord, int partition) { + Instant now = Instant.now(); Map data = new HashMap<>(); Event event; Object value = consumerRecord.value(); @@ -496,7 +497,7 @@ private void iterateRecordPartitions(ConsumerRecords records, fin if (schema == MessageFormat.BYTES) { InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value()); if(byteDecoder != null) { - byteDecoder.parse(inputStream, (record) -> { + byteDecoder.parse(inputStream, Instant.ofEpochMilli(consumerRecord.timestamp()), (record) -> { processRecord(acknowledgementSet, record); }); } else { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index fe0291662b..56e9783d2b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -169,11 +169,11 @@ private void publishAvroMessage(final Record record, final String key) th } Future send(final String topicName, String key, final Object record) throws Exception { - if (Objects.isNull(key)) { - return producer.send(new ProducerRecord(topicName, record), callBack(record)); - } + ProducerRecord producerRecord = Objects.isNull(key) ? + new ProducerRecord(topicName, record) : + new ProducerRecord(topicName, key, record); - return producer.send(new ProducerRecord(topicName, key, record), callBack(record)); + return producer.send(producerRecord, callBack(record)); } private void publishJsonMessage(final Record record, final String key) throws IOException, ProcessingException, Exception { diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java index 4a63dabac0..e800e48e74 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.List; import java.util.stream.Collectors; @@ -84,7 +85,7 @@ private void processRequest(final ExportLogsServiceRequest request, final Stream final List logs; try { - logs = oTelProtoDecoder.parseExportLogsServiceRequest(request); + logs = oTelProtoDecoder.parseExportLogsServiceRequest(request, Instant.now()); } catch (Exception e) { LOG.error("Failed to parse the request {} due to:", request, e); throw new BadRequestException(e.getMessage(), e); diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java index a5a9353981..28b8cbeca4 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java @@ -35,6 +35,7 @@ import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -189,7 +190,7 @@ public void export_BufferTimeout_responseObserverOnError() throws Exception { public void export_BadRequest_responseObserverOnError() { final String testMessage = "test message"; final RuntimeException testException = new RuntimeException(testMessage); - when(mockOTelProtoDecoder.parseExportLogsServiceRequest(any())).thenThrow(testException); + when(mockOTelProtoDecoder.parseExportLogsServiceRequest(any(), any(Instant.class))).thenThrow(testException); objectUnderTest = generateOTelLogsGrpcService(mockOTelProtoDecoder); try (MockedStatic mockedStatic = mockStatic(ServiceRequestContext.class)) { diff --git a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java index 4ee2a26408..87ec477a26 100644 --- a/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java +++ b/data-prepper-plugins/otel-metrics-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otelmetrics/OTelMetricsRawProcessor.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -92,7 +93,7 @@ public Collection> doExecute(Collection> reco } ExportMetricsServiceRequest request = ((Record)rec).getData(); - recordsOut.addAll(otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets(), flattenAttributesFlag)); + recordsOut.addAll(otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, otelMetricsRawProcessorConfig.getExponentialHistogramMaxAllowedScale(), Instant.now(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets(), flattenAttributesFlag)); } recordsDroppedMetricsRawCounter.increment(droppedCounter.get()); return recordsOut; diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index 8705fb03fc..e88cf18b5f 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; @@ -90,7 +91,7 @@ private void processRequest(final ExportMetricsServiceRequest request, final Str Collection> metrics; AtomicInteger droppedCounter = new AtomicInteger(0); - metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, true); + metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, Instant.now(), true, true, true); recordsDroppedCounter.increment(droppedCounter.get()); recordsCreatedCounter.increment(metrics.size()); buffer.writeAll(metrics, bufferWriteTimeoutInMillis); diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoder.java index 6c491e7510..c140e9591d 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoder.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoder.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.function.Consumer; +import java.time.Instant; public class OTelLogsDecoder implements ByteDecoder { @@ -23,10 +24,10 @@ public class OTelLogsDecoder implements ByteDecoder { public OTelLogsDecoder() { otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); } - public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(inputStream); AtomicInteger droppedCounter = new AtomicInteger(0); - List logs = otelProtoDecoder.parseExportLogsServiceRequest(request); + List logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs); for (OpenTelemetryLog log: logs) { eventConsumer.accept(new Record<>(log)); } diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java index 1918842c22..d7bdd0c5fd 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.function.Consumer; @@ -24,11 +25,11 @@ public class OTelMetricDecoder implements ByteDecoder { public OTelMetricDecoder() { otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); } - public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { ExportMetricsServiceRequest request = ExportMetricsServiceRequest.parseFrom(inputStream); AtomicInteger droppedCounter = new AtomicInteger(0); Collection> records = - otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); + otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, timeReceivedMs, true, true, false); for (Record record: records) { eventConsumer.accept((Record)record); } diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java index d4a5eab2ff..8875e90e53 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java @@ -163,9 +163,9 @@ public static long timeISO8601ToNanos(final String timeISO08601) { public static class OTelProtoDecoder { - public List parseExportTraceServiceRequest(final ExportTraceServiceRequest exportTraceServiceRequest) { + public List parseExportTraceServiceRequest(final ExportTraceServiceRequest exportTraceServiceRequest, final Instant timeReceived) { return exportTraceServiceRequest.getResourceSpansList().stream() - .flatMap(rs -> parseResourceSpans(rs).stream()).collect(Collectors.toList()); + .flatMap(rs -> parseResourceSpans(rs, timeReceived).stream()).collect(Collectors.toList()); } public Map splitExportTraceServiceRequestByTraceId(final ExportTraceServiceRequest exportTraceServiceRequest) { @@ -188,12 +188,12 @@ public Map splitExportTraceServiceRequestByTr return result; } - public List parseExportLogsServiceRequest(final ExportLogsServiceRequest exportLogsServiceRequest) { + public List parseExportLogsServiceRequest(final ExportLogsServiceRequest exportLogsServiceRequest, final Instant timeReceived) { return exportLogsServiceRequest.getResourceLogsList().stream() - .flatMap(rs -> parseResourceLogs(rs).stream()).collect(Collectors.toList()); + .flatMap(rs -> parseResourceLogs(rs, timeReceived).stream()).collect(Collectors.toList()); } - protected Collection parseResourceLogs(ResourceLogs rs) { + protected Collection parseResourceLogs(ResourceLogs rs, final Instant timeReceived) { final String serviceName = OTelProtoCodec.getServiceName(rs.getResource()).orElse(null); final Map resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource()); final String schemaUrl = rs.getSchemaUrl(); @@ -205,7 +205,8 @@ protected Collection parseResourceLogs(ResourceLogs rs) { serviceName, OTelProtoCodec.getInstrumentationLibraryAttributes(ils.getInstrumentationLibrary()), resourceAttributes, - schemaUrl)) + schemaUrl, + timeReceived)) .flatMap(Collection::stream); Stream mappedScopeListLogs = rs.getScopeLogsList() @@ -215,7 +216,8 @@ protected Collection parseResourceLogs(ResourceLogs rs) { serviceName, OTelProtoCodec.getInstrumentationScopeAttributes(sls.getScope()), resourceAttributes, - schemaUrl)) + schemaUrl, + timeReceived)) .flatMap(Collection::stream); return Stream.concat(mappedInstrumentationLibraryLogs, mappedScopeListLogs).collect(Collectors.toList()); @@ -260,26 +262,26 @@ protected Map splitResourceSpansByTraceId(final ResourceS return result; } - protected List parseResourceSpans(final ResourceSpans resourceSpans) { + protected List parseResourceSpans(final ResourceSpans resourceSpans, final Instant timeReceived) { final String serviceName = getServiceName(resourceSpans.getResource()).orElse(null); final Map resourceAttributes = getResourceAttributes(resourceSpans.getResource()); if (resourceSpans.getScopeSpansList().size() > 0) { - return parseScopeSpans(resourceSpans.getScopeSpansList(), serviceName, resourceAttributes); + return parseScopeSpans(resourceSpans.getScopeSpansList(), serviceName, resourceAttributes, timeReceived); } if (resourceSpans.getInstrumentationLibrarySpansList().size() > 0) { - return parseInstrumentationLibrarySpans(resourceSpans.getInstrumentationLibrarySpansList(), serviceName, resourceAttributes); + return parseInstrumentationLibrarySpans(resourceSpans.getInstrumentationLibrarySpansList(), serviceName, resourceAttributes, timeReceived); } LOG.debug("No spans found to parse from ResourceSpans object: {}", resourceSpans); return Collections.emptyList(); } - private List parseScopeSpans(final List scopeSpansList, final String serviceName, final Map resourceAttributes) { + private List parseScopeSpans(final List scopeSpansList, final String serviceName, final Map resourceAttributes, final Instant timeReceived) { return scopeSpansList.stream() .map(scopeSpans -> parseSpans(scopeSpans.getSpansList(), scopeSpans.getScope(), - OTelProtoCodec::getInstrumentationScopeAttributes, serviceName, resourceAttributes)) + OTelProtoCodec::getInstrumentationScopeAttributes, serviceName, resourceAttributes, timeReceived)) .flatMap(Collection::stream) .collect(Collectors.toList()); } @@ -305,11 +307,12 @@ private Map> splitScopeSpansByTraceId(final List parseInstrumentationLibrarySpans(final List instrumentationLibrarySpansList, - final String serviceName, final Map resourceAttributes) { + final String serviceName, final Map resourceAttributes, + final Instant timeReceived) { return instrumentationLibrarySpansList.stream() .map(instrumentationLibrarySpans -> parseSpans(instrumentationLibrarySpans.getSpansList(), instrumentationLibrarySpans.getInstrumentationLibrary(), this::getInstrumentationLibraryAttributes, - serviceName, resourceAttributes)) + serviceName, resourceAttributes, timeReceived)) .flatMap(Collection::stream) .collect(Collectors.toList()); } @@ -354,20 +357,22 @@ private Map> splitSpansByTrac private List parseSpans(final List spans, final T scope, final Function> scopeAttributesGetter, - final String serviceName, final Map resourceAttributes) { + final String serviceName, final Map resourceAttributes, + final Instant timeReceived) { return spans.stream() .map(span -> { final Map scopeAttributes = scopeAttributesGetter.apply(scope); - return parseSpan(span, scopeAttributes, serviceName, resourceAttributes); + return parseSpan(span, scopeAttributes, serviceName, resourceAttributes, timeReceived); }) .collect(Collectors.toList()); } protected List processLogsList(final List logsList, - final String serviceName, - final Map ils, - final Map resourceAttributes, - final String schemaUrl) { + final String serviceName, + final Map ils, + final Map resourceAttributes, + final String schemaUrl, + final Instant timeReceived) { return logsList.stream() .map(log -> JacksonOtelLog.builder() .withTime(OTelProtoCodec.convertUnixNanosToISO8601(log.getTimeUnixNano())) @@ -388,12 +393,13 @@ protected List processLogsList(final List logsList, .withSeverityText(log.getSeverityText()) .withDroppedAttributesCount(log.getDroppedAttributesCount()) .withBody(OTelProtoCodec.convertAnyValue(log.getBody())) + .withTimeReceived(timeReceived) .build()) .collect(Collectors.toList()); } protected Span parseSpan(final io.opentelemetry.proto.trace.v1.Span sp, final Map instrumentationScopeAttributes, - final String serviceName, final Map resourceAttributes) { + final String serviceName, final Map resourceAttributes, final Instant timeReceived) { return JacksonSpan.builder() .withSpanId(convertByteStringToString(sp.getSpanId())) .withTraceId(convertByteStringToString(sp.getTraceId())) @@ -420,6 +426,7 @@ protected Span parseSpan(final io.opentelemetry.proto.trace.v1.Span sp, final Ma .withTraceGroup(getTraceGroup(sp)) .withDurationInNanos(sp.getEndTimeUnixNano() - sp.getStartTimeUnixNano()) .withTraceGroupFields(getTraceGroupFields(sp)) + .withTimeReceived(timeReceived) .build(); } @@ -583,6 +590,7 @@ public Collection> parseExportMetricsServiceRequest( final ExportMetricsServiceRequest request, AtomicInteger droppedCounter, final Integer exponentialHistogramMaxAllowedScale, + final Instant timeReceived, final boolean calculateHistogramBuckets, final boolean calculateExponentialHistogramBuckets, final boolean flattenAttributes) { @@ -594,12 +602,12 @@ public Collection> parseExportMetricsServiceRequest( for (InstrumentationLibraryMetrics is : rs.getInstrumentationLibraryMetricsList()) { final Map ils = OTelProtoCodec.getInstrumentationLibraryAttributes(is.getInstrumentationLibrary()); - recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); + recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, timeReceived, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); } for (ScopeMetrics sm : rs.getScopeMetricsList()) { final Map ils = OTelProtoCodec.getInstrumentationScopeAttributes(sm.getScope()); - recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); + recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, timeReceived, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes)); } } return recordsOut; @@ -613,6 +621,7 @@ private List> processMetricsList( final String schemaUrl, AtomicInteger droppedCounter, final Integer exponentialHistogramMaxAllowedScale, + final Instant timeReceived, final boolean calculateHistogramBuckets, final boolean calculateExponentialHistogramBuckets, final boolean flattenAttributes) { @@ -620,15 +629,15 @@ private List> processMetricsList( for (io.opentelemetry.proto.metrics.v1.Metric metric : metricsList) { try { if (metric.hasGauge()) { - recordsOut.addAll(mapGauge(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes)); + recordsOut.addAll(mapGauge(metric, serviceName, ils, resourceAttributes, schemaUrl, timeReceived, flattenAttributes)); } else if (metric.hasSum()) { - recordsOut.addAll(mapSum(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes)); + recordsOut.addAll(mapSum(metric, serviceName, ils, resourceAttributes, schemaUrl, timeReceived, flattenAttributes)); } else if (metric.hasSummary()) { - recordsOut.addAll(mapSummary(metric, serviceName, ils, resourceAttributes, schemaUrl, flattenAttributes)); + recordsOut.addAll(mapSummary(metric, serviceName, ils, resourceAttributes, schemaUrl, timeReceived, flattenAttributes)); } else if (metric.hasHistogram()) { - recordsOut.addAll(mapHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, calculateHistogramBuckets, flattenAttributes)); + recordsOut.addAll(mapHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, timeReceived, calculateHistogramBuckets, flattenAttributes)); } else if (metric.hasExponentialHistogram()) { - recordsOut.addAll(mapExponentialHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, exponentialHistogramMaxAllowedScale, calculateExponentialHistogramBuckets, flattenAttributes)); + recordsOut.addAll(mapExponentialHistogram(metric, serviceName, ils, resourceAttributes, schemaUrl, exponentialHistogramMaxAllowedScale, timeReceived, calculateExponentialHistogramBuckets, flattenAttributes)); } } catch (Exception e) { LOG.warn("Error while processing metrics", e); @@ -644,6 +653,7 @@ private List> mapGauge( final Map ils, final Map resourceAttributes, final String schemaUrl, + final Instant timeReceived, final boolean flattenAttributes) { return metric.getGauge().getDataPointsList().stream() .map(dp -> JacksonGauge.builder() @@ -664,6 +674,7 @@ private List> mapGauge( .withSchemaUrl(schemaUrl) .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) .withFlags(dp.getFlags()) + .withTimeReceived(timeReceived) .build(flattenAttributes)) .map(Record::new) .collect(Collectors.toList()); @@ -675,6 +686,7 @@ private List> mapSum( final Map ils, final Map resourceAttributes, final String schemaUrl, + final Instant timeReceived, final boolean flattenAttributes) { return metric.getSum().getDataPointsList().stream() .map(dp -> JacksonSum.builder() @@ -697,6 +709,7 @@ private List> mapSum( .withSchemaUrl(schemaUrl) .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) .withFlags(dp.getFlags()) + .withTimeReceived(timeReceived) .build(flattenAttributes)) .map(Record::new) .collect(Collectors.toList()); @@ -708,6 +721,7 @@ private List> mapSummary( final Map ils, final Map resourceAttributes, final String schemaUrl, + final Instant timeReceived, final boolean flattenAttributes) { return metric.getSummary().getDataPointsList().stream() .map(dp -> JacksonSummary.builder() @@ -730,6 +744,7 @@ private List> mapSummary( )) .withSchemaUrl(schemaUrl) .withFlags(dp.getFlags()) + .withTimeReceived(timeReceived) .build(flattenAttributes)) .map(Record::new) .collect(Collectors.toList()); @@ -741,6 +756,7 @@ private List> mapHistogram( final Map ils, final Map resourceAttributes, final String schemaUrl, + final Instant timeReceived, final boolean calculateHistogramBuckets, final boolean flattenAttributes) { return metric.getHistogram().getDataPointsList().stream() @@ -768,6 +784,7 @@ private List> mapHistogram( )) .withSchemaUrl(schemaUrl) .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) + .withTimeReceived(timeReceived) .withFlags(dp.getFlags()); if (calculateHistogramBuckets) { builder.withBuckets(OTelProtoCodec.createBuckets(dp.getBucketCountsList(), dp.getExplicitBoundsList())); @@ -787,6 +804,7 @@ private List> mapExponentialHistogram( final Map resourceAttributes, final String schemaUrl, final Integer exponentialHistogramMaxAllowedScale, + final Instant timeReceived, final boolean calculateExponentialHistogramBuckets, final boolean flattenAttributes) { return metric.getExponentialHistogram() @@ -827,6 +845,7 @@ private List> mapExponentialHistogram( )) .withSchemaUrl(schemaUrl) .withExemplars(OTelProtoCodec.convertExemplars(dp.getExemplarsList())) + .withTimeReceived(timeReceived) .withFlags(dp.getFlags()); if (calculateExponentialHistogramBuckets) { diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java index 47c3fd03e9..302723021a 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.function.Consumer; +import java.time.Instant; public class OTelTraceDecoder implements ByteDecoder { @@ -25,11 +26,10 @@ public class OTelTraceDecoder implements ByteDecoder { public OTelTraceDecoder() { otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); } - public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(inputStream); AtomicInteger droppedCounter = new AtomicInteger(0); - List spans = - otelProtoDecoder.parseExportTraceServiceRequest(request); + List spans = otelProtoDecoder.parseExportTraceServiceRequest(request, timeReceivedMs); for (Span span: spans) { eventConsumer.accept(new Record<>(span)); } diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoderTest.java index 8fef72b2fa..7c64d88a20 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoderTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoderTest.java @@ -12,6 +12,7 @@ import java.io.InputStreamReader; import java.io.IOException; +import java.time.Instant; import java.util.Objects; import java.util.Map; import com.google.protobuf.util.JsonFormat; @@ -65,7 +66,7 @@ private void validateLog(OpenTelemetryLog logRecord) { public void testParse() throws Exception { final ExportLogsServiceRequest request = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_LOGS_FILE); InputStream inputStream = new ByteArrayInputStream((byte[])request.toByteArray()); - createObjectUnderTest().parse(inputStream, (record) -> { + createObjectUnderTest().parse(inputStream, Instant.now(), (record) -> { validateLog((OpenTelemetryLog)record.getData()); }); diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricsDecoderTest.java index c6bdd9cc89..4dfb58dfa6 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricsDecoderTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricsDecoderTest.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.model.metric.JacksonSum; import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -87,7 +88,7 @@ private void validateMetric(Event event) { public void testParse() throws Exception { final ExportMetricsServiceRequest request = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_METRICS_FILE); InputStream inputStream = new ByteArrayInputStream((byte[])request.toByteArray()); - createObjectUnderTest().parse(inputStream, (record) -> { + createObjectUnderTest().parse(inputStream, Instant.now(), (record) -> { validateMetric((Event)record.getData()); }); diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java index 0b75ea6f46..b2e42c6c20 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java @@ -211,28 +211,28 @@ public void testSplitExportTraceServiceRequestWithMultipleTraces() throws Except @Test public void testParseExportTraceServiceRequest() throws IOException { final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_TRACE_JSON_FILE); - final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest); + final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest, Instant.now()); validateSpans(spans); } @Test public void testParseExportTraceServiceRequest_InstrumentationLibrarySpans() throws IOException { final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_INSTRUMENTATION_LIBRARY_TRACE_JSON_FILE); - final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest); + final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest, Instant.now()); validateSpans(spans); } @Test public void testParseExportTraceServiceRequest_ScopeSpansTakesPrecedenceOverInstrumentationLibrarySpans() throws IOException { final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_BOTH_SPAN_TYPES_JSON_FILE); - final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest); + final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest, Instant.now()); validateSpans(spans); } @Test public void testParseExportTraceServiceRequest_NoSpans() throws IOException { final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_NO_SPANS_JSON_FILE); - final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest); + final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest, Instant.now()); assertThat(spans.size(), is(equalTo(0))); } @@ -492,7 +492,7 @@ public void testTraceGroupFields() { @Test public void testParseExportLogsServiceRequest_ScopedLogs() throws IOException { final ExportLogsServiceRequest exportLogsServiceRequest = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_LOGS_JSON_FILE); - List logs = decoderUnderTest.parseExportLogsServiceRequest(exportLogsServiceRequest); + List logs = decoderUnderTest.parseExportLogsServiceRequest(exportLogsServiceRequest, Instant.now()); assertThat(logs.size() , is(equalTo(1))); validateLog(logs.get(0)); @@ -501,7 +501,7 @@ public void testParseExportLogsServiceRequest_ScopedLogs() throws IOException { @Test public void testParseExportLogsServiceRequest_InstrumentationLibraryLogs() throws IOException { final ExportLogsServiceRequest exportLogsServiceRequest = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_LOGS_IS_JSON_FILE); - List logs = decoderUnderTest.parseExportLogsServiceRequest(exportLogsServiceRequest); + List logs = decoderUnderTest.parseExportLogsServiceRequest(exportLogsServiceRequest, Instant.now()); assertThat(logs.size() , is(equalTo(1))); validateLog(logs.get(0)); @@ -527,7 +527,7 @@ private void validateLog(OpenTelemetryLog logRecord) { @Test public void testParseExportLogsServiceRequest_InstrumentationLibrarySpans() throws IOException { final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_INSTRUMENTATION_LIBRARY_TRACE_JSON_FILE); - final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest); + final List spans = decoderUnderTest.parseExportTraceServiceRequest(exportTraceServiceRequest, Instant.now()); validateSpans(spans); } @@ -535,7 +535,7 @@ public void testParseExportLogsServiceRequest_InstrumentationLibrarySpans() thro public void testParseExportMetricsServiceRequest_Guage() throws IOException { final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_GAUGE_METRICS_JSON_FILE); AtomicInteger droppedCount = new AtomicInteger(0); - final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true); + final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, Instant.now(), true, true, true); validateGaugeMetricRequest(metrics); } @@ -544,7 +544,7 @@ public void testParseExportMetricsServiceRequest_Guage() throws IOException { public void testParseExportMetricsServiceRequest_Sum() throws IOException { final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_SUM_METRICS_JSON_FILE); AtomicInteger droppedCount = new AtomicInteger(0); - final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true); + final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, Instant.now(), true, true, true); validateSumMetricRequest(metrics); } @@ -552,7 +552,7 @@ public void testParseExportMetricsServiceRequest_Sum() throws IOException { public void testParseExportMetricsServiceRequest_Histogram() throws IOException { final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE); AtomicInteger droppedCount = new AtomicInteger(0); - final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true); + final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, Instant.now(), true, true, true); validateHistogramMetricRequest(metrics); } @@ -560,7 +560,7 @@ public void testParseExportMetricsServiceRequest_Histogram() throws IOException public void testParseExportMetricsServiceRequest_Histogram_WithNoExplicitBounds() throws IOException { final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_HISTOGRAM_METRICS_NO_EXPLICIT_BOUNDS_JSON_FILE); AtomicInteger droppedCount = new AtomicInteger(0); - final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true); + final Collection> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, Instant.now(), true, true, true); validateHistogramMetricRequestNoExplicitBounds(metrics); } @@ -944,13 +944,13 @@ public void testTimeCodec() { @Test public void testOTelProtoCodecConsistency() throws IOException, DecoderException { final ExportTraceServiceRequest request = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_TRACE_JSON_FILE); - final List spansFirstDec = decoderUnderTest.parseExportTraceServiceRequest(request); + final List spansFirstDec = decoderUnderTest.parseExportTraceServiceRequest(request, Instant.now()); final List resourceSpansList = new ArrayList<>(); for (final Span span : spansFirstDec) { resourceSpansList.add(encoderUnderTest.convertToResourceSpans(span)); } final List spansSecondDec = resourceSpansList.stream() - .flatMap(rs -> decoderUnderTest.parseResourceSpans(rs).stream()).collect(Collectors.toList()); + .flatMap(rs -> decoderUnderTest.parseResourceSpans(rs, Instant.now()).stream()).collect(Collectors.toList()); assertThat(spansFirstDec.size(), equalTo(spansSecondDec.size())); for (int i = 0; i < spansFirstDec.size(); i++) { assertThat(spansFirstDec.get(i).toJsonString(), equalTo(spansSecondDec.get(i).toJsonString())); diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java index a6cc6d122b..0e976a14e2 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java @@ -12,6 +12,7 @@ import java.io.InputStreamReader; import java.io.IOException; +import java.time.Instant; import java.util.Objects; import com.google.protobuf.util.JsonFormat; import org.opensearch.dataprepper.model.trace.Span; @@ -66,7 +67,7 @@ private void validateSpan(Span span) { public void testParse() throws Exception { final ExportTraceServiceRequest request = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_TRACES_FILE); InputStream inputStream = new ByteArrayInputStream((byte[])request.toByteArray()); - createObjectUnderTest().parse(inputStream, (record) -> { + createObjectUnderTest().parse(inputStream, Instant.now(), (record) -> { validateSpan((Span)record.getData()); }); diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java index 5ac38d8e4b..a5799605ff 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; @@ -88,7 +89,7 @@ private void processRequest(final ExportTraceServiceRequest request, final Strea final Collection spans; try { - spans = oTelProtoDecoder.parseExportTraceServiceRequest(request); + spans = oTelProtoDecoder.parseExportTraceServiceRequest(request, Instant.now()); } catch (final Exception e) { LOG.warn(DataPrepperMarkers.SENSITIVE, "Failed to parse request with error '{}'. Request body: {}.", e.getMessage(), request); throw new BadRequestException(e.getMessage(), e); diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java index d6aa5503c5..ff145a61a9 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java @@ -36,6 +36,7 @@ import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -198,7 +199,7 @@ public void export_BufferTimeout_responseObserverOnError() throws Exception { public void export_BadRequest_responseObserverOnError() throws Exception { final String testMessage = "test message"; final RuntimeException testException = new RuntimeException(testMessage); - when(mockOTelProtoDecoder.parseExportTraceServiceRequest(any())).thenThrow(testException); + when(mockOTelProtoDecoder.parseExportTraceServiceRequest(any(), any(Instant.class))).thenThrow(testException); objectUnderTest = generateOTelTraceGrpcService(mockOTelProtoDecoder); try (MockedStatic mockedStatic = mockStatic(ServiceRequestContext.class)) { diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java index 724787879f..6222682e2a 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java @@ -8,11 +8,20 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.codec.JsonDecoder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; /** * An implementation of {@link InputCodec} which parses JSON Objects for arrays. */ @DataPrepperPlugin(name = "json", pluginType = InputCodec.class) public class JsonInputCodec extends JsonDecoder implements InputCodec { + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + parse(inputStream, null, eventConsumer); + } }