Skip to content

Commit

Permalink
Add normalize_index flag to normalize invalid dynamic indices (#3634)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 11, 2023
1 parent 418480c commit d267486
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class MetadataKeyAttributes {

static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "dynamodb_timestamp";

static final String EVENT_DYNAMODB_ITEM_VERSION = "dynamodb_item_version";
static final String EVENT_VERSION_FROM_TIMESTAMP = "document_version";

static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Map;

import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_DYNAMODB_ITEM_VERSION;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -97,7 +97,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,
eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis);
eventMetadata.setAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE, eventName);
eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName));
eventMetadata.setAttribute(EVENT_DYNAMODB_ITEM_VERSION, eventVersionNumber);
eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber);

String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName());
eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_DYNAMODB_ITEM_VERSION;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -146,7 +146,7 @@ void test_writeSingleRecordToBuffer() throws Exception {
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue());
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), nullValue());
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue());
assertThat(event.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(0L));
assertThat(event.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(0L));
assertThat(event.getEventHandle(), notNullValue());
assertThat(event.getEventHandle().getExternalOriginationTime(), nullValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_DYNAMODB_ITEM_VERSION;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
Expand Down Expand Up @@ -180,7 +180,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta
assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(firstEventForSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(timestamp.toEpochMilli()));
assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(timestamp.toEpochMilli() * 1000));
assertThat(firstEventForSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(timestamp.toEpochMilli() * 1000));
assertThat(firstEventForSecond.getEventHandle(), notNullValue());
assertThat(firstEventForSecond.getEventHandle().getExternalOriginationTime(), equalTo(timestamp));

Expand All @@ -195,7 +195,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta
assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(secondEventForSameSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(timestamp.toEpochMilli()));
assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(timestamp.toEpochMilli() * 1000 + 1));
assertThat(secondEventForSameSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(timestamp.toEpochMilli() * 1000 + 1));
assertThat(secondEventForSameSecond.getEventHandle(), notNullValue());
assertThat(secondEventForSameSecond.getEventHandle().getExternalOriginationTime(), equalTo(timestamp));

Expand All @@ -210,7 +210,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta
assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(olderSecond.toEpochMilli()));
assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(olderSecond.toEpochMilli() * 1000));
assertThat(thirdEventWithOlderSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(olderSecond.toEpochMilli() * 1000));
assertThat(thirdEventWithOlderSecond.getEventHandle(), notNullValue());
assertThat(thirdEventWithOlderSecond.getEventHandle().getExternalOriginationTime(), equalTo(olderSecond));

