From 22101847c048bcc2c829f1610599166fc8040b35 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Mon, 15 Jul 2024 09:38:53 -0700 Subject: [PATCH] Kinesis source implementation using Data Prepper source coordination. Signed-off-by: Souvik Bose --- data-prepper-plugins/kinesis-source/README.md | 1 + .../kinesis-source/build.gradle | 40 ++ .../kinesis/AwsAuthenticationOptions.java | 104 +++++ .../kinesis/KinesisMultiStreamTracker.java | 80 ++++ .../source/kinesis/KinesisService.java | 115 +++++ .../plugins/source/kinesis/KinesisSource.java | 99 +++++ .../source/kinesis/KinesisSourceConfig.java | 74 ++++ .../source/kinesis/KinesisStreamConfig.java | 46 ++ .../converter/MetadataKeyAttributes.java | 24 ++ .../kinesis/converter/RecordConverter.java | 87 ++++ .../converter/StreamRecordConverter.java | 116 +++++ .../coordination/PartitionFactory.java | 39 ++ .../coordination/partition/GlobalState.java | 61 +++ .../partition/LeaderPartition.java | 53 +++ .../partition/StreamPartition.java | 64 +++ .../state/LeaderProgressState.java | 30 ++ .../state/StreamProgressState.java | 56 +++ .../kinesis/leader/LeaderScheduler.java | 320 ++++++++++++++ .../source/kinesis/leader/ShardCache.java | 96 +++++ .../source/kinesis/leader/ShardManager.java | 218 ++++++++++ .../source/kinesis/stream/ShardConsumer.java | 408 ++++++++++++++++++ .../kinesis/stream/ShardConsumerFactory.java | 161 +++++++ .../kinesis/stream/StreamCheckpointer.java | 66 +++ .../kinesis/stream/StreamScheduler.java | 190 ++++++++ .../kinesis/utils/BackoffCalculator.java | 60 +++ .../utils/KinesisSourceAggregateMetrics.java | 58 +++ .../source/kinesis/utils/TableUtil.java | 26 ++ settings.gradle | 3 +- 28 files changed, 2694 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/kinesis-source/README.md create mode 100644 data-prepper-plugins/kinesis-source/build.gradle create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTracker.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisService.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSource.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSourceConfig.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisStreamConfig.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/MetadataKeyAttributes.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/RecordConverter.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/StreamRecordConverter.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/PartitionFactory.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/GlobalState.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/LeaderPartition.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/StreamPartition.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/LeaderProgressState.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/StreamProgressState.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/LeaderScheduler.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardCache.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardManager.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumer.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumerFactory.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamCheckpointer.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamScheduler.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/BackoffCalculator.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/KinesisSourceAggregateMetrics.java create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/TableUtil.java diff --git a/data-prepper-plugins/kinesis-source/README.md b/data-prepper-plugins/kinesis-source/README.md new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/README.md @@ -0,0 +1 @@ + diff --git a/data-prepper-plugins/kinesis-source/build.gradle b/data-prepper-plugins/kinesis-source/build.gradle new file mode 100644 index 0000000000..1388c443e7 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/build.gradle @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:buffer-common') + implementation libs.armeria.core + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation project(':data-prepper-plugins:blocking-buffer') + implementation 'software.amazon.awssdk:kinesis' + // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-ion + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' + + // https://mvnrepository.com/artifact/software.amazon.kinesis/amazon-kinesis-client + implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0' + compileOnly 'org.projectlombok:lombok:1.18.20' + annotationProcessor 'org.projectlombok:lombok:1.18.20' + + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { //in addition to core projects rule + limit { + minimum = 0.0 + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/AwsAuthenticationOptions.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..a752649af0 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/AwsAuthenticationOptions.java @@ -0,0 +1,104 @@ +package org.opensearch.dataprepper.plugins.source.kinesis; + +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +public class AwsAuthenticationOptions { + private static final String AWS_IAM_ROLE = "role"; + private static final String AWS_IAM = "iam"; + + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_external_id") + @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") + private String awsStsExternalId; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public String getAwsStsExternalId() { + return awsStsExternalId; + } + + public Map getAwsStsHeaderOverrides() { + return awsStsHeaderOverrides; + } + + public AwsCredentialsProvider authenticateAwsConfiguration() { + + final AwsCredentialsProvider awsCredentialsProvider; + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + + validateStsRoleArn(); + + final StsClient stsClient = StsClient.builder() + .region(getAwsRegion()) + .build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("aws-secret-" + UUID.randomUUID()) + .roleArn(awsStsRoleArn); + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() + .stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()) + .build(); + + } else { + // use default credential provider + awsCredentialsProvider = DefaultCredentialsProvider.create(); + } + + return awsCredentialsProvider; + } + + private void validateStsRoleArn() { + final Arn arn = getArn(); + if (!AWS_IAM.equals(arn.service())) { + throw new IllegalArgumentException("sts_role_arn must be an IAM Role"); + } + final Optional resourceType = arn.resource().resourceType(); + if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) { + throw new IllegalArgumentException("sts_role_arn must be an IAM Role"); + } + } + + private Arn getArn() { + try { + return Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn)); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTracker.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTracker.java new file mode 100644 index 0000000000..a637b67f14 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisMultiStreamTracker.java @@ -0,0 +1,80 @@ +package org.opensearch.dataprepper.plugins.source.kinesis; + +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.StreamDescription; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy; +import software.amazon.kinesis.processor.MultiStreamTracker; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + + +public class KinesisMultiStreamTracker implements MultiStreamTracker { + private static final String COLON = ":"; + + private final KinesisAsyncClient kinesisClient; + private final KinesisSourceConfig sourceConfig; + private final String applicationName; + + public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) { + this.kinesisClient = kinesisClient; + this.sourceConfig = sourceConfig; + this.applicationName = applicationName; + } + + @Override + public List streamConfigList() { + List streamConfigList = new ArrayList<>(); + for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) { + StreamConfig streamConfig; + try { + streamConfig = getStreamConfig(kinesisStreamConfig); + } catch (Exception e) { + throw new RuntimeException(e); + } + streamConfigList.add(streamConfig); + } + return streamConfigList; + } + + private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) throws Exception { + StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig); + return new StreamConfig(sourceStreamIdentifier, + InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition())); + } + + private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) throws Exception { + DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() + .streamName(kinesisStreamConfig.getName()) + .build(); + DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).get(); + String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription()); + return StreamIdentifier.multiStreamInstance(streamIdentifierString); + } + + private String getStreamIdentifierString(StreamDescription streamDescription) { + String accountId = streamDescription.streamARN().split(COLON)[4]; + long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond(); + return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond)); + } + + /** + * Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec + */ + @Override + public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() { + return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() { + @Override + public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofSeconds(10); + } + }; + + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisService.java new file mode 100644 index 0000000000..9f786766b3 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisService.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.kinesis.leader.LeaderScheduler; +import org.opensearch.dataprepper.plugins.source.kinesis.leader.ShardManager; +import org.opensearch.dataprepper.plugins.source.kinesis.stream.ShardConsumerFactory; +import org.opensearch.dataprepper.plugins.source.kinesis.stream.StreamScheduler; +import org.opensearch.dataprepper.plugins.source.kinesis.utils.BackoffCalculator; +import org.opensearch.dataprepper.plugins.source.kinesis.utils.KinesisSourceAggregateMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.KinesisClientUtil; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class KinesisService { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisService.class); + +// private final List tableConfigs; + + private final EnhancedSourceCoordinator coordinator; + + private final DynamoDbAsyncClient dynamoDbClient; + + private final KinesisSourceConfig kinesisSourceConfig; + // + private final KinesisAsyncClient kinesisClient; + +// private final S3Client s3Client; + + private final ShardManager shardManager; + + private final ExecutorService executor; + + private final PluginMetrics pluginMetrics; + + private final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics; + + private final AcknowledgementSetManager acknowledgementSetManager; + + + public KinesisService(final EnhancedSourceCoordinator coordinator, + final KinesisSourceConfig sourceConfig, + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager) { + this.coordinator = coordinator; + this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + this.kinesisSourceConfig = sourceConfig; + this.kinesisSourceAggregateMetrics = new KinesisSourceAggregateMetrics(); + + // Initialize AWS clients + dynamoDbClient = DynamoDbAsyncClient.builder() + .credentialsProvider(sourceConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) + .region(sourceConfig.getAwsAuthenticationOptions().getAwsRegion()) + .build(); + this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient( + KinesisAsyncClient.builder() + .credentialsProvider(sourceConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) + .region(sourceConfig.getAwsAuthenticationOptions().getAwsRegion()) + ); + + // A shard manager is responsible to retrieve the shard information from streams. + shardManager = new ShardManager(kinesisClient, kinesisSourceAggregateMetrics); +// tableConfigs = sourceConfig.getTableConfigs(); + executor = Executors.newFixedThreadPool(2); + } + + /** + * This service start three long-running threads (scheduler) + * Each thread is responsible for one type of job. + * The data will be guaranteed to be sent to {@link Buffer} in order. + * + * @param buffer Data Prepper Buffer + */ + public void start(Buffer> buffer) { + + LOG.info("Start running Kinesis service"); + + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, kinesisClient, pluginMetrics, kinesisSourceAggregateMetrics, buffer, kinesisSourceConfig.getStreams().get(0)); + Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, kinesisSourceConfig, new BackoffCalculator(false)); + // leader scheduler will handle the initialization + Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, kinesisSourceConfig); + + // May consider start or shutdown the scheduler on demand + // Currently, event after the exports are done, the related scheduler will not be shutdown + // This is because in the future we may support incremental exports. + executor.submit(leaderScheduler); + executor.submit(streamScheduler); + } + + /** + * Interrupt the running of schedulers. + * Each scheduler must implement logic for gracefully shutdown. + */ + public void shutdown() { + LOG.info("shutdown DynamoDB schedulers"); + executor.shutdownNow(); + } + +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSource.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSource.java new file mode 100644 index 0000000000..f1c481333f --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSource.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.PartitionFactory; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.LeaderPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.function.Function; + +@DataPrepperPlugin(name = "kinesis", pluginType = Source.class, pluginConfigurationType = KinesisSourceConfig.class) +public class KinesisSource implements Source>, UsesEnhancedSourceCoordination { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); + + private final PluginMetrics pluginMetrics; + + private final KinesisSourceConfig sourceConfig; + + private final PluginFactory pluginFactory; + + private final AcknowledgementSetManager acknowledgementSetManager; + + private EnhancedSourceCoordinator coordinator; + + private KinesisService kinesisService; + + private final boolean acknowledgementsEnabled; + + @DataPrepperPluginConstructor + public KinesisSource(final PluginMetrics pluginMetrics, + final KinesisSourceConfig sourceConfig, + final PluginFactory pluginFactory, + final AcknowledgementSetManager acknowledgementSetManager) { + LOG.info("Create KinesisSource Source"); + this.pluginMetrics = pluginMetrics; + this.sourceConfig = sourceConfig; + this.pluginFactory = pluginFactory; + this.acknowledgementSetManager = acknowledgementSetManager; + this.acknowledgementsEnabled = sourceConfig.isAcknowledgments(); + } + + @Override + public boolean areAcknowledgementsEnabled() { + return acknowledgementsEnabled; + } + + @Override + public void start(Buffer> buffer) { + Objects.requireNonNull(coordinator); + + coordinator.createPartition(new LeaderPartition()); + + // Create DynamoDB Service + kinesisService = new KinesisService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager); + + LOG.info("Start DynamoDB service"); + kinesisService.start(buffer); + } + + + @Override + public void stop() { + LOG.info("Stop DynamoDB Source"); + if (Objects.nonNull(kinesisService)) { + kinesisService.shutdown(); + } + + } + + @Override + public void setEnhancedSourceCoordinator(final EnhancedSourceCoordinator sourceCoordinator) { + coordinator = sourceCoordinator; + coordinator.initialize(); + } + + @Override + public Function getPartitionFactory() { + return new PartitionFactory(); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSourceConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSourceConfig.java new file mode 100644 index 0000000000..fd89c0f999 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisSourceConfig.java @@ -0,0 +1,74 @@ +package org.opensearch.dataprepper.plugins.source.kinesis; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; +import lombok.Getter; +import org.opensearch.dataprepper.model.configuration.PluginModel; + +import java.time.Duration; +import java.util.List; + + +public class KinesisSourceConfig { + + // Whether or not to do regular checkpointing. + // It's a good practice to do regular checkpointing to avoid unknown exception occurred. + // Otherwise, only when shutdown or shard end will trigger checkpointing + private static final boolean DEFAULT_ENABLE_CHECKPOINT = false; + + private static final Duration DEFAULT_TIME_OUT_IN_MILLIS = Duration.ofMillis(10000); + + @Getter + @JsonProperty("streams") + @NotNull + @Valid + @Size(min = 1, max = 4, message = "Only support a maximum of 4 streams") + private List streams; + + @Getter + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @Getter + @JsonProperty("codec") + private PluginModel codec; + + @Getter + @JsonProperty("enable_checkpoint") + private boolean enableCheckPoint = DEFAULT_ENABLE_CHECKPOINT; + + @Getter + @JsonProperty("buffer_timeout") + private Duration bufferTimeout = DEFAULT_TIME_OUT_IN_MILLIS; + + @JsonProperty("acknowledgments") + @Getter + private boolean acknowledgments = false; + + @JsonProperty("shard_acknowledgment_timeout") + private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(10); + + public Duration getShardAcknowledgmentTimeout() { + return shardAcknowledgmentTimeout; + } +} + +/** + * Fail over time in milliseconds. A worker which does not renew it's lease within this time interval + * will be regarded as having problems and it's shards will be assigned to other workers. + * For applications that have a large number of shards, this may be set to a higher number to reduce + * the number of DynamoDB IOPS required for tracking leases. + * + *

