diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index eee6d5807e..bd58e9e0d5 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; + /** * A scheduler to manage all the stream related work in one place */ @@ -100,6 +102,8 @@ private void processStreamPartition(StreamPartition streamPartition) { runConsumer.whenComplete((v, ex) -> { numOfWorkers.decrementAndGet(); if (ex != null) { + LOG.error(NOISY, "Received exception while processing shard {}, giving up this shard for reprocessing: {}", + streamPartition.getShardId(), ex); coordinator.giveUpPartition(streamPartition); } if (numOfWorkers.get() == 0) {