Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sbose2k21 committed Sep 11, 2024
1 parent d3a115b commit e5132d9
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 108 deletions.
3 changes: 2 additions & 1 deletion data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation project(path: ':data-prepper-plugins:aws-plugin-api')

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
Expand Down Expand Up @@ -53,18 +57,18 @@ public class KinesisService {
private final String kclMetricsNamespaceName;
private final String pipelineName;
private final AcknowledgementSetManager acknowledgementSetManager;
private final KinesisSourceConfig sourceConfig;
private final KinesisSourceConfig kinesisSourceConfig;
private final KinesisAsyncClient kinesisClient;
private final DynamoDbAsyncClient dynamoDbClient;
private final CloudWatchAsyncClient cloudWatchClient;
private final WorkerIdentifierGenerator workerIdentifierGenerator;
private final InputCodec codec;

@Setter
private Scheduler scheduler;

private final ExecutorService executorService;

public KinesisService(final KinesisSourceConfig sourceConfig,
public KinesisService(final KinesisSourceConfig kinesisSourceConfig,
final KinesisClientFactory kinesisClientFactory,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
Expand All @@ -73,7 +77,7 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier,
final WorkerIdentifierGenerator workerIdentifierGenerator
){
this.sourceConfig = sourceConfig;
this.kinesisSourceConfig = kinesisSourceConfig;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
Expand All @@ -85,21 +89,24 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName();
this.kclMetricsNamespaceName = this.tableName;
this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(sourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
this.workerIdentifierGenerator = workerIdentifierGenerator;
this.executorService = Executors.newFixedThreadPool(1);
final PluginModel codecConfiguration = kinesisSourceConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
}

public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
throw new IllegalStateException("Buffer provided is null.");
}

if (sourceConfig.getStreams() == null || sourceConfig.getStreams().isEmpty()) {
throw new IllegalStateException("Streams are empty!");
if (kinesisSourceConfig.getStreams() == null || kinesisSourceConfig.getStreams().isEmpty()) {
throw new InvalidPluginConfigurationException("No Kinesis streams provided.");
}

scheduler = getScheduler(buffer);
Expand Down Expand Up @@ -129,31 +136,30 @@ public Scheduler getScheduler(final Buffer<Record<Event>> buffer) {

public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
final ShardRecordProcessorFactory processorFactory = new KinesisShardRecordProcessorFactory(
buffer, sourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory);
buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, codec);

ConfigsBuilder configsBuilder =
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
new KinesisMultiStreamTracker(kinesisClient, kinesisSourceConfig, applicationName),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
workerIdentifierGenerator.generate(), processorFactory
)
.tableName(tableName)
.namespace(kclMetricsNamespaceName);

ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy();
ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
configsBuilder.retrievalConfig().retrievalSpecificConfig(
new PollingConfig(kinesisClient)
.maxRecords(sourceConfig.getPollingConfig().getMaxPollingRecords())
.maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
sourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis()));
kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis()));
}

return new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.leaseManagementConfig().billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.converter;

