Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support format expressions for routing in the opensearch sink #3863

Merged
merged 5 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 54 additions & 12 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Default is null.

- `aws_sts_role_arn`: A IAM role arn which the sink plugin will assume to sign request to Amazon OpenSearch Service. If not provided the plugin will use the default credentials.

- `aws_sts_external_id`: An optional external ID to use when assuming an IAM role.
- `aws_sts_external_id`: An optional external ID to use when assuming an IAM role.

- `aws_sts_header_overrides`: An optional map of header overrides to make when assuming the IAM role for the sink plugin.

Expand All @@ -95,7 +95,7 @@ Default is null.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. For `distribution_version` set to `es6`, default value is `false`, otherwise default value is `true`.
- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. For `distribution_version` set to `es6`, default value is `false`, otherwise default value is `true`.

```
APM trace analytics raw span data type example:
Expand Down Expand Up @@ -128,15 +128,15 @@ Default is null.
"traceGroupName": "MakePayement.auto"
}
```
- <a name="index"></a>`index`: A String used as index name for custom data type. Applicable and required only If index_type is explicitly `custom` or defaults to be `custom`.
- <a name="index"></a>`index`: A String used as index name for custom data type. Applicable and required only If `index_type` is explicitly `custom` or defaults to be `custom`.
* This index name can be a plain string, such as `application`, `my-index-name`.
* This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`.
* This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value.
- Additionally, the formatted string can include expressions to evaluate to format the index name. For example, `my-${index}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `index` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the index name.
- <a name="template_type"></a>`template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint. Note: when `distribution_version` is `es6`, `template_type` is enforced into `v1`.

- <a name="template_file"></a>`template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of
`"template"` key in the json content of OpenSearch [Index templates API](https://opensearch.org/docs/latest/opensearch/index-templates/),
`"template"` key in the json content of OpenSearch [Index templates API](https://opensearch.org/docs/latest/opensearch/index-templates/),
e.g. [otel-v1-apm-span-index-template.json](https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/opensearch/src/main/resources/otel-v1-apm-span-index-template.json)

- `number_of_shards` (optional): The number of primary shards that an index should have on the destination OpenSearch server. This parameter is effective only when `template_file` is either explicitly provided in Sink configuration or built-in. If this parameter is set, it would override the value in index template file. OpenSearch documentation has [more about this parameter](https://opensearch.org/docs/latest/opensearch/rest-api/index-apis/create-index/).
Expand Down Expand Up @@ -178,7 +178,7 @@ all the records received from the upstream prepper at a time will be sent as a s
If a single record turns out to be larger than the set bulk size, it will be sent as a bulk request of a single document.

- `estimate_bulk_size_using_compression` (optional): A boolean dictating whether to compress the bulk requests when estimating
the size. This option is ignored if request compression is not enabled for the OpenSearch client. This is an experimental
the size. This option is ignored if request compression is not enabled for the OpenSearch client. This is an experimental
feature and makes no guarantees about the accuracy of the estimation. Default is false.

- `max_local_compressions_for_estimation` (optional): An integer of the maximum number of times to compress a partially packed
Expand All @@ -189,11 +189,53 @@ and is ignored unless `estimate_bulk_size_using_compression` is enabled. Default
If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable
the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute.

- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression
that is evaluated to determine the document_id_field. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key
as the document_id
- `document_id_field` (optional) (deprecated) : A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression that is evaluated to determine the document_id_field. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key as the `document_id`

- `routing_field` (optional): A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id
- `document_id` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression that is evaluated to determine the `document_id`. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key as the document_id
* This `document_id` string can also be a formatted string, such as `doc-${docId}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${docId}" will be replaced by it's value in the event that is being processed. The format may also be like "${docId1/docId2/docId3}" in which case the field "docId1/docId2/docId3" is searched in the event and replaced by its value.
* Additionally, the formatted string can include expressions to evaluate to format the document id. For example, `my-${docId}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `docId` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the document id.

- `routing_field` (optional) (deprecated) : A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the `routing id`

- `routing` (optional): A string which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id.
* This routing string can also be a formatted string, such as `routing-${rid}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${rid}" will be replaced by it's value in the event that is being processed. The format may also be like "${rid1/rid2/rid3}" in which case the field "rid1/rid2/rid3" is searched in the event and replaced by its value.
* Additionally, the formatted string can include expressions to evaluate to format the routing string. For example, `my-${rid}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `rid` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the routing string.
Examples:
1. Routing config with simple strings
```
sink:
opensearch:
hosts: ["https://your-amazon-opensearch-service-endpoint"]
aws_sigv4: true
cert: path/to/cert
insecure: false
routing: "test_routing_string"
bulk_size: 4
```

2. Routing config with keys from event
```
sink:
opensearch:
hosts: ["https://your-amazon-opensearch-service-endpoint"]
aws_sigv4: true
cert: path/to/cert
insecure: false
routing: "${/info/id}"
bulk_size: 4
```

3. Routing config with more complex expressions
```
sink:
opensearch:
hosts: ["https://your-amazon-opensearch-service-endpoint"]
aws_sigv4: true
cert: path/to/cert
insecure: false
routing: '${/info/id}-test-${getMetadata("metadata_key")}'
bulk_size: 4
```

- `ism_policy_file` (optional): A String of absolute file path or AWS S3 URI for an ISM (Index State Management) policy JSON file. This policy file is effective only when there is no built-in policy file for the index type. For example, `custom` index type is currently the only one without a built-in policy file, thus it would use the policy file here if it's provided through this parameter. OpenSearch documentation has more about [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/)

Expand All @@ -207,7 +249,7 @@ the flush timeout and instead flush whatever is present at the end of each batch

- `trace_analytics_service_map`: No longer supported starting Data Prepper 2.0. Use `index_type` instead.

- `document_root_key`: The key in the event that will be used as the root in the document. The default is the root of the event. If the key does not exist the entire event is written as the document. If the value at the `document_root_key` is a basic type (ie String, int, etc), the document will have a structure of `{"data": <value of the document_root_key>}`. For example, If we have the following sample event:
- `document_root_key`: The key in the event that will be used as the root in the document. The default is the root of the event. If the key does not exist the entire event is written as the document. If the value at the `document_root_key` is a basic type (ie String, int, etc), the document will have a structure of `{"data": <value of the document_root_key>}`. For example, If we have the following sample event:

```
{
Expand Down Expand Up @@ -296,7 +338,7 @@ if `exclude_keys` is set to ["message", "status"], the document written to OpenS
* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`.
* `serverless` (Optional): A boolean flag to indicate the OpenSearch backend is Amazon OpenSearch Serverless. Default to `false`. Notice that [ISM policies.](https://opensearch.org/docs/latest/im-plugin/ism/policies/) is not supported in Amazon OpenSearch Serverless and thus any ISM related configuration value has no effect, i.e. `ism_policy_file`.
* `serverless_options` (Optional): Additional options you can specify when using serverless.

#### <a name="serverless_configuration">Serverless Configuration</a>
Expand Down Expand Up @@ -355,7 +397,7 @@ If the events received by the OpenSearch Sink have end-to-end acknowledgements e

This plugin is compatible with Java 8. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

# OpenSearch Source
Expand Down
Loading
Loading