Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add document_version and document_version_type parameters to the open… #3591

Merged
merged 3 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ default Boolean evaluateConditional(final String statement, final Event context)
}

Boolean isValidExpressionStatement(final String statement);
}

Boolean isValidFormatExpressions(final String format);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a typo here. This takes a singular expression, but the name uses plural "expressions."

}
Original file line number Diff line number Diff line change
Expand Up @@ -318,29 +318,6 @@ public String formatString(final String format, final ExpressionEvaluator expres
return formatStringInternal(format, expressionEvaluator);
}

public static boolean isValidFormatExpressions(final String format, final ExpressionEvaluator expressionEvaluator) {
if (Objects.isNull(expressionEvaluator)) {
return false;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return false;
}
String name = format.substring(position + 2, endPosition);

Object val;
if (!expressionEvaluator.isValidExpressionStatement(name)) {
return false;
}
fromIndex = endPosition + 1;
}
return true;
}

private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) {
int fromIndex = 0;
String result = "";
Expand All @@ -362,7 +339,7 @@ private String formatStringInternal(final String format, final ExpressionEvaluat
}

if (val == null) {
if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) {
if (expressionEvaluator != null && expressionEvaluator.isValidExpressionStatement(name)) {
val = expressionEvaluator.evaluate(name, this);
} else {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
Expand Down Expand Up @@ -410,12 +387,21 @@ public Map<String, Object> toMap() {
return mapper.convertValue(jsonNode, MAP_TYPE_REFERENCE);
}


public static boolean isValidEventKey(final String key) {
try {
checkKey(key);
return true;
} catch (final Exception e) {
return false;
}
}
private String checkAndTrimKey(final String key) {
checkKey(key);
return trimKey(key);
}

private void checkKey(final String key) {
private static void checkKey(final String key) {
checkNotNull(key, "key cannot be null");
checkArgument(!key.isEmpty(), "key cannot be an empty string");
if (key.length() > MAX_KEY_LENGTH) {
Expand All @@ -432,7 +418,7 @@ private String trimKey(final String key) {
return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash;
}

private boolean isValidKey(final String key) {
private static boolean isValidKey(final String key) {
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public Object evaluate(final String statement, final Event event) {
public Boolean isValidExpressionStatement(final String statement) {
return true;
}

@Override
public Boolean isValidFormatExpressions(String format) {
return true;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,25 +535,6 @@ public void testBuild_withFormatString(String formattedString, String finalStrin
assertThat(event.formatString(formattedString), is(equalTo(finalString)));
}

@ParameterizedTest
@CsvSource({
"abc-${/foo, false",
"abc-${/foo}, true",
"abc-${getMetadata(\"key\")}, true",
"abc-${getXYZ(\"key\")}, false"
})
public void testBuild_withIsValidFormatExpressions(final String format, final Boolean expectedResult) {
final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class);
when(expressionEvaluator.isValidExpressionStatement("/foo")).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"key\")")).thenReturn(true);
assertThat(JacksonEvent.isValidFormatExpressions(format, expressionEvaluator), equalTo(expectedResult));
}

@Test
public void testBuild_withIsValidFormatExpressionsWithNullEvaluator() {
assertThat(JacksonEvent.isValidFormatExpressions("${}", null), equalTo(false));
}

@Test
public void formatString_with_expression_evaluator_catches_exception_when_Event_get_throws_exception() {

Expand Down Expand Up @@ -818,6 +799,11 @@ void testJsonStringBuilderWithExcludeKeys() {

}

@ParameterizedTest
@CsvSource(value = {"test_key, true", "/test_key, true", "inv(alid, false", "getMetadata(\"test_key\"), false"})
void isValidEventKey_returns_expected_result(final String key, final boolean isValid) {
assertThat(JacksonEvent.isValidEventKey(key), equalTo(isValid));
}

private static Map<String, Object> createComplexDataMap() {
final Map<String, Object> dataObject = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.antlr.v4.runtime.tree.ParseTree;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -52,4 +53,25 @@ public Boolean isValidExpressionStatement(final String statement) {
return false;
}
}

@Override
public Boolean isValidFormatExpressions(final String format) {
int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return false;
}
String name = format.substring(position + 2, endPosition);

Object val;
// Invalid if it is not a valid key and not a valid expression statement
if (!JacksonEvent.isValidEventKey(name) && !isValidExpressionStatement(name)) {
return false;
}
fromIndex = endPosition + 1;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@

package org.opensearch.dataprepper.expression;

import org.opensearch.dataprepper.model.event.Event;
import org.antlr.v4.runtime.tree.ParseTree;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;

import java.util.UUID;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -119,5 +123,24 @@ void isValidExpressionStatement_returns_false_when_parse_throws() {
assertThat(result, equalTo(false));
}

@ParameterizedTest
@CsvSource({
"abc-${/foo, false",
"abc-${/foo}, true",
"abc-${getMetadata(\"key\")}, true",
"abc-${getXYZ(\"key\")}, true",
"abc-${invalid, false"
})
void isValidFormatExpressionsReturnsCorrectResult(final String format, final Boolean expectedResult) {
assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(expectedResult));
}

@ParameterizedTest
@ValueSource(strings = {"abc-${anyS(=tring}"})
void isValidFormatExpressionsReturnsFalseWhenIsValidKeyAndValidExpressionIsFalse(final String format) {
doThrow(RuntimeException.class).when(parser).parse(anyString());
assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(false));
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,11 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException,
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
Event event = (Event)testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "create");
final String actionFormatExpression = "${getMetadata(\"action\")}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good to add an integration test which creates two documents with the same id and version and then we see the error handled correctly. But, it's not critical.

when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true);
when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action"));
pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}");
pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
Expand Down Expand Up @@ -677,9 +679,11 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
Event event = (Event)testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "unknown");
final String actionFormatExpression = "${getMetadata(\"action\")}";
when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true);
when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action"));
pluginSetting.getSettings().put(IndexConfiguration.ACTION, "${getMetadata(\"action\")}");
pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ public final class BulkRetryStrategy {
public static final String BULK_REQUEST_NOT_FOUND_ERRORS = "bulkRequestNotFoundErrors";
public static final String BULK_REQUEST_TIMEOUT_ERRORS = "bulkRequestTimeoutErrors";
public static final String BULK_REQUEST_SERVER_ERRORS = "bulkRequestServerErrors";
public static final String DOCUMENTS_VERSION_CONFLICT_ERRORS = "documentsVersionConflictErrors";
static final long INITIAL_DELAY_MS = 50;
static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception";

private static final Set<Integer> NON_RETRY_STATUS = new HashSet<>(
Arrays.asList(
Expand Down Expand Up @@ -116,6 +118,7 @@ public final class BulkRetryStrategy {
private final Counter bulkRequestNotFoundErrors;
private final Counter bulkRequestTimeoutErrors;
private final Counter bulkRequestServerErrors;
private final Counter documentsVersionConflictErrors;
private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class);

static class BulkOperationRequestResponse {
Expand Down Expand Up @@ -160,6 +163,7 @@ public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOpera
bulkRequestNotFoundErrors = pluginMetrics.counter(BULK_REQUEST_NOT_FOUND_ERRORS);
bulkRequestTimeoutErrors = pluginMetrics.counter(BULK_REQUEST_TIMEOUT_ERRORS);
bulkRequestServerErrors = pluginMetrics.counter(BULK_REQUEST_SERVER_ERRORS);
documentsVersionConflictErrors = pluginMetrics.counter(DOCUMENTS_VERSION_CONFLICT_ERRORS);
}

private void incrementErrorCounters(final Exception e) {
Expand Down Expand Up @@ -240,7 +244,7 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, e);
if (e == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (Objects.nonNull(bulkItemResponse.error())) {
if (bulkItemResponse.error() != null) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
}
}
Expand All @@ -255,15 +259,16 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
}

private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure) {
LOG.warn("Bulk Operation Failed.", failure);
if (failure == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (Objects.nonNull(bulkItemResponse.error())) {
// Skip logging the error for version conflicts
if (bulkItemResponse.error() != null && !VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
}
}
handleFailures(bulkRequest, bulkResponse.items());
} else {
LOG.warn("Bulk Operation Failed.", failure);
handleFailures(bulkRequest, failure);
}
bulkRequestFailedCounter.increment();
Expand Down Expand Up @@ -313,6 +318,10 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
if (bulkItemResponse.error() != null) {
if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
requestToReissue.addOperation(bulkOperation);
} else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally keen on this logging, but we should also be consistent. Why log here when we often don't log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's debug log to share details about version conflicts. Some users may wish to know which records had conflicts

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other failures are logged, just elsewhere. This one is dropped instead of sending to DLQ or logging the failure i OpenSearchSink, so we log here

bulkOperation.releaseEventHandle(true);
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
Expand All @@ -338,10 +347,16 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
final BulkResponseItem bulkItemResponse = itemResponses.get(i);
final BulkOperationWrapper bulkOperation = accumulatingBulkRequest.getOperationAt(i);
if (bulkItemResponse.error() != null) {
failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
.withBulkResponseItem(bulkItemResponse)
.build());
if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
.withBulkResponseItem(bulkItemResponse)
.build());
}
documentErrorsCounter.increment();
} else {
sentDocumentsCounter.increment();
Expand Down
Loading