diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 5b463c793f..ed5e8dfe03 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -38,6 +38,7 @@ import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventType; @@ -129,6 +130,8 @@ public class OpenSearchSinkIT { private SinkContext sinkContext; private String testTagsTargetKey; + ObjectMapper objectMapper; + @Mock private PluginFactory pluginFactory; @@ -138,24 +141,33 @@ public class OpenSearchSinkIT { @Mock private ExpressionEvaluator expressionEvaluator; + @Mock + private OpenSearchSinkConfig openSearchSinkConfig; + + @Mock + private PipelineDescription pipelineDescription; + + @Mock + private PluginSetting pluginSetting; + @Mock private PluginConfigObservable pluginConfigObservable; - public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) { + public OpenSearchSink createObjectUnderTest(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) { OpenSearchSink sink = new OpenSearchSink( - pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable); + pluginSetting, null, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); if (doInitialize) { sink.doInitialize(); } return sink; } - public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginSetting, boolean doInitialize) { + public OpenSearchSink createObjectUnderTestWithSinkContext(OpenSearchSinkConfig openSearchSinkConfig, boolean doInitialize) { sinkContext = mock(SinkContext.class); testTagsTargetKey = RandomStringUtils.randomAlphabetic(5); when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey); OpenSearchSink sink = new OpenSearchSink( - pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable); + pluginSetting, null, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); if (doInitialize) { sink.doInitialize(); } @@ -187,8 +199,8 @@ public void cleanOpenSearch() throws Exception { @Test @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkRawSpanDefault() throws IOException { - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); assertThat(indexAlias, equalTo("otel-v1-apm-span")); Request request = new Request(HttpMethod.HEAD, indexAlias); @@ -215,7 +227,7 @@ public void testInstantiateSinkRawSpanDefault() throws IOException { assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); // Instantiate sink again - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); // Make sure no new write index *-000001 is created under alias final String rolloverIndexName = String.format("%s-000002", indexAlias); request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); @@ -232,8 +244,8 @@ public void testInstantiateSinkRawSpanDefault() throws IOException { @Test @DisabledIf(value = "isES6", disabledReason = LOG_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkLogsDefaultLogSink() throws IOException { - final PluginSetting pluginSetting = generatePluginSetting(IndexType.LOG_ANALYTICS.getValue(), null, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.LOG_ANALYTICS.getValue(), null, null); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.LOG_ANALYTICS); assertThat(indexAlias, equalTo("logs-otel-v1")); Request request = new Request(HttpMethod.HEAD, indexAlias); @@ -260,7 +272,7 @@ public void testInstantiateSinkLogsDefaultLogSink() throws IOException { assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); // Instantiate sink again - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); // Make sure no new write index *-000001 is created under alias final String rolloverIndexName = String.format("%s-000002", indexAlias); request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); @@ -277,8 +289,8 @@ public void testInstantiateSinkLogsDefaultLogSink() throws IOException { @Test @DisabledIf(value = "isES6", disabledReason = METRIC_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkMetricsDefaultMetricSink() throws IOException { - final PluginSetting pluginSetting = generatePluginSetting(IndexType.METRIC_ANALYTICS.getValue(), null, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.METRIC_ANALYTICS.getValue(), null, null); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.METRIC_ANALYTICS); assertThat(indexAlias, equalTo("metrics-otel-v1")); Request request = new Request(HttpMethod.HEAD, indexAlias); @@ -305,7 +317,7 @@ public void testInstantiateSinkMetricsDefaultMetricSink() throws IOException { assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); // Instantiate sink again - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); // Make sure no new write index *-000001 is created under alias final String rolloverIndexName = String.format("%s-000002", indexAlias); request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); @@ -326,8 +338,8 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I final String reservedIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); final Request request = new Request(HttpMethod.PUT, reservedIndexAlias); client.performRequest(request); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, false); Assert.assertThrows(String.format(AbstractIndexManager.INDEX_ALIAS_USED_AS_INDEX_ERROR, reservedIndexAlias), RuntimeException.class, () -> sink.doInitialize()); } @@ -344,9 +356,9 @@ public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompress @SuppressWarnings("unchecked") final Map expData2 = mapper.readValue(testDoc2, Map.class); final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, estimateBulkSizeUsingCompression, isRequestCompressionEnabled); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); @@ -413,7 +425,7 @@ public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompress @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc2, Map.class); final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, estimateBulkSizeUsingCompression, isRequestCompressionEnabled); // generate temporary directory for dlq file final File tempDirectory = Files.createTempDirectory("").toFile(); @@ -421,7 +433,7 @@ public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompress final String expDLQFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt"; pluginSetting.getSettings().put(RetryConfiguration.DLQ_FILE, expDLQFile); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); sink.shutdown(); @@ -466,8 +478,8 @@ public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompress @Test @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkServiceMapDefault() throws IOException { - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); final Request request = new Request(HttpMethod.HEAD, indexAlias); final Response response = client.performRequest(request); @@ -492,9 +504,9 @@ public void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompr @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc, Map.class); final List> testRecords = Collections.singletonList(jsonStringToRecord(testDoc)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, estimateBulkSizeUsingCompression, isRequestCompressionEnabled); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); final List> retSources = getSearchResponseDocSources(expIndexAlias); @@ -524,7 +536,7 @@ public void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompr assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); // Check restart for index already exists - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); sink.shutdown(); } @@ -533,8 +545,8 @@ public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { final String testIndexAlias = "test-alias"; final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; final Request request = new Request(HttpMethod.HEAD, testIndexAlias + extraURI); @@ -543,7 +555,7 @@ public void testInstantiateSinkCustomIndex_NoRollOver() throws IOException { sink.shutdown(); // Check restart for index already exists - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); sink.shutdown(); } @@ -559,9 +571,9 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy( final Map metadata = initializeConfigurationMetadata(null, indexAlias, testTemplateFile); metadata.put(IndexConfiguration.ISM_POLICY_FILE, TEST_CUSTOM_INDEX_POLICY_FILE); metadata.put(IndexConfiguration.TEMPLATE_TYPE, templateType); - final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; Request request = new Request(HttpMethod.HEAD, indexAlias + extraURI); @@ -605,7 +617,7 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy( assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); // Instantiate sink again - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); // Make sure no new write index *-000001 is created under alias final String rolloverIndexName = String.format("%s-000002", indexAlias); request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias"); @@ -633,8 +645,8 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( final String testTemplateFileV2 = getClass().getClassLoader().getResource(v2File).getFile(); // Create sink with template version 1 - PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, templateType, testTemplateFileV1); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; @@ -652,8 +664,8 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( sink.shutdown(); // Create sink with template version 2 - pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV2); - sink = createObjectUnderTest(pluginSetting, true); + openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, templateType, testTemplateFileV2); + sink = createObjectUnderTest(openSearchSinkConfig, true); getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); @@ -669,8 +681,8 @@ public void testInstantiateSinkDoesNotOverwriteNewerIndexTemplates( sink.shutdown(); // Create sink with template version 1 again - pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1); - sink = createObjectUnderTest(pluginSetting, true); + openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, templateType, testTemplateFileV1); + sink = createObjectUnderTest(openSearchSinkConfig, true); getTemplateRequest = new Request(HttpMethod.GET, "/" + templatePath + "/" + expectedIndexTemplateName + extraURI); @@ -702,8 +714,8 @@ public void testIndexNameWithDateNotAsSuffixCreatesIndexTemplate( final String expectedIndexTemplateName = "prefix-suffix-index-template"; final String testTemplateFileV1 = getClass().getClassLoader().getResource(templateFile).getFile(); - PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, templateType, testTemplateFileV1); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, templateType, testTemplateFileV1); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; @@ -814,9 +826,9 @@ public void testOutputCustomIndex() throws IOException, InterruptedException { final String testIdField = "someId"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -840,10 +852,10 @@ public void testOpenSearchBulkActionsCreate() throws IOException, InterruptedExc final String testIdField = "someId"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); pluginSetting.getSettings().put(IndexConfiguration.ACTION, OpenSearchBulkActions.CREATE.toString()); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -867,7 +879,7 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException, final String testIdField = "someId"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); Event event = (Event) testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "create"); @@ -876,7 +888,7 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException, when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -900,7 +912,7 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce final String testIdField = "someId"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); Event event = (Event) testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "unknown"); @@ -909,7 +921,7 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(0)); @@ -926,14 +938,14 @@ public void testBulkActionCreateWithActions() throws IOException, InterruptedExc final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.CREATE.toString()); aList.add(aMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -959,14 +971,14 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1"))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.CREATE.toString()); aList.add(aMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -986,7 +998,7 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc aMap.put("type", OpenSearchBulkActions.UPDATE.toString()); aList.add(aMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); retSources = getSearchResponseDocSources(testIndexAlias); @@ -1015,7 +1027,7 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr List> testRecords = Collections.singletonList(jsonStringToRecord(createJsonEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ROOT_KEY, documentRootKey); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); @@ -1026,7 +1038,7 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr aList.add(actionMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -1051,7 +1063,7 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr actionMap.put("type", OpenSearchBulkActions.UPDATE.toString()); aList.add(actionMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); retSources = getSearchResponseDocSources(testIndexAlias); @@ -1078,10 +1090,10 @@ public void testBulkActionUpsertWithActionsAndNoCreate() throws IOException, Int actionMap.put("type", OpenSearchBulkActions.UPSERT.toString()); aList.add(actionMap); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); @@ -1104,14 +1116,14 @@ public void testBulkActionUpsertWithActions() throws IOException, InterruptedExc final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1"))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.CREATE.toString()); aList.add(aMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -1131,7 +1143,7 @@ public void testBulkActionUpsertWithActions() throws IOException, InterruptedExc aMap.put("type", OpenSearchBulkActions.UPSERT.toString()); aList.add(aMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - sink = createObjectUnderTest(pluginSetting, true); + sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); retSources = getSearchResponseDocSources(testIndexAlias); @@ -1152,14 +1164,14 @@ public void testBulkActionUpsertWithoutCreate() throws IOException, InterruptedE final String testIdField = "someId"; final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson3(testIdField, testId, "name", "value1", "newKey", "newValue"))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.UPSERT.toString()); aList.add(aMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); @@ -1188,14 +1200,14 @@ public void testBulkActionDeleteWithActions() throws IOException, InterruptedExc final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, testTemplateFile); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.DELETE.toString()); aList.add(aMap); pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(0)); @@ -1214,8 +1226,8 @@ public void testEventOutputWithTags() throws IOException, InterruptedException { final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - final OpenSearchSink sink = createObjectUnderTestWithSinkContext(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTestWithSinkContext(openSearchSinkConfig, true); sink.output(testRecords); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); @@ -1241,8 +1253,8 @@ public void testEventOutput() throws IOException, InterruptedException { final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); verify(pluginConfigObservable).addPluginConfigObserver(any()); sink.output(testRecords); @@ -1272,8 +1284,8 @@ public void testEventOutputWithSpecialAndExtremeValues(final Object testValue) t final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); @@ -1298,9 +1310,9 @@ public void testOpenSearchDocumentId(final String testDocumentIdField) throws IO final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testDocumentIdField); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List docIds = getSearchResponseDocIds(testIndexAlias); @@ -1323,9 +1335,9 @@ public void testOpenSearchRoutingField(final String testRoutingField) throws IOE final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, testRoutingField); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List routingFields = getSearchResponseRoutingFields(testIndexAlias); @@ -1350,11 +1362,11 @@ public void testOpenSearchRouting(final String testRouting) throws IOException, final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); if (!testRouting.isEmpty()) { pluginSetting.getSettings().put(IndexConfiguration.ROUTING, "${"+testRouting+"}"); } - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List routingFields = getSearchResponseRoutingFields(testIndexAlias); @@ -1381,10 +1393,10 @@ public void testOpenSearchRoutingWithExpressions(final String testRouting) throw final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); pluginSetting.getSettings().put(IndexConfiguration.ROUTING, "${/"+testRouting+"}"); when(expressionEvaluator.isValidFormatExpression(any(String.class))).thenReturn(true); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List routingFields = getSearchResponseRoutingFields(testIndexAlias); @@ -1409,10 +1421,10 @@ public void testOpenSearchRoutingWithMixedExpressions(final String testRouting) final String prefix = RandomStringUtils.randomAlphabetic(5); final String suffix = RandomStringUtils.randomAlphabetic(6); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); pluginSetting.getSettings().put(IndexConfiguration.ROUTING, prefix+"-${/"+testRouting+"}-"+suffix); when(expressionEvaluator.isValidFormatExpression(any(String.class))).thenReturn(true); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final String expectedRouting = prefix+"-"+routing+"-"+suffix; @@ -1441,8 +1453,8 @@ public void testOpenSearchDynamicIndex(final String testIndex) throws IOExceptio final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, dynamicTestIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -1474,8 +1486,8 @@ public void testOpenSearchDynamicIndexWithDate(final String testIndex, final Str final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, dynamicTestIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(expectedIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -1510,8 +1522,8 @@ public void testOpenSearchDynamicIndexWithDateNotAsSuffix( final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, dynamicTestIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, dynamicTestIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(expectedIndexAlias); assertThat(retSources.size(), equalTo(1)); @@ -1538,8 +1550,8 @@ public void testOpenSearchIndexWithDate(final String testDatePattern) throws IOE final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(expectedIndexName); assertThat(retSources.size(), equalTo(1)); @@ -1566,8 +1578,8 @@ public void testOpenSearchIndexWithDateNotAsSuffix(final String testIndexAlias) final List> testRecords = Collections.singletonList(new Record<>(testEvent)); - final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, testIndexAlias, null); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(expectedIndexName); assertThat(retSources.size(), equalTo(1)); @@ -1579,16 +1591,16 @@ public void testOpenSearchIndexWithDateNotAsSuffix(final String testIndexAlias) public void testOpenSearchIndexWithInvalidDate() throws IOException, InterruptedException { String invalidDatePattern = "yyyy-MM-dd HH:ss:mm"; final String invalidTestIndexAlias = "test-index-%{" + invalidDatePattern + "}"; - final PluginSetting pluginSetting = generatePluginSetting(null, invalidTestIndexAlias, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, invalidTestIndexAlias, null); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, false); Assert.assertThrows(IllegalArgumentException.class, () -> sink.doInitialize()); } @Test public void testOpenSearchIndexWithInvalidChars() throws IOException, InterruptedException { final String invalidTestIndexAlias = "test#-index"; - final PluginSetting pluginSetting = generatePluginSetting(null, invalidTestIndexAlias, null); - OpenSearchSink sink = createObjectUnderTest(pluginSetting, false); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(null, invalidTestIndexAlias, null); + OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, false); Assert.assertThrows(RuntimeException.class, () -> sink.doInitialize()); } @@ -1614,8 +1626,8 @@ public void testOutputManagementDisabled() throws IOException, InterruptedExcept metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.MANAGEMENT_DISABLED.getValue()); metadata.put(AUTHENTICATION, Map.of(USERNAME, username, PASSWORD, password)); metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); - final PluginSetting pluginSetting = generatePluginSettingByMetadata(metadata); - final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource("management-disabled-index-template.json")).getFile(); @@ -1657,33 +1669,38 @@ private Map initializeConfigurationMetadata(final String indexTy return metadata; } - private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, - final String templateFilePath) { + private OpenSearchSinkConfig generateOpenSearchSinkConfig(final String indexType, final String indexAlias, + final String templateFilePath) throws JsonProcessingException { final Map metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath); - return generatePluginSettingByMetadata(metadata); + return generateOpenSearchSinkConfigByMetadata(metadata); } - private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, + private OpenSearchSinkConfig generateOpenSearchSinkConfig(final String indexType, final String indexAlias, final String templateFilePath, final boolean estimateBulkSizeUsingCompression, - final boolean requestCompressionEnabled) { + final boolean requestCompressionEnabled) throws JsonProcessingException { final Map metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath); metadata.put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, estimateBulkSizeUsingCompression); metadata.put(ConnectionConfiguration.REQUEST_COMPRESSION_ENABLED, requestCompressionEnabled); - return generatePluginSettingByMetadata(metadata); + return generateOpenSearchSinkConfigByMetadata(metadata); } - private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, + private OpenSearchSinkConfig generateOpenSearchSinkConfig(final String indexType, final String indexAlias, final String templateType, - final String templateFilePath) { + final String templateFilePath) throws JsonProcessingException { final Map metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath); metadata.put(IndexConfiguration.TEMPLATE_TYPE, templateType); - return generatePluginSettingByMetadata(metadata); + return generateOpenSearchSinkConfigByMetadata(metadata); } - private PluginSetting generatePluginSettingByMetadata(final Map configurationMetadata) { - final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, configurationMetadata); - pluginSetting.setPipelineName(PIPELINE_NAME); - return pluginSetting; + private OpenSearchSinkConfig generateOpenSearchSinkConfigByMetadata(final Map configurationMetadata) throws JsonProcessingException { + objectMapper = new ObjectMapper(); + List hosts = new ArrayList<>(); + hosts.add("test_host"); + configurationMetadata.put("hosts", hosts); + String json = new ObjectMapper().writeValueAsString(configurationMetadata); + OpenSearchSinkConfig openSearchSinkConfig = objectMapper.readValue(json, OpenSearchSinkConfig.class); + + return openSearchSinkConfig; } private String generateCustomRecordJson(final String idField, final String documentId) throws IOException {