diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java index 159e04ccc7..ca806e36d5 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java @@ -231,13 +231,15 @@ public Optional getAvailablePartition(final String own // For ASSIGNED partitions we are sorting based on partitionOwnershipTimeout, so if any item has partitionOwnershipTimeout // in the future, we can know that the remaining items will not be available. - if (SourcePartitionStatus.ASSIGNED.equals(sourcePartitionStatus) && Instant.now().isBefore(item.getPartitionOwnershipTimeout())) { + if (SourcePartitionStatus.ASSIGNED.equals(sourcePartitionStatus) && item.getPartitionOwnershipTimeout() != null && + Instant.now().isBefore(item.getPartitionOwnershipTimeout())) { return Optional.empty(); } // For CLOSED partitions we are sorting based on reOpenAt time, so if any item has reOpenAt in the future, // we can know that the remaining items will not be ready to be acquired again. - if (SourcePartitionStatus.CLOSED.equals(sourcePartitionStatus) && Instant.now().isBefore(item.getReOpenAt())) { + if (SourcePartitionStatus.CLOSED.equals(sourcePartitionStatus) && item.getReOpenAt() != null && + Instant.now().isBefore(item.getReOpenAt())) { return Optional.empty(); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 26e868e5ef..45b51ac4f2 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -21,6 +21,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardManager; import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory; import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -94,7 +95,7 @@ public void start(Buffer> buffer) { Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer); - Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); + Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator()); // leader scheduler will handle the initialization Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java index 2a682ce280..5babe81a29 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java @@ -33,7 +33,7 @@ public class DynamoDBSourceConfig { private boolean acknowledgments = false; @JsonProperty("shard_acknowledgment_timeout") - private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(3); + private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10); @JsonProperty("s3_data_file_acknowledgment_timeout") private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(5); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java index 2cdf923e8f..650f5f33f4 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java @@ -184,8 +184,8 @@ public void run() { while ((line = reader.readLine()) != null) { if (shouldStop) { checkpointer.checkpoint(lastLineProcessed); - LOG.debug("Should Stop flag is set to True, looks like shutdown has triggered"); - throw new RuntimeException("Load is interrupted"); + LOG.warn("Loading data file s3://{}/{} was interrupted by a shutdown signal, giving up ownership of data file", bucketName, key); + throw new RuntimeException("Loading data file interrupted"); } lineCount += 1; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 5d9871cd9e..23b0888377 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -43,7 +43,7 @@ public class DataFileScheduler implements Runnable { /** * Default interval to acquire a lease from coordination store */ - private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000; + private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000; static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed"; static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers"; @@ -111,6 +111,9 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { runLoader.whenComplete(completeDataLoader(dataFilePartition)); } else { runLoader.whenComplete((v, ex) -> { + if (ex != null) { + coordinator.giveUpPartition(dataFilePartition); + } numOfWorkers.decrementAndGet(); }); } @@ -147,7 +150,7 @@ public void run() { } } } - LOG.warn("Data file scheduler is interrupted, Stop all data file loaders..."); + LOG.warn("Data file scheduler is interrupted, stopping all data file loaders..."); // Cannot call executor.shutdownNow() here // Otherwise the final checkpoint will fail due to SDK interruption. executor.shutdown(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 234455bf9e..a8a75e1447 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -264,7 +264,7 @@ private String getOrCreateExportArn(ExportPartition exportPartition) { return state.getExportArn(); } - LOG.info("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime()); + LOG.info("Submitting a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime()); // submit a new export request String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), state.getKmsKeyId(), exportPartition.getExportTime()); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 87491c3896..049065885e 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -99,6 +99,10 @@ public class ShardConsumer implements Runnable { private final Duration shardAcknowledgmentTimeout; + private final String shardId; + + private long recordsWrittenToBuffer; + private ShardConsumer(Builder builder) { this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient; this.checkpointer = builder.checkpointer; @@ -111,6 +115,8 @@ private ShardConsumer(Builder builder) { recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics); this.acknowledgementSet = builder.acknowledgementSet; this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout; + this.shardId = builder.shardId; + this.recordsWrittenToBuffer = 0; } public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer> buffer) { @@ -138,6 +144,8 @@ static class Builder { private boolean waitForExport; + private String shardId; + private AcknowledgementSet acknowledgementSet; private Duration dataFileAcknowledgmentTimeout; @@ -152,6 +160,11 @@ public Builder tableInfo(TableInfo tableInfo) { return this; } + public Builder shardId(final String shardId) { + this.shardId = shardId; + return this; + } + public Builder checkpointer(StreamCheckpointer checkpointer) { this.checkpointer = checkpointer; return this; @@ -220,7 +233,9 @@ public void run() { } if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { - LOG.debug("Perform regular checkpointing for Shard Consumer"); + if (shardId != null) { + LOG.info("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); + } checkpointer.checkpoint(sequenceNumber); lastCheckpointTime = System.currentTimeMillis(); } @@ -245,6 +260,7 @@ public void run() { .filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime)) .collect(Collectors.toList()); recordConverter.writeToBuffer(acknowledgementSet, records); + recordsWrittenToBuffer += records.size(); long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli(); interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS; @@ -260,17 +276,21 @@ public void run() { } } + // interrupted + if (shouldStop) { + // Do last checkpoint and then quit + LOG.warn("Processing for shard {} was interrupted by a shutdown signal, giving up shard", shardId); + checkpointer.checkpoint(sequenceNumber); + throw new RuntimeException("Consuming shard was interrupted from shutdown"); + } + if (acknowledgementSet != null) { checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); acknowledgementSet.complete(); } - // interrupted - if (shouldStop) { - // Do last checkpoint and then quit - LOG.error("Should Stop flag is set to True, looks like shutdown has triggered"); - checkpointer.checkpoint(sequenceNumber); - throw new RuntimeException("Shard Consumer is interrupted"); + if (shardId != null) { + LOG.info("Completed writing shard {} to buffer after reaching the end of the shard", shardId); } if (waitForExport) { @@ -337,10 +357,10 @@ private boolean shouldSkip() { Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime(); if (lastEventTime.isBefore(startTime)) { - LOG.info("LastShardIterator is provided, and Last Event Time is earlier than export time, skip processing"); + LOG.info("LastShardIterator is provided, and Last Event Time is earlier than {}, skip processing", startTime); return true; } else { - LOG.info("LastShardIterator is provided, and Last Event Time is later than export time, start processing"); + LOG.info("LastShardIterator is provided, and Last Event Time is later than {}, start processing", startTime); return false; } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index feb78a555d..6f17087c97 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -58,7 +58,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, final AcknowledgementSet acknowledgementSet, final Duration shardAcknowledgmentTimeout) { - LOG.info("Try to start a Shard Consumer for " + streamPartition.getShardId()); + LOG.info("Starting to consume shard " + streamPartition.getShardId()); // Check and get the current state. Optional progressState = streamPartition.getProgressState(); @@ -82,8 +82,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, String shardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber); if (shardIterator == null) { - LOG.info("Unable to get a shard iterator, looks like the shard has expired"); - LOG.error("Failed to start a Shard Consumer for " + streamPartition.getShardId()); + LOG.error("Failed to start consuming shard '{}'. Unable to get a shard iterator for this shard, this shard may have expired", streamPartition.getShardId()); return null; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index 34ea3fc6fb..eee6d5807e 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +29,7 @@ */ public class StreamScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); + private static final Logger SHARD_COUNT_LOGGER = LoggerFactory.getLogger("org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardCountLogger"); /** * Max number of shards each node can handle in parallel @@ -51,18 +53,22 @@ public class StreamScheduler implements Runnable { private final AtomicLong shardsInProcessing; private final AcknowledgementSetManager acknowledgementSetManager; private final DynamoDBSourceConfig dynamoDBSourceConfig; + private final BackoffCalculator backoffCalculator; + private int noAvailableShardsCount = 0; public StreamScheduler(final EnhancedSourceCoordinator coordinator, final ShardConsumerFactory consumerFactory, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, - final DynamoDBSourceConfig dynamoDBSourceConfig) { + final DynamoDBSourceConfig dynamoDBSourceConfig, + final BackoffCalculator backoffCalculator) { this.coordinator = coordinator; this.consumerFactory = consumerFactory; this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.dynamoDBSourceConfig = dynamoDBSourceConfig; + this.backoffCalculator = backoffCalculator; executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong()); @@ -93,6 +99,9 @@ private void processStreamPartition(StreamPartition streamPartition) { if (acknowledgmentsEnabled) { runConsumer.whenComplete((v, ex) -> { numOfWorkers.decrementAndGet(); + if (ex != null) { + coordinator.giveUpPartition(streamPartition); + } if (numOfWorkers.get() == 0) { activeChangeEventConsumers.decrementAndGet(); } @@ -102,6 +111,10 @@ private void processStreamPartition(StreamPartition streamPartition) { runConsumer.whenComplete(completeConsumer(streamPartition)); } numOfWorkers.incrementAndGet(); + if (numOfWorkers.get() % 10 == 0) { + SHARD_COUNT_LOGGER.info("Actively processing {} shards", numOfWorkers.get()); + } + if (numOfWorkers.get() >= 1) { activeChangeEventConsumers.incrementAndGet(); } @@ -122,11 +135,14 @@ public void run() { if (sourcePartition.isPresent()) { StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); processStreamPartition(streamPartition); + noAvailableShardsCount = 0; + } else { + noAvailableShardsCount++; } } try { - Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + Thread.sleep(backoffCalculator.calculateBackoffToAcquireNextShard(noAvailableShardsCount, numOfWorkers)); } catch (final InterruptedException e) { LOG.info("InterruptedException occurred"); break; @@ -162,12 +178,10 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) { if (ex == null) { LOG.info("Shard consumer for {} is completed", streamPartition.getShardId()); coordinator.completePartition(streamPartition); - } else { // Do nothing // The consumer must have already done one last checkpointing. - LOG.debug("Shard consumer completed with exception"); - LOG.error(ex.toString()); + LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex); coordinator.giveUpPartition(streamPartition); } }; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java new file mode 100644 index 0000000000..2065310665 --- /dev/null +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculator.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.dynamodb.utils; + +import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.math.LongMath.pow; +import static com.google.common.primitives.Longs.min; +import static java.lang.Math.max; + +public class BackoffCalculator { + + private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); + + private static final Random RANDOM = new Random(); + + static final Duration STARTING_BACKOFF = Duration.ofMillis(500); + static final Duration MAX_BACKOFF_WITH_SHARDS = Duration.ofSeconds(15); + static final Duration MAX_BACKOFF_NO_SHARDS_ACQUIRED = Duration.ofSeconds(15); + static final int BACKOFF_RATE = 2; + static final Duration MAX_JITTER = Duration.ofSeconds(2); + static final Duration MIN_JITTER = Duration.ofSeconds(-2); + + public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, final AtomicInteger shardsAcquired) { + + // When no shards are available to process we backoff exponentially based on how many consecutive attempts have been made without getting a shard + // This limits calls to the coordination store + if (noAvailableShardCount > 0) { + if (noAvailableShardCount % 10 == 0) { + LOG.info("No shards acquired after {} attempts", noAvailableShardCount); + } + + final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1)); + return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, noAvailableShardCount - 1) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())); + } + + // When shards are being acquired we backoff linearly based on how many shards this node is actively processing, to encourage a fast start but still a balance of shards between nodes + return max(500, min(MAX_BACKOFF_WITH_SHARDS.toMillis(), shardsAcquired.get() * STARTING_BACKOFF.toMillis())); + } +} diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index e0094edee4..34d5db1ad3 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import java.time.Duration; @@ -28,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -59,6 +61,9 @@ class StreamSchedulerTest { @Mock private DynamoDBSourceConfig dynamoDBSourceConfig; + @Mock + private BackoffCalculator backoffCalculator; + private StreamScheduler scheduler; @@ -112,10 +117,16 @@ void setup() { @Test public void test_normal_run() throws InterruptedException { + when(backoffCalculator.calculateBackoffToAcquireNextShard(eq(0), eq(new AtomicInteger(1)))) + .thenReturn(1L); + + when(backoffCalculator.calculateBackoffToAcquireNextShard(eq(1), any(AtomicInteger.class))) + .thenReturn(10000L); + when(consumerFactory.createConsumer(any(StreamPartition.class), eq(null), any(Duration.class))).thenReturn(() -> System.out.println("Hello")); when(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).thenReturn(Optional.of(streamPartition)).thenReturn(Optional.empty()); - scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); + scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, backoffCalculator); ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -128,6 +139,7 @@ public void test_normal_run() throws InterruptedException { verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer verify(consumerFactory).createConsumer(any(StreamPartition.class), eq(null), any(Duration.class)); + // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); @@ -139,6 +151,12 @@ public void test_normal_run() throws InterruptedException { @Test public void test_normal_run_with_acknowledgments() throws InterruptedException { + when(backoffCalculator.calculateBackoffToAcquireNextShard(eq(0), eq(new AtomicInteger(1)))) + .thenReturn(1L); + + when(backoffCalculator.calculateBackoffToAcquireNextShard(eq(1), any(AtomicInteger.class))) + .thenReturn(10000L); + given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty()); given(dynamoDBSourceConfig.isAcknowledgmentsEnabled()).willReturn(true); @@ -154,7 +172,7 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { when(consumerFactory.createConsumer(any(StreamPartition.class), eq(acknowledgementSet), eq(shardAcknowledgmentTimeout))).thenReturn(() -> System.out.println("Hello")); - scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); + scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, backoffCalculator); ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -168,6 +186,7 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer verify(consumerFactory).createConsumer(any(StreamPartition.class), any(AcknowledgementSet.class), any(Duration.class)); + // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); @@ -180,7 +199,7 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException { @Test void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException { given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willThrow(RuntimeException.class); - scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); + scheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, backoffCalculator); ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> scheduler.run()); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java new file mode 100644 index 0000000000..cfbbeaedca --- /dev/null +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/BackoffCalculatorTest.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.dynamodb.utils; + +import org.hamcrest.Matchers; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator.MAX_BACKOFF_NO_SHARDS_ACQUIRED; + +public class BackoffCalculatorTest { + + @ParameterizedTest + @MethodSource("countsToExpectedBackoffRange") + void calculateBackoffToAcquireNextShardReturnsExpectedBackoffValues( + final int noAvailableShardCount, + final int shardsAcquiredCount, + final long minExpectedBackoff, + final long maxExpectedBackoff + ) { + + final BackoffCalculator objectUnderTest = new BackoffCalculator(); + + final long backOffForShardCounts = objectUnderTest.calculateBackoffToAcquireNextShard(noAvailableShardCount, new AtomicInteger(shardsAcquiredCount)); + + assertThat(backOffForShardCounts, Matchers.greaterThanOrEqualTo(minExpectedBackoff)); + assertThat(backOffForShardCounts, Matchers.lessThanOrEqualTo(maxExpectedBackoff)); + } + + private static Stream countsToExpectedBackoffRange() { + return Stream.of( + Arguments.of(0, 0, 500, 500), + Arguments.of(0, 1, 500, 500), + Arguments.of(0, 2, 1000, 1000), + Arguments.of(0, 29, 14_500, 14_500), + Arguments.of(0, 30, 15_000, 15_000), + Arguments.of(2, 1, 1, 3_000), + Arguments.of(3, 0, 1, 4_000), + Arguments.of(4, 2, 2_000, 6_000), + Arguments.of(5, 6, 6_000, 10_000), + Arguments.of(6, 6, 14_000, 15_000), + Arguments.of(8, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()) + ); + } +}