Skip to content

Commit

Permalink
Add document_version and document_version_type parameters to the open… (
Browse files Browse the repository at this point in the history
#3591)

Add document_version and document_version_type parameters to the opensearch sink for conditional indexing of documents

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 8, 2023
1 parent 2067523 commit edb13ef
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ default Boolean evaluateConditional(final String statement, final Event context)
}

Boolean isValidExpressionStatement(final String statement);
}

Boolean isValidFormatExpressions(final String format);
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,29 +318,6 @@ public String formatString(final String format, final ExpressionEvaluator expres
return formatStringInternal(format, expressionEvaluator);
}

public static boolean isValidFormatExpressions(final String format, final ExpressionEvaluator expressionEvaluator) {
if (Objects.isNull(expressionEvaluator)) {
return false;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return false;
}
String name = format.substring(position + 2, endPosition);

Object val;
if (!expressionEvaluator.isValidExpressionStatement(name)) {
return false;
}
fromIndex = endPosition + 1;
}
return true;
}

private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) {
int fromIndex = 0;
String result = "";
Expand All @@ -362,7 +339,7 @@ private String formatStringInternal(final String format, final ExpressionEvaluat
}

if (val == null) {
if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) {
if (expressionEvaluator != null && expressionEvaluator.isValidExpressionStatement(name)) {
val = expressionEvaluator.evaluate(name, this);
} else {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
Expand Down Expand Up @@ -410,12 +387,21 @@ public Map<String, Object> toMap() {
return mapper.convertValue(jsonNode, MAP_TYPE_REFERENCE);
}


public static boolean isValidEventKey(final String key) {
try {
checkKey(key);
return true;
} catch (final Exception e) {
return false;
}
}
private String checkAndTrimKey(final String key) {
checkKey(key);
return trimKey(key);
}

private void checkKey(final String key) {
private static void checkKey(final String key) {
checkNotNull(key, "key cannot be null");
checkArgument(!key.isEmpty(), "key cannot be an empty string");
if (key.length() > MAX_KEY_LENGTH) {
Expand All @@ -432,7 +418,7 @@ private String trimKey(final String key) {
return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash;
}

private boolean isValidKey(final String key) {
private static boolean isValidKey(final String key) {
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public Object evaluate(final String statement, final Event event) {
public Boolean isValidExpressionStatement(final String statement) {
return true;
}

@Override
public Boolean isValidFormatExpressions(String format) {
return true;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,25 +535,6 @@ public void testBuild_withFormatString(String formattedString, String finalStrin
assertThat(event.formatString(formattedString), is(equalTo(finalString)));
}

@ParameterizedTest
@CsvSource({
"abc-${/foo, false",
"abc-${/foo}, true",
"abc-${getMetadata(\"key\")}, true",
"abc-${getXYZ(\"key\")}, false"
})
public void testBuild_withIsValidFormatExpressions(final String format, final Boolean expectedResult) {
final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class);
when(expressionEvaluator.isValidExpressionStatement("/foo")).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"key\")")).thenReturn(true);
assertThat(JacksonEvent.isValidFormatExpressions(format, expressionEvaluator), equalTo(expectedResult));
}

@Test
public void testBuild_withIsValidFormatExpressionsWithNullEvaluator() {
assertThat(JacksonEvent.isValidFormatExpressions("${}", null), equalTo(false));
}

@Test
public void formatString_with_expression_evaluator_catches_exception_when_Event_get_throws_exception() {

Expand Down Expand Up @@ -818,6 +799,11 @@ void testJsonStringBuilderWithExcludeKeys() {

}

@ParameterizedTest
@CsvSource(value = {"test_key, true", "/test_key, true", "inv(alid, false", "getMetadata(\"test_key\"), false"})
void isValidEventKey_returns_expected_result(final String key, final boolean isValid) {
assertThat(JacksonEvent.isValidEventKey(key), equalTo(isValid));
}

private static Map<String, Object> createComplexDataMap() {
final Map<String, Object> dataObject = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.antlr.v4.runtime.tree.ParseTree;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -52,4 +53,25 @@ public Boolean isValidExpressionStatement(final String statement) {
return false;
}
}

@Override
public Boolean isValidFormatExpressions(final String format) {
int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return false;
}
String name = format.substring(position + 2, endPosition);

Object val;
// Invalid if it is not a valid key and not a valid expression statement
if (!JacksonEvent.isValidEventKey(name) && !isValidExpressionStatement(name)) {
return false;
}
fromIndex = endPosition + 1;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@

package org.opensearch.dataprepper.expression;

import org.opensearch.dataprepper.model.event.Event;
import org.antlr.v4.runtime.tree.ParseTree;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;

import java.util.UUID;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -119,5 +123,24 @@ void isValidExpressionStatement_returns_false_when_parse_throws() {
assertThat(result, equalTo(false));
}

@ParameterizedTest
@CsvSource({
"abc-${/foo, false",
"abc-${/foo}, true",
"abc-${getMetadata(\"key\")}, true",
"abc-${getXYZ(\"key\")}, true",
"abc-${invalid, false"
})
void isValidFormatExpressionsReturnsCorrectResult(final String format, final Boolean expectedResult) {
assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(expectedResult));
}

@ParameterizedTest
@ValueSource(strings = {"abc-${anyS(=tring}"})
void isValidFormatExpressionsReturnsFalseWhenIsValidKeyAndValidExpressionIsFalse(final String format) {
doThrow(RuntimeException.class).when(parser).parse(anyString());
assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(false));
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,11 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException,
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
Event event = (Event)testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "create");
final String actionFormatExpression = "${getMetadata(\"action\")}";
when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true);
when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action"));
pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}");
pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
Expand Down Expand Up @@ -677,9 +679,11 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
Event event = (Event)testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "unknown");
final String actionFormatExpression = "${getMetadata(\"action\")}";
when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true);
when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action"));
pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}");
pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ public final class BulkRetryStrategy {
public static final String BULK_REQUEST_NOT_FOUND_ERRORS = "bulkRequestNotFoundErrors";
public static final String BULK_REQUEST_TIMEOUT_ERRORS = "bulkRequestTimeoutErrors";
public static final String BULK_REQUEST_SERVER_ERRORS = "bulkRequestServerErrors";
public static final String DOCUMENTS_VERSION_CONFLICT_ERRORS = "documentsVersionConflictErrors";
static final long INITIAL_DELAY_MS = 50;
static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception";

private static final Set<Integer> NON_RETRY_STATUS = new HashSet<>(
Arrays.asList(
Expand Down Expand Up @@ -116,6 +118,7 @@ public final class BulkRetryStrategy {
private final Counter bulkRequestNotFoundErrors;
private final Counter bulkRequestTimeoutErrors;
private final Counter bulkRequestServerErrors;
private final Counter documentsVersionConflictErrors;
private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class);

static class BulkOperationRequestResponse {
Expand Down Expand Up @@ -160,6 +163,7 @@ public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOpera
bulkRequestNotFoundErrors = pluginMetrics.counter(BULK_REQUEST_NOT_FOUND_ERRORS);
bulkRequestTimeoutErrors = pluginMetrics.counter(BULK_REQUEST_TIMEOUT_ERRORS);
bulkRequestServerErrors = pluginMetrics.counter(BULK_REQUEST_SERVER_ERRORS);
documentsVersionConflictErrors = pluginMetrics.counter(DOCUMENTS_VERSION_CONFLICT_ERRORS);
}

private void incrementErrorCounters(final Exception e) {
Expand Down Expand Up @@ -240,7 +244,7 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, e);
if (e == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (Objects.nonNull(bulkItemResponse.error())) {
if (bulkItemResponse.error() != null) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
}
}
Expand All @@ -255,15 +259,16 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
}

private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure) {
LOG.warn("Bulk Operation Failed.", failure);
if (failure == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (Objects.nonNull(bulkItemResponse.error())) {
// Skip logging the error for version conflicts
if (bulkItemResponse.error() != null && !VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
}
}
handleFailures(bulkRequest, bulkResponse.items());
} else {
LOG.warn("Bulk Operation Failed.", failure);
handleFailures(bulkRequest, failure);
}
bulkRequestFailedCounter.increment();
Expand Down Expand Up @@ -313,6 +318,10 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
if (bulkItemResponse.error() != null) {
if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
requestToReissue.addOperation(bulkOperation);
} else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
Expand All @@ -338,10 +347,16 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
final BulkResponseItem bulkItemResponse = itemResponses.get(i);
final BulkOperationWrapper bulkOperation = accumulatingBulkRequest.getOperationAt(i);
if (bulkItemResponse.error() != null) {
failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
.withBulkResponseItem(bulkItemResponse)
.build());
if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
.withBulkResponseItem(bulkItemResponse)
.build());
}
documentErrorsCounter.increment();
} else {
sentDocumentsCounter.increment();
Expand Down
Loading

0 comments on commit edb13ef

Please sign in to comment.