diff --git a/build.gradle b/build.gradle index 3dccd497cf..f77ecc442b 100644 --- a/build.gradle +++ b/build.gradle @@ -226,9 +226,6 @@ subprojects { test { useJUnitPlatform() - javaLauncher = javaToolchains.launcherFor { - languageVersion = JavaLanguageVersion.current() - } reports { junitXml.required html.required diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java index 26dd7e98a6..1c3e596265 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java @@ -28,7 +28,6 @@ public abstract class AbstractSink> implements Sink { private Thread retryThread; private int maxRetries; private int waitTimeMs; - private SinkThread sinkThread; public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) { this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); @@ -52,8 +51,7 @@ public void initialize() { // the exceptions which are not retryable. doInitialize(); if (!isReady() && retryThread == null) { - sinkThread = new SinkThread(this, maxRetries, waitTimeMs); - retryThread = new Thread(sinkThread); + retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs)); retryThread.start(); } } @@ -78,7 +76,7 @@ public void output(Collection records) { @Override public void shutdown() { if (retryThread != null) { - sinkThread.stop(); + retryThread.stop(); } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java index 451cef7dff..c304de37af 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java @@ -10,8 +10,6 @@ class SinkThread implements Runnable { private int maxRetries; private int waitTimeMs; - private volatile boolean isStopped = false; - public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) { this.sink = sink; this.maxRetries = maxRetries; @@ -21,15 +19,11 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) { @Override public void run() { int numRetries = 0; - while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) { + while (!sink.isReady() && numRetries++ < maxRetries) { try { Thread.sleep(waitTimeMs); sink.doInitialize(); } catch (InterruptedException e){} } } - - public void stop() { - isStopped = true; - } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java index 8d1af7ea44..3b9fe7c007 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java @@ -11,10 +11,15 @@ import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.EventHandle; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; import java.time.Duration; import java.util.Arrays; @@ -25,12 +30,6 @@ import java.util.UUID; import static org.awaitility.Awaitility.await; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; class AbstractSinkTest { private int count; @@ -72,13 +71,13 @@ void testMetrics() { } @Test - void testSinkNotReady() throws InterruptedException { + void testSinkNotReady() { final String sinkName = "testSink"; final String pipelineName = "pipelineName"; MetricsTestUtil.initMetrics(); PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap()); pluginSetting.setPipelineName(pipelineName); - AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting); + AbstractSink> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting); abstractSink.initialize(); assertEquals(abstractSink.isReady(), false); assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE); @@ -88,10 +87,7 @@ void testSinkNotReady() throws InterruptedException { await().atMost(Duration.ofSeconds(5)) .until(abstractSink::isReady); assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED); - int initCountBeforeShutdown = abstractSink.initCount; abstractSink.shutdown(); - Thread.sleep(200); - assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown)); } @Test diff --git a/data-prepper-core/build.gradle b/data-prepper-core/build.gradle index c939129a1c..080538c5e4 100644 --- a/data-prepper-core/build.gradle +++ b/data-prepper-core/build.gradle @@ -48,6 +48,7 @@ dependencies { exclude group: 'commons-logging', module: 'commons-logging' } implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1' + testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0' testImplementation testLibs.spring.test implementation libs.armeria.core implementation libs.armeria.grpc @@ -88,6 +89,8 @@ task integrationTest(type: Test) { classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' + filter { includeTestsMatching '*IT' } diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java index 1b66b62c37..622eb56a1b 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/avro/AvroAutoSchemaGeneratorTest.java @@ -17,7 +17,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Timer; +import java.util.Random; import java.util.UUID; import java.util.stream.Stream; @@ -218,7 +218,7 @@ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) { return Stream.of( - arguments(Timer.class), + arguments(Random.class), arguments(InputStream.class), arguments(File.class) ); diff --git a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java index f3f28db174..194c810ec4 100644 --- a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java +++ b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java @@ -328,7 +328,7 @@ public Stream provideArguments(final ExtensionContext conte return Stream.of( Arguments.of(0, randomInt + 1, 0.0), Arguments.of(1, 100, 1.0), - Arguments.of(randomInt + 1, randomInt + 1, 100.0), + Arguments.of(randomInt, randomInt, 100.0), Arguments.of(randomInt, randomInt + 250, ((double) randomInt / (randomInt + 250)) * 100), Arguments.of(6, 9, 66.66666666666666), Arguments.of(531, 1000, 53.1), diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java index a4b0377963..f85d1c6605 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputCodecTest.java @@ -11,12 +11,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; - import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; - import org.mockito.Mock; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -31,7 +28,6 @@ import java.io.ByteArrayInputStream; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.LinkedList; import java.util.Map; @@ -60,7 +56,7 @@ public EventJsonInputCodec createInputCodec() { @ParameterizedTest @ValueSource(strings = {"", "{}"}) public void emptyTest(String input) throws Exception { - input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[" + input + "]}"; + input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}"; ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes()); inputCodec = createInputCodec(); Consumer> consumer = mock(Consumer.class); @@ -74,15 +70,15 @@ public void inCompatibleVersionTest() throws Exception { final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); + Instant startTime = Instant.now(); Event event = createEvent(data, startTime); Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\"" + EventJsonDefines.VERSION + "\":\"3.0\", \"" + EventJsonDefines.EVENTS + "\":["; + String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { - input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}"; + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; comma = ","; } input += "]}"; @@ -99,15 +95,15 @@ public void basicTest() throws Exception { final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); + Instant startTime = Instant.now(); Event event = createEvent(data, startTime); Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":["; + String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { - input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}"; + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; comma = ","; } input += "]}"; @@ -115,8 +111,8 @@ public void basicTest() throws Exception { List> records = new LinkedList<>(); inputCodec.parse(inputStream, records::add); assertThat(records.size(), equalTo(2)); - for (Record record : records) { - Event e = (Event) record.getData(); + for(Record record : records) { + Event e = (Event)record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -130,15 +126,15 @@ public void test_with_timeReceivedOverridden() throws Exception { final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS).minusSeconds(5); + Instant startTime = Instant.now().minusSeconds(5); Event event = createEvent(data, startTime); Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); - String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":["; + String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { - input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}"; + input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}"; comma = ","; } input += "]}"; @@ -146,8 +142,8 @@ public void test_with_timeReceivedOverridden() throws Exception { List> records = new LinkedList<>(); inputCodec.parse(inputStream, records::add); assertThat(records.size(), equalTo(2)); - for (Record record : records) { - Event e = (Event) record.getData(); + for(Record record : records) { + Event e = (Event)record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime))); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -163,7 +159,7 @@ private Event createEvent(final Map json, final Instant timeRece if (timeReceived != null) { logBuilder.withTimeReceived(timeReceived); } - final JacksonEvent event = (JacksonEvent) logBuilder.build(); + final JacksonEvent event = (JacksonEvent)logBuilder.build(); return event; } diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java index 7ea8c49cd0..85e91e5a55 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonInputOutputCodecTest.java @@ -6,12 +6,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - import static org.mockito.Mockito.when; import static org.mockito.Mockito.mock; - import org.mockito.Mock; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -25,7 +22,6 @@ import org.opensearch.dataprepper.model.log.JacksonLog; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.LinkedList; import java.util.Map; @@ -68,7 +64,7 @@ public void basicTest() throws Exception { final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); + Instant startTime = Instant.now(); Event event = createEvent(data, startTime); outputCodec = createOutputCodec(); inputCodec = createInputCodec(); @@ -79,8 +75,8 @@ public void basicTest() throws Exception { inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); assertThat(records.size(), equalTo(1)); - for (Record record : records) { - Event e = (Event) record.getData(); + for(Record record : records) { + Event e = (Event)record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -94,7 +90,7 @@ public void multipleEventsTest() throws Exception { final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); + Instant startTime = Instant.now(); Event event = createEvent(data, startTime); outputCodec = createOutputCodec(); inputCodec = createInputCodec(); @@ -107,8 +103,8 @@ public void multipleEventsTest() throws Exception { inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); assertThat(records.size(), equalTo(3)); - for (Record record : records) { - Event e = (Event) record.getData(); + for(Record record : records) { + Event e = (Event)record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags().size(), equalTo(0)); @@ -126,7 +122,7 @@ public void extendedTest() throws Exception { Set tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); List tagsList = tags.stream().collect(Collectors.toList()); - Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); + Instant startTime = Instant.now(); Event event = createEvent(data, startTime); Instant origTime = startTime.minusSeconds(5); event.getMetadata().setExternalOriginationTime(origTime); @@ -139,11 +135,11 @@ public void extendedTest() throws Exception { outputCodec.complete(outputStream); assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON)); List> records = new LinkedList<>(); - inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); +inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add); assertThat(records.size(), equalTo(1)); - for (Record record : records) { - Event e = (Event) record.getData(); + for(Record record : records) { + Event e = (Event)record.getData(); assertThat(e.get(key, String.class), equalTo(value)); assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime)); assertThat(e.getMetadata().getTags(), equalTo(tags)); @@ -161,7 +157,7 @@ private Event createEvent(final Map json, final Instant timeRece if (timeReceived != null) { logBuilder.withTimeReceived(timeReceived); } - final JacksonEvent event = (JacksonEvent) logBuilder.build(); + final JacksonEvent event = (JacksonEvent)logBuilder.build(); return event; } diff --git a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java index b32d2b62e9..51dda545cb 100644 --- a/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java +++ b/data-prepper-plugins/event-json-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/event_json/EventJsonOutputCodecTest.java @@ -11,7 +11,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -23,7 +22,6 @@ import org.opensearch.dataprepper.model.log.JacksonLog; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.UUID; @@ -51,7 +49,7 @@ public void basicTest() throws Exception { final String value = UUID.randomUUID().toString(); Map data = Map.of(key, value); - Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS); + Instant startTime = Instant.now(); Event event = createEvent(data, startTime); outputCodec = createOutputCodec(); outputCodec.start(outputStream, null, null); @@ -61,10 +59,10 @@ public void basicTest() throws Exception { Map dataMap = event.toMap(); Map metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class); //String expectedOutput = "{\"version\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; - String expectedOutput = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\",\"" + EventJsonDefines.EVENTS + "\":["; + String expectedOutput = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\",\""+EventJsonDefines.EVENTS+"\":["; String comma = ""; for (int i = 0; i < 2; i++) { - expectedOutput += comma + "{\"" + EventJsonDefines.DATA + "\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"" + EventJsonDefines.METADATA + "\":" + objectMapper.writeValueAsString(metadataMap) + "}"; + expectedOutput += comma+"{\""+EventJsonDefines.DATA+"\":"+objectMapper.writeValueAsString(dataMap)+","+"\""+EventJsonDefines.METADATA+"\":"+objectMapper.writeValueAsString(metadataMap)+"}"; comma = ","; } expectedOutput += "]}"; @@ -80,7 +78,7 @@ private Event createEvent(final Map json, final Instant timeRece if (timeReceived != null) { logBuilder.withTimeReceived(timeReceived); } - final JacksonEvent event = (JacksonEvent) logBuilder.build(); + final JacksonEvent event = (JacksonEvent)logBuilder.build(); return event; } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java index 81f6bbab34..df23740344 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.mutateevent; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.Valid; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; @@ -16,25 +17,47 @@ public class AddEntryProcessorConfig { public static class Entry { + + @JsonPropertyDescription("The key of the new entry to be added. Some examples of keys include `my_key`, " + + "`myKey`, and `object/sub_Key`. The key can also be a format expression, for example, `${/key1}` to " + + "use the value of field `key1` as the key.") private String key; @JsonProperty("metadata_key") + @JsonPropertyDescription("The key for the new metadata attribute. The argument must be a literal string key " + + "and not a JSON Pointer. Either one string key or `metadata_key` is required.") private String metadataKey; + @JsonPropertyDescription("The value of the new entry to be added, which can be used with any of the " + + "following data types: strings, Booleans, numbers, null, nested objects, and arrays.") private Object value; + @JsonPropertyDescription("A format string to use as the value of the new entry, for example, " + + "`${key1}-${key2}`, where `key1` and `key2` are existing keys in the event. Required if neither " + + "`value` nor `value_expression` is specified.") private String format; @JsonProperty("value_expression") + @JsonPropertyDescription("An expression string to use as the value of the new entry. For example, `/key` " + + "is an existing key in the event with a type of either a number, a string, or a Boolean. " + + "Expressions can also contain functions returning number/string/integer. For example, " + + "`length(/key)` will return the length of the key in the event when the key is a string. For more " + + "information about keys, see [Expression syntax](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/).") private String valueExpression; @JsonProperty("add_when") + @JsonPropertyDescription("A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + "such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will be run on the event.") private String addWhen; @JsonProperty("overwrite_if_key_exists") + @JsonPropertyDescription("When set to `true`, the existing value is overwritten if `key` already exists " + + "in the event. The default value is `false`.") private boolean overwriteIfKeyExists = false; @JsonProperty("append_if_key_exists") + @JsonPropertyDescription("When set to `true`, the existing value will be appended if a `key` already " + + "exists in the event. An array will be created if the existing value is not an array. Default is `false`.") private boolean appendIfKeyExists = false; public String getKey() { @@ -110,6 +133,7 @@ public Entry() { @NotEmpty @NotNull @Valid + @JsonPropertyDescription("A list of entries to add to the event.") private List entries; public List getEntries() { diff --git a/data-prepper-plugins/rds-source/build.gradle b/data-prepper-plugins/rds-source/build.gradle index f83b1332eb..6d6a681646 100644 --- a/data-prepper-plugins/rds-source/build.gradle +++ b/data-prepper-plugins/rds-source/build.gradle @@ -21,6 +21,8 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.zendesk:mysql-binlog-connector-java:0.29.2' + testImplementation project(path: ':data-prepper-test-common') testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' testImplementation project(path: ':data-prepper-test-event') diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 77956e6b0e..83e168f41f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds; +import com.github.shyiko.mysql.binlog.BinaryLogClient; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; @@ -14,6 +15,8 @@ import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler; import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler; import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler; +import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory; +import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.rds.RdsClient; @@ -42,6 +45,7 @@ public class RdsService { private LeaderScheduler leaderScheduler; private ExportScheduler exportScheduler; private DataFileScheduler dataFileScheduler; + private StreamScheduler streamScheduler; public RdsService(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, @@ -78,6 +82,12 @@ public void start(Buffer> buffer) { runnableList.add(dataFileScheduler); } + if (sourceConfig.isStreamEnabled()) { + BinaryLogClient binaryLogClient = new BinlogClientFactory(sourceConfig, rdsClient).create(); + streamScheduler = new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics); + runnableList.add(streamScheduler); + } + executor = Executors.newFixedThreadPool(runnableList.size()); runnableList.forEach(executor::submit); } @@ -93,6 +103,11 @@ public void shutdown() { exportScheduler.shutdown(); dataFileScheduler.shutdown(); } + + if (sourceConfig.isStreamEnabled()) { + streamScheduler.shutdown(); + } + leaderScheduler.shutdown(); executor.shutdownNow(); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java index 43806c0475..071fc5889b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java @@ -52,6 +52,7 @@ public RdsSource(final PluginMetrics pluginMetrics, @Override public void start(Buffer> buffer) { + LOG.info("Starting RDS source"); Objects.requireNonNull(sourceCoordinator); sourceCoordinator.createPartition(new LeaderPartition()); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java index cb8e41513a..4d90c475ec 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; import org.opensearch.dataprepper.plugins.source.rds.configuration.ExportConfig; +import org.opensearch.dataprepper.plugins.source.rds.configuration.StreamConfig; import java.util.List; @@ -70,6 +71,12 @@ public class RdsSourceConfig { @Valid private ExportConfig exportConfig; + @JsonProperty("stream") + private StreamConfig streamConfig; + + @JsonProperty("authentication") + private AuthenticationConfig authenticationConfig; + public String getDbIdentifier() { return dbIdentifier; } @@ -117,4 +124,33 @@ public ExportConfig getExport() { public boolean isExportEnabled() { return exportConfig != null; } + + public StreamConfig getStream() { + return streamConfig; + } + + public boolean isStreamEnabled() { + return streamConfig != null; + } + + public AuthenticationConfig getAuthenticationConfig() { + return this.authenticationConfig; + } + + public static class AuthenticationConfig { + @JsonProperty("username") + private String username; + + @JsonProperty("password") + private String password; + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java new file mode 100644 index 0000000000..c246e56b45 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/StreamConfig.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; + +public class StreamConfig { + + private static final int DEFAULT_S3_FOLDER_PARTITION_COUNT = 100; + + @JsonProperty("partition_count") + @Min(1) + @Max(1000) + private int s3FolderPartitionCount = DEFAULT_S3_FOLDER_PARTITION_COUNT; + + public int getPartitionCount() { + return s3FolderPartitionCount; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java index 91eecdf07b..60e1ba2bd3 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java @@ -14,7 +14,11 @@ public class MetadataKeyAttributes { static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action"; + static final String EVENT_DATABASE_NAME_METADATA_ATTRIBUTE = "database_name"; + static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name"; static final String INGESTION_EVENT_TYPE_ATTRIBUTE = "ingestion_type"; + + static final String EVENT_S3_PARTITION_KEY = "s3_partition_key"; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/S3PartitionCreator.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/S3PartitionCreator.java new file mode 100644 index 0000000000..d2096b6ff5 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/S3PartitionCreator.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class S3PartitionCreator { + private static final Logger LOG = LoggerFactory.getLogger(S3PartitionCreator.class); + private final int partitionCount; + + S3PartitionCreator(final int partitionCount) { + this.partitionCount = partitionCount; + } + + List createPartitions() { + final List partitions = new ArrayList<>(); + for (int i = 0; i < partitionCount; i++) { + String partitionName = String.format("%02x", i) + "/"; + partitions.add(partitionName); + } + LOG.info("S3 partitions created successfully."); + return partitions; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java new file mode 100644 index 0000000000..511876d668 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; + +/** + * Convert binlog row data into JacksonEvent + */ +public class StreamRecordConverter { + + private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class); + + private final List folderNames; + + static final String S3_PATH_DELIMITER = "/"; + + static final String STREAM_EVENT_TYPE = "STREAM"; + + public StreamRecordConverter(final int partitionCount) { + S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(partitionCount); + folderNames = s3PartitionCreator.createPartitions(); + } + + public Event convert(Map rowData, + String databaseName, + String tableName, + OpenSearchBulkActions bulkAction, + List primaryKeys, + String s3Prefix) { + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(rowData) + .build(); + + EventMetadata eventMetadata = event.getMetadata(); + + eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName); + eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName); + eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, bulkAction.toString()); + eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, STREAM_EVENT_TYPE); + + final String primaryKeyValue = primaryKeys.stream() + .map(rowData::get) + .map(String::valueOf) + .collect(Collectors.joining("|")); + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue); + eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3Prefix + S3_PATH_DELIMITER + hashKeyToPartition(primaryKeyValue)); + + return event; + } + + private String hashKeyToPartition(final String key) { + return folderNames.get(hashKeyToIndex(key)); + } + private int hashKeyToIndex(final String key) { + try { + // Create a SHA-256 hash instance + final MessageDigest digest = MessageDigest.getInstance("SHA-256"); + // Hash the key + byte[] hashBytes = digest.digest(key.getBytes()); + // Convert the hash to an integer + int hashValue = bytesToInt(hashBytes); + // Map the hash value to an index in the list + return Math.abs(hashValue) % folderNames.size(); + } catch (final NoSuchAlgorithmException e) { + return -1; + } + } + private int bytesToInt(byte[] bytes) { + return ByteBuffer.wrap(bytes).getInt(); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java index 6213263b09..419f1bf805 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; import java.util.function.Function; @@ -24,15 +25,18 @@ public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem String sourceIdentifier = partitionStoreItem.getSourceIdentifier(); String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1); - if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) { - return new LeaderPartition(partitionStoreItem); - } else if (ExportPartition.PARTITION_TYPE.equals(partitionType)) { - return new ExportPartition(partitionStoreItem); - } else if (DataFilePartition.PARTITION_TYPE.equals(partitionType)) { - return new DataFilePartition(partitionStoreItem); - } else { - // Unable to acquire other partitions. - return new GlobalState(partitionStoreItem); + switch (partitionType) { + case LeaderPartition.PARTITION_TYPE: + return new LeaderPartition(partitionStoreItem); + case ExportPartition.PARTITION_TYPE: + return new ExportPartition(partitionStoreItem); + case DataFilePartition.PARTITION_TYPE: + return new DataFilePartition(partitionStoreItem); + case StreamPartition.PARTITION_TYPE: + return new StreamPartition(partitionStoreItem); + default: + // Unable to acquire other partitions. + return new GlobalState(partitionStoreItem); } } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/GlobalState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/GlobalState.java index c6f1d394a2..4110c1e286 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/GlobalState.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/GlobalState.java @@ -11,6 +11,12 @@ import java.util.Map; import java.util.Optional; +/** + * Global State is a special type of partition. The partition type is null. + * You can't acquire (own) a Global State. + * However, you can read and update Global State whenever required. + * The progress state is a Map object. + */ public class GlobalState extends EnhancedSourcePartition> { private final String stateName; diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/StreamPartition.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/StreamPartition.java new file mode 100644 index 0000000000..2a3e971ac5 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/StreamPartition.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; + +import java.util.Optional; + +public class StreamPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "STREAM"; + + private final String dbIdentifier; + private final StreamProgressState state; + + public StreamPartition(String dbIdentifier, StreamProgressState state) { + this.dbIdentifier = dbIdentifier; + this.state = state; + } + + public StreamPartition(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + dbIdentifier = sourcePartitionStoreItem.getSourcePartitionKey(); + state = convertStringToPartitionProgressState(StreamProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return dbIdentifier; + } + + @Override + public Optional getProgressState() { + if (state != null) { + return Optional.of(state); + } + return Optional.empty(); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java new file mode 100644 index 0000000000..21873179da --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; + +public class StreamProgressState { + + @JsonProperty("startPosition") + private BinlogCoordinate startPosition; + + @JsonProperty("currentPosition") + private BinlogCoordinate currentPosition; + + @JsonProperty("waitForExport") + private boolean waitForExport = false; + + public BinlogCoordinate getStartPosition() { + return startPosition; + } + + public void setStartPosition(BinlogCoordinate startPosition) { + this.startPosition = startPosition; + } + + public BinlogCoordinate getCurrentPosition() { + return currentPosition; + } + + public void setCurrentPosition(BinlogCoordinate currentPosition) { + this.currentPosition = currentPosition; + } + + public boolean shouldWaitForExport() { + return waitForExport; + } + + public void setWaitForExport(boolean waitForExport) { + this.waitForExport = waitForExport; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java index d465d55076..0a2b2fb638 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java @@ -125,7 +125,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT); sourceCoordinator.completePartition(dataFilePartition); } else { - LOG.error("There was an exception while processing an S3 data file", (Throwable) ex); + LOG.error("There was an exception while processing an S3 data file", ex); sourceCoordinator.giveUpPartition(dataFilePartition); } numOfWorkers.decrementAndGet(); @@ -153,7 +153,10 @@ private void updateLoadStatus(String exportTaskId, Duration timeout) { try { sourceCoordinator.saveProgressStateForPartition(globalState, null); - // TODO: Stream is enabled and loadStatus.getLoadedFiles() == loadStatus.getTotalFiles(), create global state to indicate that stream can start + if (sourceConfig.isStreamEnabled() && loadStatus.getLoadedFiles() == loadStatus.getTotalFiles()) { + LOG.info("All exports are done, streaming can continue..."); + sourceCoordinator.createPartition(new GlobalState("stream-for-" + sourceConfig.getDbIdentifier(), null)); + } break; } catch (Exception e) { LOG.error("Failed to update the global status, looks like the status was out of date, will retry.."); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index 4831f1e91a..f35975f5b2 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -11,8 +11,10 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +103,11 @@ private void init() { createExportPartition(sourceConfig, startTime); } + if (sourceConfig.isStreamEnabled()) { + LOG.debug("Stream is enabled. Creating stream partition in the source coordination store."); + createStreamPartition(sourceConfig); + } + LOG.debug("Update initialization state"); LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); leaderProgressState.setInitialized(true); @@ -118,4 +125,10 @@ private void createExportPartition(RdsSourceConfig sourceConfig, Instant exportT sourceCoordinator.createPartition(exportPartition); } + private void createStreamPartition(RdsSourceConfig sourceConfig) { + final StreamProgressState progressState = new StreamProgressState(); + progressState.setWaitForExport(sourceConfig.isExportEnabled()); + StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState); + sourceCoordinator.createPartition(streamPartition); + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/BinlogCoordinate.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/BinlogCoordinate.java new file mode 100644 index 0000000000..6818dabe9b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/BinlogCoordinate.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class BinlogCoordinate { + + @JsonProperty("binlogFilename") + private final String binlogFilename; + + @JsonProperty("binlogPosition") + private final long binlogPosition; + + @JsonCreator + public BinlogCoordinate(@JsonProperty("binlogFilename") String binlogFilename, + @JsonProperty("binlogPosition") long binlogPosition) { + this.binlogFilename = binlogFilename; + this.binlogPosition = binlogPosition; + } + + public String getBinlogFilename() { + return binlogFilename; + } + + public long getBinlogPosition() { + return binlogPosition; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java new file mode 100644 index 0000000000..f9ce48a3cc --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/TableMetadata.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +import java.util.List; + +public class TableMetadata { + private String databaseName; + private String tableName; + private List columnNames; + private List primaryKeys; + + public TableMetadata(String tableName, String databaseName, List columnNames, List primaryKeys) { + this.tableName = tableName; + this.databaseName = databaseName; + this.columnNames = columnNames; + this.primaryKeys = primaryKeys; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public String getFullTableName() { + return databaseName + "." + tableName; + } + + public List getColumnNames() { + return columnNames; + } + + public List getPrimaryKeys() { + return primaryKeys; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setColumnNames(List columnNames) { + this.columnNames = columnNames; + } + + public void setPrimaryKeys(List primaryKeys) { + this.primaryKeys = primaryKeys; + } + +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java new file mode 100644 index 0000000000..8802d7de46 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.rds.model.DBInstance; +import software.amazon.awssdk.services.rds.model.DescribeDbInstancesRequest; +import software.amazon.awssdk.services.rds.model.DescribeDbInstancesResponse; + +public class BinlogClientFactory { + + private final RdsSourceConfig sourceConfig; + + private final RdsClient rdsClient; + + public BinlogClientFactory(final RdsSourceConfig sourceConfig, final RdsClient rdsClient) { + this.sourceConfig = sourceConfig; + this.rdsClient = rdsClient; + } + + public BinaryLogClient create() { + DBInstance dbInstance = describeDbInstance(sourceConfig.getDbIdentifier()); + return new BinaryLogClient( + dbInstance.endpoint().address(), + dbInstance.endpoint().port(), + // For test + // "127.0.0.1", + // 3306, + sourceConfig.getAuthenticationConfig().getUsername(), + sourceConfig.getAuthenticationConfig().getPassword()); + } + + private DBInstance describeDbInstance(final String dbInstanceIdentifier) { + DescribeDbInstancesRequest request = DescribeDbInstancesRequest.builder() + .dbInstanceIdentifier(dbInstanceIdentifier) + .build(); + + DescribeDbInstancesResponse response = rdsClient.describeDBInstances(request); + return response.dbInstances().get(0); + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java new file mode 100644 index 0000000000..7ac0dcbe2b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -0,0 +1,209 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class BinlogEventListener implements BinaryLogClient.EventListener { + + private static final Logger LOG = LoggerFactory.getLogger(BinlogEventListener.class); + + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + + /** + * TableId to TableMetadata mapping + */ + private final Map tableMetadataMap; + + private final StreamRecordConverter recordConverter; + private final BufferAccumulator> bufferAccumulator; + private final List tableNames; + private final String s3Prefix; + + public BinlogEventListener(final Buffer> buffer, final RdsSourceConfig sourceConfig) { + tableMetadataMap = new HashMap<>(); + recordConverter = new StreamRecordConverter(sourceConfig.getStream().getPartitionCount()); + bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + s3Prefix = sourceConfig.getS3Prefix(); + tableNames = sourceConfig.getTableNames(); + } + + @Override + public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) { + EventType eventType = event.getHeader().getEventType(); + + switch (eventType) { + case TABLE_MAP: + handleTableMapEvent(event); + break; + case WRITE_ROWS: + case EXT_WRITE_ROWS: + handleInsertEvent(event); + break; + case UPDATE_ROWS: + case EXT_UPDATE_ROWS: + handleUpdateEvent(event); + break; + case DELETE_ROWS: + case EXT_DELETE_ROWS: + handleDeleteEvent(event); + break; + } + } + + void handleTableMapEvent(com.github.shyiko.mysql.binlog.event.Event event) { + final TableMapEventData data = event.getData(); + final TableMapEventMetadata tableMapEventMetadata = data.getEventMetadata(); + final List columnNames = tableMapEventMetadata.getColumnNames(); + final List primaryKeys = tableMapEventMetadata.getSimplePrimaryKeys().stream() + .map(columnNames::get) + .collect(Collectors.toList()); + final TableMetadata tableMetadata = new TableMetadata( + data.getTable(), data.getDatabase(), columnNames, primaryKeys); + if (isTableOfInterest(tableMetadata.getFullTableName())) { + tableMetadataMap.put(data.getTableId(), tableMetadata); + } + } + + void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) { + // get new row data from the event + LOG.debug("Handling insert event"); + final WriteRowsEventData data = event.getData(); + if (!tableMetadataMap.containsKey(data.getTableId())) { + LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read"); + return; + } + final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); + final String fullTableName = tableMetadata.getFullTableName(); + if (!isTableOfInterest(fullTableName)) { + LOG.debug("The event is not from a table of interest"); + return; + } + final List columnNames = tableMetadata.getColumnNames(); + final List primaryKeys = tableMetadata.getPrimaryKeys(); + + // Construct data prepper JacksonEvent + for (final Object[] rowDataArray : data.getRows()) { + final Map rowDataMap = new HashMap<>(); + for (int i = 0; i < rowDataArray.length; i++) { + rowDataMap.put(columnNames.get(i), rowDataArray[i]); + } + + Event pipelineEvent = recordConverter.convert( + rowDataMap, tableMetadata.getDatabaseName(), tableMetadata.getTableName(), OpenSearchBulkActions.INDEX, primaryKeys, s3Prefix); + addToBuffer(new Record<>(pipelineEvent)); + } + + flushBuffer(); + } + + void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) { + LOG.debug("Handling update event"); + final UpdateRowsEventData data = event.getData(); + if (!tableMetadataMap.containsKey(data.getTableId())) { + return; + } + final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); + final String fullTableName = tableMetadata.getFullTableName(); + if (!isTableOfInterest(fullTableName)) { + LOG.debug("The event is not from a table of interest"); + return; + } + final List columnNames = tableMetadata.getColumnNames(); + final List primaryKeys = tableMetadata.getPrimaryKeys(); + + for (Map.Entry updatedRow : data.getRows()) { + // updatedRow contains data before update as key and data after update as value + final Object[] rowData = updatedRow.getValue(); + + final Map dataMap = new HashMap<>(); + for (int i = 0; i < rowData.length; i++) { + dataMap.put(columnNames.get(i), rowData[i]); + } + + final Event pipelineEvent = recordConverter.convert( + dataMap, tableMetadata.getDatabaseName(), tableMetadata.getTableName(), OpenSearchBulkActions.INDEX, primaryKeys, s3Prefix); + addToBuffer(new Record<>(pipelineEvent)); + } + + flushBuffer(); + } + + void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) { + LOG.debug("Handling delete event"); + final DeleteRowsEventData data = event.getData(); + if (!tableMetadataMap.containsKey(data.getTableId())) { + LOG.debug("Cannot find table metadata, the event is likely not from a table of interest or the table metadata was not read"); + return; + } + final TableMetadata tableMetadata = tableMetadataMap.get(data.getTableId()); + final String fullTableName = tableMetadata.getFullTableName(); + if (!isTableOfInterest(fullTableName)) { + LOG.debug("The event is not from a table of interest"); + return; + } + final List columnNames = tableMetadata.getColumnNames(); + final List primaryKeys = tableMetadata.getPrimaryKeys(); + + for (Object[] rowDataArray : data.getRows()) { + final Map rowDataMap = new HashMap<>(); + for (int i = 0; i < rowDataArray.length; i++) { + rowDataMap.put(columnNames.get(i), rowDataArray[i]); + } + + final Event pipelineEvent = recordConverter.convert( + rowDataMap, tableMetadata.getDatabaseName(), tableMetadata.getTableName(), OpenSearchBulkActions.DELETE, primaryKeys, s3Prefix); + addToBuffer(new Record<>(pipelineEvent)); + } + + flushBuffer(); + } + + private boolean isTableOfInterest(String tableName) { + return new HashSet<>(tableNames).contains(tableName); + } + + private void addToBuffer(final Record record) { + try { + bufferAccumulator.add(record); + } catch (Exception e) { + LOG.error("Failed to add event to buffer", e); + } + } + + private void flushBuffer() { + try { + bufferAccumulator.flush(); + } catch (Exception e) { + LOG.error("Failed to flush buffer", e); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java new file mode 100644 index 0000000000..0b42c95c38 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + + +public class StreamScheduler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); + + private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; + + private final EnhancedSourceCoordinator sourceCoordinator; + private final RdsSourceConfig sourceConfig; + private final BinaryLogClient binaryLogClient; + private final PluginMetrics pluginMetrics; + + private volatile boolean shutdownRequested = false; + + public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator, + final RdsSourceConfig sourceConfig, + final BinaryLogClient binaryLogClient, + final Buffer> buffer, + final PluginMetrics pluginMetrics) { + this.sourceCoordinator = sourceCoordinator; + this.sourceConfig = sourceConfig; + this.binaryLogClient = binaryLogClient; + this.binaryLogClient.registerEventListener(new BinlogEventListener(buffer, sourceConfig)); + this.pluginMetrics = pluginMetrics; + } + + @Override + public void run() { + LOG.debug("Start running Stream Scheduler"); + while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { + try { + final Optional sourcePartition = sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + LOG.info("Acquired partition to read from stream"); + + final StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); + final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics); + streamWorker.processStream(streamPartition); + } + + try { + LOG.debug("Waiting to acquire stream partition."); + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + + } catch (Exception e) { + LOG.error("Received an exception during stream processing, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } + } + } + + public void shutdown() { + shutdownRequested = true; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java new file mode 100644 index 0000000000..ce130d0d12 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Optional; + +public class StreamWorker { + private static final Logger LOG = LoggerFactory.getLogger(StreamWorker.class); + + private static final int DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS = 60_000; + + private final EnhancedSourceCoordinator sourceCoordinator; + private final BinaryLogClient binaryLogClient; + private final PluginMetrics pluginMetrics; + + StreamWorker(final EnhancedSourceCoordinator sourceCoordinator, + final BinaryLogClient binaryLogClient, + final PluginMetrics pluginMetrics) { + this.sourceCoordinator = sourceCoordinator; + this.binaryLogClient = binaryLogClient; + this.pluginMetrics = pluginMetrics; + } + + public static StreamWorker create(final EnhancedSourceCoordinator sourceCoordinator, + final BinaryLogClient binaryLogClient, + final PluginMetrics pluginMetrics) { + return new StreamWorker(sourceCoordinator, binaryLogClient, pluginMetrics); + } + + public void processStream(final StreamPartition streamPartition) { + // get current binlog position + BinlogCoordinate currentBinlogCoords = streamPartition.getProgressState().get().getCurrentPosition(); + + // set start of binlog stream to current position if exists + if (currentBinlogCoords != null) { + final String binlogFilename = currentBinlogCoords.getBinlogFilename(); + final long binlogPosition = currentBinlogCoords.getBinlogPosition(); + LOG.debug("Will start binlog stream from binlog file {} and position {}.", binlogFilename, binlogPosition); + binaryLogClient.setBinlogFilename(binlogFilename); + binaryLogClient.setBinlogPosition(binlogPosition); + } + + while (shouldWaitForExport(streamPartition) && !Thread.currentThread().isInterrupted()) { + LOG.info("Initial load not completed yet for {}, waiting...", streamPartition.getPartitionKey()); + try { + Thread.sleep(DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); + Thread.currentThread().interrupt(); + break; + } + } + + try { + LOG.info("Connecting to binary log stream."); + binaryLogClient.connect(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + binaryLogClient.disconnect(); + } catch (IOException e) { + LOG.error("Binary log client failed to disconnect.", e); + } + } + } + + private boolean shouldWaitForExport(final StreamPartition streamPartition) { + if (!streamPartition.getProgressState().get().shouldWaitForExport()) { + LOG.debug("Export is not enabled. Proceed with streaming."); + return false; + } + + return !isExportDone(streamPartition); + } + + private boolean isExportDone(StreamPartition streamPartition) { + final String dbIdentifier = streamPartition.getPartitionKey(); + Optional globalStatePartition = sourceCoordinator.getPartition("stream-for-" + dbIdentifier); + return globalStatePartition.isPresent(); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/S3PartitionCreatorTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/S3PartitionCreatorTest.java new file mode 100644 index 0000000000..1018d90865 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/S3PartitionCreatorTest.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Random; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + + +class S3PartitionCreatorTest { + + @Test + void test_createPartition_create_correct_number_of_distinct_partition_strings() { + final int partitionCount = new Random().nextInt(10) + 1; + final S3PartitionCreator s3PartitionCreator = createObjectUnderTest(partitionCount); + + final List partitions = s3PartitionCreator.createPartitions(); + + assertThat(partitions.size(), is(partitionCount)); + assertThat(new HashSet<>(partitions).size(), is(partitionCount)); + } + + private S3PartitionCreator createObjectUnderTest(final int partitionCount) { + return new S3PartitionCreator(partitionCount); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java new file mode 100644 index 0000000000..8ccecc3ff1 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_DATABASE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_S3_PARTITION_KEY; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter.S3_PATH_DELIMITER; + + +class StreamRecordConverterTest { + + private StreamRecordConverter streamRecordConverter; + + @BeforeEach + void setUp() { + streamRecordConverter = createObjectUnderTest(); + } + + @Test + void test_convert_returns_expected_event() { + Map rowData = Map.of("key1", "value1", "key2", "value2"); + final String databaseName = UUID.randomUUID().toString(); + final String tableName = UUID.randomUUID().toString(); + final OpenSearchBulkActions bulkAction = OpenSearchBulkActions.INDEX; + final List primaryKeys = List.of("key1"); + final String s3Prefix = UUID.randomUUID().toString(); + + Event event = streamRecordConverter.convert(rowData, databaseName, tableName, bulkAction, primaryKeys, s3Prefix); + + assertThat(event.toMap(), is(rowData)); + assertThat(event.getMetadata().getAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE), is(databaseName)); + assertThat(event.getMetadata().getAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE), is(tableName)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), is(bulkAction.toString())); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), is("value1")); + assertThat(event.getMetadata().getAttribute(EVENT_S3_PARTITION_KEY).toString(), startsWith(s3Prefix + S3_PATH_DELIMITER)); + } + + private StreamRecordConverter createObjectUnderTest() { + return new StreamRecordConverter(new Random().nextInt(1000) + 1); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactoryTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactoryTest.java new file mode 100644 index 0000000000..52ba312b5a --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientFactoryTest.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.rds.model.DBInstance; +import software.amazon.awssdk.services.rds.model.DescribeDbInstancesRequest; +import software.amazon.awssdk.services.rds.model.DescribeDbInstancesResponse; + +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class BinlogClientFactoryTest { + + @Mock + private RdsSourceConfig sourceConfig; + + @Mock + private RdsClient rdsClient; + + private BinlogClientFactory binlogClientFactory; + private Random random; + + @BeforeEach + void setUp() { + binlogClientFactory = createBinlogClientFactory(); + random = new Random(); + } + + @Test + void test_create() { + DescribeDbInstancesResponse describeDbInstancesResponse = mock(DescribeDbInstancesResponse.class); + DBInstance dbInstance = mock(DBInstance.class, RETURNS_DEEP_STUBS); + final String address = UUID.randomUUID().toString(); + final Integer port = random.nextInt(); + when(dbInstance.endpoint().address()).thenReturn(address); + when(dbInstance.endpoint().port()).thenReturn(port); + when(describeDbInstancesResponse.dbInstances()).thenReturn(List.of(dbInstance)); + when(sourceConfig.getDbIdentifier()).thenReturn(UUID.randomUUID().toString()); + when(rdsClient.describeDBInstances(any(DescribeDbInstancesRequest.class))).thenReturn(describeDbInstancesResponse); + RdsSourceConfig.AuthenticationConfig authenticationConfig = mock(RdsSourceConfig.AuthenticationConfig.class); + when(sourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig); + + binlogClientFactory.create(); + } + + private BinlogClientFactory createBinlogClientFactory() { + return new BinlogClientFactory(sourceConfig, rdsClient); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java new file mode 100644 index 0000000000..406a89cec9 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.event.EventType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class BinlogEventListenerTest { + + @Mock + private Buffer> buffer; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private RdsSourceConfig sourceConfig; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private com.github.shyiko.mysql.binlog.event.Event binlogEvent; + + private static BinlogEventListener objectUnderTest; + + @BeforeEach + void setUp() { + objectUnderTest = spy(createObjectUnderTest()); + } + + @Test + void test_given_TableMap_event_then_calls_correct_handler() { + when(binlogEvent.getHeader().getEventType()).thenReturn(EventType.TABLE_MAP); + doNothing().when(objectUnderTest).handleTableMapEvent(binlogEvent); + + objectUnderTest.onEvent(binlogEvent); + + verify(objectUnderTest).handleTableMapEvent(binlogEvent); + } + + @ParameterizedTest + @EnumSource(names = {"WRITE_ROWS", "EXT_WRITE_ROWS"}) + void test_given_WriteRows_event_then_calls_correct_handler(EventType eventType) { + when(binlogEvent.getHeader().getEventType()).thenReturn(eventType); + doNothing().when(objectUnderTest).handleInsertEvent(binlogEvent); + + objectUnderTest.onEvent(binlogEvent); + + verify(objectUnderTest).handleInsertEvent(binlogEvent); + } + + @ParameterizedTest + @EnumSource(names = {"UPDATE_ROWS", "EXT_UPDATE_ROWS"}) + void test_given_UpdateRows_event_then_calls_correct_handler(EventType eventType) { + when(binlogEvent.getHeader().getEventType()).thenReturn(eventType); + doNothing().when(objectUnderTest).handleUpdateEvent(binlogEvent); + + objectUnderTest.onEvent(binlogEvent); + + verify(objectUnderTest).handleUpdateEvent(binlogEvent); + } + + @ParameterizedTest + @EnumSource(names = {"DELETE_ROWS", "EXT_DELETE_ROWS"}) + void test_given_DeleteRows_event_then_calls_correct_handler(EventType eventType) { + when(binlogEvent.getHeader().getEventType()).thenReturn(eventType); + doNothing().when(objectUnderTest).handleDeleteEvent(binlogEvent); + + objectUnderTest.onEvent(binlogEvent); + + verify(objectUnderTest).handleDeleteEvent(binlogEvent); + } + + private BinlogEventListener createObjectUnderTest() { + return new BinlogEventListener(buffer, sourceConfig); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java new file mode 100644 index 0000000000..1a152137ee --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamSchedulerTest.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StreamSchedulerTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private RdsSourceConfig sourceConfig; + + @Mock + private BinaryLogClient binaryLogClient; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Buffer> buffer; + + private StreamScheduler objectUnderTest; + + @BeforeEach + void setUp() { + objectUnderTest = createObjectUnderTest(); + } + + @Test + void test_given_no_stream_partition_then_no_stream_actions() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(objectUnderTest); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verify(binaryLogClient).registerEventListener(any(BinlogEventListener.class)); + verifyNoMoreInteractions(binaryLogClient); + } + + @Test + void test_given_stream_partition_then_start_stream() throws InterruptedException { + final StreamPartition streamPartition = mock(StreamPartition.class); + when(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).thenReturn(Optional.of(streamPartition)); + + StreamWorker streamWorker = mock(StreamWorker.class); + doNothing().when(streamWorker).processStream(streamPartition); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> { + try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class)) { + streamWorkerMockedStatic.when(() -> StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics)) + .thenReturn(streamWorker); + objectUnderTest.run(); + } + + }); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verify(streamWorker).processStream(streamPartition); + } + + @Test + void test_shutdown() { + lenient().when(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(objectUnderTest); + objectUnderTest.shutdown(); + verifyNoMoreInteractions(sourceCoordinator); + executorService.shutdownNow(); + } + + private StreamScheduler createObjectUnderTest() { + return new StreamScheduler(sourceCoordinator, sourceConfig, binaryLogClient, buffer, pluginMetrics); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java new file mode 100644 index 0000000000..4dd3930466 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; + +import java.io.IOException; +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StreamWorkerTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private BinaryLogClient binaryLogClient; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private StreamPartition streamPartition; + + private StreamWorker streamWorker; + + @BeforeEach + void setUp() { + streamWorker = createObjectUnderTest(); + } + + @Test + void test_processStream_with_given_binlog_coordinates() throws IOException { + StreamProgressState streamProgressState = mock(StreamProgressState.class); + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + final String binlogFilename = "binlog-001"; + final Long binlogPosition = 100L; + when(streamProgressState.getCurrentPosition()).thenReturn(new BinlogCoordinate(binlogFilename, binlogPosition)); + when(streamProgressState.shouldWaitForExport()).thenReturn(false); + + streamWorker.processStream(streamPartition); + + verify(binaryLogClient).setBinlogFilename(binlogFilename); + verify(binaryLogClient).setBinlogPosition(binlogPosition); + verify(binaryLogClient).connect(); + } + + @Test + void test_processStream_without_current_binlog_coordinates() throws IOException { + StreamProgressState streamProgressState = mock(StreamProgressState.class); + when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + final String binlogFilename = "binlog-001"; + final Long binlogPosition = 100L; + when(streamProgressState.getCurrentPosition()).thenReturn(null); + when(streamProgressState.shouldWaitForExport()).thenReturn(false); + + streamWorker.processStream(streamPartition); + + verify(binaryLogClient, never()).setBinlogFilename(binlogFilename); + verify(binaryLogClient, never()).setBinlogPosition(binlogPosition); + verify(binaryLogClient).connect(); + } + + private StreamWorker createObjectUnderTest() { + return new StreamWorker(sourceCoordinator, binaryLogClient, pluginMetrics); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 2b1d6b3e49..3cddb1a2e8 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -152,6 +152,7 @@ public void run() { } for (String partitionKey: partitionKeys) { + LOG.debug("Scan object worker is stopped, giving up partitions."); sourceCoordinator.giveUpPartition(partitionKey); } } @@ -203,6 +204,7 @@ private void startProcessingObject(final long waitTimeMillis) { deleteObjectsForPartition.forEach(s3ObjectDeleteWorker::deleteS3Object); objectsToDeleteForAcknowledgmentSets.remove(objectToProcess.get().getPartitionKey()); } else { + LOG.debug("Did not receive positive acknowledgement, giving up partition."); sourceCoordinator.giveUpPartition(objectToProcess.get().getPartitionKey()); } partitionKeys.remove(objectToProcess.get().getPartitionKey()); @@ -268,7 +270,7 @@ private void processFolderPartition(final SourcePartition sourceCoordinator.deletePartition(folderPartition.getPartitionKey()); return; } - + LOG.debug("No objects to process, giving up partition"); sourceCoordinator.giveUpPartition(folderPartition.getPartitionKey(), Instant.now()); return; } diff --git a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java index c4af96a3d4..db70e3c6db 100644 --- a/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java +++ b/data-prepper-plugins/split-event-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/splitevent/SplitEventProcessorConfig.java @@ -11,6 +11,7 @@ package org.opensearch.dataprepper.plugins.processor.splitevent; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; @@ -20,12 +21,15 @@ public class SplitEventProcessorConfig { @NotEmpty @NotNull @JsonProperty("field") + @JsonPropertyDescription("The event field to be split") private String field; @JsonProperty("delimiter_regex") + @JsonPropertyDescription("The regular expression used as the delimiter for splitting the field") private String delimiterRegex; @Size(min = 1, max = 1) + @JsonPropertyDescription("The delimiter used for splitting the field. If not specified, the default delimiter is used") private String delimiter; public String getField() { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a4413138c9..b82aa23a4f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index b740cf1339..1aa94a4269 100755 --- a/gradlew +++ b/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/.