diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index ef161200b1..341d5277bc 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -387,12 +387,21 @@ public Map toMap() { return mapper.convertValue(jsonNode, MAP_TYPE_REFERENCE); } + + public static boolean isValidEventKey(final String key) { + try { + checkKey(key); + return true; + } catch (final Exception e) { + return false; + } + } private String checkAndTrimKey(final String key) { checkKey(key); return trimKey(key); } - private void checkKey(final String key) { + private static void checkKey(final String key) { checkNotNull(key, "key cannot be null"); checkArgument(!key.isEmpty(), "key cannot be an empty string"); if (key.length() > MAX_KEY_LENGTH) { @@ -409,7 +418,7 @@ private String trimKey(final String key) { return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash; } - private boolean isValidKey(final String key) { + private static boolean isValidKey(final String key) { for (int i = 0; i < key.length(); i++) { char c = key.charAt(i); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index 65e861c51d..4fe8f272cc 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -799,6 +799,11 @@ void testJsonStringBuilderWithExcludeKeys() { } + @ParameterizedTest + @CsvSource(value = {"test_key, true", "/test_key, true", "inv(alid, false", "getMetadata(\"test_key\"), false"}) + void isValidEventKey_returns_expected_result(final String key, final boolean isValid) { + assertThat(JacksonEvent.isValidEventKey(key), equalTo(isValid)); + } private static Map createComplexDataMap() { final Map dataObject = new HashMap<>(); diff --git a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java index 842bb1f1c9..89d8b15a84 100644 --- a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java +++ b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java @@ -7,6 +7,7 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import javax.inject.Inject; import javax.inject.Named; @@ -65,10 +66,8 @@ public Boolean isValidFormatExpressions(final String format) { String name = format.substring(position + 2, endPosition); Object val; - // We only check the expression if it matches (.*) to mitigate the issue with the antlr logger () - // These can't be keys because (, ) is invalid for keys, so we attempt to validate the expression. All expressions currently - // contain a function ( ) so this will detect them to validate against. - if (name.matches("(.*)") && !isValidExpressionStatement(name)) { + // Invalid if it is not a valid key and not a valid expression statement + if (!JacksonEvent.isValidEventKey(name) && !isValidExpressionStatement(name)) { return false; } fromIndex = endPosition + 1; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index f49b4438e4..23c4641663 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -318,6 +318,10 @@ private AccumulatingBulkRequest createBulkReq if (bulkItemResponse.error() != null) { if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) { requestToReissue.addOperation(bulkOperation); + } else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { + documentsVersionConflictErrors.increment(); + LOG.info("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); + bulkOperation.releaseEventHandle(true); } else { nonRetryableFailures.add(FailedBulkOperation.builder() .withBulkOperation(bulkOperation) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 99852cdd33..b1edff4802 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -88,7 +88,7 @@ public class OpenSearchSink extends AbstractSink> { public static final String INVALID_ACTION_ERRORS = "invalidActionErrors"; public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes"; public static final String DYNAMIC_INDEX_DROPPED_EVENTS = "dynamicIndexDroppedEvents"; - public static final String INVALID_VERSION_EXPRESSION_DROPPED_EVENTS = "invalidVersionExpressionDroppedEvents"; + public static final String INVALID_VERSION_EXPRESSION_DROPPED_EVENTS = "dynamicDocumentVersionDroppedEvents"; private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class); private static final int INITIALIZE_RETRY_WAIT_TIME_MS = 5000; @@ -122,7 +122,7 @@ public class OpenSearchSink extends AbstractSink> { private final Counter invalidActionErrorsCounter; private final Counter dynamicIndexDroppedEvents; private final DistributionSummary bulkRequestSizeBytesSummary; - private final Counter versionExpressionDroppedEventsCounter; + private final Counter dynamicDocumentVersionDroppedEvents; private OpenSearchClient openSearchClient; private ObjectMapper objectMapper; private volatile boolean initialized; @@ -151,7 +151,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS); dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS); bulkRequestSizeBytesSummary = pluginMetrics.summary(BULKREQUEST_SIZE_BYTES); - versionExpressionDroppedEventsCounter = pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS); + dynamicDocumentVersionDroppedEvents = pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS); this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize()); @@ -371,13 +371,13 @@ public void doOutput(final Collection> records) { versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator); version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator)); } catch (final NumberFormatException e) { - LOG.warn("Unable to convert the result of evaluating version_expression {} to Long for an Event. {} must be a valid Long type", versionExpression, versionExpressionEvaluationResult); + LOG.warn("Unable to convert the result of evaluating document_version '{}' to Long for an Event. The evaluation result '{}' must be a valid Long type", versionExpression, versionExpressionEvaluationResult); logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); - versionExpressionDroppedEventsCounter.increment(); + dynamicDocumentVersionDroppedEvents.increment(); } catch (final RuntimeException e) { - LOG.error("There was an exception when evaluating the version_expression {}. Check the dlq if configured to see details about the affected Event: {}", versionExpression, e.getMessage()); + LOG.error("There was an exception when evaluating the document_version '{}'. Check the dlq if configured to see details about the affected Event: {}", versionExpression, e.getMessage()); logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); - versionExpressionDroppedEventsCounter.increment(); + dynamicDocumentVersionDroppedEvents.increment(); } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 0ea6af5e73..10390b7de3 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -374,11 +374,13 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception { final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); + final IndexOperation indexOperation5 = new IndexOperation.Builder().index(testIndex).id("5").document(arbitraryDocument()).build(); final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation1).build(), eventHandle1)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle2)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation3).build(), eventHandle3)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation4).build(), eventHandle4)); + accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation5).build(), eventHandle5)); bulkRetryStrategy.execute(accumulatingBulkRequest); MatcherAssert.assertThat(maxRetriesLimitReached, equalTo(true)); assertEquals(numEventsSucceeded, 2); @@ -407,7 +409,6 @@ public void testExecuteNonRetryableResponse() throws Exception { final IndexOperation indexOperation2 = new IndexOperation.Builder().index(testIndex).id("2").document(arbitraryDocument()).build(); final IndexOperation indexOperation3 = new IndexOperation.Builder().index(testIndex).id("3").document(arbitraryDocument()).build(); final IndexOperation indexOperation4 = new IndexOperation.Builder().index(testIndex).id("4").document(arbitraryDocument()).build(); - final IndexOperation indexOperation5 = new IndexOperation.Builder().index(testIndex).id("5").document(arbitraryDocument()).build(); final AccumulatingBulkRequest accumulatingBulkRequest = new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation1).build(), eventHandle1)); accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation2).build(), eventHandle2)); @@ -604,7 +605,7 @@ private BulkResponse bulkSecondResponse(final BulkRequest bulkRequest) { private BulkResponse bulkSecondResponseWithFailures(final BulkRequest bulkRequest) { final int requestSize = bulkRequest.operations().size(); - assert requestSize == 3; + assert requestSize == 2; final List bulkItemResponses = Arrays.asList( internalServerErrorItemResponse(index), internalServerErrorItemResponse(index)); return new BulkResponse.Builder().items(bulkItemResponses).errors(true).took(10).build(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java index 8789bd872c..78f3efc516 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java @@ -132,16 +132,17 @@ public void testReadESConfigWithBulkActionCreate() { @Test public void testReadESConfigWithBulkActionCreateExpression() { + final String actionFormatExpression = "${getMetadata(\"action\")}"; final Map metadata = new HashMap<>(); metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.TRACE_ANALYTICS_RAW.getValue()); - metadata.put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}"); + metadata.put(IndexConfiguration.ACTION, actionFormatExpression); metadata.put(ConnectionConfiguration.HOSTS, TEST_HOSTS); final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, metadata); pluginSetting.setPipelineName(PIPELINE_NAME); expressionEvaluator = mock(ExpressionEvaluator.class); - when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(true); + when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true); final OpenSearchSinkConfiguration openSearchSinkConfiguration = OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator);