From 1e871e9f6df24cbf73f2e7b6621bff8ca137c3b4 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 20 Dec 2023 23:02:34 +0000 Subject: [PATCH 1/5] Support format expressions for routing in the opensearch sink Signed-off-by: Krishna Kondaka --- .../sink/opensearch/OpenSearchSinkIT.java | 62 ++++++++++++++++++- .../sink/opensearch/OpenSearchSink.java | 9 ++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 5d8865aca9..03ba90342d 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -127,6 +127,7 @@ public class OpenSearchSinkIT { @Mock private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock private ExpressionEvaluator expressionEvaluator; public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) { @@ -217,6 +218,7 @@ public void testInstantiateSinkRawSpanDefault() throws IOException { @Test @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws IOException { + final String reservedIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); final Request request = new Request(HttpMethod.PUT, reservedIndexAlias); client.performRequest(request); @@ -1016,9 +1018,9 @@ public void testEventOutputWithTags() throws IOException, InterruptedException { @Test @DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON) public void testEventOutput() throws IOException, InterruptedException { - + final String spanId = UUID.randomUUID().toString(); final Event testEvent = JacksonEvent.builder() - .withData("{\"log\": \"foobar\"}") + .withData("{\"log\": \"foobar\", \"spanId\": \""+spanId+"\"}") .withEventType("event") .build(); @@ -1032,6 +1034,7 @@ public void testEventOutput() throws IOException, InterruptedException { final List> retSources = getSearchResponseDocSources(expIndexAlias); final Map expectedContent = new HashMap<>(); expectedContent.put("log", "foobar"); + expectedContent.put("spanId", spanId); assertThat(retSources.size(), equalTo(1)); assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); @@ -1116,6 +1119,61 @@ public void testOpenSearchRoutingField(final String testRoutingField) throws IOE sink.shutdown(); } + @ParameterizedTest + @ValueSource(strings = {"info/ids/rid", "rid"}) + public void testOpenSearchRoutingFieldWithExpressions(final String testRoutingField) throws IOException, InterruptedException { + final String expectedRoutingField = UUID.randomUUID().toString(); + final String testIndexAlias = "test_index"; + final Event testEvent = JacksonEvent.builder() + .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) + .withEventType("event") + .build(); + testEvent.put(testRoutingField, expectedRoutingField); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, "${/"+testRoutingField+"}"); + when(expressionEvaluator.isValidFormatExpression(any(String.class))).thenReturn(true); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + + final List routingFields = getSearchResponseRoutingFields(testIndexAlias); + for (String routingField : routingFields) { + assertThat(routingField, equalTo(expectedRoutingField)); + } + sink.shutdown(); + } + + @ParameterizedTest + @ValueSource(strings = {"info/ids/rid", "rid"}) + public void testOpenSearchRoutingFieldWithMixedExpressions(final String testRoutingField) throws IOException, InterruptedException { + final String routingField = UUID.randomUUID().toString(); + final String testIndexAlias = "test_index"; + final Event testEvent = JacksonEvent.builder() + .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) + .withEventType("event") + .build(); + testEvent.put(testRoutingField, routingField); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final String prefix = RandomStringUtils.randomAlphabetic(5); + final String suffix = RandomStringUtils.randomAlphabetic(6); + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, prefix+"-${/"+testRoutingField+"}-"+suffix); + when(expressionEvaluator.isValidFormatExpression(any(String.class))).thenReturn(true); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + final String expectedRoutingField = prefix+"-"+routingField+"-"+suffix; + + final List routingFields = getSearchResponseRoutingFields(testIndexAlias); + for (String field : routingFields) { + assertThat(field, equalTo(expectedRoutingField)); + } + sink.shutdown(); + } + @ParameterizedTest @ValueSource(strings = {"info/ids/id", "id"}) public void testOpenSearchDynamicIndex(final String testIndex) throws IOException, InterruptedException { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 9c2352b3e1..9ea0104f71 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -473,7 +473,14 @@ private SerializedJson getDocument(final Event event) { } } - String routing = (routingField != null) ? event.get(routingField, String.class) : null; + String routing = null; + if (routingField != null) { + if (expressionEvaluator.isValidFormatExpression(routingField)) { + routing = event.formatString(routingField, expressionEvaluator); + } else { + routing = event.get(routingField, String.class); + } + } final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys()); From 0cc3b09a1d537a6a89315757c376cf09e0ba6a54 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 3 Jan 2024 03:58:13 +0000 Subject: [PATCH 2/5] Addressed review comments Signed-off-by: Krishna Kondaka --- data-prepper-plugins/opensearch/README.md | 66 +++++++++++++++---- .../sink/opensearch/OpenSearchSinkIT.java | 47 +++++++++---- .../sink/opensearch/OpenSearchSink.java | 14 ++-- .../opensearch/index/IndexConfiguration.java | 17 +++++ 4 files changed, 116 insertions(+), 28 deletions(-) diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index 6ea7f3972c..6d9e28f469 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -75,7 +75,7 @@ Default is null. - `aws_sts_role_arn`: A IAM role arn which the sink plugin will assume to sign request to Amazon OpenSearch Service. If not provided the plugin will use the default credentials. -- `aws_sts_external_id`: An optional external ID to use when assuming an IAM role. +- `aws_sts_external_id`: An optional external ID to use when assuming an IAM role. - `aws_sts_header_overrides`: An optional map of header overrides to make when assuming the IAM role for the sink plugin. @@ -95,7 +95,7 @@ Default is null. - `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling. -- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. For `distribution_version` set to `es6`, default value is `false`, otherwise default value is `true`. +- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. For `distribution_version` set to `es6`, default value is `false`, otherwise default value is `true`. ``` APM trace analytics raw span data type example: @@ -128,7 +128,7 @@ Default is null. "traceGroupName": "MakePayement.auto" } ``` -- `index`: A String used as index name for custom data type. Applicable and required only If index_type is explicitly `custom` or defaults to be `custom`. +- `index`: A String used as index name for custom data type. Applicable and required only If `index_type` is explicitly `custom` or defaults to be `custom`. * This index name can be a plain string, such as `application`, `my-index-name`. * This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`. * This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value. @@ -136,7 +136,7 @@ Default is null. - `template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint. Note: when `distribution_version` is `es6`, `template_type` is enforced into `v1`. - `template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of -`"template"` key in the json content of OpenSearch [Index templates API](https://opensearch.org/docs/latest/opensearch/index-templates/), +`"template"` key in the json content of OpenSearch [Index templates API](https://opensearch.org/docs/latest/opensearch/index-templates/), e.g. [otel-v1-apm-span-index-template.json](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/resources/otel-v1-apm-span-index-template.json) - `number_of_shards` (optional): The number of primary shards that an index should have on the destination OpenSearch server. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. OpenSearch documentation has [more about this parameter](https://opensearch.org/docs/latest/opensearch/rest-api/index-apis/create-index/). @@ -178,7 +178,7 @@ all the records received from the upstream prepper at a time will be sent as a s If a single record turns out to be larger than the set bulk size, it will be sent as a bulk request of a single document. - `estimate_bulk_size_using_compression` (optional): A boolean dictating whether to compress the bulk requests when estimating -the size. This option is ignored if request compression is not enabled for the OpenSearch client. This is an experimental +the size. This option is ignored if request compression is not enabled for the OpenSearch client. This is an experimental feature and makes no guarantees about the accuracy of the estimation. Default is false. - `max_local_compressions_for_estimation` (optional): An integer of the maximum number of times to compress a partially packed @@ -189,11 +189,53 @@ and is ignored unless `estimate_bulk_size_using_compression` is enabled. Default If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute. -- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression - that is evaluated to determine the document_id_field. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key - as the document_id +- `document_id_field` (optional) (deprecated) : A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression that is evaluated to determine the document_id_field. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key as the `document_id` -- `routing_field` (optional): A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id +- `document_id` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression that is evaluated to determine the `document_id`. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key as the document_id + * This `document_id` string can also be a formatted string, such as `doc-${docId}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${docId}" will be replaced by it's value in the event that is being processed. The format may also be like "${docId1/docId2/docId3}" in which case the field "docId1/docId2/docId3" is searched in the event and replaced by its value. + * Additionally, the formatted string can include expressions to evaluate to format the document id. For example, `my-${docId}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `docId` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the document id. + +- `routing_field` (optional) (deprecated) : A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the `routing id` + +- `routing` (optional): A string which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id. + * This routing string can also be a formatted string, such as `routing-${rid}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${rid}" will be replaced by it's value in the event that is being processed. The format may also be like "${rid1/rid2/rid3}" in which case the field "rid1/rid2/rid3" is searched in the event and replaced by its value. + * Additionally, the formatted string can include expressions to evaluate to format the routing string. For example, `my-${rid}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `rid` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the routing string. + Examples: + 1. Routing config with simple strings +``` + sink: + opensearch: + hosts: ["https://your-amazon-opensearch-service-endpoint"] + aws_sigv4: true + cert: path/to/cert + insecure: false + routing: "test_routing_string" + bulk_size: 4 +``` + + 2. Routing config with keys from event +``` + sink: + opensearch: + hosts: ["https://your-amazon-opensearch-service-endpoint"] + aws_sigv4: true + cert: path/to/cert + insecure: false + routing: "${/info/id}" + bulk_size: 4 +``` + + 3. Routing config with more complex expressions +``` + sink: + opensearch: + hosts: ["https://your-amazon-opensearch-service-endpoint"] + aws_sigv4: true + cert: path/to/cert + insecure: false + routing: '${/info/id}-test-${getMetadata("metadata_key")}' + bulk_size: 4 +``` - `ism_policy_file` (optional): A String of absolute file path or AWS S3 URI for an ISM (Index State Management) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, `custom` index type is currently the only one without a built-in policy file, thus it would use the policy file here if it's provided through this parameter. OpenSearch documentation has more about [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) @@ -207,7 +249,7 @@ the flush timeout and instead flush whatever is present at the end of each batch - `trace_analytics_service_map`: No longer supported starting Data Prepper 2.0. Use `index_type` instead. -- `document_root_key`: The key in the event that will be used as the root in the document. The default is the root of the event. If the key does not exist the entire event is written as the document. If the value at the `document_root_key` is a basic type (ie String, int, etc), the document will have a structure of `{"data": }`. For example, If we have the following sample event: +- `document_root_key`: The key in the event that will be used as the root in the document. The default is the root of the event. If the key does not exist the entire event is written as the document. If the value at the `document_root_key` is a basic type (ie String, int, etc), the document will have a structure of `{"data": }`. For example, If we have the following sample event: ``` { @@ -296,7 +338,7 @@ if `exclude_keys` is set to ["message", "status"], the document written to OpenS * `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). * `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). * `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin. -* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`. +* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`. * `serverless_options` (Optional): Additional options you can specify when using serverless. #### Serverless Configuration @@ -355,7 +397,7 @@ If the events received by the OpenSearch Sink have end-to-end acknowledgements e This plugin is compatible with Java 8. See -- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) - [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) # OpenSearch Source diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 03ba90342d..39c23a3fb4 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -1121,55 +1121,80 @@ public void testOpenSearchRoutingField(final String testRoutingField) throws IOE @ParameterizedTest @ValueSource(strings = {"info/ids/rid", "rid"}) - public void testOpenSearchRoutingFieldWithExpressions(final String testRoutingField) throws IOException, InterruptedException { - final String expectedRoutingField = UUID.randomUUID().toString(); + public void testOpenSearchRouting(final String testRouting) throws IOException, InterruptedException { + final String expectedRouting = UUID.randomUUID().toString(); final String testIndexAlias = "test_index"; final Event testEvent = JacksonEvent.builder() .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) .withEventType("event") .build(); - testEvent.put(testRoutingField, expectedRoutingField); + testEvent.put(testRouting, expectedRouting); final List> testRecords = Collections.singletonList(new Record<>(testEvent)); final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, "${/"+testRoutingField+"}"); + pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, testRouting); + final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + sink.output(testRecords); + + final List routingFields = getSearchResponseRoutingFields(testIndexAlias); + for (String routingField : routingFields) { + assertThat(routingField, equalTo(expectedRouting)); + } + sink.shutdown(); + } + + @ParameterizedTest + @ValueSource(strings = {"info/ids/rid", "rid"}) + public void testOpenSearchRoutingWithExpressions(final String testRouting) throws IOException, InterruptedException { + final String expectedRouting = UUID.randomUUID().toString(); + final String testIndexAlias = "test_index"; + final Event testEvent = JacksonEvent.builder() + .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) + .withEventType("event") + .build(); + testEvent.put(testRouting, expectedRouting); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); + pluginSetting.getSettings().put(IndexConfiguration.ROUTING, "${/"+testRouting+"}"); when(expressionEvaluator.isValidFormatExpression(any(String.class))).thenReturn(true); final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List routingFields = getSearchResponseRoutingFields(testIndexAlias); for (String routingField : routingFields) { - assertThat(routingField, equalTo(expectedRoutingField)); + assertThat(routingField, equalTo(expectedRouting)); } sink.shutdown(); } @ParameterizedTest @ValueSource(strings = {"info/ids/rid", "rid"}) - public void testOpenSearchRoutingFieldWithMixedExpressions(final String testRoutingField) throws IOException, InterruptedException { - final String routingField = UUID.randomUUID().toString(); + public void testOpenSearchRoutingWithMixedExpressions(final String testRouting) throws IOException, InterruptedException { + final String routing = UUID.randomUUID().toString(); final String testIndexAlias = "test_index"; final Event testEvent = JacksonEvent.builder() .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) .withEventType("event") .build(); - testEvent.put(testRoutingField, routingField); + testEvent.put(testRouting, routing); final List> testRecords = Collections.singletonList(new Record<>(testEvent)); final String prefix = RandomStringUtils.randomAlphabetic(5); final String suffix = RandomStringUtils.randomAlphabetic(6); final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, prefix+"-${/"+testRoutingField+"}-"+suffix); + pluginSetting.getSettings().put(IndexConfiguration.ROUTING, prefix+"-${/"+testRouting+"}-"+suffix); when(expressionEvaluator.isValidFormatExpression(any(String.class))).thenReturn(true); final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); - final String expectedRoutingField = prefix+"-"+routingField+"-"+suffix; + final String expectedRouting = prefix+"-"+routing+"-"+suffix; final List routingFields = getSearchResponseRoutingFields(testIndexAlias); for (String field : routingFields) { - assertThat(field, equalTo(expectedRoutingField)); + assertThat(field, equalTo(expectedRouting)); } sink.shutdown(); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 9ea0104f71..e92c1e5b0b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -112,6 +112,7 @@ public class OpenSearchSink extends AbstractSink> { private final String documentIdField; private final String documentId; private final String routingField; + private final String routing; private final String action; private final List> actions; private final String documentRootKey; @@ -163,6 +164,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField(); this.documentId = openSearchSinkConfig.getIndexConfiguration().getDocumentId(); this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField(); + this.routing = openSearchSinkConfig.getIndexConfiguration().getRouting(); this.action = openSearchSinkConfig.getIndexConfiguration().getAction(); this.actions = openSearchSinkConfig.getIndexConfiguration().getActions(); this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey(); @@ -473,18 +475,20 @@ private SerializedJson getDocument(final Event event) { } } - String routing = null; + String routingValue = null; if (routingField != null) { - if (expressionEvaluator.isValidFormatExpression(routingField)) { - routing = event.formatString(routingField, expressionEvaluator); + routingValue = event.get(routingField, String.class); + } else if (routing != null) { + if (expressionEvaluator.isValidFormatExpression(routing)) { + routingValue = event.formatString(routing, expressionEvaluator); } else { - routing = event.get(routingField, String.class); + routingValue = event.get(routing, String.class); } } final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys()); - return SerializedJson.fromStringAndOptionals(document, docId, routing); + return SerializedJson.fromStringAndOptionals(document, docId, routingValue); } private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index edceb5b7e5..3f129b8933 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -56,6 +56,7 @@ public class IndexConfiguration { public static final String DOCUMENT_ID_FIELD = "document_id_field"; public static final String DOCUMENT_ID = "document_id"; public static final String ROUTING_FIELD = "routing_field"; + public static final String ROUTING = "routing"; public static final String ISM_POLICY_FILE = "ism_policy_file"; public static final long DEFAULT_BULK_SIZE = 5L; public static final boolean DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION = false; @@ -81,6 +82,7 @@ public class IndexConfiguration { private final String documentIdField; private final String documentId; private final String routingField; + private final String routing; private final long bulkSize; private final boolean estimateBulkSizeUsingCompression; private int maxLocalCompressionsForEstimation; @@ -144,6 +146,7 @@ private IndexConfiguration(final Builder builder) { this.maxLocalCompressionsForEstimation = builder.maxLocalCompressionsForEstimation; this.flushTimeout = builder.flushTimeout; this.routingField = builder.routingField; + this.routing = builder.routing; String documentIdField = builder.documentIdField; String documentId = builder.documentId; @@ -255,8 +258,12 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti } final String routingField = pluginSetting.getStringOrDefault(ROUTING_FIELD, null); + final String routing = pluginSetting.getStringOrDefault(ROUTING, null); if (routingField != null) { + LOG.warn("routing_field is deprecated in favor of routing, and support for document_field will be removed in a future major version release."); builder = builder.withRoutingField(routingField); + } else if (routing != null) { + builder = builder.withRouting(routing); } final String ismPolicyFile = pluginSetting.getStringOrDefault(ISM_POLICY_FILE, null); @@ -325,6 +332,10 @@ public String getRoutingField() { return routingField; } + public String getRouting() { + return routing; + } + public long getBulkSize() { return bulkSize; } @@ -447,6 +458,7 @@ public static class Builder { private int numShards; private int numReplicas; private String routingField; + private String routing; private String documentIdField; private String documentId; private long bulkSize = DEFAULT_BULK_SIZE; @@ -517,6 +529,11 @@ public Builder withRoutingField(final String routingField) { return this; } + public Builder withRouting(final String routing) { + this.routing = routing; + return this; + } + public Builder withBulkSize(final long bulkSize) { this.bulkSize = bulkSize; return this; From 25956ceb132acfa2aabdb53a019d55dd9c3ec3e5 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Thu, 4 Jan 2024 21:43:25 +0000 Subject: [PATCH 3/5] Addressed review comments Signed-off-by: Krishna Kondaka --- .../plugins/sink/opensearch/OpenSearchSink.java | 10 +++++----- .../sink/opensearch/index/IndexConfiguration.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index e92c1e5b0b..c319da8955 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -479,11 +479,11 @@ private SerializedJson getDocument(final Event event) { if (routingField != null) { routingValue = event.get(routingField, String.class); } else if (routing != null) { - if (expressionEvaluator.isValidFormatExpression(routing)) { - routingValue = event.formatString(routing, expressionEvaluator); - } else { - routingValue = event.get(routing, String.class); - } + try { + routingValue = event.formatString(routing, expressionEvaluator); + } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { + LOG.error("Unable to construct routing with format {}, the document_id will be generated by OpenSearch", routing, e); + } } final String document = DocumentBuilder.build(event, documentRootKey, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys()); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 3f129b8933..b3bbb213b2 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -260,7 +260,7 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti final String routingField = pluginSetting.getStringOrDefault(ROUTING_FIELD, null); final String routing = pluginSetting.getStringOrDefault(ROUTING, null); if (routingField != null) { - LOG.warn("routing_field is deprecated in favor of routing, and support for document_field will be removed in a future major version release."); + LOG.warn("routing_field is deprecated in favor of routing, and support for routing_field will be removed in a future major version release."); builder = builder.withRoutingField(routingField); } else if (routing != null) { builder = builder.withRouting(routing); From c13ebcf3284e434d9a1258cc46e38c7ab465a6c4 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 5 Jan 2024 04:29:59 +0000 Subject: [PATCH 4/5] Addressed review comments Signed-off-by: Krishna Kondaka --- .../sink/opensearch/OpenSearchSinkIT.java | 17 +++++++++++++---- .../plugins/sink/opensearch/OpenSearchSink.java | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 39c23a3fb4..403f8be0ec 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -88,6 +88,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.closeTo; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -1120,7 +1121,7 @@ public void testOpenSearchRoutingField(final String testRoutingField) throws IOE } @ParameterizedTest - @ValueSource(strings = {"info/ids/rid", "rid"}) + @ValueSource(strings = {"", "info/ids/rid", "rid"}) public void testOpenSearchRouting(final String testRouting) throws IOException, InterruptedException { final String expectedRouting = UUID.randomUUID().toString(); final String testIndexAlias = "test_index"; @@ -1128,18 +1129,26 @@ public void testOpenSearchRouting(final String testRouting) throws IOException, .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) .withEventType("event") .build(); - testEvent.put(testRouting, expectedRouting); + if (!testRouting.isEmpty()) { + testEvent.put(testRouting, expectedRouting); + } final List> testRecords = Collections.singletonList(new Record<>(testEvent)); final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null); - pluginSetting.getSettings().put(IndexConfiguration.ROUTING_FIELD, testRouting); + if (!testRouting.isEmpty()) { + pluginSetting.getSettings().put(IndexConfiguration.ROUTING, "${"+testRouting+"}"); + } final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final List routingFields = getSearchResponseRoutingFields(testIndexAlias); for (String routingField : routingFields) { - assertThat(routingField, equalTo(expectedRouting)); + if (!testRouting.isEmpty()) { + assertThat(routingField, equalTo(expectedRouting)); + } else { + assertTrue(Objects.isNull(routingField)); + } } sink.shutdown(); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index c319da8955..4445ba6b85 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -482,7 +482,7 @@ private SerializedJson getDocument(final Event event) { try { routingValue = event.formatString(routing, expressionEvaluator); } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { - LOG.error("Unable to construct routing with format {}, the document_id will be generated by OpenSearch", routing, e); + LOG.error("Unable to construct routing with format {}, the routing will be generated by OpenSearch", routing, e); } } From 45e291dc7dab5ba23219ce45ff3ab58c483d4fa9 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sun, 7 Jan 2024 23:13:39 +0000 Subject: [PATCH 5/5] Addressed review comments Signed-off-by: Krishna Kondaka --- .../sink/opensearch/OpenSearchSink.java | 3 +- .../sink/opensearch/OpenSearchSinkTest.java | 51 ++++++++++++++++--- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 4445ba6b85..c657f6b792 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -461,8 +461,7 @@ public void doOutput(final Collection> records) { lastFlushTimeMap.put(threadId, lastFlushTime); } - private SerializedJson getDocument(final Event event) { - + SerializedJson getDocument(final Event event) { String docId = null; if (Objects.nonNull(documentIdField)) { diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 237f4b9a2f..8789929823 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.Collections; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -56,6 +57,7 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.lenient; import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.EXTERNAL_LATENCY; import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.INTERNAL_LATENCY; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_ERRORS; @@ -128,30 +130,31 @@ void setup() { final RetryConfiguration retryConfiguration = mock(RetryConfiguration.class); when(retryConfiguration.getDlq()).thenReturn(Optional.empty()); - when(retryConfiguration.getDlqFile()).thenReturn(null); + lenient().when(retryConfiguration.getDlqFile()).thenReturn(null); final ConnectionConfiguration connectionConfiguration = mock(ConnectionConfiguration.class); final RestHighLevelClient restHighLevelClient = mock(RestHighLevelClient.class); - when(connectionConfiguration.createClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient); - when(connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier)).thenReturn(openSearchClient); + lenient().when(connectionConfiguration.createClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient); + lenient().when(connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier)).thenReturn(openSearchClient); when(indexConfiguration.getAction()).thenReturn("index"); when(indexConfiguration.getDocumentId()).thenReturn(null); when(indexConfiguration.getDocumentIdField()).thenReturn(null); when(indexConfiguration.getRoutingField()).thenReturn(null); + when(indexConfiguration.getRouting()).thenReturn(null); when(indexConfiguration.getActions()).thenReturn(null); when(indexConfiguration.getDocumentRootKey()).thenReturn(null); - when(indexConfiguration.getVersionType()).thenReturn(null); - when(indexConfiguration.getVersionExpression()).thenReturn(null); - when(indexConfiguration.getIndexAlias()).thenReturn(UUID.randomUUID().toString()); - when(indexConfiguration.getTemplateType()).thenReturn(TemplateType.V1); + lenient().when(indexConfiguration.getVersionType()).thenReturn(null); + lenient().when(indexConfiguration.getVersionExpression()).thenReturn(null); + lenient().when(indexConfiguration.getIndexAlias()).thenReturn(UUID.randomUUID().toString()); + lenient().when(indexConfiguration.getTemplateType()).thenReturn(TemplateType.V1); when(indexConfiguration.getIndexType()).thenReturn(IndexType.CUSTOM); when(indexConfiguration.getBulkSize()).thenReturn(DEFAULT_BULK_SIZE); when(indexConfiguration.getFlushTimeout()).thenReturn(DEFAULT_FLUSH_TIMEOUT); when(openSearchSinkConfiguration.getIndexConfiguration()).thenReturn(indexConfiguration); when(openSearchSinkConfiguration.getRetryConfiguration()).thenReturn(retryConfiguration); - when(openSearchSinkConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration); + lenient().when(openSearchSinkConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration); when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); @@ -238,6 +241,38 @@ void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_ verify(dynamicDocumentVersionDroppedEvents).increment(); } + @Test + void test_routing_field_in_document() throws IOException { + String routingFieldKey = UUID.randomUUID().toString(); + String routingKey = UUID.randomUUID().toString(); + String routingFieldValue = UUID.randomUUID().toString(); + when(indexConfiguration.getRoutingField()).thenReturn(routingFieldKey); + when(indexConfiguration.getRouting()).thenReturn(routingKey); + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(Collections.singletonMap(routingFieldKey, routingFieldValue)) + .build(); + assertThat(objectUnderTest.getDocument(event).getRoutingField(), equalTo(Optional.of(routingFieldValue))); + + } + + @Test + void test_routing_in_document() throws IOException { + String routingValue = UUID.randomUUID().toString(); + String routingKey = UUID.randomUUID().toString(); + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(Collections.singletonMap(routingKey, routingValue)) + .build(); + assertThat(objectUnderTest.getDocument(event).getRoutingField(), equalTo(Optional.empty())); + + when(indexConfiguration.getRouting()).thenReturn("${"+routingKey+"}"); + final OpenSearchSink objectUnderTest2 = createObjectUnderTest(); + assertThat(objectUnderTest2.getDocument(event).getRoutingField(), equalTo(Optional.of(routingValue))); + } + @Test void doOutput_with_invalid_version_expression_result_catches_RuntimeException_and_creates_DLQObject() throws IOException {