Skip to content

Commit

Permalink
Revert to using DynamoDB lease refresher
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 committed Jul 18, 2024
1 parent e9ac708 commit 0ca6cf0
Showing 1 changed file with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
Expand Down Expand Up @@ -146,10 +147,12 @@ public void start(final Buffer<Record<Event>> buffer) {
final ShardRecordProcessorFactory processorFactory = new KinesisShardRecordProcessorFactory(
buffer, codec, sourceConfig, acknowledgementSetManager, pluginMetrics);

MultiTenantMultiStreamConfigsBuilder configsBuilder = new MultiTenantMultiStreamConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
applicationName, kinesisClient, dynamoClient, cloudWatchClient, workerIdentifier, processorFactory);
configsBuilder.tableName("KinesisDynamoDBLeaseCoordinationTable");
ConfigsBuilder configsBuilder =
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
applicationName, kinesisClient, dynamoClient, cloudWatchClient,
workerIdentifier, processorFactory
).tableName(applicationName);

sourceConfig.getStreams().forEach(stream -> {
if (stream.getConsumerStrategy() == KinesisStreamConfig.ConsumerStrategy.POLLING) {
Expand All @@ -160,17 +163,11 @@ public void start(final Buffer<Record<Event>> buffer) {
}
});

MultiTenantDynamoDBLeaseManagementFactory multiTenantDynamoDBLeaseManagementFactory =
(MultiTenantDynamoDBLeaseManagementFactory) configsBuilder
.leaseManagementConfig()
.leaseManagementFactory(new MultiTenantDynamoDBLeaseSerializer(applicationName), true);

scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.billingMode(BillingMode.PAY_PER_REQUEST)
.leaseManagementFactory(multiTenantDynamoDBLeaseManagementFactory),
.billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
Expand Down

0 comments on commit 0ca6cf0

Please sign in to comment.