From e338559a6882b65bceece9166554fb0db0c43918 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 23 Oct 2023 16:44:27 -0500 Subject: [PATCH] Retry on dynamic index creation when an OpenSearchException is thrown Signed-off-by: Chase Engelbrecht --- .../sink/opensearch/OpenSearchSink.java | 4 +- .../opensearch/index/DynamicIndexManager.java | 27 +++++++- .../index/DynamicIndexManagerTests.java | 62 +++++++++++++++++++ 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ea7a46b44c..3d3f3cd55b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -331,8 +331,8 @@ public void doOutput(final Collection> records) { String indexName = configuredIndexAlias; try { indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); - } catch (IOException | EventKeyNotFoundException e) { - LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); + } catch (final Exception e) { + LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage(), e); dynamicIndexDroppedEvents.increment(); logFailureForDlqObjects(List.of(DlqObject.builder() .withEventHandle(event.getEventHandle()) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java index a2e613a13b..83eec171fb 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManager.java @@ -5,18 +5,25 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; +import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkNotNull; public class DynamicIndexManager extends AbstractIndexManager { + private static final Logger LOG = LoggerFactory.getLogger(DynamicIndexManager.class); + private static final int INDEX_SETUP_RETRY_WAIT_TIME_MS = 1000; + private Cache indexManagerCache; final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30; final int APPROXIMATE_INDEX_MANAGER_SIZE = 32; @@ -72,9 +79,27 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException { indexManager = indexManagerFactory.getIndexManager( indexType, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, fullIndexAlias); indexManagerCache.put(fullIndexAlias, indexManager); - indexManager.setupIndex(); + setupIndexWithRetries(indexManager); } return indexManager.getIndexName(fullIndexAlias); } + + private void setupIndexWithRetries(final IndexManager indexManager) throws IOException { + boolean isIndexSetup = false; + + while (!isIndexSetup) { + try { + indexManager.setupIndex(); + isIndexSetup = true; + } catch (final OpenSearchException e) { + LOG.warn("Failed to setup dynamic index with an exception. ", e); + try { + Thread.sleep(INDEX_SETUP_RETRY_WAIT_TIME_MS); + } catch (final InterruptedException ex) { + LOG.warn("Interrupted while sleeping between index setup retries"); + } + } + } + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java index 8513e70fe7..de8eb37f4e 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DynamicIndexManagerTests.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.IndicesClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; @@ -28,6 +29,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -165,4 +167,64 @@ public void missingDynamicIndexTest() throws IOException { JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(RandomStringUtils.randomAlphabetic(10), DYNAMIC)).build(); assertThrows(EventKeyNotFoundException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias))); } + + @Test + public void getIndexName_DoesNotRetryOnNonOpenSearchExceptions() throws IOException { + when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS); + String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias(); + String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC); + innerIndexManager = mock(IndexManager.class); + when(mockIndexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); + doThrow(new RuntimeException()) + .when(innerIndexManager).setupIndex(); + + JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build(); + assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias))); + + verify(innerIndexManager, times(1)).setupIndex(); + } + + @Test + public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilSuccess() throws IOException { + when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS); + when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), eq(IndexConstants.ISM_ENABLED_SETTING))) + .thenReturn("true"); + String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias(); + String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC); + innerIndexManager = mock(IndexManager.class); + when(mockIndexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); + doThrow(new OpenSearchException(new RuntimeException())) + .doThrow(new OpenSearchException(new RuntimeException())) + .doNothing() + .when(innerIndexManager).setupIndex(); + when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias); + + JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build(); + final String indexName = dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)); + assertThat(expectedIndexAlias, equalTo(indexName)); + + verify(innerIndexManager, times(3)).setupIndex(); + } + + @Test + public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilFailure() throws IOException { + when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS); + String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias(); + String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC); + innerIndexManager = mock(IndexManager.class); + when(mockIndexManagerFactory.getIndexManager( + IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager); + doThrow(new OpenSearchException(new RuntimeException())) + .doThrow(new OpenSearchException(new RuntimeException())) + .doThrow(new RuntimeException()) + .when(innerIndexManager).setupIndex(); + when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias); + + JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build(); + assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias))); + + verify(innerIndexManager, times(3)).setupIndex(); + } }