diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java index 5f84f24943..5fdfb0a2ef 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java @@ -28,10 +28,9 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper; -import org.opensearch.dataprepper.plugins.source.opensearch.AuthConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.AuthConfig; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 6e4c675820..172f17c4b4 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -10,16 +10,15 @@ import org.apache.commons.lang3.EnumUtils; import org.opensearch.client.opensearch._types.VersionType; import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ActionConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3FileReader; -import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.arns.Arn; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java index 34658ae4de..9bbed3a456 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java @@ -9,7 +9,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.plugins.source.opensearch.AuthConfig; import java.util.function.Function; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 31b77e0bf3..05859535c3 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; @@ -41,10 +42,10 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.Collections; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -53,12 +54,12 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.lenient; import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.EXTERNAL_LATENCY; import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.INTERNAL_LATENCY; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_ERRORS; @@ -79,9 +80,6 @@ public class OpenSearchSinkTest { @Mock private OpenSearchClient openSearchClient; - @Mock - private PluginFactory pluginFactory; - @Mock private SinkContext sinkContext; @@ -97,6 +95,9 @@ public class OpenSearchSinkTest { @Mock private OpenSearchSinkConfiguration openSearchSinkConfiguration; + @Mock + private PipelineDescription pipelineDescription; + @Mock private IndexConfiguration indexConfiguration; @@ -109,6 +110,9 @@ public class OpenSearchSinkTest { @Mock private Timer bulkRequestTimer; + @Mock + private OpenSearchSinkConfig openSearchSinkConfig; + @Mock private Counter bulkRequestErrorsCounter; @@ -129,8 +133,7 @@ public class OpenSearchSinkTest { @BeforeEach void setup() { - when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); - when(pluginSetting.getName()).thenReturn(UUID.randomUUID().toString()); + when(pipelineDescription.getPipelineName()).thenReturn(UUID.randomUUID().toString()); final RetryConfiguration retryConfiguration = mock(RetryConfiguration.class); when(retryConfiguration.getDlq()).thenReturn(Optional.empty()); @@ -146,7 +149,6 @@ void setup() { when(indexConfiguration.getDocumentIdField()).thenReturn(null); when(indexConfiguration.getRoutingField()).thenReturn(null); when(indexConfiguration.getRouting()).thenReturn(null); - when(indexConfiguration.getPipeline()).thenReturn(null); when(indexConfiguration.getActions()).thenReturn(null); when(indexConfiguration.getDocumentRootKey()).thenReturn(null); lenient().when(indexConfiguration.getVersionType()).thenReturn(null); @@ -184,10 +186,10 @@ private OpenSearchSink createObjectUnderTest() throws IOException { indexManagerFactory = mock; })) { pluginMetricsMockedStatic.when(() -> PluginMetrics.fromPluginSetting(pluginSetting)).thenReturn(pluginMetrics); - openSearchSinkConfigurationMockedStatic.when(() -> OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator)) + openSearchSinkConfigurationMockedStatic.when(() -> OpenSearchSinkConfiguration.readOSConfig(openSearchSinkConfig, expressionEvaluator)) .thenReturn(openSearchSinkConfiguration); return new OpenSearchSink( - pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable); + pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); } } @@ -204,7 +206,7 @@ void test_initialization() throws IOException { @Test void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_creates_DLQObject() throws IOException { - + when(pluginSetting.getName()).thenReturn("opensearch"); final String versionExpression = UUID.randomUUID().toString(); when(indexConfiguration.getVersionExpression()).thenReturn(versionExpression); @@ -214,6 +216,7 @@ void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_ final EventHandle eventHandle = mock(EventHandle.class); when(event.getEventHandle()).thenReturn(eventHandle); final String index = UUID.randomUUID().toString(); + when(event.formatString(pipelineDescription.getPipelineName(), expressionEvaluator)).thenReturn(null); when(event.formatString(versionExpression, expressionEvaluator)).thenReturn("not_a_number"); when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); final Record eventRecord = new Record<>(event); @@ -234,7 +237,7 @@ void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_ when(dlqObjectBuilder.withFailedData(failedDlqData.capture())).thenReturn(dlqObjectBuilder); when(dlqObjectBuilder.withPluginName(pluginSetting.getName())).thenReturn(dlqObjectBuilder); when(dlqObjectBuilder.withPluginId(pluginSetting.getName())).thenReturn(dlqObjectBuilder); - when(dlqObjectBuilder.withPipelineName(pluginSetting.getPipelineName())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPipelineName(pipelineDescription.getPipelineName())).thenReturn(dlqObjectBuilder); when(dlqObject.getFailedData()).thenReturn(mock(FailedDlqData.class)); doNothing().when(dlqObject).releaseEventHandle(false); @@ -294,14 +297,16 @@ void test_routing_in_document() throws IOException { void test_pipeline_in_document() throws IOException { String pipelineValue = UUID.randomUUID().toString(); String pipelineKey = UUID.randomUUID().toString(); + String pipelineName = UUID.randomUUID().toString(); + when(pipelineDescription.getPipelineName()).thenReturn(pipelineName); final OpenSearchSink objectUnderTest = createObjectUnderTest(); final Event event = JacksonEvent.builder() .withEventType("event") .withData(Collections.singletonMap(pipelineKey, pipelineValue)) .build(); - assertThat(objectUnderTest.getDocument(event).getPipelineField(), equalTo(Optional.empty())); + assertThat(objectUnderTest.getDocument(event).getPipelineField(), equalTo(Optional.of(pipelineName))); - when(indexConfiguration.getPipeline()).thenReturn("${"+pipelineKey+"}"); + when(pipelineDescription.getPipelineName()).thenReturn("${"+pipelineKey+"}"); final OpenSearchSink objectUnderTest2 = createObjectUnderTest(); assertThat(objectUnderTest2.getDocument(event).getPipelineField(), equalTo(Optional.of(pipelineValue))); } @@ -309,6 +314,7 @@ void test_pipeline_in_document() throws IOException { @Test void doOutput_with_invalid_version_expression_result_catches_RuntimeException_and_creates_DLQObject() throws IOException { + when(pluginSetting.getName()).thenReturn("opensearch"); final String versionExpression = UUID.randomUUID().toString(); when(indexConfiguration.getVersionExpression()).thenReturn(versionExpression); @@ -318,6 +324,7 @@ void doOutput_with_invalid_version_expression_result_catches_RuntimeException_an final EventHandle eventHandle = mock(EventHandle.class); when(event.getEventHandle()).thenReturn(eventHandle); final String index = UUID.randomUUID().toString(); + when(event.formatString(pipelineDescription.getPipelineName(), expressionEvaluator)).thenReturn(null); when(event.formatString(versionExpression, expressionEvaluator)).thenThrow(RuntimeException.class); when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); final Record eventRecord = new Record<>(event); @@ -338,7 +345,7 @@ void doOutput_with_invalid_version_expression_result_catches_RuntimeException_an when(dlqObjectBuilder.withFailedData(failedDlqData.capture())).thenReturn(dlqObjectBuilder); when(dlqObjectBuilder.withPluginName(pluginSetting.getName())).thenReturn(dlqObjectBuilder); when(dlqObjectBuilder.withPluginId(pluginSetting.getName())).thenReturn(dlqObjectBuilder); - when(dlqObjectBuilder.withPipelineName(pluginSetting.getPipelineName())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPipelineName(pipelineDescription.getPipelineName())).thenReturn(dlqObjectBuilder); when(dlqObject.getFailedData()).thenReturn(mock(FailedDlqData.class)); doNothing().when(dlqObject).releaseEventHandle(false); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index 481ff9eaa7..3e61ba55c0 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -14,7 +14,6 @@ import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfig; @@ -53,9 +52,7 @@ import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DISTRIBUTION_VERSION; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DOCUMENT_ROOT_KEY; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DOCUMENT_VERSION_EXPRESSION; -import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.PIPELINE; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.ROUTING; -import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.ROUTING_FIELD; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.SERVERLESS; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.TEMPLATE_TYPE; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants.RAW_DEFAULT_TEMPLATE_FILE;