Expand All @@ -225,7 +225,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta
assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(newerSecond.toEpochMilli()));
assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_DYNAMODB_ITEM_VERSION), equalTo(newerSecond.toEpochMilli() * 1000));
assertThat(fourthEventWithNewerSecond.getMetadata().getAttribute(EVENT_VERSION_FROM_TIMESTAMP), equalTo(newerSecond.toEpochMilli() * 1000));
assertThat(fourthEventWithNewerSecond.getEventHandle(), notNullValue());
assertThat(fourthEventWithNewerSecond.getEventHandle().getExternalOriginationTime(), equalTo(newerSecond));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -27,6 +27,8 @@ public class DynamicIndexManager extends AbstractIndexManager {
private Cache<String, IndexManager> indexManagerCache;
final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30;
final int APPROXIMATE_INDEX_MANAGER_SIZE = 32;
private static final String INVALID_INDEX_CHARACTERS = "[:\\\"*+/\\\\|?#><]";
private static final Pattern INVALID_REGEX_CHARACTERS_PATTERN = Pattern.compile(INVALID_INDEX_CHARACTERS);
private final long cacheSizeInKB = 1024;
protected RestHighLevelClient restHighLevelClient;
protected OpenSearchClient openSearchClient;
Expand Down Expand Up @@ -73,6 +75,11 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException {
throw new IOException("index alias is null");
}
String fullIndexAlias = AbstractIndexManager.getIndexAliasWithDate(dynamicIndexAlias);

if (openSearchSinkConfiguration.getIndexConfiguration().isNormalizeIndex()) {
fullIndexAlias = normalizeIndex(fullIndexAlias);
}

IndexManager indexManager = indexManagerCache.getIfPresent(fullIndexAlias);
if (indexManager == null) {
indexManager = indexManagerFactory.getIndexManager(
Expand All @@ -82,7 +89,6 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException {
}
return indexManager.getIndexName(fullIndexAlias);
}

private void setupIndexWithRetries(final IndexManager indexManager) throws IOException {
boolean isIndexSetup = false;

Expand All @@ -100,5 +106,29 @@ private void setupIndexWithRetries(final IndexManager indexManager) throws IOExc
}
}
}
// Restrictions on index names (https://opensearch.org/docs/1.0/opensearch/rest-api/create-index/#index-naming-restrictions)
private String normalizeIndex(final String indexName) {
String normalizedIndexName = indexName.toLowerCase(Locale.ROOT);

normalizedIndexName = INVALID_REGEX_CHARACTERS_PATTERN.matcher(normalizedIndexName).replaceAll("");

while (normalizedIndexName.startsWith("_") || normalizedIndexName.startsWith("-")) {
if (normalizedIndexName.length() == 1) {
throw new RuntimeException(String.format(
"Unable to normalize index '%s'. This index name is invalid.", indexName)
);
}

normalizedIndexName = normalizedIndexName.substring(1);
}

if (normalizedIndexName.isBlank()) {
throw new RuntimeException(String.format(
"Unable to normalize index '%s'. The result after normalization was an empty String.", indexName)
);
}

return normalizedIndexName;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class IndexConfiguration {
public static final String DOCUMENT_ROOT_KEY = "document_root_key";
public static final String DOCUMENT_VERSION_EXPRESSION = "document_version";
public static final String DOCUMENT_VERSION_TYPE = "document_version_type";
public static final String NORMALIZE_INDEX = "normalize_index";

private IndexType indexType;
private TemplateType templateType;
Expand All @@ -96,6 +97,7 @@ public class IndexConfiguration {
private final String documentRootKey;
private final String versionExpression;
private final VersionType versionType;
private final boolean normalizeIndex;

private static final String S3_PREFIX = "s3://";
private static final String DEFAULT_AWS_REGION = "us-east-1";
Expand All @@ -112,6 +114,7 @@ private IndexConfiguration(final Builder builder) {
this.s3Client = builder.s3Client;
this.versionExpression = builder.versionExpression;
this.versionType = builder.versionType;
this.normalizeIndex = builder.normalizeIndex;

determineTemplateType(builder);

Expand Down Expand Up @@ -230,6 +233,8 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti

final String versionExpression = pluginSetting.getStringOrDefault(DOCUMENT_VERSION_EXPRESSION, null);
final String versionType = pluginSetting.getStringOrDefault(DOCUMENT_VERSION_TYPE, null);
final boolean normalizeIndex = pluginSetting.getBooleanOrDefault(NORMALIZE_INDEX, false);
builder = builder.withNormalizeIndex(normalizeIndex);

builder = builder.withVersionExpression(versionExpression);
if (versionExpression != null && (!expressionEvaluator.isValidFormatExpression(versionExpression))) {
Expand Down Expand Up @@ -376,6 +381,8 @@ public String getDocumentRootKey() {

public String getVersionExpression() { return versionExpression; }

public boolean isNormalizeIndex() { return normalizeIndex; }

/**
* This method is used in the creation of IndexConfiguration object. It takes in the template file path
* or index type and returns the index template read from the file or specific to index type or returns an
Expand Down Expand Up @@ -458,6 +465,7 @@ public static class Builder {
private String documentRootKey;
private VersionType versionType;
private String versionExpression;
private boolean normalizeIndex;

public Builder withIndexAlias(final String indexAlias) {
checkArgument(indexAlias != null, "indexAlias cannot be null.");
Expand Down Expand Up @@ -626,6 +634,11 @@ public Builder withVersionType(final String versionType) {
return this;
}

public Builder withNormalizeIndex(final boolean normalizeIndex) {
this.normalizeIndex = normalizeIndex;
return this;
}

private VersionType getVersionType(final String versionType) {
switch (versionType.toLowerCase()) {
case "internal":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.mockito.MockedStatic;
import org.opensearch.client.IndicesClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
Expand All @@ -24,13 +28,16 @@
import java.util.Optional;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -230,4 +237,50 @@ public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilFailure() throws I

verify(innerIndexManager, times(3)).setupIndex();
}

@ParameterizedTest
@CsvSource(value = {"INVALID_INDEX#, invalid_index", "-AAA:\\\"*+/\\\\|?#><, aaa", "_TeST_InDeX<, test_index", "--<t, t"})
public void normalize_index_correctly_normalizes_invalid_indexes(final String dynamicIndexName, final String normalizedDynamicIndexName) throws IOException {
when(indexConfiguration.isNormalizeIndex()).thenReturn(true);
innerIndexManager = mock(IndexManager.class);


when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, normalizedDynamicIndexName)).thenReturn(innerIndexManager);
when(innerIndexManager.getIndexName(normalizedDynamicIndexName)).thenReturn(normalizedDynamicIndexName);

final String result = dynamicIndexManager.getIndexName(dynamicIndexName);
assertThat(result, equalTo(normalizedDynamicIndexName));
}

@Test
public void normalize_index_correctly_normalizes_indexes_correctly_with_data_time_patterns() throws IOException {
final String dynamicIndexName = "-<_-test-%{yyyy.MM.dd}";
final String indexWithDateTimePatternResolved = "-<_-test-2023.11.11";
final String normalizedDynamicIndexName = "test-2023.11.11";
when(indexConfiguration.isNormalizeIndex()).thenReturn(true);
innerIndexManager = mock(IndexManager.class);


when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, normalizedDynamicIndexName)).thenReturn(innerIndexManager);
when(innerIndexManager.getIndexName(normalizedDynamicIndexName)).thenReturn(normalizedDynamicIndexName);

try (final MockedStatic<AbstractIndexManager> abstractIndexManagerMockedStatic = mockStatic(AbstractIndexManager.class)) {
abstractIndexManagerMockedStatic.when(() -> AbstractIndexManager.getIndexAliasWithDate(dynamicIndexName))
.thenReturn(indexWithDateTimePatternResolved);
final String result = dynamicIndexManager.getIndexName(dynamicIndexName);
assertThat(result, equalTo(normalizedDynamicIndexName));
}
}

@ParameterizedTest
@ValueSource(strings = {"*-<-", "<?"})
public void normalize_index_resulting_in_empty_index_throws_expected_exception(final String dynamicIndexName) {
when(indexConfiguration.isNormalizeIndex()).thenReturn(true);
final RuntimeException exception = assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(dynamicIndexName));

assertThat(exception, notNullValue());
assertThat(exception.getMessage(), startsWith("Unable to normalize index"));
}
}

0 comments on commit d267486

Please sign in to comment.