diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java index 86278bea74..cb7c12da84 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventHandle.java @@ -8,6 +8,9 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; import java.time.Instant; import java.io.Serializable; @@ -15,11 +18,13 @@ public class DefaultEventHandle implements EventHandle, InternalEventHandle, Ser private Instant externalOriginationTime; private final Instant internalOriginationTime; private WeakReference acknowledgementSetRef; + private List> releaseConsumers; public DefaultEventHandle(final Instant internalOriginationTime) { this.acknowledgementSetRef = null; this.externalOriginationTime = null; this.internalOriginationTime = internalOriginationTime; + this.releaseConsumers = new ArrayList<>(); } @Override @@ -51,9 +56,17 @@ public Instant getExternalOriginationTime() { @Override public void release(boolean result) { + for (final BiConsumer consumer: releaseConsumers) { + consumer.accept(this, result); + } AcknowledgementSet acknowledgementSet = getAcknowledgementSet(); if (acknowledgementSet != null) { acknowledgementSet.release(this, result); } } + + @Override + public void onRelease(BiConsumer releaseConsumer) { + releaseConsumers.add(releaseConsumer); + } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java index 64ef3be574..6049a196e3 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventHandle.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.model.event; import java.time.Instant; +import java.util.function.BiConsumer; public interface EventHandle { /** @@ -41,5 +42,6 @@ public interface EventHandle { */ Instant getInternalOriginationTime(); + void onRelease(BiConsumer releaseConsumer); } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index f116d89e9e..1c3e596265 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; import java.util.Collection; @@ -22,6 +23,7 @@ public abstract class AbstractSink> implements Sink { protected static final int DEFAULT_WAIT_TIME_MS = 1000; protected final PluginMetrics pluginMetrics; private final Counter recordsInCounter; + private final SinkLatencyMetrics latencyMetrics; private final Timer timeElapsedTimer; private Thread retryThread; private int maxRetries; @@ -31,6 +33,7 @@ public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitT this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); recordsInCounter = pluginMetrics.counter(MetricNames.RECORDS_IN); timeElapsedTimer = pluginMetrics.timer(MetricNames.TIME_ELAPSED); + this.latencyMetrics = new SinkLatencyMetrics(pluginMetrics); retryThread = null; this.maxRetries = numRetries; this.waitTimeMs = waitTimeMs; @@ -77,6 +80,20 @@ public void shutdown() { } } + @Override + public void updateLatencyMetrics(Collection records) { + for (final Record record : records) { + if (record.getData() instanceof Event) { + Event event = (Event)record.getData(); + event.getEventHandle().onRelease((eventHandle, result) -> { + if (result) { + latencyMetrics.update(eventHandle); + } + }); + } + } + } + Thread.State getRetryThreadState() { if (retryThread != null) { return retryThread.getState(); diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java index 0ce6fa5ac1..178566ba5b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/Sink.java @@ -38,4 +38,12 @@ public interface Sink> { */ boolean isReady(); + /** + * updates latency metrics of sink + * + * @param events list of events used for updating the latency metrics + */ + default void updateLatencyMetrics(final Collection events) { + } + } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java new file mode 100644 index 0000000000..31b584271f --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import io.micrometer.core.instrument.DistributionSummary; +import org.opensearch.dataprepper.model.event.EventHandle; + +import java.time.Duration; +import java.time.Instant; + +public class SinkLatencyMetrics { + public static final String INTERNAL_LATENCY = "internalLatency"; + public static final String EXTERNAL_LATENCY = "externalLatency"; + private final DistributionSummary internalLatencySummary; + private final DistributionSummary externalLatencySummary; + + public SinkLatencyMetrics(PluginMetrics pluginMetrics) { + internalLatencySummary = pluginMetrics.summary(INTERNAL_LATENCY); + externalLatencySummary = pluginMetrics.summary(EXTERNAL_LATENCY); + } + public void update(final EventHandle eventHandle) { + Instant now = Instant.now(); + internalLatencySummary.record(Duration.between(eventHandle.getInternalOriginationTime(), now).toMillis()); + if (eventHandle.getExternalOriginationTime() == null) { + return; + } + externalLatencySummary.record(Duration.between(eventHandle.getExternalOriginationTime(), now).toMillis()); + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java index 3f1af4fb26..b2a66b2d1d 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventHandleTests.java @@ -20,6 +20,7 @@ class DefaultEventHandleTests { @Mock private AcknowledgementSet acknowledgementSet; + private int count; @Test void testBasic() { @@ -56,4 +57,16 @@ void testWithExternalOriginationTime() { assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60))); eventHandle.release(true); } + + @Test + void testWithOnReleaseHandler() { + Instant now = Instant.now(); + count = 0; + DefaultEventHandle eventHandle = new DefaultEventHandle(now); + eventHandle.onRelease((handle, result) -> {if (result) count++; }); + eventHandle.release(true); + assertThat(count, equalTo(1)); + + } + } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java index e4b19cf7ca..6d58f7dd71 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java @@ -13,6 +13,12 @@ import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.EventHandle; + +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; import java.time.Duration; import java.util.Arrays; @@ -25,6 +31,7 @@ import static org.awaitility.Awaitility.await; public class AbstractSinkTest { + private int count; @Test public void testMetrics() { final String sinkName = "testSink"; @@ -35,6 +42,8 @@ public void testMetrics() { AbstractSink> abstractSink = new AbstractSinkImpl(pluginSetting); abstractSink.initialize(); Assert.assertEquals(abstractSink.isReady(), true); + abstractSink.updateLatencyMetrics(Arrays.asList( + new Record<>(UUID.randomUUID().toString()))); abstractSink.output(Arrays.asList( new Record<>(UUID.randomUUID().toString()), new Record<>(UUID.randomUUID().toString()), @@ -80,6 +89,61 @@ public void testSinkNotReady() { abstractSink.shutdown(); } + @Test + public void testSinkWithRegisterEventReleaseHandler() { + final String sinkName = "testSink"; + final String pipelineName = "pipelineName"; + MetricsTestUtil.initMetrics(); + PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap()); + pluginSetting.setPipelineName(pipelineName); + AbstractSink> abstractSink = new AbstractEventSinkImpl(pluginSetting); + abstractSink.initialize(); + Assert.assertEquals(abstractSink.isReady(), true); + count = 0; + Event event = JacksonEvent.builder() + .withEventType("event") + .build(); + Record record = mock(Record.class); + EventHandle eventHandle = mock(EventHandle.class); + when(record.getData()).thenReturn(event); + + abstractSink.updateLatencyMetrics(Arrays.asList(record)); + abstractSink.output(Arrays.asList(record)); + await().atMost(Duration.ofSeconds(5)) + .until(abstractSink::isReady); + abstractSink.shutdown(); + } + + private static class AbstractEventSinkImpl extends AbstractSink> { + + public AbstractEventSinkImpl(PluginSetting pluginSetting) { + super(pluginSetting, 10, 1000); + } + + @Override + public void doOutput(Collection> records) { + for (final Record record: records) { + Event event = record.getData(); + event.getEventHandle().release(true); + } + } + + @Override + public void shutdown() { + super.shutdown(); + } + + @Override + public void doInitialize() { + } + + @Override + public boolean isReady() { + return true; + } + } + + private static class AbstractSinkImpl extends AbstractSink> { public AbstractSinkImpl(PluginSetting pluginSetting) { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java new file mode 100644 index 0000000000..4cf5043cae --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.EventHandle; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.core.instrument.DistributionSummary; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +import java.time.Instant; + +class SinkLatencyMetricsTest { + + private PluginMetrics pluginMetrics; + private EventHandle eventHandle; + private SinkLatencyMetrics latencyMetrics; + private DistributionSummary internalLatencySummary; + private DistributionSummary externalLatencySummary; + + public SinkLatencyMetrics createObjectUnderTest() { + return new SinkLatencyMetrics(pluginMetrics); + } + + @BeforeEach + void setup() { + pluginMetrics = mock(PluginMetrics.class); + SimpleMeterRegistry registry = new SimpleMeterRegistry(); + internalLatencySummary = DistributionSummary + .builder("internalLatency") + .baseUnit("milliseconds") + .register(registry); + externalLatencySummary = DistributionSummary + .builder("externalLatency") + .baseUnit("milliseconds") + .register(registry); + when(pluginMetrics.summary(SinkLatencyMetrics.INTERNAL_LATENCY)).thenReturn(internalLatencySummary); + when(pluginMetrics.summary(SinkLatencyMetrics.EXTERNAL_LATENCY)).thenReturn(externalLatencySummary); + eventHandle = mock(EventHandle.class); + when(eventHandle.getInternalOriginationTime()).thenReturn(Instant.now()); + latencyMetrics = createObjectUnderTest(); + } + + @Test + public void testInternalOriginationTime() { + latencyMetrics.update(eventHandle); + assertThat(internalLatencySummary.count(), equalTo(1L)); + } + + @Test + public void testExternalOriginationTime() { + when(eventHandle.getExternalOriginationTime()).thenReturn(Instant.now().minusMillis(10)); + latencyMetrics.update(eventHandle); + assertThat(internalLatencySummary.count(), equalTo(1L)); + assertThat(externalLatencySummary.count(), equalTo(1L)); + assertThat(externalLatencySummary.max(), greaterThanOrEqualTo(10.0)); + } +} + + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java new file mode 100644 index 0000000000..5f66a623aa --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkTest.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import org.opensearch.dataprepper.model.record.Record; +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.Collections; + +public class SinkTest { + private static class SinkTestClass implements Sink> { + + @Override + public boolean isReady() { + return true; + } + + @Override + public void shutdown() { + } + + @Override + public void initialize() { + } + + @Override + public void output(Collection> records) { + } + + }; + + SinkTestClass sink; + + @Test + public void testSinkUpdateLatencyMetrics() { + sink = new SinkTestClass(); + sink.updateLatencyMetrics(Collections.emptyList()); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index 0bd5d469ca..a71f8d14b1 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -344,8 +344,11 @@ List> publishToSinks(final Collection records) { InactiveAcknowledgementSetManager.getInstance(), sinks); router.route(records, sinks, getRecordStrategy, (sink, events) -> - sinkFutures.add(sinkExecutorService.submit(() -> sink.output(events), null)) - ); + sinkFutures.add(sinkExecutorService.submit(() -> { + sink.updateLatencyMetrics(events); + sink.output(events); + }, null)) + ); return sinkFutures; } } diff --git a/data-prepper-plugins/date-processor/README.md b/data-prepper-plugins/date-processor/README.md index a5c147ecd6..c4d9acd4f5 100644 --- a/data-prepper-plugins/date-processor/README.md +++ b/data-prepper-plugins/date-processor/README.md @@ -104,6 +104,8 @@ processor: * Type: String * Default: `Locale.ROOT` +* `to_origination_metadata` (Optional): When this option is used, matched time is put into the event's metadata as an instance of `Instant`. + ## Metrics * `dateProcessingMatchSuccessCounter`: Number of records that match with at least one pattern specified in match configuration option. diff --git a/data-prepper-plugins/date-processor/build.gradle b/data-prepper-plugins/date-processor/build.gradle index 6433af9b6d..743761201d 100644 --- a/data-prepper-plugins/date-processor/build.gradle +++ b/data-prepper-plugins/date-processor/build.gradle @@ -12,5 +12,6 @@ dependencies { implementation project(':data-prepper-test-common') implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'io.micrometer:micrometer-core' + implementation libs.commons.lang3 testImplementation libs.commons.lang3 } diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java index b9094926ee..d0808053b2 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; @DataPrepperPlugin(name = "date", pluginType = Processor.class, pluginConfigurationType = DateProcessorConfig.class) public class DateProcessor extends AbstractProcessor, Record> { @@ -71,7 +72,16 @@ public Collection> doExecute(Collection> records) { zonedDateTime = getDateTimeFromTimeReceived(record); else if (keyToParse != null && !keyToParse.isEmpty()) { - zonedDateTime = getDateTimeFromMatch(record); + Pair result = getDateTimeFromMatch(record); + if (result != null) { + zonedDateTime = result.getLeft(); + Instant timeStamp = result.getRight(); + if (dateProcessorConfig.getToOriginationMetadata()) { + Event event = (Event)record.getData(); + event.getMetadata().setExternalOriginationTime(timeStamp); + event.getEventHandle().setExternalOriginationTime(timeStamp); + } + } populateDateProcessorMetrics(zonedDateTime); } @@ -119,7 +129,7 @@ private String getDateTimeFromTimeReceived(final Record record) { return timeReceived.atZone(dateProcessorConfig.getDestinationZoneId()).format(getOutputFormatter()); } - private String getDateTimeFromMatch(final Record record) { + private Pair getDateTimeFromMatch(final Record record) { final String sourceTimestamp = getSourceTimestamp(record); if (sourceTimestamp == null) return null; @@ -136,12 +146,12 @@ private String getSourceTimestamp(final Record record) { } } - private String getFormattedDateTimeString(final String sourceTimestamp) { + private Pair getFormattedDateTimeString(final String sourceTimestamp) { for (DateTimeFormatter formatter : dateTimeFormatters) { try { - return ZonedDateTime.parse(sourceTimestamp, formatter).format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId())); + ZonedDateTime tmp = ZonedDateTime.parse(sourceTimestamp, formatter); + return Pair.of(tmp.format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId())), tmp.toInstant()); } catch (Exception ignored) { - } } diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java index fd6c1f25be..5e06b48cbb 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java @@ -15,6 +15,7 @@ public class DateProcessorConfig { static final Boolean DEFAULT_FROM_TIME_RECEIVED = false; + static final Boolean DEFAULT_TO_ORIGINATION_METADATA = false; static final String DEFAULT_DESTINATION = "@timestamp"; static final String DEFAULT_SOURCE_TIMEZONE = ZoneId.systemDefault().toString(); static final String DEFAULT_DESTINATION_TIMEZONE = ZoneId.systemDefault().toString(); @@ -45,6 +46,9 @@ public List getPatterns() { @JsonProperty("from_time_received") private Boolean fromTimeReceived = DEFAULT_FROM_TIME_RECEIVED; + @JsonProperty("to_origination_metadata") + private Boolean toOriginationMetadata = DEFAULT_TO_ORIGINATION_METADATA; + @JsonProperty("match") private List match; @@ -76,6 +80,10 @@ public Boolean getFromTimeReceived() { return fromTimeReceived; } + public Boolean getToOriginationMetadata() { + return toOriginationMetadata; + } + public List getMatch() { return match; } diff --git a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java index 0959c9db2f..db604039fa 100644 --- a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java +++ b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java @@ -51,8 +51,9 @@ void isValidMatchAndFromTimestampReceived_should_return_true_if_from_time_receiv } @Test - void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_received_and_match_are_not_configured() { - assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false)); + void testToOriginationMetadata_should_return_true() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(dateProcessorConfig, "toOriginationMetadata", true); + assertThat(dateProcessorConfig.getToOriginationMetadata(), equalTo(true)); } @Test @@ -178,4 +179,4 @@ private void reflectivelySetField(final DateProcessorConfig dateProcessorConfig, field.setAccessible(false); } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java index a45daeb56b..ce3906a635 100644 --- a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java +++ b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java @@ -83,7 +83,8 @@ void setup() { lenient().when(pluginMetrics.counter(DateProcessor.DATE_PROCESSING_MATCH_SUCCESS)).thenReturn(dateProcessingMatchSuccessCounter); lenient().when(pluginMetrics.counter(DateProcessor.DATE_PROCESSING_MATCH_FAILURE)).thenReturn(dateProcessingMatchFailureCounter); when(mockDateProcessorConfig.getDateWhen()).thenReturn(null); - expectedDateTime = LocalDateTime.now(); + expectedInstant = Instant.now(); + expectedDateTime = LocalDateTime.ofInstant(expectedInstant, ZoneId.systemDefault()); } @AfterEach @@ -361,6 +362,35 @@ void match_with_different_year_formats_test(String pattern) { verify(dateProcessingMatchSuccessCounter, times(1)).increment(); } + @ParameterizedTest + @ValueSource(strings = {"yyyy MM dd HH mm ss"}) + void match_with_to_origination_metadata(String pattern) { + when(mockDateMatch.getKey()).thenReturn("logDate"); + when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(pattern)); + + List dateMatches = Collections.singletonList(mockDateMatch); + when(mockDateProcessorConfig.getMatch()).thenReturn(dateMatches); + when(mockDateProcessorConfig.getSourceZoneId()).thenReturn(ZoneId.systemDefault()); + when(mockDateProcessorConfig.getDestinationZoneId()).thenReturn(ZoneId.systemDefault()); + when(mockDateProcessorConfig.getSourceLocale()).thenReturn(Locale.ROOT); + when(mockDateProcessorConfig.getToOriginationMetadata()).thenReturn(true); + + dateProcessor = createObjectUnderTest(); + + Map testData = getTestData(); + testData.put("logDate", expectedDateTime.format(DateTimeFormatter.ofPattern(pattern))); + + final Record record = buildRecordWithEvent(testData); + final List> processedRecords = (List>) dateProcessor.doExecute(Collections.singletonList(record)); + + Event event = (Event)processedRecords.get(0).getData(); + Assertions.assertTrue(event.getMetadata().getExternalOriginationTime() != null); + Assertions.assertTrue(event.getEventHandle().getExternalOriginationTime() != null); + ZonedDateTime expectedZonedDatetime = expectedDateTime.atZone(mockDateProcessorConfig.getSourceZoneId()).truncatedTo(ChronoUnit.SECONDS); + Assertions.assertTrue(expectedZonedDatetime.equals(event.getMetadata().getExternalOriginationTime().atZone(mockDateProcessorConfig.getSourceZoneId()))); + verify(dateProcessingMatchSuccessCounter, times(1)).increment(); + } + @ParameterizedTest @ValueSource(strings = {"MMM/dd", "MM dd"}) void match_without_year_test(String pattern) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java index f4c1ebb0b9..7f11db2234 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java @@ -47,9 +47,7 @@ public class BulkOperationWrapper { private final SerializedJson jsonNode; public BulkOperationWrapper(final BulkOperation bulkOperation) { - this.bulkOperation = bulkOperation; - this.eventHandle = null; - this.jsonNode = null; + this(bulkOperation, null, null); } public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) { @@ -60,10 +58,7 @@ public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle } public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle) { - checkNotNull(bulkOperation); - this.bulkOperation = bulkOperation; - this.eventHandle = eventHandle; - this.jsonNode = null; + this(bulkOperation, eventHandle, null); } public BulkOperation getBulkOperation() { @@ -75,9 +70,7 @@ public EventHandle getEventHandle() { } public void releaseEventHandle(boolean result) { - if (eventHandle != null) { - eventHandle.release(result); - } + eventHandle.release(result); } public Object getDocument() { diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 7c05b99aa9..b2556e3192 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -37,6 +37,7 @@ import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; @@ -106,13 +107,42 @@ public void setUp() { } } + public BulkRetryStrategy createObjectUnderTest( + final RequestFunction, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final Supplier bulkRequestSupplier +) { + return new BulkRetryStrategy( + requestFunction, + logFailure, + pluginMetrics, + Integer.MAX_VALUE, + bulkRequestSupplier, + pluginSetting); + } + + public BulkRetryStrategy createObjectUnderTest( + final RequestFunction, BulkResponse> requestFunction, + final BiConsumer, Throwable> logFailure, + final int maxRetries, + final Supplier bulkRequestSupplier +) { + return new BulkRetryStrategy( + requestFunction, + logFailure, + pluginMetrics, + maxRetries, + bulkRequestSupplier, + pluginSetting); + } + @Test public void testCanRetry() { AccumulatingBulkRequest accumulatingBulkRequest = mock(AccumulatingBulkRequest.class); - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( bulkRequest -> mock(BulkResponse.class), - (docWriteRequest, throwable) -> {}, pluginMetrics, Integer.MAX_VALUE, - () -> mock(AccumulatingBulkRequest.class), pluginSetting); + (docWriteRequest, throwable) -> {}, + () -> mock(AccumulatingBulkRequest.class)); final String testIndex = "foo"; final BulkResponseItem bulkItemResponse1 = successItemResponse(testIndex); final BulkResponseItem bulkItemResponse2 = badRequestItemResponse(testIndex); @@ -139,9 +169,9 @@ public void testExecuteSuccessOnFirstAttempt() throws Exception { numEventsSucceeded = 0; numEventsFailed = 0; - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, pluginMetrics, Integer.MAX_VALUE, - () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()), pluginSetting); + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); @@ -178,9 +208,9 @@ public void testExecuteRetryable() throws Exception { numEventsSucceeded = 0; numEventsFailed = 0; - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, pluginMetrics, Integer.MAX_VALUE, - () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()), pluginSetting); + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); @@ -241,9 +271,9 @@ public void testExecuteNonRetryableException() throws Exception { numEventsSucceeded = 0; numEventsFailed = 0; - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, pluginMetrics, Integer.MAX_VALUE, - () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()), pluginSetting); + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); @@ -306,9 +336,9 @@ public void testExecuteWithMaxRetries() throws Exception { maxRetriesLimitReached = false; client.maxRetriesTestValue = MAX_RETRIES; logFailureConsumer = this::logFailureMaxRetries; - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, pluginMetrics, MAX_RETRIES, - () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()), pluginSetting); + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, MAX_RETRIES, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); @@ -336,9 +366,9 @@ public void testExecuteWithMaxRetriesWithException() throws Exception { client.maxRetriesTestValue = MAX_RETRIES; client.maxRetriesWithException = true; logFailureConsumer = this::logFailureMaxRetries; - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, pluginMetrics, MAX_RETRIES, - () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()), pluginSetting); + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, MAX_RETRIES, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); @@ -366,9 +396,9 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception { client.maxRetriesTestValue = MAX_RETRIES; client.maxRetriesWithSuccesses = true; logFailureConsumer = this::logFailureMaxRetries; - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, pluginMetrics, MAX_RETRIES, - () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()), pluginSetting); + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, MAX_RETRIES, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); @@ -393,9 +423,9 @@ public void testExecuteNonRetryableResponse() throws Exception { numEventsSucceeded = 0; numEventsFailed = 0; - final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logFailureConsumer, pluginMetrics, Integer.MAX_VALUE, - () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()), pluginSetting); + final BulkRetryStrategy bulkRetryStrategy = createObjectUnderTest( + client::bulk, logFailureConsumer, + () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder())); final IndexOperation indexOperation1 = new IndexOperation.Builder().index(testIndex).id("1").document(arbitraryDocument()).build(); final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequestTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequestTest.java index a1ea5159f0..5d6996059b 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequestTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequestTest.java @@ -219,4 +219,4 @@ private SizedDocument generateDocumentWithLength(long documentLength) { when(sizedDocument.getDocumentSize()).thenReturn(documentLength); return sizedDocument; } -} \ No newline at end of file +}