import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class KinesisRecordConverter {

private final InputCodec codec;

public KinesisRecordConverter(final InputCodec codec) {
this.codec = codec;
}

public List<Record<Event>> convert(List<KinesisClientRecord> kinesisClientRecords) throws IOException {
List<Record<Event>> records = new ArrayList<>();
for (KinesisClientRecord record : kinesisClientRecords) {
processRecord(record, records::add);
}
return records;
}

private void processRecord(KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException {
// Read bytebuffer
byte[] arr = new byte[record.data().remaining()];
record.data().get(arr);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
codec.parse(byteArrayInputStream, eventConsumer);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.KinesisSource;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.converter.KinesisRecordConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.common.StreamIdentifier;
Expand All @@ -27,56 +35,52 @@
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

public class KinesisRecordProcessor implements ShardRecordProcessor {
private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
private final StreamIdentifier streamIdentifier;
private final KinesisStreamConfig kinesisStreamConfig;
private final Duration checkpointInterval;
private final KinesisSourceConfig kinesisSourceConfig;
private final Buffer<Record<Event>> buffer;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final KinesisRecordConverter kinesisRecordConverter;
private String kinesisShardId;
private final InputCodec codec;
private long lastCheckpointTimeInMillis;
private final int bufferTimeoutMillis;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Counter acknowledgementSetCallbackCounter;
private final Counter acknowledgementSetSuccesses;
private final Counter acknowledgementSetFailures;
private final Counter recordProcessingErrors;
private final Counter checkpointFailures;
private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20);
public static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
public static final String KINESIS_RECORD_PROCESSING_ERRORS = "recordProcessingErrors";
public static final String KINESIS_CHECKPOINT_FAILURES = "checkpointFailures";
public static final String KINESIS_STREAM_TAG_KEY = "stream";

public KinesisRecordProcessor(Buffer<Record<Event>> buffer,
public KinesisRecordProcessor(final BufferAccumulator<Record<Event>> bufferAccumulator,
final KinesisSourceConfig kinesisSourceConfig,
final AcknowledgementSetManager acknowledgementSetManager,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final KinesisRecordConverter kinesisRecordConverter,
final StreamIdentifier streamIdentifier) {
this.bufferTimeoutMillis = (int) kinesisSourceConfig.getBufferTimeout().toMillis();
this.streamIdentifier = streamIdentifier;
this.kinesisSourceConfig = kinesisSourceConfig;
this.kinesisStreamConfig = getStreamConfig(kinesisSourceConfig);
final PluginModel codecConfiguration = kinesisSourceConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
this.kinesisRecordConverter = kinesisRecordConverter;
this.acknowledgementSetManager = acknowledgementSetManager;
this.acknowledgementSetCallbackCounter = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.acknowledgementSetSuccesses = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.acknowledgementSetFailures = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval();
this.buffer = buffer;
this.bufferAccumulator = bufferAccumulator;
}

private KinesisStreamConfig getStreamConfig(final KinesisSourceConfig kinesisSourceConfig) {
Expand All @@ -93,41 +97,46 @@ public void initialize(InitializationInput initializationInput) {

private AcknowledgementSet createAcknowledgmentSet(final ProcessRecordsInput processRecordsInput) {
return acknowledgementSetManager.create((result) -> {
acknowledgementSetCallbackCounter.increment();
if (result) {
LOG.info("acknowledgements received");
acknowledgementSetSuccesses.increment();
LOG.info("acknowledgements received for shardId {}", kinesisShardId);
checkpoint(processRecordsInput.checkpointer());
} else {
LOG.info("acknowledgements received with false");
acknowledgementSetFailures.increment();
LOG.info("acknowledgements received with false for shardId {}", kinesisShardId);
}

}, ACKNOWLEDGEMENT_SET_TIMEOUT);
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record<Event>> records = new ArrayList<>();

try {
Optional<AcknowledgementSet> acknowledgementSetOpt = Optional.empty();
boolean acknowledgementsEnabled = kinesisSourceConfig.isAcknowledgments();
if (acknowledgementsEnabled) {
acknowledgementSetOpt = Optional.of(createAcknowledgmentSet(processRecordsInput));
}

for (KinesisClientRecord record : processRecordsInput.records()) {
processRecord(record, records::add);
List<Record<Event>> records = kinesisRecordConverter.convert(processRecordsInput.records());

for (Record<Event> record: records) {
Event event = record.getData();
EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE,
streamIdentifier.streamName().toLowerCase());
bufferAccumulator.add(record);
}

acknowledgementSetOpt.ifPresent(acknowledgementSet -> records.forEach(record -> acknowledgementSet.add(record.getData())));

buffer.writeAll(records, bufferTimeoutMillis);
LOG.debug("Records written to buffer: {}", records.size());

acknowledgementSetOpt.ifPresent(AcknowledgementSet::complete);

// Checkpoint for shard
if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointInterval.toMillis()) {
LOG.info("Regular checkpointing for shard " + kinesisShardId);
if (shouldCheckpointAfterBufferWrite()) {
LOG.debug("Regular checkpointing for shard {}", kinesisShardId);
checkpoint(processRecordsInput.checkpointer());
lastCheckpointTimeInMillis = System.currentTimeMillis();
}
Expand All @@ -137,14 +146,6 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {
}
}

private void processRecord(KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException {
// Read bytebuffer
byte[] arr = new byte[record.data().remaining()];
record.data().get(arr);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
codec.parse(byteArrayInputStream, eventConsumer);
}

@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
LOG.debug("Lease Lost");
Expand All @@ -170,4 +171,10 @@ private void checkpoint(RecordProcessorCheckpointer checkpointer) {
checkpointFailures.increment();
}
}

private boolean shouldCheckpointAfterBufferWrite() {
return !kinesisSourceConfig.isAcknowledgments() &&
kinesisStreamConfig.isEnableCheckPoint() &&
System.currentTimeMillis() - lastCheckpointTimeInMillis >= checkpointInterval.toMillis();
}
}
Loading

0 comments on commit e5132d9

Please sign in to comment.