diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java index c006a31cae..ea786ce5bd 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java @@ -34,4 +34,6 @@ default Boolean evaluateConditional(final String statement, final Event context) } Boolean isValidExpressionStatement(final String statement); -} + + Boolean isValidFormatExpressions(final String format); +} \ No newline at end of file diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 1f36e01e25..341d5277bc 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -318,29 +318,6 @@ public String formatString(final String format, final ExpressionEvaluator expres return formatStringInternal(format, expressionEvaluator); } - public static boolean isValidFormatExpressions(final String format, final ExpressionEvaluator expressionEvaluator) { - if (Objects.isNull(expressionEvaluator)) { - return false; - } - - int fromIndex = 0; - int position = 0; - while ((position = format.indexOf("${", fromIndex)) != -1) { - int endPosition = format.indexOf("}", position + 1); - if (endPosition == -1) { - return false; - } - String name = format.substring(position + 2, endPosition); - - Object val; - if (!expressionEvaluator.isValidExpressionStatement(name)) { - return false; - } - fromIndex = endPosition + 1; - } - return true; - } - private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) { int fromIndex = 0; String result = ""; @@ -362,7 +339,7 @@ private String formatStringInternal(final String format, final ExpressionEvaluat } if (val == null) { - if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) { + if (expressionEvaluator != null && expressionEvaluator.isValidExpressionStatement(name)) { val = expressionEvaluator.evaluate(name, this); } else { throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name)); @@ -410,12 +387,21 @@ public Map toMap() { return mapper.convertValue(jsonNode, MAP_TYPE_REFERENCE); } + + public static boolean isValidEventKey(final String key) { + try { + checkKey(key); + return true; + } catch (final Exception e) { + return false; + } + } private String checkAndTrimKey(final String key) { checkKey(key); return trimKey(key); } - private void checkKey(final String key) { + private static void checkKey(final String key) { checkNotNull(key, "key cannot be null"); checkArgument(!key.isEmpty(), "key cannot be an empty string"); if (key.length() > MAX_KEY_LENGTH) { @@ -432,7 +418,7 @@ private String trimKey(final String key) { return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash; } - private boolean isValidKey(final String key) { + private static boolean isValidKey(final String key) { for (int i = 0; i < key.length(); i++) { char c = key.charAt(i); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java index 36a60ac447..6ce1af8660 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java @@ -23,6 +23,11 @@ public Object evaluate(final String statement, final Event event) { public Boolean isValidExpressionStatement(final String statement) { return true; } + + @Override + public Boolean isValidFormatExpressions(String format) { + return true; + } } @Test diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index deb68f83bd..4fe8f272cc 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -535,25 +535,6 @@ public void testBuild_withFormatString(String formattedString, String finalStrin assertThat(event.formatString(formattedString), is(equalTo(finalString))); } - @ParameterizedTest - @CsvSource({ - "abc-${/foo, false", - "abc-${/foo}, true", - "abc-${getMetadata(\"key\")}, true", - "abc-${getXYZ(\"key\")}, false" - }) - public void testBuild_withIsValidFormatExpressions(final String format, final Boolean expectedResult) { - final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); - when(expressionEvaluator.isValidExpressionStatement("/foo")).thenReturn(true); - when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"key\")")).thenReturn(true); - assertThat(JacksonEvent.isValidFormatExpressions(format, expressionEvaluator), equalTo(expectedResult)); - } - - @Test - public void testBuild_withIsValidFormatExpressionsWithNullEvaluator() { - assertThat(JacksonEvent.isValidFormatExpressions("${}", null), equalTo(false)); - } - @Test public void formatString_with_expression_evaluator_catches_exception_when_Event_get_throws_exception() { @@ -818,6 +799,11 @@ void testJsonStringBuilderWithExcludeKeys() { } + @ParameterizedTest + @CsvSource(value = {"test_key, true", "/test_key, true", "inv(alid, false", "getMetadata(\"test_key\"), false"}) + void isValidEventKey_returns_expected_result(final String key, final boolean isValid) { + assertThat(JacksonEvent.isValidEventKey(key), equalTo(isValid)); + } private static Map createComplexDataMap() { final Map dataObject = new HashMap<>(); diff --git a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java index 072b3a7393..89d8b15a84 100644 --- a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java +++ b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java @@ -7,6 +7,7 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import javax.inject.Inject; import javax.inject.Named; @@ -52,4 +53,25 @@ public Boolean isValidExpressionStatement(final String statement) { return false; } } + + @Override + public Boolean isValidFormatExpressions(final String format) { + int fromIndex = 0; + int position = 0; + while ((position = format.indexOf("${", fromIndex)) != -1) { + int endPosition = format.indexOf("}", position + 1); + if (endPosition == -1) { + return false; + } + String name = format.substring(position + 2, endPosition); + + Object val; + // Invalid if it is not a valid key and not a valid expression statement + if (!JacksonEvent.isValidEventKey(name) && !isValidExpressionStatement(name)) { + return false; + } + fromIndex = endPosition + 1; + } + return true; + } } diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java index d68808dc85..0b8ac2ee03 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java @@ -5,22 +5,26 @@ package org.opensearch.dataprepper.expression; -import org.opensearch.dataprepper.model.event.Event; import org.antlr.v4.runtime.tree.ParseTree; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; -import java.util.UUID; import java.util.Random; +import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -119,5 +123,24 @@ void isValidExpressionStatement_returns_false_when_parse_throws() { assertThat(result, equalTo(false)); } + @ParameterizedTest + @CsvSource({ + "abc-${/foo, false", + "abc-${/foo}, true", + "abc-${getMetadata(\"key\")}, true", + "abc-${getXYZ(\"key\")}, true", + "abc-${invalid, false" + }) + void isValidFormatExpressionsReturnsCorrectResult(final String format, final Boolean expectedResult) { + assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(expectedResult)); + } + + @ParameterizedTest + @ValueSource(strings = {"abc-${anyS(=tring}"}) + void isValidFormatExpressionsReturnsFalseWhenIsValidKeyAndValidExpressionIsFalse(final String format) { + doThrow(RuntimeException.class).when(parser).parse(anyString()); + assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(false)); + } + } diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 755ccb5283..fbe17763a0 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -646,9 +646,11 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException, pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); Event event = (Event)testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "create"); + final String actionFormatExpression = "${getMetadata(\"action\")}"; + when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true); when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); - pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}"); + pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression); final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); @@ -677,9 +679,11 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); Event event = (Event)testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "unknown"); + final String actionFormatExpression = "${getMetadata(\"action\")}"; + when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true); when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); - pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}"); + pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression); final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List> retSources = getSearchResponseDocSources(testIndexAlias); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 793029f6ef..5f6271010b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -43,8 +43,10 @@ public final class BulkRetryStrategy { public static final String BULK_REQUEST_NOT_FOUND_ERRORS = "bulkRequestNotFoundErrors"; public static final String BULK_REQUEST_TIMEOUT_ERRORS = "bulkRequestTimeoutErrors"; public static final String BULK_REQUEST_SERVER_ERRORS = "bulkRequestServerErrors"; + public static final String DOCUMENTS_VERSION_CONFLICT_ERRORS = "documentsVersionConflictErrors"; static final long INITIAL_DELAY_MS = 50; static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis(); + static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception"; private static final Set NON_RETRY_STATUS = new HashSet<>( Arrays.asList( @@ -116,6 +118,7 @@ public final class BulkRetryStrategy { private final Counter bulkRequestNotFoundErrors; private final Counter bulkRequestTimeoutErrors; private final Counter bulkRequestServerErrors; + private final Counter documentsVersionConflictErrors; private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class); static class BulkOperationRequestResponse { @@ -160,6 +163,7 @@ public BulkRetryStrategy(final RequestFunction bulkRequest, final BulkResponse bulkResponse, final Throwable failure) { - LOG.warn("Bulk Operation Failed.", failure); if (failure == null) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { - if (Objects.nonNull(bulkItemResponse.error())) { + // Skip logging the error for version conflicts + if (bulkItemResponse.error() != null && !VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason()); } } handleFailures(bulkRequest, bulkResponse.items()); } else { + LOG.warn("Bulk Operation Failed.", failure); handleFailures(bulkRequest, failure); } bulkRequestFailedCounter.increment(); @@ -313,6 +318,10 @@ private AccumulatingBulkRequest createBulkReq if (bulkItemResponse.error() != null) { if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) { requestToReissue.addOperation(bulkOperation); + } else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { + documentsVersionConflictErrors.increment(); + LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); + bulkOperation.releaseEventHandle(true); } else { nonRetryableFailures.add(FailedBulkOperation.builder() .withBulkOperation(bulkOperation) @@ -338,10 +347,16 @@ private void handleFailures(final AccumulatingBulkRequest> { public static final String INVALID_ACTION_ERRORS = "invalidActionErrors"; public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes"; public static final String DYNAMIC_INDEX_DROPPED_EVENTS = "dynamicIndexDroppedEvents"; + public static final String INVALID_VERSION_EXPRESSION_DROPPED_EVENTS = "dynamicDocumentVersionDroppedEvents"; private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class); private static final int INITIALIZE_RETRY_WAIT_TIME_MS = 5000; @@ -112,12 +114,15 @@ public class OpenSearchSink extends AbstractSink> { private final String documentRootKey; private String configuredIndexAlias; private final ReentrantLock lock; + private final VersionType versionType; + private final String versionExpression; private final Timer bulkRequestTimer; private final Counter bulkRequestErrorsCounter; private final Counter invalidActionErrorsCounter; private final Counter dynamicIndexDroppedEvents; private final DistributionSummary bulkRequestSizeBytesSummary; + private final Counter dynamicDocumentVersionDroppedEvents; private OpenSearchClient openSearchClient; private ObjectMapper objectMapper; private volatile boolean initialized; @@ -146,6 +151,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS); dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS); bulkRequestSizeBytesSummary = pluginMetrics.summary(BULKREQUEST_SIZE_BYTES); + dynamicDocumentVersionDroppedEvents = pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS); this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize()); @@ -157,6 +163,8 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.action = openSearchSinkConfig.getIndexConfiguration().getAction(); this.actions = openSearchSinkConfig.getIndexConfiguration().getActions(); this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey(); + this.versionType = openSearchSinkConfig.getIndexConfiguration().getVersionType(); + this.versionExpression = openSearchSinkConfig.getIndexConfiguration().getVersionExpression(); this.indexManagerFactory = new IndexManagerFactory(new ClusterSettingsParser()); this.failedBulkOperationConverter = new FailedBulkOperationConverter(pluginSetting.getPipelineName(), pluginSetting.getName(), pluginSetting.getName()); @@ -260,7 +268,11 @@ public boolean isReady() { return initialized; } - private BulkOperation getBulkOperationForAction(final String action, final SerializedJson document, final String indexName, final JsonNode jsonNode) { + private BulkOperation getBulkOperationForAction(final String action, + final SerializedJson document, + final Long version, + final String indexName, + final JsonNode jsonNode) { BulkOperation bulkOperation; final Optional docId = document.getDocumentId(); final Optional routing = document.getRoutingField(); @@ -283,10 +295,14 @@ private BulkOperation getBulkOperationForAction(final String action, final Seria new UpdateOperation.Builder<>() .index(indexName) .document(jsonNode) - .upsert(jsonNode) : + .upsert(jsonNode) + .versionType(versionType) + .version(version) : new UpdateOperation.Builder<>() .index(indexName) - .document(jsonNode); + .document(jsonNode) + .versionType(versionType) + .version(version); docId.ifPresent(updateOperationBuilder::id); routing.ifPresent(updateOperationBuilder::routing); bulkOperation = new BulkOperation.Builder() @@ -300,13 +316,20 @@ private BulkOperation getBulkOperationForAction(final String action, final Seria docId.ifPresent(deleteOperationBuilder::id); routing.ifPresent(deleteOperationBuilder::routing); bulkOperation = new BulkOperation.Builder() - .delete(deleteOperationBuilder.build()) + .delete(deleteOperationBuilder + .versionType(versionType) + .version(version) + .build()) .build(); return bulkOperation; } // Default to "index" final IndexOperation.Builder indexOperationBuilder = - new IndexOperation.Builder<>().index(indexName).document(document); + new IndexOperation.Builder<>() + .index(indexName) + .document(document) + .version(version) + .versionType(versionType); docId.ifPresent(indexOperationBuilder::id); routing.ifPresent(indexOperationBuilder::routing); bulkOperation = new BulkOperation.Builder() @@ -337,16 +360,27 @@ public void doOutput(final Collection> records) { } catch (IOException | EventKeyNotFoundException e) { LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); dynamicIndexDroppedEvents.increment(); - logFailureForDlqObjects(List.of(DlqObject.builder() - .withEventHandle(event.getEventHandle()) - .withFailedData(FailedDlqData.builder().withDocument(event.toJsonString()).withIndex(indexName).withMessage(e.getMessage()).build()) - .withPluginName(pluginSetting.getName()) - .withPipelineName(pluginSetting.getPipelineName()) - .withPluginId(pluginSetting.getName()) - .build()), e); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); continue; } + Long version = null; + String versionExpressionEvaluationResult = null; + if (versionExpression != null) { + try { + versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator); + version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator)); + } catch (final NumberFormatException e) { + LOG.warn("Unable to convert the result of evaluating document_version '{}' to Long for an Event. The evaluation result '{}' must be a valid Long type", versionExpression, versionExpressionEvaluationResult); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + dynamicDocumentVersionDroppedEvents.increment(); + } catch (final RuntimeException e) { + LOG.error("There was an exception when evaluating the document_version '{}'. Check the dlq if configured to see details about the affected Event: {}", versionExpression, e.getMessage()); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + dynamicDocumentVersionDroppedEvents.increment(); + } + } + String eventAction = action; if (actions != null) { for (final Map actionEntry: actions) { @@ -373,7 +407,7 @@ public void doOutput(final Collection> records) { StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { serializedJsonNode = SerializedJson.fromJsonNode(event.getJsonNode(), document); } - BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, indexName, event.getJsonNode()); + BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode()); BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode); final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper); @@ -524,4 +558,20 @@ private void maybeUpdateServerlessNetworkPolicy() { connectionConfiguration.getServerlessVpceId()); } } + + private DlqObject createDlqObjectFromEvent(final Event event, + final String index, + final String message) { + return DlqObject.builder() + .withEventHandle(event.getEventHandle()) + .withFailedData(FailedDlqData.builder() + .withDocument(event.toJsonString()) + .withIndex(index) + .withMessage(message) + .build()) + .withPluginName(pluginSetting.getName()) + .withPipelineName(pluginSetting.getPipelineName()) + .withPluginId(pluginSetting.getName()) + .build(); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 426675e9ea..c6ad893578 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -8,9 +8,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.EnumUtils; +import org.opensearch.client.opensearch._types.VersionType; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; @@ -70,6 +70,8 @@ public class IndexConfiguration { public static final String DISTRIBUTION_VERSION = "distribution_version"; public static final String AWS_OPTION = "aws"; public static final String DOCUMENT_ROOT_KEY = "document_root_key"; + public static final String DOCUMENT_VERSION_EXPRESSION = "document_version"; + public static final String DOCUMENT_VERSION_TYPE = "document_version_type"; private IndexType indexType; private TemplateType templateType; @@ -92,6 +94,8 @@ public class IndexConfiguration { private final boolean serverless; private final DistributionVersion distributionVersion; private final String documentRootKey; + private final String versionExpression; + private final VersionType versionType; private static final String S3_PREFIX = "s3://"; private static final String DEFAULT_AWS_REGION = "us-east-1"; @@ -106,6 +110,8 @@ private IndexConfiguration(final Builder builder) { this.s3AwsStsRoleArn = builder.s3AwsStsRoleArn; this.s3AwsExternalId = builder.s3AwsStsExternalId; this.s3Client = builder.s3Client; + this.versionExpression = builder.versionExpression; + this.versionType = builder.versionType; determineTemplateType(builder); @@ -222,6 +228,15 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti final String documentIdField = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null); final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID, null); + final String versionExpression = pluginSetting.getStringOrDefault(DOCUMENT_VERSION_EXPRESSION, null); + final String versionType = pluginSetting.getStringOrDefault(DOCUMENT_VERSION_TYPE, null); + + builder = builder.withVersionExpression(versionExpression); + if (versionExpression != null && (!expressionEvaluator.isValidFormatExpressions(versionExpression))) { + throw new InvalidPluginConfigurationException("document_version {} is not a valid format expression."); + } + + builder = builder.withVersionType(versionType); if (Objects.nonNull(documentIdField) && Objects.nonNull(documentId)) { throw new InvalidPluginConfigurationException("Both document_id_field and document_id cannot be used at the same time. It is preferred to only use document_id as document_id_field is deprecated."); @@ -357,6 +372,10 @@ public String getDocumentRootKey() { return documentRootKey; } + public VersionType getVersionType() { return versionType; } + + public String getVersionExpression() { return versionExpression; } + /** * This method is used in the creation of IndexConfiguration object. It takes in the template file path * or index type and returns the index template read from the file or specific to index type or returns an @@ -437,6 +456,8 @@ public static class Builder { private boolean serverless; private DistributionVersion distributionVersion; private String documentRootKey; + private VersionType versionType; + private String versionExpression; public Builder withIndexAlias(final String indexAlias) { checkArgument(indexAlias != null, "indexAlias cannot be null."); @@ -525,7 +546,7 @@ public Builder withIsmPolicyFile(final String ismPolicyFile) { public Builder withAction(final String action, final ExpressionEvaluator expressionEvaluator) { checkArgument((EnumUtils.isValidEnumIgnoreCase(OpenSearchBulkActions.class, action) || - (action.contains("${") && JacksonEvent.isValidFormatExpressions(action, expressionEvaluator))), "Invalid action \"" + action + "\". action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); + (action.contains("${") && expressionEvaluator.isValidFormatExpressions(action))), "action \"" + action + "\" is invalid. action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); this.action = action; return this; } @@ -535,7 +556,7 @@ public Builder withActions(final List> actions, final Expres String action = (String)actionMap.get("type"); if (action != null) { checkArgument((EnumUtils.isValidEnumIgnoreCase(OpenSearchBulkActions.class, action) || - (action.contains("${") && JacksonEvent.isValidFormatExpressions(action, expressionEvaluator))), "Invalid action \"" + action + "\". action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); + (action.contains("${") && expressionEvaluator.isValidFormatExpressions(action))), "action \"" + action + "\". action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); } } this.actions = actions; @@ -591,6 +612,44 @@ public Builder withDocumentRootKey(final String documentRootKey) { return this; } + public Builder withVersionType(final String versionType) { + if (versionType != null) { + try { + this.versionType = getVersionType(versionType); + } catch (final IllegalArgumentException e) { + throw new InvalidPluginConfigurationException( + String.format("version_type %s is invalid. version_type must be one of: %s", + versionType, Arrays.stream(VersionType.values()).collect(Collectors.toList()))); + } + } + + return this; + } + + private VersionType getVersionType(final String versionType) { + switch (versionType.toLowerCase()) { + case "internal": + return VersionType.Internal; + case "external": + return VersionType.External; + case "external_gte": + return VersionType.ExternalGte; + default: + throw new IllegalArgumentException(); + } + } + + public Builder withVersionExpression(final String versionExpression) { + if (versionExpression != null && !versionExpression.contains("${")) { + throw new InvalidPluginConfigurationException( + String.format("document_version %s is invalid. It must be in the format of \"${/key}\" or \"${expression}\"", versionExpression)); + } + + this.versionExpression = versionExpression; + + return this; + } + public IndexConfiguration build() { return new IndexConfiguration(this); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 7c05b99aa9..10390b7de3 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -5,31 +5,31 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; -import org.opensearch.client.opensearch._types.ErrorResponse; -import org.opensearch.dataprepper.metrics.MetricNames; -import org.opensearch.dataprepper.metrics.MetricsTestUtil; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import io.micrometer.core.instrument.Measurement; import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; -import org.opensearch.client.opensearch._types.OpenSearchException; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.opensearch._types.ErrorCause; +import org.opensearch.client.opensearch._types.ErrorResponse; +import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.BulkResponse; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingUncompressedBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation; import org.opensearch.rest.RestStatus; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.lenient; import java.io.IOException; import java.util.Arrays; @@ -43,13 +43,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.ArgumentMatchers.eq; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; +import static org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.VERSION_CONFLICT_EXCEPTION_TYPE; @ExtendWith(MockitoExtension.class) public class BulkRetryStrategyTests { @@ -65,6 +66,7 @@ public class BulkRetryStrategyTests { private EventHandle eventHandle2; private EventHandle eventHandle3; private EventHandle eventHandle4; + private EventHandle eventHandle5; @BeforeEach public void setUp() { @@ -74,6 +76,7 @@ public void setUp() { eventHandle2 = mock(EventHandle.class); eventHandle3 = mock(EventHandle.class); eventHandle4 = mock(EventHandle.class); + eventHandle5 = mock(EventHandle.class); lenient().doAnswer(a -> { List l = a.getArgument(0); @@ -190,7 +193,6 @@ public void testExecuteRetryable() throws Exception { accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle2)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation3).build(), eventHandle3)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation4).build(), eventHandle4)); - bulkRetryStrategy.execute(accumulatingBulkRequest); assertEquals(3, client.attempt); @@ -198,8 +200,8 @@ public void testExecuteRetryable() throws Exception { assertFalse(client.finalResponse.errors()); assertEquals("3", client.finalRequest.operations().get(0).index().id()); assertEquals("4", client.finalRequest.operations().get(1).index().id()); - assertEquals(numEventsSucceeded, 3); - assertEquals(numEventsFailed, 1); + assertEquals(3, numEventsSucceeded); + assertEquals(1, numEventsFailed); final ArgumentCaptor> failedBulkOperationsCaptor = ArgumentCaptor.forClass(List.class); ArgumentCaptor throwableArgCaptor = ArgumentCaptor.forClass(Throwable.class); @@ -208,12 +210,11 @@ public void testExecuteRetryable() throws Exception { final List failedBulkOperations = failedBulkOperationsCaptor.getValue(); MatcherAssert.assertThat(failedBulkOperations.size(), equalTo(1)); - failedBulkOperations.forEach(failedBulkOperation -> { - final BulkOperationWrapper bulkOperationWithHandle = failedBulkOperation.getBulkOperation(); - final BulkOperation bulkOperation = bulkOperationWithHandle.getBulkOperation(); - MatcherAssert.assertThat(bulkOperation.index().index(), equalTo(testIndex)); - MatcherAssert.assertThat(bulkOperation.index().id(), equalTo(String.valueOf("2"))); - }); + + final BulkOperationWrapper bulkOperationWithHandle = failedBulkOperations.get(0).getBulkOperation(); + final BulkOperation bulkOperation = bulkOperationWithHandle.getBulkOperation(); + MatcherAssert.assertThat(bulkOperation.index().index(), equalTo(testIndex)); + MatcherAssert.assertThat(bulkOperation.index().id(), equalTo("2")); // verify metrics final List documentsSuccessFirstAttemptMeasurements = MetricsTestUtil.getMeasurementList( @@ -373,15 +374,23 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception { final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final IndexOperation indexOperation5 = new IndexOperation.Builder().index(testIndex).id("5").document(arbitraryDocument()).build(); final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation1).build(), eventHandle1)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle2)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation3).build(), eventHandle3)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation4).build(), eventHandle4)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation5).build(), eventHandle5)); bulkRetryStrategy.execute(accumulatingBulkRequest); MatcherAssert.assertThat(maxRetriesLimitReached, equalTo(true)); assertEquals(numEventsSucceeded, 2); assertEquals(numEventsFailed, 2); + + final List documentVersionConflictMeasurement = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_VERSION_CONFLICT_ERRORS).toString()); + assertEquals(1, documentVersionConflictMeasurement.size()); + assertEquals(1.0, documentVersionConflictMeasurement.get(0).getValue(), 0); } @Test @@ -458,6 +467,10 @@ private static BulkResponseItem internalServerErrorItemResponse(final String ind return customBulkFailureResponse(index, RestStatus.INTERNAL_SERVER_ERROR); } + private static BulkResponseItem versionConflictErrorItemResponse() { + return customBulkFailureResponse(RestStatus.CONFLICT, VERSION_CONFLICT_EXCEPTION_TYPE); + } + private static BulkResponseItem customBulkFailureResponse(final String index, final RestStatus restStatus) { final ErrorCause errorCause = mock(ErrorCause.class); final BulkResponseItem badResponse = mock(BulkResponseItem.class); @@ -466,6 +479,15 @@ private static BulkResponseItem customBulkFailureResponse(final String index, fi return badResponse; } + private static BulkResponseItem customBulkFailureResponse(final RestStatus restStatus, final String errorType) { + final ErrorCause errorCause = mock(ErrorCause.class); + lenient().when(errorCause.type()).thenReturn(errorType); + final BulkResponseItem badResponse = mock(BulkResponseItem.class); + lenient().when(badResponse.status()).thenReturn(restStatus.getStatus()); + lenient().when(badResponse.error()).thenReturn(errorCause); + return badResponse; + } + private SerializedJson arbitraryDocument() { return SerializedJson.fromStringAndOptionals("{}", null, null); } @@ -548,7 +570,8 @@ private BulkResponse bulkMaxRetriesResponseWithSuccesses(final BulkRequest bulkR internalServerErrorItemResponse(index), successItemResponse(index), successItemResponse(index), - tooManyRequestItemResponse(index)); + tooManyRequestItemResponse(index), + versionConflictErrorItemResponse()); return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java index 8789bd872c..78f3efc516 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java @@ -132,16 +132,17 @@ public void testReadESConfigWithBulkActionCreate() { @Test public void testReadESConfigWithBulkActionCreateExpression() { + final String actionFormatExpression = "${getMetadata(\"action\")}"; final Map metadata = new HashMap<>(); metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue()); - metadata.put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}"); + metadata.put(IndexConfiguration.ACTION, actionFormatExpression); metadata.put(ConnectionConfiguration.HOSTS, TEST_HOSTS); final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata); pluginSetting.setPipelineName(PIPELINE_NAME); expressionEvaluator = mock(ExpressionEvaluator.class); - when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(true); + when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true); final OpenSearchSinkConfiguration openSearchSinkConfiguration = OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index 46de83df7e..b0878f53e9 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -12,6 +12,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; @@ -49,6 +51,7 @@ import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.AWS_OPTION; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DISTRIBUTION_VERSION; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DOCUMENT_ROOT_KEY; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DOCUMENT_VERSION_EXPRESSION; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.SERVERLESS; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.TEMPLATE_TYPE; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants.RAW_DEFAULT_TEMPLATE_FILE; @@ -473,6 +476,41 @@ public void testReadIndexConfig_emptyDocumentRootKey() { assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting)); } + @ParameterizedTest + @ValueSource(strings = {"${key}", "${getMetadata(\"key\")}"}) + public void testReadIndexConfig_withValidDocumentVersionExpression(final String versionExpression) { + + final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidFormatExpressions(versionExpression)).thenReturn(true); + + final Map metadata = initializeConfigMetaData( + IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null); + metadata.put(DOCUMENT_VERSION_EXPRESSION, versionExpression); + + final PluginSetting pluginSetting = getPluginSetting(metadata); + + final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting, expressionEvaluator); + + assertThat(indexConfiguration, notNullValue()); + assertThat(indexConfiguration.getVersionExpression(), equalTo(versionExpression)); + } + + @Test + public void testReadIndexConfig_withInvalidDocumentVersionExpression_throws_InvalidPluginConfigurationException() { + final String versionExpression = UUID.randomUUID().toString(); + + final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidFormatExpressions(versionExpression)).thenReturn(false); + + final Map metadata = initializeConfigMetaData( + IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null); + metadata.put(DOCUMENT_VERSION_EXPRESSION, versionExpression); + + final PluginSetting pluginSetting = getPluginSetting(metadata); + + assertThrows(InvalidPluginConfigurationException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting, expressionEvaluator)); + } + @Test void getTemplateType_defaults_to_V1() { final Map metadata = initializeConfigMetaData(