Skip to content

Commit

Permalink
Kinesis source implementation using Data Prepper source coordination.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 committed Jul 15, 2024
1 parent b4b71e2 commit 2210184
Show file tree
Hide file tree
Showing 28 changed files with 2,694 additions and 1 deletion.
1 change: 1 addition & 0 deletions data-prepper-plugins/kinesis-source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

40 changes: 40 additions & 0 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:buffer-common')
implementation libs.armeria.core
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation project(':data-prepper-plugins:blocking-buffer')
implementation 'software.amazon.awssdk:kinesis'
// https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-ion
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'

// https://mvnrepository.com/artifact/software.amazon.kinesis/amazon-kinesis-client
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'

testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.0
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package org.opensearch.dataprepper.plugins.source.kinesis;

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

public class AwsAuthenticationOptions {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {

validateStsRoleArn();

final StsClient stsClient = StsClient.builder()
.region(getAwsRegion())
.build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("aws-secret-" + UUID.randomUUID())
.roleArn(awsStsRoleArn);

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
// use default credential provider
awsCredentialsProvider = DefaultCredentialsProvider.create();
}

return awsCredentialsProvider;
}

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.opensearch.dataprepper.plugins.source.kinesis;

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;


public class KinesisMultiStreamTracker implements MultiStreamTracker {
private static final String COLON = ":";

private final KinesisAsyncClient kinesisClient;
private final KinesisSourceConfig sourceConfig;
private final String applicationName;

public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) {
this.kinesisClient = kinesisClient;
this.sourceConfig = sourceConfig;
this.applicationName = applicationName;
}

@Override
public List<StreamConfig> streamConfigList() {
List<StreamConfig> streamConfigList = new ArrayList<>();
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) {
StreamConfig streamConfig;
try {
streamConfig = getStreamConfig(kinesisStreamConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
streamConfigList.add(streamConfig);
}
return streamConfigList;
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) throws Exception {
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig);
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}

private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) throws Exception {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(kinesisStreamConfig.getName())
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).get();
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = streamDescription.streamARN().split(COLON)[4];
long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond));
}

/**
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
*/
@Override
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofSeconds(10);
}
};

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.kinesis;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.kinesis.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.source.kinesis.leader.ShardManager;
import org.opensearch.dataprepper.plugins.source.kinesis.stream.ShardConsumerFactory;
import org.opensearch.dataprepper.plugins.source.kinesis.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.kinesis.utils.BackoffCalculator;
import org.opensearch.dataprepper.plugins.source.kinesis.utils.KinesisSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.KinesisClientUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KinesisService {

private static final Logger LOG = LoggerFactory.getLogger(KinesisService.class);

// private final List<TableConfig> tableConfigs;

private final EnhancedSourceCoordinator coordinator;

private final DynamoDbAsyncClient dynamoDbClient;

private final KinesisSourceConfig kinesisSourceConfig;
//
private final KinesisAsyncClient kinesisClient;

// private final S3Client s3Client;

private final ShardManager shardManager;

private final ExecutorService executor;

private final PluginMetrics pluginMetrics;

private final KinesisSourceAggregateMetrics kinesisSourceAggregateMetrics;

private final AcknowledgementSetManager acknowledgementSetManager;


public KinesisService(final EnhancedSourceCoordinator coordinator,
final KinesisSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.coordinator = coordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.kinesisSourceConfig = sourceConfig;
this.kinesisSourceAggregateMetrics = new KinesisSourceAggregateMetrics();

// Initialize AWS clients
dynamoDbClient = DynamoDbAsyncClient.builder()
.credentialsProvider(sourceConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration())
.region(sourceConfig.getAwsAuthenticationOptions().getAwsRegion())
.build();
this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder()
.credentialsProvider(sourceConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration())
.region(sourceConfig.getAwsAuthenticationOptions().getAwsRegion())
);

// A shard manager is responsible to retrieve the shard information from streams.
shardManager = new ShardManager(kinesisClient, kinesisSourceAggregateMetrics);
// tableConfigs = sourceConfig.getTableConfigs();
executor = Executors.newFixedThreadPool(2);
}

/**
* This service start three long-running threads (scheduler)
* Each thread is responsible for one type of job.
* The data will be guaranteed to be sent to {@link Buffer} in order.
*
* @param buffer Data Prepper Buffer
*/
public void start(Buffer<Record<Event>> buffer) {

LOG.info("Start running Kinesis service");

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, kinesisClient, pluginMetrics, kinesisSourceAggregateMetrics, buffer, kinesisSourceConfig.getStreams().get(0));
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, kinesisSourceConfig, new BackoffCalculator(false));
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, kinesisSourceConfig);

// May consider start or shutdown the scheduler on demand
// Currently, event after the exports are done, the related scheduler will not be shutdown
// This is because in the future we may support incremental exports.
executor.submit(leaderScheduler);
executor.submit(streamScheduler);
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
LOG.info("shutdown DynamoDB schedulers");
executor.shutdownNow();
}

}
Loading

0 comments on commit 2210184

Please sign in to comment.