diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index 71b8dc2afc..d3a57de0e6 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' implementation project(path: ':data-prepper-plugins:aws-plugin-api') + implementation project(path: ':data-prepper-plugins:buffer-common') testImplementation platform('org.junit:junit-bom:5.9.1') diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 6b5430d997..f09b5b8ded 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -41,6 +41,7 @@ import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.s3.S3Client; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -69,6 +70,9 @@ public class DynamoDBService { private final PluginMetrics pluginMetrics; + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + public DynamoDBService(EnhancedSourceCoordinator coordinator, ClientFactory clientFactory, DynamoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics) { this.coordinator = coordinator; @@ -156,14 +160,13 @@ public void init() { Instant startTime = Instant.now(); if (tableInfo.getMetadata().isExportRequired()) { -// exportTime = Instant.now(); createExportPartition(tableInfo.getTableArn(), startTime, tableInfo.getMetadata().getExportBucket(), tableInfo.getMetadata().getExportPrefix()); } if (tableInfo.getMetadata().isStreamRequired()) { List shardIds; // start position by default is TRIM_HORIZON if not provided. - if (tableInfo.getMetadata().isExportRequired() || String.valueOf(StreamStartPosition.LATEST).equals(tableInfo.getMetadata().getStreamStartPosition())) { + if (tableInfo.getMetadata().isExportRequired() || tableInfo.getMetadata().getStreamStartPosition() == StreamStartPosition.LATEST) { // For a continued data extraction process that involves both export and stream // The export must be completed and loaded before stream can start. // Moreover, there should not be any gaps between the export time and the time start reading the stream @@ -274,15 +277,13 @@ private TableInfo getTableInfo(TableConfig tableConfig) { throw new InvalidPluginConfigurationException(errorMessage); } // Validate view type of DynamoDB stream - if (describeTableResult.table().streamSpecification() != null) { - String viewType = describeTableResult.table().streamSpecification().streamViewTypeAsString(); - LOG.debug("The stream view type for table " + tableName + " is " + viewType); - List supportedType = List.of("NEW_IMAGE", "NEW_AND_OLD_IMAGES"); - if (!supportedType.contains(viewType)) { - String errorMessage = "Stream " + tableConfig.getTableArn() + " is enabled with " + viewType + ". Supported types are " + supportedType; - LOG.error(errorMessage); - throw new InvalidPluginConfigurationException(errorMessage); - } + String viewType = describeTableResult.table().streamSpecification().streamViewTypeAsString(); + LOG.debug("The stream view type for table " + tableName + " is " + viewType); + List supportedType = List.of("NEW_IMAGE", "NEW_AND_OLD_IMAGES"); + if (!supportedType.contains(viewType)) { + String errorMessage = "Stream " + tableConfig.getTableArn() + " is enabled with " + viewType + ". Supported types are " + supportedType; + LOG.error(errorMessage); + throw new InvalidPluginConfigurationException(errorMessage); } streamStartPosition = tableConfig.getStreamConfig().getStartPosition(); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java index ef35c82825..ffb1809ac6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java @@ -10,7 +10,6 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; @@ -46,7 +45,7 @@ public class DynamoDBSource implements Source>, UsesEnhancedSource @DataPrepperPluginConstructor - public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final PluginSetting pluginSetting, final AwsCredentialsSupplier awsCredentialsSupplier) { + public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final AwsCredentialsSupplier awsCredentialsSupplier) { LOG.info("Create DynamoDB Source"); this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java index 70a6cbcf31..9f770dff69 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java @@ -8,8 +8,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.dataformat.ion.IonObjectMapper; import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class ExportRecordConverter extends RecordConverter { @@ -37,8 +36,8 @@ public class ExportRecordConverter extends RecordConverter { private final Counter exportRecordSuccessCounter; private final Counter exportRecordErrorCounter; - public ExportRecordConverter(Buffer> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) { - super(buffer, tableInfo); + public ExportRecordConverter(final BufferAccumulator> bufferAccumulator, TableInfo tableInfo, PluginMetrics pluginMetrics) { + super(bufferAccumulator, tableInfo); this.pluginMetrics = pluginMetrics; this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT); this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT); @@ -60,21 +59,26 @@ String getEventType() { } public void writeToBuffer(List lines) { - List> data = lines.stream() - .map(this::convertToMap) - .map(d -> (Map) d.get(ITEM_KEY)) - .collect(Collectors.toList()); - List> events = data.stream().map(this::convertToEvent).collect(Collectors.toList()); + int eventCount = 0; + for (String line : lines) { + Map data = (Map) convertToMap(line).get(ITEM_KEY); + try { + addToBuffer(data); + eventCount++; + } catch (Exception e) { + // will this cause too many logs? + LOG.error("Failed to add event to buffer due to {}", e.getMessage()); + } + } try { - writeEventsToBuffer(events); - exportRecordSuccessCounter.increment(events.size()); + flushBuffer(); + exportRecordSuccessCounter.increment(eventCount); } catch (Exception e) { - LOG.error("Failed to write {} events to buffer due to {}", events.size(), e.getMessage()); - exportRecordErrorCounter.increment(events.size()); + LOG.error("Failed to write {} events to buffer due to {}", eventCount, e.getMessage()); + exportRecordErrorCounter.increment(eventCount); } } - } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java index 0286627ba6..204ec82e02 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java @@ -6,15 +6,15 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.converter; public class MetadataKeyAttributes { - static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key"; + static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key"; - static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key"; + static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key"; - static final String SORT_KEY_METADATA_ATTRIBUTE = "sort_key"; + static final String SORT_KEY_METADATA_ATTRIBUTE = "sort_key"; - static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "ts"; + static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "ts"; - static final String STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "op"; + static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "op"; - static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name"; + static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name"; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index cedf7fb0f1..d5c5a059a8 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.converter; -import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -13,16 +13,14 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; -import java.time.Instant; -import java.util.List; import java.util.Map; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; /** * Base Record Processor definition. @@ -33,68 +31,72 @@ public abstract class RecordConverter { private static final String DEFAULT_ACTION = OpenSearchBulkActions.INDEX.toString(); - private static final int DEFAULT_WRITE_TIMEOUT_MILLIS = 60_000; - private final Buffer> buffer; + private final BufferAccumulator> bufferAccumulator; private final TableInfo tableInfo; - public RecordConverter(Buffer> buffer, TableInfo tableInfo) { - this.buffer = buffer; + public RecordConverter(final BufferAccumulator> bufferAccumulator, TableInfo tableInfo) { + this.bufferAccumulator = bufferAccumulator; this.tableInfo = tableInfo; } - abstract String getEventType(); /** - * Default method to conduct the document ID value, - * Using partition key plus sort key (if any) + * Extract the value based on attribute map + * + * @param data A map of attribute name and value + * @param attributeName Attribute name + * @return the related attribute value, return null if the attribute name doesn't exist. */ - String getId(Map data) { - String partitionKey = String.valueOf(data.get(tableInfo.getMetadata().getPartitionKeyAttributeName())); - if (tableInfo.getMetadata().getSortKeyAttributeName() == null) { - return partitionKey; + private String getAttributeValue(final Map data, String attributeName) { + if (data.containsKey(attributeName)) { + return String.valueOf(data.get(attributeName)); } - String sortKey = String.valueOf(data.get(tableInfo.getMetadata().getSortKeyAttributeName())); - return partitionKey + "_" + sortKey; + return null; } - String getPartitionKey(final Map data) { - return String.valueOf(data.get(tableInfo.getMetadata().getPartitionKeyAttributeName())); + void flushBuffer() throws Exception { + bufferAccumulator.flush(); } - String getSortKey(final Map data) { - return String.valueOf(data.get(tableInfo.getMetadata().getSortKeyAttributeName())); - } - - void writeEventsToBuffer(List> events) throws Exception { - buffer.writeAll(events, DEFAULT_WRITE_TIMEOUT_MILLIS); - } - - public Record convertToEvent(Map data, Instant eventCreationTime, String streamEventName) { - Event event; - event = JacksonEvent.builder() + /** + * Add event record to buffer + * + * @param data A map to hold event data, note that it may be empty. + * @param keys A map to hold the keys (partition key and sort key) + * @param eventCreationTimeMillis Creation timestamp of the event + * @param eventName Event name + * @throws Exception Exception if failed to write to buffer. + */ + public void addToBuffer(Map data, Map keys, long eventCreationTimeMillis, String eventName) throws Exception { + Event event = JacksonEvent.builder() .withEventType(getEventType()) .withData(data) .build(); EventMetadata eventMetadata = event.getMetadata(); eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableInfo.getTableName()); - if (eventCreationTime != null) { - eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTime.toEpochMilli()); + eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTimeMillis); + eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName)); + String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName()); + eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey); + + String sortKey = getAttributeValue(keys, tableInfo.getMetadata().getSortKeyAttributeName()); + if (sortKey != null) { + eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, sortKey); + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey + "_" + sortKey); + } else { + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey); } - - eventMetadata.setAttribute(STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(streamEventName)); - eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, getId(data)); - eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, getPartitionKey(data)); - eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, getSortKey(data)); - - return new Record<>(event); + bufferAccumulator.add(new Record<>(event)); } - public Record convertToEvent(Map data) { - return convertToEvent(data, null, null); + public void addToBuffer(Map data) throws Exception { + // Export data doesn't have an event timestamp + // Default to current timestamp when the event is added to buffer + addToBuffer(data, data, System.currentTimeMillis(), null); } private String mapStreamEventNameToBulkAction(final String streamEventName) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index bdfe6dcbe1..4b0bba0a4a 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -9,18 +9,19 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.Record; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class StreamRecordConverter extends RecordConverter { private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class); @@ -39,8 +40,8 @@ public class StreamRecordConverter extends RecordConverter { private final Counter changeEventSuccessCounter; private final Counter changeEventErrorCounter; - public StreamRecordConverter(Buffer> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) { - super(buffer, tableInfo); + public StreamRecordConverter(final BufferAccumulator> bufferAccumulator, TableInfo tableInfo, PluginMetrics pluginMetrics) { + super(bufferAccumulator, tableInfo); this.pluginMetrics = pluginMetrics; this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); @@ -51,30 +52,67 @@ String getEventType() { return "STREAM"; } - public void writeToBuffer(List records) { - // TODO: What if convert failed. - List> events = records.stream() - .map(record -> convertToEvent( - toMap(EnhancedDocument.fromAttributeValueMap(record.dynamodb().newImage()).toJson()), - record.dynamodb().approximateCreationDateTime(), - record.eventNameAsString())) - .collect(Collectors.toList()); + + public void writeToBuffer(List records) { + + int eventCount = 0; + for (Record record : records) { + // NewImage may be empty + Map data = convertData(record.dynamodb().newImage()); + // Always get keys from dynamodb().keys() + Map keys = convertKeys(record.dynamodb().keys()); + + try { + addToBuffer(data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), record.eventNameAsString()); + eventCount++; + } catch (Exception e) { + // will this cause too many logs? + LOG.error("Failed to add event to buffer due to {}", e.getMessage()); + changeEventErrorCounter.increment(); + } + } try { - writeEventsToBuffer(events); - changeEventSuccessCounter.increment(events.size()); + flushBuffer(); + changeEventSuccessCounter.increment(eventCount); } catch (Exception e) { - LOG.error("Failed to write {} events to buffer due to {}", events.size(), e.getMessage()); - changeEventErrorCounter.increment(events.size()); + LOG.error("Failed to write {} events to buffer due to {}", eventCount, e.getMessage()); + changeEventErrorCounter.increment(eventCount); } } - private Map toMap(String jsonData) { + /** + * Convert the DynamoDB attribute map to a normal map for data + */ + private Map convertData(Map data) { try { + String jsonData = EnhancedDocument.fromAttributeValueMap(data).toJson(); return MAPPER.readValue(jsonData, MAP_TYPE_REFERENCE); } catch (JsonProcessingException e) { return null; } } + + /** + * Convert the DynamoDB attribute map to a normal map for keys + * This method may not be necessary, can use convertData() alternatively + */ + private Map convertKeys(Map keys) { + Map result = new HashMap<>(); + // The attribute type for key can only be N, B or S + keys.forEach(((attributeName, attributeValue) -> { + if (attributeValue.type() == AttributeValue.Type.N) { + // N for number + result.put(attributeName, attributeValue.n()); + } else if (attributeValue.type() == AttributeValue.Type.B) { + // B for Binary + result.put(attributeName, attributeValue.b().toString()); + } else { + result.put(attributeName, attributeValue.s()); + } + })); + return result; + + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java index 75dccf8218..9e2378e3df 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; @@ -15,34 +16,37 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import software.amazon.awssdk.services.s3.S3Client; +import java.time.Duration; + /** * Factory class for DataFileLoader thread. */ public class DataFileLoaderFactory { + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; private final EnhancedSourceCoordinator coordinator; - private final S3ObjectReader fileReader; - + private final S3ObjectReader objectReader; private final PluginMetrics pluginMetrics; - private final Buffer> buffer; - public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3Client, PluginMetrics pluginMetrics, Buffer> buffer) { + public DataFileLoaderFactory(EnhancedSourceCoordinator coordinator, S3Client s3Client, PluginMetrics pluginMetrics, final Buffer> buffer) { this.coordinator = coordinator; this.pluginMetrics = pluginMetrics; this.buffer = buffer; - fileReader = new S3ObjectReader(s3Client); + objectReader = new S3ObjectReader(s3Client); } public Runnable createDataFileLoader(DataFilePartition dataFilePartition, TableInfo tableInfo) { - ExportRecordConverter recordProcessor = new ExportRecordConverter(buffer, tableInfo, pluginMetrics); + final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + ExportRecordConverter recordProcessor = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); DataFileCheckpointer checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition); // Start a data loader thread. DataFileLoader loader = DataFileLoader.builder() - .s3ObjectReader(fileReader) + .s3ObjectReader(objectReader) .bucketName(dataFilePartition.getBucket()) .key(dataFilePartition.getKey()) .recordConverter(recordProcessor) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 6c56323cca..8c1979e320 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -35,7 +35,7 @@ public class DataFileScheduler implements Runnable { /** * Maximum concurrent data loader per node */ - private static final int MAX_JOB_COUNT = 4; + private static final int MAX_JOB_COUNT = 3; /** * Default interval to acquire a lease from coordination store @@ -85,7 +85,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { @Override public void run() { - LOG.info("Start running Data File Scheduler"); + LOG.debug("Start running Data File Scheduler"); while (!Thread.currentThread().isInterrupted()) { if (numOfWorkers.get() < MAX_JOB_COUNT) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index 1c35f882ad..ba311456b1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; @@ -20,6 +21,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; +import java.time.Duration; import java.time.Instant; import java.util.Optional; @@ -31,17 +33,21 @@ public class ShardConsumerFactory { private static final int STREAM_TO_TABLE_OFFSET = "stream/".length(); + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + private final DynamoDbStreamsClient streamsClient; private final EnhancedSourceCoordinator enhancedSourceCoordinator; private final PluginMetrics pluginMetrics; private final ShardManager shardManager; - private final Buffer> buffer; + public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordinator, final DynamoDbStreamsClient streamsClient, final PluginMetrics pluginMetrics, - final ShardManager shardManager, final Buffer> buffer) { + final ShardManager shardManager, + final Buffer> buffer) { this.streamsClient = streamsClient; this.enhancedSourceCoordinator = enhancedSourceCoordinator; this.pluginMetrics = pluginMetrics; @@ -77,7 +83,9 @@ public Runnable createConsumer(StreamPartition streamPartition) { StreamCheckpointer checkpointer = new StreamCheckpointer(enhancedSourceCoordinator, streamPartition); String tableArn = getTableArn(streamPartition.getStreamArn()); TableInfo tableInfo = getTableInfo(tableArn); - StreamRecordConverter recordConverter = new StreamRecordConverter(buffer, tableInfo, pluginMetrics); + final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + + StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient) .recordConverter(recordConverter) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java index 1e342178de..32ae8b573b 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java @@ -38,12 +38,18 @@ public ShardManager(DynamoDbStreamsClient streamsClient) { this.streamsClient = streamsClient; } - private List listShards(String streamArn) { + /** + * List all shards using describe stream API. + * + * @param streamArn Stream Arn + * @param lastEvaluatedShardId Start shard id for listing, useful when trying to get child shards. If not provided, all shards will be returned. + * @return A list of {@link Shard} + */ + private List listShards(String streamArn, String lastEvaluatedShardId) { LOG.info("Start getting all shards from {}", streamArn); long startTime = System.currentTimeMillis(); // Get all the shard IDs from the stream. List shards = new ArrayList<>(); - String lastEvaluatedShardId = null; do { DescribeStreamRequest req = DescribeStreamRequest.builder() .streamArn(streamArn) @@ -70,12 +76,12 @@ private List listShards(String streamArn) { * Get a list of Child Shard Ids based on a parent shard id provided. * * @param streamArn Stream Arn - * @param shardId Parent Shard Id - * @return A list of child shard Ids. + * @param shardId Parent Shard id + * @return A list of child shard ids. */ public List getChildShardIds(String streamArn, String shardId) { LOG.debug("Getting child ids for " + shardId); - List shards = listShards(streamArn); + List shards = listShards(streamArn, shardId); return shards.stream() .filter(s -> shardId.equals(s.parentShardId())) .map(s -> s.shardId()) @@ -90,7 +96,7 @@ public List getChildShardIds(String streamArn, String shardId) { * @return A list of shard Ids */ public List getActiveShards(String streamArn) { - List shards = listShards(streamArn); + List shards = listShards(streamArn, null); return shards.stream() .filter(s -> s.sequenceNumberRange().endingSequenceNumber() == null) .map(s -> s.shardId()) @@ -155,7 +161,7 @@ public String getShardIterator(String streamArn, String shardId, String sequence * @return A list of root shard Ids */ public List getRootShardIds(String streamArn) { - List shards = listShards(streamArn); + List shards = listShards(streamArn, null); List childIds = shards.stream().map(shard -> shard.shardId()).collect(Collectors.toList()); List rootIds = shards.stream() diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java index 838bb9f0ab..e79d231cb3 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java @@ -12,29 +12,37 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Random; import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE; @ExtendWith(MockitoExtension.class) class ExportRecordConverterTest { @@ -43,7 +51,7 @@ class ExportRecordConverterTest { private PluginMetrics pluginMetrics; @Mock - private Buffer> buffer; + private BufferAccumulator> bufferAccumulator; private TableInfo tableInfo; @@ -97,16 +105,40 @@ void test_writeToBuffer() throws Exception { int numberOfRecords = random.nextInt(10); List data = generateData(numberOfRecords); - ExportRecordConverter recordConverter = new ExportRecordConverter(buffer, tableInfo, pluginMetrics); + ExportRecordConverter recordConverter = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); - final ArgumentCaptor>> writeRequestArgumentCaptor = ArgumentCaptor.forClass(Collection.class); - doNothing().when(buffer).writeAll(writeRequestArgumentCaptor.capture(), anyInt()); recordConverter.writeToBuffer(data); - - assertThat(writeRequestArgumentCaptor.getValue().size(), equalTo(numberOfRecords)); + verify(bufferAccumulator, times(numberOfRecords)).add(any(Record.class)); verify(exportRecordSuccess).increment(anyDouble()); verifyNoInteractions(exportRecordErrors); } + + @Test + void test_writeSingleRecordToBuffer() throws Exception { + final String pk = UUID.randomUUID().toString(); + final String sk = UUID.randomUUID().toString(); + String line = " $ion_1_0 {Item:{PK:\"" + pk + "\",SK:\"" + sk + "\"}}"; + + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + + ExportRecordConverter recordConverter = new ExportRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); +// doNothing().when(bufferAccumulator).flush(); + + recordConverter.writeToBuffer(List.of(line)); + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(pk)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sk)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(pk + "_" + sk)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); + assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue()); + } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index 1b9b161ecc..23ac32f6df 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -12,9 +12,11 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; @@ -24,7 +26,6 @@ import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; @@ -32,14 +33,21 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSING_ERROR_COUNT; @ExtendWith(MockitoExtension.class) class StreamRecordConverterTest { @@ -47,7 +55,7 @@ class StreamRecordConverterTest { private PluginMetrics pluginMetrics; @Mock - private Buffer> buffer; + private BufferAccumulator> bufferAccumulator; private TableInfo tableInfo; @@ -90,21 +98,46 @@ void test_writeToBuffer() throws Exception { int numberOfRecords = random.nextInt(10); - List data = buildRecords(numberOfRecords); - - StreamRecordConverter recordConverter = new StreamRecordConverter(buffer, tableInfo, pluginMetrics); + List records = buildRecords(numberOfRecords); - final ArgumentCaptor>> writeRequestArgumentCaptor = ArgumentCaptor.forClass(Collection.class); - doNothing().when(buffer).writeAll(writeRequestArgumentCaptor.capture(), anyInt()); + StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); - - recordConverter.writeToBuffer(data); - assertThat(writeRequestArgumentCaptor.getValue().size(), equalTo(numberOfRecords)); + recordConverter.writeToBuffer(records); + verify(bufferAccumulator, times(numberOfRecords)).add(any(Record.class)); + verify(bufferAccumulator).flush(); verify(changeEventSuccessCounter).increment(anyDouble()); verifyNoInteractions(changeEventErrorCounter); + } + + @Test + void test_writeSingleRecordToBuffer() throws Exception { + + List records = buildRecords(1); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); + StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + recordConverter.writeToBuffer(records); + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + verify(changeEventSuccessCounter).increment(anyDouble()); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + String partitionKey = record.dynamodb().keys().get(partitionKeyAttrName).s(); + String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s(); + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "_" + sortKey)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.CREATE.toString())); + assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + + verifyNoInteractions(changeEventErrorCounter); } private List buildRecords(int count) { @@ -116,6 +149,7 @@ private List buildRecords StreamRecord streamRecord = StreamRecord.builder() .newImage(data) + .keys(data) .sequenceNumber(UUID.randomUUID().toString()) .approximateCreationDateTime(Instant.now()) .build(); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java index d1f7426ad7..061cb91a93 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java @@ -45,7 +45,6 @@ class DataFileLoaderFactoryTest { @Mock private Buffer> buffer; - private TableInfo tableInfo; private final String tableName = UUID.randomUUID().toString();