Default value: 10000L

+ +private long failoverTimeMillis = 10000L; + + >>> Add Failover time (ms) for lease expiration. +**/ + + + diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisStreamConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisStreamConfig.java new file mode 100644 index 0000000000..a50331a8a4 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/KinesisStreamConfig.java @@ -0,0 +1,46 @@ +package org.opensearch.dataprepper.plugins.source.kinesis; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import software.amazon.kinesis.common.InitialPositionInStream; + + +@Getter +public class KinesisStreamConfig { + + @JsonProperty("stream_name") + @NotNull + @Valid + private String name; + + @JsonProperty("stream_arn") + private String arn; + + @JsonProperty("initial_position") + private InitialPositionInStream initialPosition = InitialPositionInStream.LATEST; + + + @JsonProperty("consumer_strategy") + private ConsumerStrategy consumerStrategy = ConsumerStrategy.POLLING; + + enum ConsumerStrategy { + + POLLING("Polling"), + + ENHANCED_FAN_OUT("Fan-Out"); + + private final String value; + + ConsumerStrategy(String value) { + this.value = value; + } + + @JsonValue + public String getValue() { + return value; + } + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/MetadataKeyAttributes.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/MetadataKeyAttributes.java new file mode 100644 index 0000000000..3ac0650142 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/MetadataKeyAttributes.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.converter; + +public class MetadataKeyAttributes { + static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key"; + + static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key"; + + static final String SORT_KEY_METADATA_ATTRIBUTE = "sort_key"; + + static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "dynamodb_timestamp"; + + static final String EVENT_VERSION_FROM_TIMESTAMP = "document_version"; + + static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action"; + + static final String DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE = "dynamodb_event_name"; + + static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name"; +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/RecordConverter.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/RecordConverter.java new file mode 100644 index 0000000000..9d1bae2b85 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/RecordConverter.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.converter; + +import com.fasterxml.jackson.databind.JsonNode; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.record.Record; + +import java.math.BigDecimal; +import java.util.Map; + +import static org.opensearch.dataprepper.plugins.source.kinesis.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.kinesis.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; + +/** + * Base Record Processor definition. + * The record processor is to transform the source data into a JacksonEvent, + * and then write to buffer. + */ +public abstract class RecordConverter { + + private static final String DEFAULT_ACTION = OpenSearchBulkActions.INDEX.toString(); + + private final BufferAccumulator> bufferAccumulator; + + public RecordConverter(final BufferAccumulator> bufferAccumulator) { + this.bufferAccumulator = bufferAccumulator; + } + + abstract String getEventType(); + + /** + * Extract the value based on attribute map + * + * @param data A map of attribute name and value + * @param attributeName Attribute name + * @return the related attribute value, return null if the attribute name doesn't exist. + */ + private String getAttributeValue(final Map data, String attributeName) { + if (data.containsKey(attributeName)) { + final Object value = data.get(attributeName); + if (value instanceof Number) { + return new BigDecimal(value.toString()).toPlainString(); + } + return String.valueOf(value); + } + return null; + } + + void flushBuffer() throws Exception { + bufferAccumulator.flush(); + } + + /** + * Add event record to buffer + * + * @param eventCreationTimeMillis Creation timestamp of the event + * @throws Exception Exception if failed to write to buffer. + */ + public void addToBuffer(final AcknowledgementSet acknowledgementSet, + final JsonNode jsonNode, + final long eventCreationTimeMillis, + final long eventVersionNumber) throws Exception { + Event event = JacksonEvent.builder() + .withEventType(getEventType()) + .withData(jsonNode) + .build(); + + // Only set external origination time for stream events, not export + EventMetadata eventMetadata = event.getMetadata(); + eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis); + eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber); + + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + bufferAccumulator.add(new Record<>(event)); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/StreamRecordConverter.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/StreamRecordConverter.java new file mode 100644 index 0000000000..eb9737a4a2 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/converter/StreamRecordConverter.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.converter; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.source.kinesis.KinesisStreamConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.io.ByteArrayInputStream; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public class StreamRecordConverter extends RecordConverter { + private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class); + + + static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed"; + static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors"; + static final String BYTES_RECEIVED = "bytesReceived"; + static final String BYTES_PROCESSED = "bytesProcessed"; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() { + }; + + private final KinesisStreamConfig streamConfig; + + private final PluginMetrics pluginMetrics; + + private final Counter changeEventSuccessCounter; + private final Counter changeEventErrorCounter; + private final DistributionSummary bytesReceivedSummary; + private final DistributionSummary bytesProcessedSummary; + + private Instant currentSecond; + private int recordsSeenThisSecond = 0; + + public StreamRecordConverter(final BufferAccumulator> bufferAccumulator, + final PluginMetrics pluginMetrics, + final KinesisStreamConfig streamConfig) { + super(bufferAccumulator); + this.pluginMetrics = pluginMetrics; + this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); + this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); + this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); + this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); + this.streamConfig = streamConfig; + + } + + @Override + String getEventType() { + return "STREAM"; + } + + + public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List records) { + + int eventCount = 0; + for (Record record : records) { + final long bytes = record.data().asByteArray().length; + try { + bytesReceivedSummary.record(bytes); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(record.data().asByteArray()); + JsonNode jsonNode = new ObjectMapper().readValue(byteArrayInputStream, JsonNode.class); + + final long eventCreationTimeMillis = calculateTieBreakingVersionFromTimestamp(record.approximateArrivalTimestamp()); + addToBuffer(acknowledgementSet, jsonNode, record.approximateArrivalTimestamp().toEpochMilli(), eventCreationTimeMillis); + bytesProcessedSummary.record(bytes); + eventCount++; + } catch (Exception e) { + // will this cause too many logs? + LOG.error("Failed to add event to buffer due to {}", e.getMessage()); + changeEventErrorCounter.increment(); + } + } + + try { + flushBuffer(); + changeEventSuccessCounter.increment(eventCount); + } catch (Exception e) { + LOG.error("Failed to write {} events to buffer due to {}", eventCount, e.getMessage()); + changeEventErrorCounter.increment(eventCount); + } + } + + private long calculateTieBreakingVersionFromTimestamp(final Instant eventTimeInSeconds) { + if (currentSecond == null) { + currentSecond = eventTimeInSeconds; + } else if (currentSecond.isAfter(eventTimeInSeconds)) { + return eventTimeInSeconds.getEpochSecond() * 1_000_000; + } else if (currentSecond.isBefore(eventTimeInSeconds)) { + recordsSeenThisSecond = 0; + currentSecond = eventTimeInSeconds; + } else { + recordsSeenThisSecond++; + } + + return eventTimeInSeconds.getEpochSecond() * 1_000_000 + recordsSeenThisSecond; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/PartitionFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/PartitionFactory.java new file mode 100644 index 0000000000..408712797e --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/PartitionFactory.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.coordination; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.StreamPartition; + + +import java.util.function.Function; + +/** + * Special partition factory just for this DynamoDB source. + */ +public class PartitionFactory implements Function { + + + @Override + public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem) { + String sourceIdentifier = partitionStoreItem.getSourceIdentifier(); + String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1); + + if (StreamPartition.PARTITION_TYPE.equals(partitionType)) { + return new StreamPartition(partitionStoreItem); + } else if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) { + return new LeaderPartition(partitionStoreItem); + } else { + // Unable to acquire other partitions. + return new GlobalState(partitionStoreItem); + } + } + + +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/GlobalState.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/GlobalState.java new file mode 100644 index 0000000000..de0e4b9605 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/GlobalState.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; + +import java.util.Map; +import java.util.Optional; + +/** + * Global State is a special type of partition. The partition type is null. + * You can't acquire (own) a Global State. + * However, you can read and update Global State whenever required. + * The progress state is a Map object. + */ +public class GlobalState extends EnhancedSourcePartition> { + + private final String stateName; + + private Map state; + + public GlobalState(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.stateName = sourcePartitionStoreItem.getSourcePartitionKey(); + this.state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState()); + } + + public GlobalState(String stateName, Optional> state) { + this.stateName = stateName; + this.state = state.orElse(null); + + } + + @Override + public String getPartitionType() { + return null; + } + + @Override + public String getPartitionKey() { + return stateName; + } + + @Override + public Optional> getProgressState() { + if (state != null) { + return Optional.of(state); + } + return Optional.empty(); + } + + public void setProgressState(Map state) { + this.state = state; + } + + +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/LeaderPartition.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/LeaderPartition.java new file mode 100644 index 0000000000..4f93430701 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/LeaderPartition.java @@ -0,0 +1,53 @@ +package org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.state.LeaderProgressState; + +import java.util.ArrayList; +import java.util.Optional; + +/** + *

A LeaderPartition is for some tasks that should be done in a single node only.

+ *

Hence whatever node owns the lease of this partition will be acted as a 'leader'.

+ *

In this DynamoDB source design, a leader node will be responsible for:

+ *
    + *
  • Initialization process
  • + *
  • Regular Shard Discovery
  • + *
+ */ +public class LeaderPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "LEADER"; + + private static final String DEFAULT_PARTITION_KEY = "GLOBAL"; + + private final LeaderProgressState state; + + public LeaderPartition() { + this.state = new LeaderProgressState(); + this.state.setInitialized(false); + this.state.setStreamArns(new ArrayList<>()); + } + + public LeaderPartition(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.state = convertStringToPartitionProgressState(LeaderProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return DEFAULT_PARTITION_KEY; + } + + @Override + public Optional getProgressState() { + return Optional.of(state); + } + +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/StreamPartition.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/StreamPartition.java new file mode 100644 index 0000000000..8f9a0d3e93 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/partition/StreamPartition.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.state.StreamProgressState; + +import java.util.Optional; + +public class StreamPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "STREAM"; + + private final String streamArn; + + private final String shardId; + + private final StreamProgressState state; + + public StreamPartition(String streamArn, String shardId, Optional state) { + this.streamArn = streamArn; + this.shardId = shardId; + this.state = state.orElse(null); + } + + public StreamPartition(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|"); + streamArn = keySplits[0]; + shardId = keySplits[1]; + this.state = convertStringToPartitionProgressState(StreamProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return streamArn + "|" + shardId; + } + + @Override + public Optional getProgressState() { + if (state != null) { + return Optional.of(state); + } + return Optional.empty(); + } + + public String getStreamArn() { + return streamArn; + } + + public String getShardId() { + return shardId; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/LeaderProgressState.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/LeaderProgressState.java new file mode 100644 index 0000000000..5b3ffcc90c --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/LeaderProgressState.java @@ -0,0 +1,30 @@ +package org.opensearch.dataprepper.plugins.source.kinesis.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class LeaderProgressState { + + @JsonProperty("initialized") + private boolean initialized = false; + + @JsonProperty("streamArns") + private List streamArns; + + public boolean isInitialized() { + return initialized; + } + + public void setInitialized(boolean initialized) { + this.initialized = initialized; + } + + public List getStreamArns() { + return streamArns; + } + + public void setStreamArns(List streamArns) { + this.streamArns = streamArns; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/StreamProgressState.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/StreamProgressState.java new file mode 100644 index 0000000000..fe76f3fc2a --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/coordination/state/StreamProgressState.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class StreamProgressState { + + @JsonProperty("startTime") + private long startTime; + + @JsonProperty("sequenceNumber") + private String sequenceNumber; + + + @JsonProperty("waitForExport") + private boolean waitForExport = false; + + @JsonProperty("endingSequenceNumber") + private String endingSequenceNumber; + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public void setSequenceNumber(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public boolean shouldWaitForExport() { + return waitForExport; + } + + public void setWaitForExport(boolean waitForExport) { + this.waitForExport = waitForExport; + } + + public String getEndingSequenceNumber() { + return endingSequenceNumber; + } + + public void setEndingSequenceNumber(String endingSequenceNumber) { + this.endingSequenceNumber = endingSequenceNumber; + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/LeaderScheduler.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/LeaderScheduler.java new file mode 100644 index 0000000000..4df0ecf172 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/LeaderScheduler.java @@ -0,0 +1,320 @@ +package org.opensearch.dataprepper.plugins.source.kinesis.leader; + +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.kinesis.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.source.kinesis.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.state.LeaderProgressState; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.state.StreamProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class LeaderScheduler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class); + + /** + * Default duration to extend the timeout of lease + */ + private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3; + + /** + * Default interval to run lease check and shard discovery + */ + private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1); + + private final EnhancedSourceCoordinator coordinator; + + private final DynamoDbAsyncClient dynamoDbClient; + + private final ShardManager shardManager; + private final KinesisSourceConfig sourceConfig; + private final Duration leaseInterval; + + private LeaderPartition leaderPartition; + + private List streamArns; + + public LeaderScheduler(EnhancedSourceCoordinator coordinator, final DynamoDbAsyncClient dynamoDbClient, ShardManager shardManager, final KinesisSourceConfig sourceConfig) { + this(coordinator, dynamoDbClient, shardManager, sourceConfig, DEFAULT_LEASE_INTERVAL); + } + + LeaderScheduler(EnhancedSourceCoordinator coordinator, + final DynamoDbAsyncClient dynamoDbClient, + ShardManager shardManager, + final KinesisSourceConfig sourceConfig, + Duration leaseInterval) { + this.coordinator = coordinator; + this.dynamoDbClient = dynamoDbClient; + this.shardManager = shardManager; + this.sourceConfig = sourceConfig; + this.leaseInterval = leaseInterval; + } + + @Override + public void run() { + LOG.debug("Starting Leader Scheduler for initialization and shard discovery"); + + while (!Thread.currentThread().isInterrupted()) { + try { + // Try to acquire the lease if not owned. + if (leaderPartition == null) { + final Optional sourcePartition = coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + LOG.info("Running as a LEADER node"); + leaderPartition = (LeaderPartition) sourcePartition.get(); + } + } + // Once owned, run Normal LEADER node process. + // May want to quit this scheduler if streaming is not required + if (leaderPartition != null) { + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + if (!leaderProgressState.isInitialized()) { + LOG.debug("The service is not been initialized"); + init(); + } else { + // The initialization process will populate that value, otherwise, get from state + if (streamArns == null) { + streamArns = leaderProgressState.getStreamArns(); + } + } + + if (streamArns != null && !streamArns.isEmpty()) { + // Step 1: Run shard discovery + streamArns.forEach(streamArn -> { + shardManager.runDiscovery(streamArn); + }); + + // Step 2: Search all completed shards in the last 1 day (maximum time) + List sourcePartitions = coordinator.queryCompletedPartitions( + StreamPartition.PARTITION_TYPE, + Instant.now().minus(Duration.ofDays(1)) + ); + + // Step 3: Find and create children partitions. + compareAndCreateChildrenPartitions(sourcePartitions); + } + + } + + } catch (Exception e) { + LOG.error("Exception occurred in primary scheduling loop", e); + } finally { + if(leaderPartition != null) { + // Extend the timeout + // will always be a leader until shutdown + coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); + } + try { + Thread.sleep(leaseInterval.toMillis()); + } catch (final InterruptedException e) { + LOG.info("InterruptedException occurred"); + break; + } + } + } + // Should Stop + LOG.warn("Quitting Leader Scheduler"); + if (leaderPartition != null) { + coordinator.giveUpPartition(leaderPartition); + } + } + + private void init() { + LOG.info("Try to initialize DynamoDB service"); + List streamNames = sourceConfig.getStreams().stream().map(KinesisStreamConfig::getArn).collect(Collectors.toList()); + streamArns = new ArrayList<>(); + + streamNames.forEach(streamName -> { + // Create a Global state in the coordination table for the configuration. + // Global State here is designed to be able to read whenever needed + // So that the jobs can refer to the configuration. + coordinator.createPartition(new GlobalState(streamName, Optional.empty())); + + Instant startTime = Instant.now(); +// if (tableInfo.getMetadata().isStreamRequired()) { + // TODO: Revisit the use of start position. + // The behaviour is same for all cases regardless the configuration provided. + // Only process event from current date time but still traverse the shards from the beginning. + List shards = shardManager.runDiscovery(streamName); + List childIds = shards.stream().map(shard -> shard.shardId()).collect(Collectors.toList()); + // Create for root shards. + + List rootShards = shards.stream() + .filter(shard -> shard.parentShardId() == null || !childIds.contains(shard.parentShardId())) + .collect(Collectors.toList()); + LOG.info("Found {} root shards in total", rootShards.size()); + rootShards.forEach(shard -> { + createRootStreamPartition(streamName, shard, startTime, true); + }); + streamArns.add(streamName); +// } + }); + + LOG.debug("Update initialization state"); + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + leaderProgressState.setStreamArns(streamArns); + leaderProgressState.setInitialized(true); + } + + + /** + * Compare and find all child shards. + * Then try to create a stream partition for each. + */ + private void compareAndCreateChildrenPartitions(List sourcePartitions) { + + if (sourcePartitions == null || sourcePartitions.isEmpty()) { + return; + } + long startTime = System.currentTimeMillis(); + // Get the list of completed shard Ids. + List completedShardIds = sourcePartitions.stream() + .map(sourcePartition -> ((StreamPartition) sourcePartition).getShardId()) + .collect(Collectors.toList()); + // Try to Create a stream partition for each child shards that have been found and not completed yet. + // If a shard is already created, it won't be created again. + sourcePartitions.forEach(sourcePartition -> { + StreamPartition streamPartition = (StreamPartition) sourcePartition; + List childShardIds = shardManager.findChildShardIds(streamPartition.getStreamArn(), streamPartition.getShardId()); + if (childShardIds != null && !childShardIds.isEmpty()) { + childShardIds.forEach( + shardId -> { + if (!completedShardIds.contains(shardId)) { + createChildStreamPartition(streamPartition, shardId); + } + } + ); + } + }); + long endTime = System.currentTimeMillis(); + LOG.debug("Compare and create children partitions took {} milliseconds", endTime - startTime); + } + + + /** + * Conduct Metadata info for table and also perform validation on configuration. + * Once created, the info should not be changed. + */ +// private TableInfo getTableInfo(TableConfig tableConfig) { +// String tableName = TableUtil.getTableNameFromArn(tableConfig.getTableArn()); +// DescribeTableResponse describeTableResult; +// try { +// // Need to call describe table to get the Key schema for table +// // The key schema will be used when adding the metadata to event. +// DescribeTableRequest req = DescribeTableRequest.builder() +// .tableName(tableName) +// .build(); +// +// describeTableResult = dynamoDbClient.describeTable(req); +// } catch (Exception e) { +// LOG.error("Unable to call DescribeTableRequest to get information for table {} due to {}", tableName, e.getMessage()); +// throw new RuntimeException("Unable to get table information for " + tableName + ". Please make sure the permission is properly set"); +// } +// +// Map keys = describeTableResult.table().keySchema().stream().collect(Collectors.toMap( +// e -> e.keyTypeAsString(), e -> e.attributeName() +// )); +// // Validate if PITR is turn on or not for exports. +// if (tableConfig.getExportConfig() != null) { +// String status = getContinuousBackupsStatus(tableName); +// LOG.debug("The PITR status for table " + tableName + " is " + status); +// if (!"ENABLED".equals(status)) { +// String errorMessage = "Point-in-time recovery (PITR) needs to be enabled for exporting data from table " + tableConfig.getTableArn(); +// LOG.error(errorMessage); +// throw new InvalidPluginConfigurationException(errorMessage); +// } +// } +// +// StreamStartPosition streamStartPosition = null; +// +// if (tableConfig.getStreamConfig() != null) { +// // Validate if DynamoDB Stream is turn on or not +// if (describeTableResult.table().streamSpecification() == null) { +// String errorMessage = "Stream is not enabled for table " + tableConfig.getTableArn(); +// LOG.error(errorMessage); +// throw new InvalidPluginConfigurationException(errorMessage); +// } +// // Validate view type of DynamoDB stream +// String viewType = describeTableResult.table().streamSpecification().streamViewTypeAsString(); +// LOG.debug("The stream view type for table " + tableName + " is " + viewType); +// List supportedType = List.of("NEW_IMAGE", "NEW_AND_OLD_IMAGES"); +// if (!supportedType.contains(viewType)) { +// String errorMessage = "Stream " + tableConfig.getTableArn() + " is enabled with " + viewType + ". Supported types are " + supportedType; +// LOG.error(errorMessage); +// throw new InvalidPluginConfigurationException(errorMessage); +// } +// streamStartPosition = tableConfig.getStreamConfig().getStartPosition(); +// } +// +// // Conduct metadata info +// // May consider to remove export bucket and prefix +// TableMetadata metadata = TableMetadata.builder() +// .partitionKeyAttributeName(keys.get("HASH")) +// .sortKeyAttributeName(keys.get("RANGE")) +// .streamArn(describeTableResult.table().latestStreamArn()) +// .streamRequired(tableConfig.getStreamConfig() != null) +// .exportRequired(tableConfig.getExportConfig() != null) +// .streamStartPosition(streamStartPosition) // Will be ignored +// .exportBucket(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Bucket()) +// .exportPrefix(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Prefix()) +// .exportKmsKeyId(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3SseKmsKeyId()) +// .build(); +// return new TableInfo(tableConfig.getTableArn(), metadata); +// } + +// private String getContinuousBackupsStatus(String tableName) { +// // Validate Point in time recovery is enabled or not +// try { +// DescribeContinuousBackupsRequest req = DescribeContinuousBackupsRequest.builder() +// .tableName(tableName) +// .build(); +// DescribeContinuousBackupsResponse resp = dynamoDbClient.describeContinuousBackups(req); +// return resp.continuousBackupsDescription().pointInTimeRecoveryDescription().pointInTimeRecoveryStatus().toString(); +// } catch (Exception e) { +// LOG.error("Unable to call describeContinuousBackupsRequest for table {} due to {}", tableName, e.getMessage()); +// throw new RuntimeException("Unable to check if point in time recovery is enabled or not for " + tableName + ". Please make sure the permission is properly set"); +// } +// } + + + /** + * Create a partition for a stream job in the coordination table. + * + * @param streamArn Stream Arn + * @param shard A {@link Shard} + * @param exportTime the start time for change events, any change events with creation datetime before this should be ignored. + */ + private void createRootStreamPartition(String streamArn, Shard shard, Instant exportTime, boolean waitForExport) { + StreamProgressState streamProgressState = new StreamProgressState(); + streamProgressState.setWaitForExport(waitForExport); + streamProgressState.setStartTime(exportTime.toEpochMilli()); + streamProgressState.setEndingSequenceNumber(shard.sequenceNumberRange().endingSequenceNumber()); + coordinator.createPartition(new StreamPartition(streamArn, shard.shardId(), Optional.of(streamProgressState))); + } + + /** + * Create a stream partition for a child shard ID. Some of the information are getting from parent. + */ + private void createChildStreamPartition(StreamPartition streamPartition, String childShardId) { + StreamProgressState parentStreamProgressState = streamPartition.getProgressState().get(); + StreamProgressState streamProgressState = new StreamProgressState(); + streamProgressState.setStartTime(parentStreamProgressState.getStartTime()); + streamProgressState.setEndingSequenceNumber(shardManager.getEndingSequenceNumber(childShardId)); + streamProgressState.setWaitForExport(parentStreamProgressState.shouldWaitForExport()); + StreamPartition partition = new StreamPartition(streamPartition.getStreamArn(), childShardId, Optional.of(streamProgressState)); + coordinator.createPartition(partition); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardCache.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardCache.java new file mode 100644 index 0000000000..f958e650cf --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardCache.java @@ -0,0 +1,96 @@ +package org.opensearch.dataprepper.plugins.source.kinesis.leader; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * A Caching store for quickly finding child shards by a given parent shard ID. + * Should create one for each stream in case of conflicts of shard Ids + */ +public class ShardCache { + + /** + * Common prefix for shard ID + */ + private static final String DEFAULT_SHARD_ID_PREFIX = "shardId-"; + + /** + * A cache in HashMap, where key is parent shard id, and the value is the list of child shard Ids. + */ + private final Map> cache; + + public ShardCache() { + this.cache = new HashMap<>(); + } + + /** + * Add a parent-child shard pair to cache. + * + * @param shardId Shard ID + * @param parentShardId Parent Shard ID + */ + public void put(final String shardId, final String parentShardId) { + Objects.requireNonNull(shardId); + if (parentShardId != null && !parentShardId.isEmpty()) { + String trimedParentShardId = removeShardIdPrefix(parentShardId); + String trimedShardId = removeShardIdPrefix(shardId); + List childShards = cache.getOrDefault(trimedParentShardId, new ArrayList<>()); + childShards.add(trimedShardId); + cache.put(trimedParentShardId, childShards); + } + } + + + /** + * Get child shard ids by parent shard id from cache. + * If none is found, return null. + * + * @param parentShardId + * @return a list of Child Shard IDs + */ + public List get(String parentShardId) { + List childShardIds = cache.get(removeShardIdPrefix(parentShardId)); + if (childShardIds == null) { + return null; + } + return childShardIds.stream().map(this::appendShardIdPrefix).collect(Collectors.toList()); + } + + /** + * Clean up cache + */ + public void clear() { + cache.clear(); + } + + + /** + * Get cache size + * + * @return size of the map + */ + public int size() { + return cache.size(); + } + + + /** + * Remove the common prefix to save space + */ + private String removeShardIdPrefix(String shardId) { + return shardId.substring(DEFAULT_SHARD_ID_PREFIX.length()); + } + + /** + * Append the common prefix back when retrieval + */ + private String appendShardIdPrefix(String shardId) { + return DEFAULT_SHARD_ID_PREFIX + shardId; + } + + +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardManager.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardManager.java new file mode 100644 index 0000000000..a224c71873 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/leader/ShardManager.java @@ -0,0 +1,218 @@ +package org.opensearch.dataprepper.plugins.source.kinesis.leader; + +import org.opensearch.dataprepper.plugins.source.kinesis.utils.KinesisSourceAggregateMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * A general manager class to handle shard related task + */ +public class ShardManager { + + private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); + + /** + * Max number of shards to return in the DescribeStream API call, maximum 100. + */ + private static final int MAX_SHARD_COUNT = 100; + + /** + * Default interval to clean up cache and rebuild + */ + private static final int DEFAULT_CLEAN_UP_CACHE_INTERVAL_MILLS = 10 * 60_000; + + /** + * A map for all streams, where key is streamArn, and the value is the related {@link StreamInfo}. + */ + private final Map streamMap; + + /** + * A map for storing ending sequence number, where key is shard ID and the value is the ending sequence number. + */ + private Map endingSequenceNumberMap; + + private final KinesisAsyncClient streamsClient; + private final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics; + + public ShardManager(final KinesisAsyncClient streamsClient, final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics) { + this.streamsClient = streamsClient; + this.kinesisSourceAggregateMetrics = kinesisSourceAggregateMetrics; + streamMap = new HashMap<>(); + endingSequenceNumberMap = new HashMap<>(); + } + + /** + *

This is the main process for shard discovery (listing shards using DescribeStream API). + * It will use the last evaluated shard ID to speed up the listing, + * but still run a full listing on a regular basis.

+ * + *

Everytime the process run, it also builds the internal caching store, + * which will be used to find child shards for a given parent.

+ * + * @param streamArn Stream ARN + * @return a list of {@link Shard} + */ + public List runDiscovery(String streamArn) { + StreamInfo streamInfo = streamMap.get(streamArn); + + if (streamInfo == null) { + streamInfo = new StreamInfo(); + streamInfo.setLastCacheBuildTime(System.currentTimeMillis()); + streamInfo.setLastEvaluatedShardId(null); + streamInfo.setShardCache(new ShardCache()); + streamMap.put(streamArn, streamInfo); + } + + ShardCache shardCache = streamInfo.getShardCache(); + if (System.currentTimeMillis() - streamInfo.getLastCacheBuildTime() > DEFAULT_CLEAN_UP_CACHE_INTERVAL_MILLS) { + LOG.debug("Perform regular rebuild of cache."); + // Reset the mask + streamInfo.setLastEvaluatedShardId(null); + streamInfo.setLastCacheBuildTime(System.currentTimeMillis()); + // Clean up existing cache. + shardCache.clear(); + endingSequenceNumberMap.clear(); + } + + + LOG.debug("Last evaluated shard ID is " + streamInfo.getLastEvaluatedShardId()); + List shards = listShards(streamArn, streamInfo.getLastEvaluatedShardId()); + // build/update cache + if (!shards.isEmpty()) { + shards.forEach(shard -> { + shardCache.put(shard.shardId(), shard.parentShardId()); + }); + + if (streamInfo.getLastEvaluatedShardId() == null) { + endingSequenceNumberMap = shards.stream() + .filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() != null) + .collect(Collectors.toMap( + Shard::shardId, + shard -> shard.sequenceNumberRange().endingSequenceNumber() + )); + } + LOG.debug("New last evaluated shard ID is " + shards.get(shards.size() - 1).shardId()); + streamInfo.setLastEvaluatedShardId(shards.get(shards.size() - 1).shardId()); + } + return shards; + } + + /** + * Ending sequence number is used when trying to create a closed shard. + * + * @param shardId Shard ID + * @return the related ending sequence number if any, otherwise return null; + */ + public String getEndingSequenceNumber(String shardId) { + // May change this if multiple tables are supported. + return endingSequenceNumberMap.get(shardId); + } + + /** + * Finding child shards from cache + * + * @param streamArn Stream ARN + * @param parentShardId Parent Shard IDs + * @return a list of shard IDs + */ + public List findChildShardIds(String streamArn, String parentShardId) { + StreamInfo streamInfo = streamMap.get(streamArn); + if (streamInfo == null) { + return Collections.emptyList(); + } + + ShardCache shardCache = streamInfo.getShardCache(); + return shardCache.get(parentShardId); + } + + /** + * List all shards using DescribeStream API. + * + * @param streamArn Stream Arn + * @param lastEvaluatedShardId Start shard id for listing, useful when trying to get child shards. If not provided, all shards will be returned. + * @return A list of {@link Shard} + */ + private List listShards(String streamArn, String lastEvaluatedShardId) { + LOG.debug("Start listing all shards for stream {}", streamArn); + long startTime = System.currentTimeMillis(); + // Get all the shard IDs from the stream. + List shards = new ArrayList<>(); + + try { + do { + DescribeStreamRequest req = DescribeStreamRequest.builder() + .streamARN(streamArn) + .limit(MAX_SHARD_COUNT) + .exclusiveStartShardId(lastEvaluatedShardId) + .build(); + + kinesisSourceAggregateMetrics.getStreamApiInvocations().increment(); + CompletableFuture describeStreamResult = streamsClient.describeStream(req); + shards.addAll(describeStreamResult.get().streamDescription().shards()); + + // If LastEvaluatedShardId is set, + // at least one more page of shard IDs to retrieve + lastEvaluatedShardId = describeStreamResult.get().streamDescription().keyId(); + + } while (lastEvaluatedShardId != null); + } catch (final SdkException e) { + LOG.error("Received an exception from DynamoDB while listing shards: {}", e.getMessage()); + kinesisSourceAggregateMetrics.getStream4xxErrors().increment(); + return shards; + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + + long endTime = System.currentTimeMillis(); + LOG.info("Listing shards (DescribeStream call) took {} milliseconds with {} shards found", endTime - startTime, shards.size()); + return shards; + } + + + /** + * Extra state for shard discovery for each stream + */ + class StreamInfo { + private String lastEvaluatedShardId; + private long lastCacheBuildTime; + private ShardCache shardCache; + + public String getLastEvaluatedShardId() { + return lastEvaluatedShardId; + } + + public void setLastEvaluatedShardId(String lastEvaluatedShardId) { + this.lastEvaluatedShardId = lastEvaluatedShardId; + } + + public long getLastCacheBuildTime() { + return lastCacheBuildTime; + } + + public void setLastCacheBuildTime(long lastCacheBuildTime) { + this.lastCacheBuildTime = lastCacheBuildTime; + } + + public ShardCache getShardCache() { + return shardCache; + } + + public void setShardCache(ShardCache shardCache) { + this.shardCache = shardCache; + } + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumer.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumer.java new file mode 100644 index 0000000000..c5bcd0c079 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumer.java @@ -0,0 +1,408 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.stream; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.kinesis.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.source.kinesis.converter.StreamRecordConverter; +import org.opensearch.dataprepper.plugins.source.kinesis.utils.KinesisSourceAggregateMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * A basic data consumer to read from one shard + */ +public class ShardConsumer implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class); + + /** + * A flag to interrupt the process + */ + private static volatile boolean shouldStop = false; + + /** + * An overlap added between the event creation time and the export time + */ + private static final Duration STREAM_EVENT_OVERLAP_TIME = Duration.ofMinutes(5); + + /** + * Max number of items to return per GetRecords call, maximum 1000. + */ + private static final int MAX_GET_RECORD_ITEM_COUNT = 1000; + + /** + * Idle Time between GetRecords Reads + */ + private static final int GET_RECORD_INTERVAL_MILLS = 300; + + /** + * Idle Time between GetRecords Reads + */ + private static final int MINIMUM_GET_RECORD_INTERVAL_MILLS = 10; + + /** + * Minimum Idle Time between GetRecords Reads + */ + private static final long GET_RECORD_DELAY_THRESHOLD_MILLS = 15_000; + + /** + * Default interval to check if export is completed. + */ + private static final int DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS = 60_000; + + + /** + * Default number of times in the wait for export to do regular checkpoint. + */ + private static final int DEFAULT_WAIT_COUNT_TO_CHECKPOINT = 5; + + /** + * Default regular checkpoint interval + */ + private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000; + + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + static final String SHARD_PROGRESS = "shardProgress"; + + + private final KinesisAsyncClient kinesisClient; + + private final StreamRecordConverter recordConverter; + + private final StreamCheckpointer checkpointer; + + private String shardIterator; + + private final String lastShardIterator; + + private final Instant startTime; + + private boolean waitForExport; + + private final AcknowledgementSet acknowledgementSet; + + private final Duration shardAcknowledgmentTimeout; + + private final String shardId; + + private final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics; + + private final Counter shardProgress; + + private long recordsWrittenToBuffer; + + private ShardConsumer(Builder builder) { + this.shardProgress = builder.pluginMetrics.counter(SHARD_PROGRESS); + this.kinesisClient = builder.kinesisClient; + this.checkpointer = builder.checkpointer; + this.shardIterator = builder.shardIterator; + this.lastShardIterator = builder.lastShardIterator; + // Introduce an overlap + this.startTime = builder.startTime == null ? Instant.MIN : builder.startTime.minus(STREAM_EVENT_OVERLAP_TIME); + this.waitForExport = builder.waitForExport; + final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + recordConverter = new StreamRecordConverter(bufferAccumulator, builder.pluginMetrics, builder.streamConfig); + this.acknowledgementSet = builder.acknowledgementSet; + this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout; + this.shardId = builder.shardId; + this.recordsWrittenToBuffer = 0; + this.kinesisSourceAggregateMetrics = builder.kinesisSourceAggregateMetrics; + } + + public static Builder builder(final KinesisAsyncClient kinesisClient, + final PluginMetrics pluginMetrics, + final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics, + final Buffer> buffer, + final KinesisStreamConfig streamConfig) { + return new Builder(kinesisClient, pluginMetrics, kinesisSourceAggregateMetrics, buffer, streamConfig); + } + + + static class Builder { + + private final KinesisAsyncClient kinesisClient; + + private final PluginMetrics pluginMetrics; + + private final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics; + + private final Buffer> buffer; + + //private TableInfo tableInfo; + + private StreamCheckpointer checkpointer; + + private String shardIterator; + + private String lastShardIterator; + + private Instant startTime; + + private boolean waitForExport; + + private String shardId; + + private AcknowledgementSet acknowledgementSet; + private Duration dataFileAcknowledgmentTimeout; + + private KinesisStreamConfig streamConfig; + + public Builder(final KinesisAsyncClient kinesisClient, + final PluginMetrics pluginMetrics, + final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics, + final Buffer> buffer, + final KinesisStreamConfig streamConfig) { + this.kinesisClient = kinesisClient; + this.pluginMetrics = pluginMetrics; + this.kinesisSourceAggregateMetrics = kinesisSourceAggregateMetrics; + this.buffer = buffer; + this.streamConfig = streamConfig; + } + +// public Builder tableInfo(TableInfo tableInfo) { +// this.tableInfo = tableInfo; +// return this; +// } + + public Builder shardId(final String shardId) { + this.shardId = shardId; + return this; + } + + public Builder checkpointer(StreamCheckpointer checkpointer) { + this.checkpointer = checkpointer; + return this; + } + + public Builder shardIterator(String shardIterator) { + this.shardIterator = shardIterator; + return this; + } + + public Builder lastShardIterator(String lastShardIterator) { + this.lastShardIterator = lastShardIterator; + return this; + } + + public Builder startTime(Instant startTime) { + this.startTime = startTime; + return this; + } + + public Builder waitForExport(boolean waitForExport) { + this.waitForExport = waitForExport; + return this; + } + + public Builder acknowledgmentSet(AcknowledgementSet acknowledgementSet) { + this.acknowledgementSet = acknowledgementSet; + return this; + } + + public Builder acknowledgmentSetTimeout(Duration dataFileAcknowledgmentTimeout) { + this.dataFileAcknowledgmentTimeout = dataFileAcknowledgmentTimeout; + return this; + } + + public ShardConsumer build() { + return new ShardConsumer(this); + } + + } + + + @Override + public void run() { + LOG.debug("Shard Consumer start to run..."); + // Check should skip processing or not. + if (shouldSkip()) { + shardProgress.increment(); + if (acknowledgementSet != null) { + checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); + acknowledgementSet.complete(); + } + return; + } + + long lastCheckpointTime = System.currentTimeMillis(); + String sequenceNumber = ""; + int interval; + List records; + + while (!shouldStop) { + if (shardIterator == null) { + // End of Shard + LOG.debug("Reached end of shard"); + checkpointer.checkpoint(sequenceNumber); + break; + } + + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); + checkpointer.checkpoint(sequenceNumber); + lastCheckpointTime = System.currentTimeMillis(); + } + + GetRecordsResponse response = callGetRecords(shardIterator); + shardIterator = response.nextShardIterator(); + if (!response.records().isEmpty()) { + // Always use the last sequence number for checkpoint + sequenceNumber = response.records().get(response.records().size() - 1).sequenceNumber(); + Instant lastEventTime = response.records().get(response.records().size() - 1).approximateArrivalTimestamp(); + + if (lastEventTime.isBefore(startTime)) { + LOG.debug("Get {} events before start time, ignore...", response.records().size()); + continue; + } + if (waitForExport) { + checkpointer.checkpoint(sequenceNumber); + waitForExport(); + waitForExport = false; + } + records = response.records().stream() + .filter(record -> record.approximateArrivalTimestamp().isAfter(startTime)) + .collect(Collectors.toList()); + recordConverter.writeToBuffer(acknowledgementSet, records); + shardProgress.increment(); + recordsWrittenToBuffer += records.size(); + long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli(); + interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS; + + } else { + interval = GET_RECORD_INTERVAL_MILLS; + shardProgress.increment(); + } + + try { + // Idle between get records call. + Thread.sleep(interval); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // interrupted + if (shouldStop) { + // Do last checkpoint and then quit + LOG.warn("Processing for shard {} was interrupted by a shutdown signal, giving up shard", shardId); + checkpointer.checkpoint(sequenceNumber); + throw new RuntimeException("Consuming shard was interrupted from shutdown"); + } + + if (acknowledgementSet != null) { + checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); + acknowledgementSet.complete(); + } + + if (waitForExport) { + waitForExport(); + } + } + + /** + * Wrap of GetRecords call + */ + private GetRecordsResponse callGetRecords(String shardIterator) { + // Use the shard iterator to read the stream records + GetRecordsRequest req = GetRecordsRequest.builder() + .shardIterator(shardIterator) + .limit(MAX_GET_RECORD_ITEM_COUNT) + .build(); + + try { + kinesisSourceAggregateMetrics.getStreamApiInvocations().increment(); + CompletableFuture response = kinesisClient.getRecords(req); + return response.get(); + } catch(final InternalServerErrorException ex) { + kinesisSourceAggregateMetrics.getStream5xxErrors().increment(); + throw new RuntimeException(ex.getMessage()); + } catch (final Exception e) { + kinesisSourceAggregateMetrics.getStream4xxErrors().increment(); + throw new RuntimeException(e.getMessage()); + } + + } + + private void waitForExport() { + LOG.debug("Start waiting for export to be done and loaded"); + int numberOfWaits = 0; + while (!checkpointer.isExportDone()) { + LOG.debug("Export is in progress, wait..."); + try { + shardProgress.increment(); + Thread.sleep(DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS); + // The wait for export may take a long time + // Need to extend the timeout of the ownership in the coordination store. + // Otherwise, the lease will expire. + numberOfWaits++; + if (numberOfWaits % DEFAULT_WAIT_COUNT_TO_CHECKPOINT == 0) { + // To extend the timeout of lease + checkpointer.checkpoint(null); + } + } catch (InterruptedException e) { + LOG.error("Wait for export is interrupted ({})", e.getMessage()); + // Directly quit the process + throw new RuntimeException("Wait for export is interrupted."); + } + } + } + + /** + * Only to skip processing when below two conditions are met. + * - Last Shard Iterator is provided (Shard with ending sequence number) + * - Last Event Timestamp is later than start time or No Last Event Timestamp (empty shard) + */ + private boolean shouldSkip() { + // Do skip check + if (lastShardIterator != null && !lastShardIterator.isEmpty()) { + GetRecordsResponse response = callGetRecords(lastShardIterator); + if (response.records().isEmpty()) { + // Empty shard + LOG.info("LastShardIterator is provided, but there is no Last Event Time, skip processing"); + return true; + } + + Instant lastEventTime = response.records().get(response.records().size() - 1).approximateArrivalTimestamp(); + if (lastEventTime.isBefore(startTime)) { + LOG.info("LastShardIterator is provided, and Last Event Time is earlier than {}, skip processing", startTime); + return true; + } else { + LOG.info("LastShardIterator is provided, and Last Event Time is later than {}, start processing", startTime); + return false; + } + } + + return false; + } + + + /** + * Currently, this is to stop all consumers. + */ + public static void stopAll() { + shouldStop = true; + } + + +} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumerFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumerFactory.java new file mode 100644 index 0000000000..17cd07ffda --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/ShardConsumerFactory.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.stream; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.kinesis.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.source.kinesis.utils.KinesisSourceAggregateMetrics; +import org.opensearch.dataprepper.plugins.source.kinesis.utils.TableUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.kinesis.common.KinesisRequestsBuilder; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Factory class to create shard consumers + */ +public class ShardConsumerFactory { + private static final Logger LOG = LoggerFactory.getLogger(ShardConsumerFactory.class); + + + private final KinesisAsyncClient streamsClient; + + private final EnhancedSourceCoordinator enhancedSourceCoordinator; + private final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics; + private final PluginMetrics pluginMetrics; + private final Buffer> buffer; + private final KinesisStreamConfig streamConfig; + + public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordinator, + final KinesisAsyncClient streamsClient, + final PluginMetrics pluginMetrics, + final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics, + final Buffer> buffer, + final KinesisStreamConfig streamConfig) { + this.streamsClient = streamsClient; + this.enhancedSourceCoordinator = enhancedSourceCoordinator; + this.kinesisSourceAggregateMetrics = kinesisSourceAggregateMetrics; + this.pluginMetrics = pluginMetrics; + this.buffer = buffer; + this.streamConfig = streamConfig; + } + + public Runnable createConsumer(final StreamPartition streamPartition, + final AcknowledgementSet acknowledgementSet, + final Duration shardAcknowledgmentTimeout) { + + LOG.info("Starting to consume shard " + streamPartition.getShardId()); + + // Check and get the current state. + Optional progressState = streamPartition.getProgressState(); + String sequenceNumber = null; + String lastShardIterator = null; + Instant startTime = null; + boolean waitForExport = false; + if (progressState.isPresent()) { + // We can't checkpoint with acks yet + sequenceNumber = acknowledgementSet == null ? null : progressState.get().getSequenceNumber(); + waitForExport = progressState.get().shouldWaitForExport(); + if (progressState.get().getStartTime() != 0) { + startTime = Instant.ofEpochMilli(progressState.get().getStartTime()); + } + // If ending sequence number is present, get the shardIterator for last record + String endingSequenceNumber = progressState.get().getEndingSequenceNumber(); + if (endingSequenceNumber != null && !endingSequenceNumber.isEmpty()) { + lastShardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), endingSequenceNumber); + } + } + + String shardIterator = getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber); + if (shardIterator == null) { + LOG.error("Failed to start consuming shard '{}'. Unable to get a shard iterator for this shard, this shard may have expired", streamPartition.getShardId()); + return null; + } + + StreamCheckpointer checkpointer = new StreamCheckpointer(enhancedSourceCoordinator, streamPartition); + String tableArn = TableUtil.getTableArnFromStreamArn(streamPartition.getStreamArn()); + + LOG.debug("Create shard consumer for {} with shardIter {}", streamPartition.getShardId(), shardIterator); + LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator); + ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, kinesisSourceAggregateMetrics, buffer, streamConfig) + //.tableInfo(tableInfo) + .checkpointer(checkpointer) + .shardIterator(shardIterator) + .shardId(streamPartition.getShardId()) + .lastShardIterator(lastShardIterator) + .startTime(startTime) + .waitForExport(waitForExport) + .acknowledgmentSet(acknowledgementSet) + .acknowledgmentSetTimeout(shardAcknowledgmentTimeout) + .build(); + return shardConsumer; + } + + /** + * Get a shard iterator to start reading stream records from a shard. + * If sequence number is provided, use AT_SEQUENCE_NUMBER to retrieve the iterator, + * otherwise use TRIM_HORIZON to retrieve the iterator. + *

+ * Note that the shard may be expired, if so, null will be returned. + *

+ * + * @param streamArn Stream Arn + * @param shardId Shard Id + * @param sequenceNumber The last Sequence Number processed if any + * @return A shard iterator. + */ + public String getShardIterator(String streamArn, String shardId, String sequenceNumber) { + LOG.debug("Get Initial Shard Iter for {}", shardId); + GetShardIteratorRequest getShardIteratorRequest; + + if (sequenceNumber != null && !sequenceNumber.isEmpty()) { + LOG.debug("Get Shard Iterator at {}", sequenceNumber); + // There may be an overlap for 1 record + getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder() + .shardId(shardId) + .streamARN(streamArn) + .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .startingSequenceNumber(sequenceNumber) + .build(); + } else { + LOG.debug("Get Shard Iterator from beginning (TRIM_HORIZON) for shard {}", shardId); + getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder() + .shardId(shardId) + .streamARN(streamArn) + .shardIteratorType(ShardIteratorType.LATEST) + .build(); + } + + try { + kinesisSourceAggregateMetrics.getStreamApiInvocations().increment(); + CompletableFuture getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); + return getShardIteratorResult.get().shardIterator(); + } catch (SdkException e) { + kinesisSourceAggregateMetrics.getStream4xxErrors().increment(); + LOG.error("Exception when trying to get the shard iterator due to {}", e.getMessage()); + return null; + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamCheckpointer.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamCheckpointer.java new file mode 100644 index 0000000000..3a1f431858 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamCheckpointer.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.stream; + +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.state.StreamProgressState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Optional; + +/** + * A helper class to handle the stream partition status and the progress state + * It will use coordinator APIs under the hood. + */ +public class StreamCheckpointer { + private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointer.class); + + static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5); + + private final EnhancedSourceCoordinator coordinator; + + private final StreamPartition streamPartition; + + public StreamCheckpointer(EnhancedSourceCoordinator coordinator, StreamPartition streamPartition) { + this.coordinator = coordinator; + this.streamPartition = streamPartition; + } + + private void setSequenceNumber(String sequenceNumber) { + // Must only update progress if sequence number is not empty + // A blank sequence number means the current sequence number in the progress state has not changed, do nothing + if (sequenceNumber != null && !sequenceNumber.isEmpty()) { + Optional progressState = streamPartition.getProgressState(); + progressState.ifPresent(streamProgressState -> streamProgressState.setSequenceNumber(sequenceNumber)); + } + } + + /** + * This method is to do a checkpoint with latest sequence number processed. + * Note that this should be called on a regular basis even there are no changes to sequence number + * As the checkpoint will also extend the timeout for the lease + * + * @param sequenceNumber The last sequence number + */ + public void checkpoint(String sequenceNumber) { + LOG.debug("Checkpoint shard " + streamPartition.getShardId() + " with sequenceNumber " + sequenceNumber); + setSequenceNumber(sequenceNumber); + coordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); + } + + public boolean isExportDone() { + Optional globalPartition = coordinator.getPartition(streamPartition.getStreamArn()); + return globalPartition.isPresent(); + } + + public void updateShardForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) { + coordinator.saveProgressStateForPartition(streamPartition, acknowledgmentSetTimeout); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamScheduler.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamScheduler.java new file mode 100644 index 0000000000..6899b5bea1 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/stream/StreamScheduler.java @@ -0,0 +1,190 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.stream; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.kinesis.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.source.kinesis.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.kinesis.utils.BackoffCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +/** + * A scheduler to manage all the stream related work in one place + */ +public class StreamScheduler implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); + private static final Logger SHARD_COUNT_LOGGER = LoggerFactory.getLogger("org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardCountLogger"); + + /** + * Max number of shards each node can handle in parallel + */ + private static final int MAX_JOB_COUNT = 150; + + /** + * Default interval to acquire a lease from coordination store + */ + private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000; + + static final String ACTIVE_CHANGE_EVENT_CONSUMERS = "activeChangeEventConsumers"; + static final String SHARDS_IN_PROCESSING = "activeShardsInProcessing"; + + private final AtomicInteger numOfWorkers = new AtomicInteger(0); + private final EnhancedSourceCoordinator coordinator; + private final ShardConsumerFactory consumerFactory; + private final ExecutorService executor; + private final PluginMetrics pluginMetrics; + private final AtomicLong activeChangeEventConsumers; + private final AtomicLong shardsInProcessing; + private final AcknowledgementSetManager acknowledgementSetManager; + private final KinesisSourceConfig kinesisSourceConfig; + private final BackoffCalculator backoffCalculator; + private int noAvailableShardsCount = 0; + + + public StreamScheduler(final EnhancedSourceCoordinator coordinator, + final ShardConsumerFactory consumerFactory, + final PluginMetrics pluginMetrics, + final AcknowledgementSetManager acknowledgementSetManager, + final KinesisSourceConfig kinesisSourceConfig, + final BackoffCalculator backoffCalculator) { + this.coordinator = coordinator; + this.consumerFactory = consumerFactory; + this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + this.kinesisSourceConfig = kinesisSourceConfig; + this.backoffCalculator = backoffCalculator; + + executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); + activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong()); + shardsInProcessing = pluginMetrics.gauge(SHARDS_IN_PROCESSING, new AtomicLong()); + } + + private void processStreamPartition(StreamPartition streamPartition) { + final boolean acknowledgmentsEnabled = kinesisSourceConfig.isAcknowledgments(); + AcknowledgementSet acknowledgementSet = null; + + if (acknowledgmentsEnabled) { + acknowledgementSet = acknowledgementSetManager.create((result) -> { + if (result) { + LOG.info("Received acknowledgment of completion from sink for shard {}", streamPartition.getShardId()); + completeConsumer(streamPartition).accept(null, null); + } else { + LOG.warn("Negative acknowledgment received for shard {}, it will be retried", streamPartition.getShardId()); + coordinator.giveUpPartition(streamPartition); + } + }, kinesisSourceConfig.getShardAcknowledgmentTimeout()); + } + + Runnable shardConsumer = consumerFactory.createConsumer(streamPartition, acknowledgementSet, kinesisSourceConfig.getShardAcknowledgmentTimeout()); + if (shardConsumer != null) { + + CompletableFuture runConsumer = CompletableFuture.runAsync(shardConsumer, executor); + + if (acknowledgmentsEnabled) { + runConsumer.whenComplete((v, ex) -> { + numOfWorkers.decrementAndGet(); + if (ex != null) { + coordinator.giveUpPartition(streamPartition); + } + if (numOfWorkers.get() == 0) { + activeChangeEventConsumers.decrementAndGet(); + } + shardsInProcessing.decrementAndGet(); + }); + } else { + runConsumer.whenComplete(completeConsumer(streamPartition)); + } + numOfWorkers.incrementAndGet(); + if (numOfWorkers.get() % 10 == 0) { + SHARD_COUNT_LOGGER.info("Actively processing {} shards", numOfWorkers.get()); + } + + if (numOfWorkers.get() >= 1) { + activeChangeEventConsumers.incrementAndGet(); + } + shardsInProcessing.incrementAndGet(); + } else { + // If failed to create a new consumer. + coordinator.completePartition(streamPartition); + } + } + + @Override + public void run() { + LOG.debug("Stream Scheduler start to run..."); + while (!Thread.currentThread().isInterrupted()) { + try { + if (numOfWorkers.get() < MAX_JOB_COUNT) { + final Optional sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); + processStreamPartition(streamPartition); + noAvailableShardsCount = 0; + } else { + noAvailableShardsCount++; + } + } + + try { + Thread.sleep(backoffCalculator.calculateBackoffToAcquireNextShard(noAvailableShardsCount, numOfWorkers)); + } catch (final InterruptedException e) { + LOG.info("InterruptedException occurred"); + break; + } + } catch (final Exception e) { + LOG.error("Received an exception while processing a shard for streams, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } + } + // Should Stop + LOG.warn("Stream Scheduler is interrupted, looks like shutdown has triggered"); + + // Cannot call executor.shutdownNow() here + // Otherwise the final checkpoint will fail due to SDK interruption. + ShardConsumer.stopAll(); + executor.shutdown(); + } + + private BiConsumer completeConsumer(StreamPartition streamPartition) { + return (v, ex) -> { + if (!kinesisSourceConfig.isAcknowledgments()) { + numOfWorkers.decrementAndGet(); + if (numOfWorkers.get() == 0) { + activeChangeEventConsumers.decrementAndGet(); + } + shardsInProcessing.decrementAndGet(); + } + if (ex == null) { + LOG.info("Shard consumer for {} is completed", streamPartition.getShardId()); + coordinator.completePartition(streamPartition); + } else { + // Do nothing + // The consumer must have already done one last checkpointing. + LOG.error("Received an exception while processing shard {}, giving up shard: {}", streamPartition.getShardId(), ex); + coordinator.giveUpPartition(streamPartition); + } + }; + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/BackoffCalculator.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/BackoffCalculator.java new file mode 100644 index 0000000000..3f22eb8565 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/BackoffCalculator.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.utils; + +import org.opensearch.dataprepper.plugins.source.kinesis.stream.StreamScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.math.LongMath.pow; +import static com.google.common.primitives.Longs.min; +import static java.lang.Math.max; + +public class BackoffCalculator { + + private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); + + private static final Random RANDOM = new Random(); + + static final Duration STARTING_BACKOFF = Duration.ofMillis(500); + static final Duration MAX_BACKOFF_WITH_SHARDS = Duration.ofSeconds(15); + static final Duration MAX_BACKOFF_NO_SHARDS_ACQUIRED = Duration.ofSeconds(15); + static final int BACKOFF_RATE = 2; + static final Duration MAX_JITTER = Duration.ofSeconds(2); + static final Duration MIN_JITTER = Duration.ofSeconds(-2); + + private final boolean isExportConfigured; + + public BackoffCalculator(final boolean isExportConfigured) { + this.isExportConfigured = isExportConfigured; + } + + public long calculateBackoffToAcquireNextShard(final int noAvailableShardCount, final AtomicInteger shardsAcquired) { + + // When no shards are available to process we backoff exponentially based on how many consecutive attempts have been made without getting a shard + // This limits calls to the coordination store + if (noAvailableShardCount > 0) { + if (noAvailableShardCount % 50 == 0 && shardsAcquired.get() == 0) { + String errorMessage = String.format("No new shards acquired after %s attempts. This means that all shards are currently being consumed", noAvailableShardCount); + + if (isExportConfigured) { + errorMessage += ", or that the export is still in progress. New shards will not be consumed until the export is fully processed."; + } + LOG.info(errorMessage); + } + + final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1)); + return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(noAvailableShardCount - 1, 8)) + jitterMillis, MAX_BACKOFF_NO_SHARDS_ACQUIRED.toMillis())); + } + + // When shards are being acquired we backoff linearly based on how many shards this node is actively processing, to encourage a fast start but still a balance of shards between nodes + return max(500, min(MAX_BACKOFF_WITH_SHARDS.toMillis(), shardsAcquired.get() * STARTING_BACKOFF.toMillis())); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/KinesisSourceAggregateMetrics.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/KinesisSourceAggregateMetrics.java new file mode 100644 index 0000000000..642f587b07 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/KinesisSourceAggregateMetrics.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.kinesis.utils; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +public class KinesisSourceAggregateMetrics { + + private static final String DYNAMO_DB = "dynamodb"; + + private static final String DDB_STREAM_5XX_EXCEPTIONS = "stream5xxErrors"; + private static final String DDB_STREAM_4XX_EXCEPTIONS = "stream4xxErrors"; + private static final String DDB_STREAM_API_INVOCATIONS = "streamApiInvocations"; + private static final String DDB_EXPORT_5XX_ERRORS = "export5xxErrors"; + private static final String DDB_EXPORT_4XX_ERRORS = "export4xxErrors"; + private static final String DDB_EXPORT_API_INVOCATIONS = "exportApiInvocations"; + + + + private final PluginMetrics pluginMetrics; + + private final Counter stream5xxErrors; + private final Counter stream4xxErrors; + private final Counter streamApiInvocations; + private final Counter export5xxErrors; + private final Counter export4xxErrors; + private final Counter exportApiInvocations; + + public KinesisSourceAggregateMetrics() { + this.pluginMetrics = PluginMetrics.fromPrefix(DYNAMO_DB); + this.stream5xxErrors = pluginMetrics.counter(DDB_STREAM_5XX_EXCEPTIONS); + this.stream4xxErrors = pluginMetrics.counter(DDB_STREAM_4XX_EXCEPTIONS); + this.streamApiInvocations = pluginMetrics.counter(DDB_STREAM_API_INVOCATIONS); + this.export5xxErrors = pluginMetrics.counter(DDB_EXPORT_5XX_ERRORS); + this.export4xxErrors = pluginMetrics.counter(DDB_EXPORT_4XX_ERRORS); + this.exportApiInvocations = pluginMetrics.counter(DDB_EXPORT_API_INVOCATIONS); + } + + public Counter getStream5xxErrors() { + return stream5xxErrors; + } + + public Counter getStream4xxErrors() { return stream4xxErrors; } + + public Counter getStreamApiInvocations() { return streamApiInvocations; } + + public Counter getExport5xxErrors() { + return export5xxErrors; + } + + public Counter getExport4xxErrors() { return export4xxErrors; } + + public Counter getExportApiInvocations() { return exportApiInvocations; } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/TableUtil.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/TableUtil.java new file mode 100644 index 0000000000..62f944dd6d --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/source/kinesis/utils/TableUtil.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.plugins.source.kinesis.utils; + +import software.amazon.awssdk.arns.Arn; + +public class TableUtil { + + public static String getTableNameFromArn(String tableArn) { + Arn arn = Arn.fromString(tableArn); + // resourceAsString is table/xxx + return arn.resourceAsString().substring("table/".length()); + } + + public static String getTableArnFromStreamArn(String streamArn) { + // e.g. Given a stream arn: arn:aws:dynamodb:us-west-2:xxx:table/test-table/stream/2023-07-31T04:59:58.190 + // Returns arn:aws:dynamodb:us-west-2:xxx:table/test-table + return streamArn.substring(0, streamArn.lastIndexOf('/') - "stream/".length()); + } + + public static String getTableArnFromExportArn(String exportArn) { + // e.g. given export arn:arn:aws:dynamodb:us-west-2:123456789012:table/Thread/export/01693291918297-bfeccbea + // returns: arn:aws:dynamodb:us-west-2:123456789012:table/Thread + return exportArn.substring(0, exportArn.lastIndexOf("/export/")); + } + + +} diff --git a/settings.gradle b/settings.gradle index 8400ff98c2..64471772b0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -175,4 +175,5 @@ include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' include 'data-prepper-plugins:http-source-common' include 'data-prepper-plugins:http-common' -include 'data-prepper-plugins:lambda-sink' \ No newline at end of file +include 'data-prepper-plugins:lambda-sink' +include 'data-prepper-plugins:kinesis-source'