diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 38bb5d1caa..a9d91e78f0 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -896,7 +896,7 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc @Test public void testBulkActionUpdateWithDocumentRootKey() throws IOException, InterruptedException { - final String testIndexAlias = "test-alias-upd1"; + final String testIndexAlias = "test-alias-update"; final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); @@ -960,9 +960,40 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr sink.shutdown(); } + @Test + public void testBulkActionUpsertWithActionsAndNoCreate() throws IOException, InterruptedException { + final String testIndexAlias = "test-alias-upsert-no-create2"; + final String testTemplateFile = Objects.requireNonNull( + getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); + + final String testIdField = "someId"; + final String testId = "foo"; + List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "key", "value"))); + + List> aList = new ArrayList<>(); + Map actionMap = new HashMap<>(); + actionMap.put("type", OpenSearchBulkActions.UPSERT.toString()); + aList.add(actionMap); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile); + pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList); + OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + + sink.output(testRecords); + List> retSources = getSearchResponseDocSources(testIndexAlias); + + assertThat(retSources.size(), equalTo(1)); + Map source = retSources.get(0); + assertThat((String) source.get("key"), equalTo("value")); + assertThat((String) source.get(testIdField), equalTo(testId)); + assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1))); + sink.shutdown(); + } + @Test public void testBulkActionUpsertWithActions() throws IOException, InterruptedException { - final String testIndexAlias = "test-alias-upd2"; + final String testIndexAlias = "test-alias-upsert"; final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); @@ -1725,6 +1756,7 @@ private void wipeAllOpenSearchIndices() throws IOException { .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro_"))) .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch-"))) .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch_"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".ql"))) .filter(Predicate.not(indexName -> indexName.startsWith(".plugins-ml-config"))) .forEach(indexName -> { try { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index c657f6b792..f2d3f66619 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -309,7 +309,7 @@ private BulkOperation getBulkOperationForAction(final String action, } - final UpdateOperation.Builder updateOperationBuilder = (action.toLowerCase() == OpenSearchBulkActions.UPSERT.toString()) ? + final UpdateOperation.Builder updateOperationBuilder = (StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString())) ? new UpdateOperation.Builder<>() .index(indexName) .document(filteredJsonNode)