Skip to content

Commit

Permalink
Improve logging message for no shards found to indicate that export m… (
Browse files Browse the repository at this point in the history
#3681)

Improve logging message for no shards found to indicate that export may still be ongoing

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 16, 2023
1 parent 22647dc commit 92224c2
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void start(Buffer<Record<Event>> buffer) {
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator());
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null));
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,30 @@ public class BackoffCalculator {
static final Duration MAX_JITTER = Duration.ofSeconds(2);
static final Duration MIN_JITTER = Duration.ofSeconds(-2);

private final boolean isExportConfigured;



public BackoffCalculator(final boolean isExportConfigured) {
this.isExportConfigured = isExportConfigured;
}

public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, final AtomicInteger shardsAcquired) {

// When no shards are available to process we backoff exponentially based on how many consecutive attempts have been made without getting a shard
// This limits calls to the coordination store
if (noAvailableShardCount > 0) {
if (noAvailableShardCount % 10 == 0) {
LOG.info("No shards acquired after {} attempts", noAvailableShardCount);
if (noAvailableShardCount % 50 == 0 && shardsAcquired.get() == 0) {
String errorMessage = String.format("No new shards acquired after %s attempts. This means that all shards are currently being consumed", noAvailableShardCount);

if (isExportConfigured) {
errorMessage += ", or that the export is still in progress. New shards will not be consumed until the export is fully processed.";
}
LOG.info(errorMessage);
}

final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, noAvailableShardCount - 1) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(noAvailableShardCount - 1, 8)) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()));
}

// When shards are being acquired we backoff linearly based on how many shards this node is actively processing, to encourage a fast start but still a balance of shards between nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void calculateBackoffToAcquireNextShardReturnsExpectedBackoffValues(
final long maxExpectedBackoff
) {

final BackoffCalculator objectUnderTest = new BackoffCalculator();
final BackoffCalculator objectUnderTest = new BackoffCalculator(false);

final long backOffForShardCounts = objectUnderTest.calculateBackoffToAcquireNextShard(noAvailableShardCount, new AtomicInteger(shardsAcquiredCount));

Expand All @@ -47,7 +47,8 @@ private static Stream<Arguments> countsToExpectedBackoffRange() {
Arguments.of(4, 2, 2_000, 6_000),
Arguments.of(5, 6, 6_000, 10_000),
Arguments.of(6, 6, 14_000, 15_000),
Arguments.of(8, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())
Arguments.of(8, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis()),
Arguments.of(70, 2, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis(), MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc

static long calculateExponentialBackoffAndJitter(final int retryCount) {
final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, retryCount - 1) + jitterMillis, MAX_BACKOFF.toMillis()));
return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(retryCount - 1, 10)) + jitterMillis, MAX_BACKOFF.toMillis()));
}
}

0 comments on commit 92224c2

Please sign in to comment.