Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add document_version and document_version_type parameters to the open… #3591

Merged
merged 3 commits into from
Nov 8, 2023

Conversation

graytaylor0
Copy link
Member

…search sink for conditional indexing of documents

Description

This change adds 2 new parameters to the OpenSearch sink

document_version - A Data Prepper format expression such as ${key} or ${expression()}
document_version_type - One of internal, external, or external_gte

OpenSearch documentation on version and version_type (https://opensearch.org/docs/latest/api-reference/document-apis/index-document/). The documentation incorrectly states that version is an Integer. It is a Long. I tested this manually in OpenSearch 2.9 will epochMilli timestamps, and the SDK also has it as a Long.

Given the following documents being sent to OpenSearch

{"version": 123, "val": "4349", "action": "index"}
{"version": 126, "val": "4349", "action": "index"}
{"version": 125, "doc_id": "4349", "action": "index"}
{"version": 128, "doc_id": "4349", "action": "delete"}
{"version": 127, "doc_id": "4349", "action": "index"}
{"version": 130, "doc_id": "4349", "action": "index", "value": "result"}

and configuration

        document_version: "${/version}"
        document_version_type: "external"  

and enabled DEBUG logging, you would see the following response errors for version conflicts for the third index action (key 125 and the index action following the delete (key 127), and the resultant document in OpenSearch would be the document will the highest key, which is the final request with 130

2023-11-05T10:28:10,155 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy - Received version conflict from OpenSearch: [4349]: version conflict, current version [126] is higher or equal to the one provided [125]
2023-11-05T10:28:10,155 [log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy - Received version conflict from OpenSearch: [4349]: version conflict, current version [128] is higher or equal to the one provided [127]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

// We only check the expression if it matches (.*) to mitigate the issue with the antlr logger ()
// These can't be keys because (, ) is invalid for keys, so we attempt to validate the expression. All expressions currently
// contain a function ( ) so this will detect them to validate against.
if (name.matches("(.*)") && !isValidExpressionStatement(name)) {
Copy link
Collaborator

@kkondaka kkondaka Nov 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this (matches()) doing a very expensive operation to do every time? Is it just making sure there is ( in the beginning and ) at the end?

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation is only supposed to be called to validate the format expression on startup. It's a way to avoid running isValidExpressionStatement and to have those antlr logs show up and to let it be valid when the usage is ${key}. The () is meant to check that we are using an expression function, so we call validate. I'm not sure of the best way to do this. This would be easy if we required the keys to start with /key, then we could easily tell the difference between key and expression. But we can't require that until a 3.0 due to breaking change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just check for first character being '(' and last character being ')'. Doing regex match is lot more expensive if we are doing once. You should do something like

if (name.charAt(0) == '(' && name.charAt(name.length()-1) == ')' && !isValidExpressionStatement(name))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't start with ( though? Does it always end with )? I could change it to just check for the ending with )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to check if it's a valid key or a valid expression, and only is invalid when both are the case. This covers a lot since ( is an invalid character in keys

chenqi0805
chenqi0805 previously approved these changes Nov 6, 2023
…search sink for conditional indexing of documents

Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: Taylor Gray <[email protected]>
chenqi0805
chenqi0805 previously approved these changes Nov 7, 2023
Signed-off-by: Taylor Gray <[email protected]>
@@ -34,4 +34,6 @@ default Boolean evaluateConditional(final String statement, final Event context)
}

Boolean isValidExpressionStatement(final String statement);
}

Boolean isValidFormatExpressions(final String format);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a typo here. This takes a singular expression, but the name uses plural "expressions."

@@ -313,6 +318,10 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
if (bulkItemResponse.error() != null) {
if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
requestToReissue.addOperation(bulkOperation);
} else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally keen on this logging, but we should also be consistent. Why log here when we often don't log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's debug log to share details about version conflicts. Some users may wish to know which records had conflicts

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other failures are logged, just elsewhere. This one is dropped instead of sending to DLQ or logging the failure i OpenSearchSink, so we log here

continue;
}

Long version = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of untested code here. We should have unit tests for these.

Are there opportunities to start splitting logic out of this class to help with testing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire class does not have a unit test, but sure I can start it to test this new code

@@ -646,9 +646,11 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException,
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
Event event = (Event)testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "create");
final String actionFormatExpression = "${getMetadata(\"action\")}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good to add an integration test which creates two documents with the same id and version and then we see the error handled correctly. But, it's not critical.

@graytaylor0 graytaylor0 requested a review from dlvenable November 7, 2023 22:33
@graytaylor0 graytaylor0 dismissed dlvenable’s stale review November 8, 2023 00:38

Addressing review comments in follow up PR

@graytaylor0 graytaylor0 merged commit edb13ef into opensearch-project:main Nov 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants