Skip to content

Commit

Permalink
Add ddb source fixes/improvements (#3676)
Browse files Browse the repository at this point in the history
Add ddb source fixes/improvements

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 16, 2023
1 parent 77d8812 commit 22647dc
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,15 @@ public Optional<SourcePartitionStoreItem> getAvailablePartition(final String own

// For ASSIGNED partitions we are sorting based on partitionOwnershipTimeout, so if any item has partitionOwnershipTimeout
// in the future, we can know that the remaining items will not be available.
if (SourcePartitionStatus.ASSIGNED.equals(sourcePartitionStatus) && Instant.now().isBefore(item.getPartitionOwnershipTimeout())) {
if (SourcePartitionStatus.ASSIGNED.equals(sourcePartitionStatus) && item.getPartitionOwnershipTimeout() != null &&
Instant.now().isBefore(item.getPartitionOwnershipTimeout())) {
return Optional.empty();
}

// For CLOSED partitions we are sorting based on reOpenAt time, so if any item has reOpenAt in the future,
// we can know that the remaining items will not be ready to be acquired again.
if (SourcePartitionStatus.CLOSED.equals(sourcePartitionStatus) && Instant.now().isBefore(item.getReOpenAt())) {
if (SourcePartitionStatus.CLOSED.equals(sourcePartitionStatus) && item.getReOpenAt() != null &&
Instant.now().isBefore(item.getReOpenAt())) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardManager;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void start(Buffer<Record<Event>> buffer) {
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator());
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DynamoDBSourceConfig {
private boolean acknowledgments = false;

@JsonProperty("shard_acknowledgment_timeout")
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(3);
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10);

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ public void run() {
while ((line = reader.readLine()) != null) {
if (shouldStop) {
checkpointer.checkpoint(lastLineProcessed);
LOG.debug("Should Stop flag is set to True, looks like shutdown has triggered");
throw new RuntimeException("Load is interrupted");
LOG.warn("Loading data file s3://{}/{} was interrupted by a shutdown signal, giving up ownership of data file", bucketName, key);
throw new RuntimeException("Loading data file interrupted");
}

lineCount += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class DataFileScheduler implements Runnable {
/**
* Default interval to acquire a lease from coordination store
*/
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000;
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000;

static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed";
static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers";
Expand Down Expand Up @@ -111,6 +111,9 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {
runLoader.whenComplete(completeDataLoader(dataFilePartition));
} else {
runLoader.whenComplete((v, ex) -> {
if (ex != null) {
coordinator.giveUpPartition(dataFilePartition);
}
numOfWorkers.decrementAndGet();
});
}
Expand Down Expand Up @@ -147,7 +150,7 @@ public void run() {
}
}
}
LOG.warn("Data file scheduler is interrupted, Stop all data file loaders...");
LOG.warn("Data file scheduler is interrupted, stopping all data file loaders...");
// Cannot call executor.shutdownNow() here
// Otherwise the final checkpoint will fail due to SDK interruption.
executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private String getOrCreateExportArn(ExportPartition exportPartition) {
return state.getExportArn();
}

LOG.info("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
LOG.info("Submitting a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
// submit a new export request
String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), state.getKmsKeyId(), exportPartition.getExportTime());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public class ShardConsumer implements Runnable {

private final Duration shardAcknowledgmentTimeout;

private final String shardId;

private long recordsWrittenToBuffer;

private ShardConsumer(Builder builder) {
this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient;
this.checkpointer = builder.checkpointer;
Expand All @@ -111,6 +115,8 @@ private ShardConsumer(Builder builder) {
recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics);
this.acknowledgementSet = builder.acknowledgementSet;
this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout;
this.shardId = builder.shardId;
this.recordsWrittenToBuffer = 0;
}

public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
Expand Down Expand Up @@ -138,6 +144,8 @@ static class Builder {

private boolean waitForExport;

private String shardId;

private AcknowledgementSet acknowledgementSet;
private Duration dataFileAcknowledgmentTimeout;

Expand All @@ -152,6 +160,11 @@ public Builder tableInfo(TableInfo tableInfo) {
return this;
}

public Builder shardId(final String shardId) {
this.shardId = shardId;
return this;
}

public Builder checkpointer(StreamCheckpointer checkpointer) {
this.checkpointer = checkpointer;
return this;
Expand Down Expand Up @@ -220,7 +233,9 @@ public void run() {
}

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Perform regular checkpointing for Shard Consumer");
if (shardId != null) {
LOG.info("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId);
}
checkpointer.checkpoint(sequenceNumber);
lastCheckpointTime = System.currentTimeMillis();
}
Expand All @@ -245,6 +260,7 @@ public void run() {
.filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime))
.collect(Collectors.toList());
recordConverter.writeToBuffer(acknowledgementSet, records);
recordsWrittenToBuffer += records.size();
long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli();
interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS;

Expand All @@ -260,17 +276,21 @@ public void run() {
}
}

// interrupted
if (shouldStop) {
// Do last checkpoint and then quit
LOG.warn("Processing for shard {} was interrupted by a shutdown signal, giving up shard", shardId);
checkpointer.checkpoint(sequenceNumber);
throw new RuntimeException("Consuming shard was interrupted from shutdown");
}

if (acknowledgementSet != null) {
checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout);
acknowledgementSet.complete();
}

// interrupted
if (shouldStop) {
// Do last checkpoint and then quit
LOG.error("Should Stop flag is set to True, looks like shutdown has triggered");
checkpointer.checkpoint(sequenceNumber);
throw new RuntimeException("Shard Consumer is interrupted");
if (shardId != null) {
LOG.info("Completed writing shard {} to buffer after reaching the end of the shard", shardId);
}

if (waitForExport) {
Expand Down Expand Up @@ -337,10 +357,10 @@ private boolean shouldSkip() {

Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime();
if (lastEventTime.isBefore(startTime)) {
LOG.info("LastShardIterator is provided, and Last Event Time is earlier than export time, skip processing");
LOG.info("LastShardIterator is provided, and Last Event Time is earlier than {}, skip processing", startTime);
return true;
} else {
LOG.info("LastShardIterator is provided, and Last Event Time is later than export time, start processing");
LOG.info("LastShardIterator is provided, and Last Event Time is later than {}, start processing", startTime);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Runnable createConsumer(final StreamPartition streamPartition,
final AcknowledgementSet acknowledgementSet,
final Duration shardAcknowledgmentTimeout) {

LOG.info("Try to start a Shard Consumer for " + streamPartition.getShardId());
LOG.info("Starting to consume shard " + streamPartition.getShardId());

// Check and get the current state.
Optional<StreamProgressState> progressState = streamPartition.getProgressState();
Expand All @@ -82,8 +82,7 @@ public Runnable createConsumer(final StreamPartition streamPartition,

String shardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber);
if (shardIterator == null) {
LOG.info("Unable to get a shard iterator, looks like the shard has expired");
LOG.error("Failed to start a Shard Consumer for " + streamPartition.getShardId());
LOG.error("Failed to start consuming shard '{}'. Unable to get a shard iterator for this shard, this shard may have expired", streamPartition.getShardId());
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,6 +29,7 @@
*/
public class StreamScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class);
private static final Logger SHARD_COUNT_LOGGER = LoggerFactory.getLogger("org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardCountLogger");

/**
* Max number of shards each node can handle in parallel
Expand All @@ -51,18 +53,22 @@ public class StreamScheduler implements Runnable {
private final AtomicLong shardsInProcessing;
private final AcknowledgementSetManager acknowledgementSetManager;
private final DynamoDBSourceConfig dynamoDBSourceConfig;
private final BackoffCalculator backoffCalculator;
private int noAvailableShardsCount = 0;


public StreamScheduler(final EnhancedSourceCoordinator coordinator,
final ShardConsumerFactory consumerFactory,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final DynamoDBSourceConfig dynamoDBSourceConfig) {
final DynamoDBSourceConfig dynamoDBSourceConfig,
final BackoffCalculator backoffCalculator) {
this.coordinator = coordinator;
this.consumerFactory = consumerFactory;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.dynamoDBSourceConfig = dynamoDBSourceConfig;
this.backoffCalculator = backoffCalculator;

executor = Executors.newFixedThreadPool(MAX_JOB_COUNT);
activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong());
Expand Down Expand Up @@ -93,6 +99,9 @@ private void processStreamPartition(StreamPartition streamPartition) {
if (acknowledgmentsEnabled) {
runConsumer.whenComplete((v, ex) -> {
numOfWorkers.decrementAndGet();
if (ex != null) {
coordinator.giveUpPartition(streamPartition);
}
if (numOfWorkers.get() == 0) {
activeChangeEventConsumers.decrementAndGet();
}
Expand All @@ -102,6 +111,10 @@ private void processStreamPartition(StreamPartition streamPartition) {
runConsumer.whenComplete(completeConsumer(streamPartition));
}
numOfWorkers.incrementAndGet();
if (numOfWorkers.get() % 10 == 0) {
SHARD_COUNT_LOGGER.info("Actively processing {} shards", numOfWorkers.get());
}

if (numOfWorkers.get() >= 1) {
activeChangeEventConsumers.incrementAndGet();
}
Expand All @@ -122,11 +135,14 @@ public void run() {
if (sourcePartition.isPresent()) {
StreamPartition streamPartition = (StreamPartition) sourcePartition.get();
processStreamPartition(streamPartition);
noAvailableShardsCount = 0;
} else {
noAvailableShardsCount++;
}
}

try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
Thread.sleep(backoffCalculator.calculateBackoffToAcquireNextShard(noAvailableShardsCount, numOfWorkers));
} catch (final InterruptedException e) {
LOG.info("InterruptedException occurred");
break;
Expand Down Expand Up @@ -162,12 +178,10 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) {
if (ex == null) {
LOG.info("Shard consumer for {} is completed", streamPartition.getShardId());
coordinator.completePartition(streamPartition);

} else {
// Do nothing
// The consumer must have already done one last checkpointing.
LOG.debug("Shard consumer completed with exception");
LOG.error(ex.toString());
LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex);
coordinator.giveUpPartition(streamPartition);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.utils;

import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.math.LongMath.pow;
import static com.google.common.primitives.Longs.min;
import static java.lang.Math.max;

public class BackoffCalculator {

private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class);

private static final Random RANDOM = new Random();

static final Duration STARTING_BACKOFF = Duration.ofMillis(500);
static final Duration MAX_BACKOFF_WITH_SHARDS = Duration.ofSeconds(15);
static final Duration MAX_BACKOFF_NO_SHARDS_ACQUIRED = Duration.ofSeconds(15);
static final int BACKOFF_RATE = 2;
static final Duration MAX_JITTER = Duration.ofSeconds(2);
static final Duration MIN_JITTER = Duration.ofSeconds(-2);

public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, final AtomicInteger shardsAcquired) {

// When no shards are available to process we backoff exponentially based on how many consecutive attempts have been made without getting a shard
// This limits calls to the coordination store
if (noAvailableShardCount > 0) {
if (noAvailableShardCount % 10 == 0) {
LOG.info("No shards acquired after {} attempts", noAvailableShardCount);
}

final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, noAvailableShardCount - 1) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()));
}

// When shards are being acquired we backoff linearly based on how many shards this node is actively processing, to encourage a fast start but still a balance of shards between nodes
return max(500, min(MAX_BACKOFF_WITH_SHARDS.toMillis(), shardsAcquired.get() * STARTING_BACKOFF.toMillis()));
}
}
Loading

0 comments on commit 22647dc

Please sign in to comment.