From 54873bff9f329f98704e9b7a4263aa41d9184678 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Tue, 24 Sep 2024 14:25:02 -0700 Subject: [PATCH] Add integration test for Kinesis source (#4967) Signed-off-by: Souvik Bose --- .../kinesis-source/build.gradle | 32 +++ .../kinesis/source/KinesisSourceIT.java | 270 ++++++++++++++++++ .../source/ingester/KinesisIngester.java | 227 +++++++++++++++ .../kinesis/source/util/TestIDGenerator.java | 21 ++ .../processor/KinesisRecordProcessor.java | 2 +- .../aws-testing-cdk/bin/aws-testing-cdk.ts | 5 + .../lib/kinesis/KinesisSourceStack.ts | 79 +++++ 7 files changed, 635 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java create mode 100644 data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/ingester/KinesisIngester.java create mode 100644 data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/util/TestIDGenerator.java create mode 100644 testing/aws-testing-cdk/lib/kinesis/KinesisSourceStack.ts diff --git a/data-prepper-plugins/kinesis-source/build.gradle b/data-prepper-plugins/kinesis-source/build.gradle index c4a0614e36..5214c9c8ab 100644 --- a/data-prepper-plugins/kinesis-source/build.gradle +++ b/data-prepper-plugins/kinesis-source/build.gradle @@ -7,6 +7,25 @@ plugins { id 'java' } +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + } +} + +test { + useJUnitPlatform() +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') @@ -30,6 +49,19 @@ dependencies { testImplementation project(':data-prepper-plugins:newline-codecs') } +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.kinesis.source.aws.region', System.getProperty('tests.kinesis.source.aws.region') + + filter { + includeTestsMatching '*IT' + } +} + jacocoTestCoverageVerification { dependsOn jacocoTestReport violationRules { diff --git a/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java new file mode 100644 index 0000000000..028f73ad9f --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java @@ -0,0 +1,270 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source; + +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec; +import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier; +import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseCoordinationTableConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.ingester.KinesisIngester; +import org.opensearch.dataprepper.plugins.kinesis.source.util.TestIDGenerator; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.KinesisClientUtil; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSED; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_CHECKPOINT_FAILURES; +import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_STREAM_TAG_KEY; + +public class KinesisSourceIT { + private static final String AWS_REGION = "tests.kinesis.source.aws.region"; + private static final String STREAM_NAME_PREFIX = "Stream1"; + private static final String PIPELINE_NAME = "pipeline-id"; + private static final Duration CHECKPOINT_INTERVAL = Duration.ofMillis(0); + private static final String codec_plugin_name = "ndjson"; + private static final String LEASE_TABLE_PREFIX = "kinesis-lease-table"; + private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private KinesisClientFactory kinesisClientFactory; + + @Mock + private Buffer> buffer; + + @Mock + private KinesisSourceConfig kinesisSourceConfig; + + @Mock + private AwsAuthenticationConfig awsAuthenticationConfig; + + @Mock + private KinesisStreamConfig kinesisStreamConfig; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private PluginFactory pluginFactory; + + @Mock + private PipelineDescription pipelineDescription; + + @Mock + private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier; + + @Mock + private KinesisLeaseConfig kinesisLeaseConfig; + + @Mock + private WorkerIdentifierGenerator workerIdentifierGenerator; + + @Mock + private KinesisLeaseCoordinationTableConfig kinesisLeaseCoordinationTableConfig; + + @Mock + private InputCodec codec; + + @Mock + private Counter acknowledgementSetSuccesses; + + @Mock + private Counter acknowledgementSetFailures; + + @Mock + private Counter recordsProcessed; + + @Mock + private Counter recordProcessingErrors; + + @Mock + private Counter checkpointFailures; + + private KinesisClient kinesisClient; + + private DynamoDbClient dynamoDbClient; + + private KinesisIngester kinesisIngester; + + @BeforeEach + void setup() { + MockitoAnnotations.openMocks(this); + + when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.of(System.getProperty(AWS_REGION))); + when(awsAuthenticationConfig.getAwsStsRoleArn()).thenReturn(UUID.randomUUID().toString()); + when(awsAuthenticationConfig.getAwsStsExternalId()).thenReturn(UUID.randomUUID().toString()); + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationConfig.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); + when(kinesisSourceConfig.getAwsAuthenticationConfig()).thenReturn(awsAuthenticationConfig); + + String testId = TestIDGenerator.generateRandomTestID(); + String streamName = STREAM_NAME_PREFIX + "_" + testId; + String leaseTableName = LEASE_TABLE_PREFIX + "_" + testId; + + when(kinesisStreamConfig.getName()).thenReturn(streamName); + when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(CHECKPOINT_INTERVAL); + when(kinesisStreamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.TRIM_HORIZON); + when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.ENHANCED_FAN_OUT); + when(kinesisSourceConfig.getStreams()).thenReturn(List.of(kinesisStreamConfig)); + when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig); + when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn(leaseTableName); + when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn(System.getProperty(AWS_REGION)); + when(kinesisLeaseCoordinationTableConfig.getAwsRegion()).thenReturn(Region.of(System.getProperty(AWS_REGION))); + when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.ofNullable(kinesisLeaseConfig)); + + PluginModel pluginModel = mock(PluginModel.class); + when(pluginModel.getPluginName()).thenReturn(codec_plugin_name); + when(pluginModel.getPluginSettings()).thenReturn(Collections.emptyMap()); + when(kinesisSourceConfig.getCodec()).thenReturn(pluginModel); + when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); + when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); + when(kinesisSourceConfig.getBufferTimeout()).thenReturn(Duration.ofMillis(1)); + + kinesisClientFactory = mock(KinesisClientFactory.class); + when(kinesisClientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(DynamoDbAsyncClient.builder() + .region(Region.of(System.getProperty(AWS_REGION))) + .build()); + when(kinesisClientFactory.buildKinesisAsyncClient(awsAuthenticationConfig.getAwsRegion())).thenReturn(KinesisClientUtil.createKinesisAsyncClient( + KinesisAsyncClient.builder() + .region(Region.of(System.getProperty(AWS_REGION))) + )); + when(kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(CloudWatchAsyncClient.builder() + .region(Region.of(System.getProperty(AWS_REGION))) + .build()); + + when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); + + final String workerId = "worker-identifier-" + testId; + when(workerIdentifierGenerator.generate()).thenReturn(workerId); + + when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName)) + .thenReturn(acknowledgementSetSuccesses); + + when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName)) + .thenReturn(acknowledgementSetFailures); + + when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamName)) + .thenReturn(recordsProcessed); + + when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamName)) + .thenReturn(recordProcessingErrors); + + when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamName)) + .thenReturn(checkpointFailures); + + kinesisClient = KinesisClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build(); + dynamoDbClient = DynamoDbClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build(); + kinesisIngester = new KinesisIngester(kinesisClient, streamName, dynamoDbClient, leaseTableName); + + kinesisIngester.createStream(); + kinesisIngester.createLeaseTable(); + } + + @AfterEach + void cleanup() { + kinesisIngester.deleteLeaseTable(); + kinesisIngester.deleteStream(); + } + + @Test + public void testKinesisService() throws Exception { + when(pluginFactory.loadPlugin(eq(InputCodec.class), any())) + .thenReturn(new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory())); + + final List> actualRecordsWritten = new ArrayList<>(); + doAnswer(a -> actualRecordsWritten.addAll(a.getArgument(0, Collection.class))) + .when(buffer).writeAll(anyCollection(), anyInt()); + + // Send data to stream + final List logStream = new ArrayList<>(); + int numberOfRecords = 20; + for (int i = 1; i <= numberOfRecords; i++) { + logStream.add("input record" + i); + } + kinesisIngester.ingest(logStream); + + KinesisService kinesisService = new KinesisService( + kinesisSourceConfig, + kinesisClientFactory, + pluginMetrics, + pluginFactory, + pipelineDescription, + acknowledgementSetManager, + kinesisLeaseConfigSupplier, + workerIdentifierGenerator + ); + + kinesisService.start(buffer); + + await().atMost(Duration.ofSeconds(150)).untilAsserted( + () -> { + verify(buffer, atLeastOnce()).writeAll(anyCollection(), anyInt()); + assertThat(actualRecordsWritten, notNullValue()); + assertThat(actualRecordsWritten.size(), equalTo(numberOfRecords)); + } + ); + + kinesisService.shutDown(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/ingester/KinesisIngester.java b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/ingester/KinesisIngester.java new file mode 100644 index 0000000000..01e077a7de --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/ingester/KinesisIngester.java @@ -0,0 +1,227 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.ingester; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; +import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.awssdk.services.kinesis.model.KinesisException; +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.StreamStatus; +import software.amazon.awssdk.services.kinesis.waiters.KinesisWaiter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class KinesisIngester { + private static final int SHARDS = 4; + private static final String LEASE_TABLE_PARTITION_KEY = "leaseKey"; + private static final String PARTITION_KEY = "record_key"; + private static final String ATTRIBUTE_VALUE = "record"; + + private final KinesisClient kinesisClient; + private final DynamoDbClient ddbClient; + private final String streamName; + private final String tableName; + + private final ObjectMapper objectMapper; + + public KinesisIngester(final KinesisClient kinesisClient, final String streamName, final DynamoDbClient ddbClient, final String tableName) { + this.kinesisClient = kinesisClient; + this.ddbClient = ddbClient; + this.tableName = tableName; + this.streamName = streamName; + this.objectMapper = new ObjectMapper(); + } + + public void createStream() { + CreateStreamRequest createStreamRequest = CreateStreamRequest.builder() + .streamName(streamName) + .shardCount(SHARDS) + .build(); + try { + kinesisClient.createStream(createStreamRequest); + KinesisWaiter kinesisWaiter = kinesisClient.waiter(); + + DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() + .streamName(streamName) + .build(); + + kinesisWaiter.waitUntilStreamExists(describeStreamRequest); + waitForStreamToBeActive(); + } catch (KinesisException ex) { + throw new RuntimeException(ex); + } + } + + public void createLeaseTable() { + final CreateTableRequest createTableRequest = CreateTableRequest.builder() + .tableName(tableName) + .keySchema(getKeySchema()) + .attributeDefinitions(getAttributeDefinitions()) + .billingMode(BillingMode.PAY_PER_REQUEST) + .build(); + + try { + CreateTableResponse response = ddbClient.createTable(createTableRequest); + DescribeTableRequest tableRequest = DescribeTableRequest.builder() + .tableName(tableName) + .build(); + // Wait until the Amazon DynamoDB table is created. + DynamoDbWaiter dbWaiter = ddbClient.waiter(); + dbWaiter.waitUntilTableExists(tableRequest); + waitForTableActive(); + + } catch (final DynamoDbException e) { + throw new RuntimeException(e); + } + } + + public void deleteLeaseTable() { + + DeleteTableRequest request = DeleteTableRequest.builder() + .tableName(tableName) + .build(); + try { + ddbClient.deleteTable(request); + } catch (final DynamoDbException e) { + throw new RuntimeException(e); + } + } + + public void deleteStream() { + DeleteStreamRequest deleteStreamRequest = DeleteStreamRequest.builder() + .streamName(streamName) + .enforceConsumerDeletion(true) + .build(); + try { + kinesisClient.deleteStream(deleteStreamRequest); + } catch (KinesisException ex) { + throw new RuntimeException(ex); + } + } + + private void waitForTableActive() { + final DescribeTableRequest describeTableRequest = DescribeTableRequest.builder() + .tableName(tableName) + .build(); + + await().atMost(5, TimeUnit.MINUTES).pollInterval(10, TimeUnit.SECONDS).untilAsserted( + () -> { + final DescribeTableResponse describeTableResponse = ddbClient.describeTable(describeTableRequest); + final TableStatus tableStatus = describeTableResponse.table().tableStatus(); + + if (!TableStatus.CREATING.equals(tableStatus) && !TableStatus.ACTIVE.equals(tableStatus)) { + throw new RuntimeException("Table is not creating or active."); + } + + assertThat(tableStatus, equalTo(TableStatus.ACTIVE)); + } + ); + } + + private Collection getKeySchema() { + List keySchema = new ArrayList<>(); + keySchema.add(KeySchemaElement.builder() + .attributeName(LEASE_TABLE_PARTITION_KEY) + .keyType(KeyType.HASH) + .build()); + + return keySchema; + } + + private Collection getAttributeDefinitions() { + List definitions = new ArrayList<>(); + definitions.add(AttributeDefinition.builder() + .attributeName(LEASE_TABLE_PARTITION_KEY) + .attributeType(ScalarAttributeType.S) + .build()); + + return definitions; + } + + private void waitForStreamToBeActive() { + final DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() + .streamName(streamName) + .build(); + + await().atMost(5, TimeUnit.MINUTES).pollInterval(10, TimeUnit.SECONDS).untilAsserted( + () -> { + final DescribeStreamResponse describeStreamResponse = + kinesisClient.describeStream(describeStreamRequest); + final StreamStatus streamStatus = describeStreamResponse + .streamDescription().streamStatus(); + + if (!StreamStatus.CREATING.equals(streamStatus) && !StreamStatus.ACTIVE.equals(streamStatus)) { + throw new RuntimeException("Stream is not creating or active."); + } + + assertThat(streamStatus, equalTo(StreamStatus.ACTIVE)); + } + ); + } + + public void ingest(List logs) { + try { + for (String log : logs) { + putRecord(UUID.randomUUID().toString(), log); + } + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + } + + private void putRecord(final String key, final String value) throws JsonProcessingException { + Map item = new HashMap<>(); + item.put(PARTITION_KEY, key); + item.put(ATTRIBUTE_VALUE, value); + PutRecordRequest putRecordRequest = PutRecordRequest.builder() + .streamName(streamName) + .partitionKey(key) + .data(SdkBytes.fromByteArray(objectMapper.writeValueAsBytes(item))) + .build(); + + try { + kinesisClient.putRecord(putRecordRequest); + } catch (final Exception ex) { + //log.error("Put Record failed, item key: {}, value: {}", key, value); + throw new RuntimeException(ex); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/util/TestIDGenerator.java b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/util/TestIDGenerator.java new file mode 100644 index 0000000000..0af62650a8 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/util/TestIDGenerator.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.util; + +import org.apache.commons.lang3.RandomStringUtils; + +import java.util.Locale; + +public class TestIDGenerator { + public static String generateRandomTestID() { + return RandomStringUtils.random(8, true, false).toLowerCase(Locale.ROOT); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index 6df0760ca3..8d13596ce3 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -184,7 +184,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber); } - LOG.debug("Number of Records {} written for stream: {}, shardId: {} to buffer: {}", eventCount, streamIdentifier.streamName(), kinesisShardId, records.size()); + LOG.debug("Number of Records {} written for stream: {}, shardId: {}", eventCount, streamIdentifier.streamName(), kinesisShardId); acknowledgementSetOpt.ifPresent(AcknowledgementSet::complete); diff --git a/testing/aws-testing-cdk/bin/aws-testing-cdk.ts b/testing/aws-testing-cdk/bin/aws-testing-cdk.ts index fcdd4914ae..756b61a01f 100644 --- a/testing/aws-testing-cdk/bin/aws-testing-cdk.ts +++ b/testing/aws-testing-cdk/bin/aws-testing-cdk.ts @@ -5,6 +5,7 @@ import {GitHubAccessStack} from '../lib/common/GitHubAccessStack'; import {SecretsManagerStack} from '../lib/aws-secrets-manager/SecretsManagerStack'; import {KmsStack} from '../lib/common/KmsStack'; import {S3SinkStack} from '../lib/s3/S3SinkStack'; +import { KinesisSourceStack } from '../lib/kinesis/KinesisSourceStack'; const app = new cdk.App(); @@ -21,3 +22,7 @@ new SecretsManagerStack(app, 'SecretsManagerStack', { new S3SinkStack(app, 'S3SinkStack', { testingRole: githubStack.gitHubActionsTestingRole }); + +new KinesisSourceStack(app, 'KinesisSourceStack', { + testingRole: githubStack.gitHubActionsTestingRole +}); \ No newline at end of file diff --git a/testing/aws-testing-cdk/lib/kinesis/KinesisSourceStack.ts b/testing/aws-testing-cdk/lib/kinesis/KinesisSourceStack.ts new file mode 100644 index 0000000000..3007737ce6 --- /dev/null +++ b/testing/aws-testing-cdk/lib/kinesis/KinesisSourceStack.ts @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import {Stack, StackProps} from 'aws-cdk-lib'; +import {Construct} from 'constructs'; +import {Effect, Role, PolicyDocument, PolicyStatement, Policy} from 'aws-cdk-lib/aws-iam'; + +export interface KinesisSourceStackProps extends StackProps { + readonly testingRole: Role; +} + +/** + * CDK stack that creates a Kinesis stream + */ +export class KinesisSourceStack extends Stack { + + constructor(scope: Construct, id: string, props: KinesisSourceStackProps) { + super(scope, id, props); + + const kinesisPolicyDocument: PolicyDocument = new PolicyDocument({ + statements: [ + new PolicyStatement({ + actions: [ + 'kinesis:ListStreams', + 'kinesis:ListStreamConsumers', + 'kinesis:ListShards', + 'kinesis:DescribeStream', + 'kinesis:GetRecords', + 'kinesis:GetResourcePolicy', + 'kinesis:SubscribeToShard', + 'kinesis:RegisterStreamConsumer', + 'kinesis:CreateStream', + 'kinesis:DeleteStream', + 'kinesis:PutRecord', + 'kinesis:PutRecords' + ], + effect: Effect.ALLOW, + resources: ['*'], + }), + ], + }); + + const kinesisPolicy: Policy = new Policy(this, 'KinesisPolicy', { + document: kinesisPolicyDocument + }); + + const dynamodbPolicyDocument: PolicyDocument = new PolicyDocument({ + statements: [ + new PolicyStatement({ + actions: [ + 'dynamodb:DescribeTimeToLive', + 'dynamodb:CreateTable', + 'dynamodb:DescribeTable', + 'dynamodb:GetItem', + 'dynamodb:PutItem', + 'dynamodb:UpdateItem', + 'dynamodb:Scan', + 'dynamodb:Query', + 'dynamodb:DeleteItem', + 'dynamodb:updateContinuousBackups' + ], + effect: Effect.ALLOW, + resources: ['*'], + }), + ], + }); + + const dynamodbPolicy: Policy = new Policy(this, 'DynamoDbPolicy', { + document: dynamodbPolicyDocument + }); + + props.testingRole.attachInlinePolicy(kinesisPolicy); + props.testingRole.attachInlinePolicy(dynamodbPolicy); + + } +} +