From f08c16c38728f68696572ae6ace8177be18320b1 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 8 Nov 2023 14:08:42 -0600 Subject: [PATCH] Fix issue where we start from checkpoint for PIT with acks to instead start from beginning (#3610) Signed-off-by: Taylor Gray --- .../plugins/source/opensearch/worker/PitWorker.java | 3 ++- .../plugins/source/opensearch/worker/WorkerCommonUtils.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 9050b6fd87..bc9f531d95 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -156,7 +156,8 @@ private void processIndex(final SourcePartition op LOG.info("Starting processing for index: '{}'", indexName); Optional openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState(); - if (openSearchIndexProgressStateOptional.isEmpty()) { + // We can't checkpoint acks yet so need to restart from the beginning of index when acks are enabled for now + if (openSearchSourceConfiguration.isAcknowledgmentsEnabled() || openSearchIndexProgressStateOptional.isEmpty()) { openSearchIndexProgressStateOptional = Optional.of(initializeProgressState()); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java index 7c5a132017..5e490f1ff1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java @@ -28,7 +28,7 @@ public class WorkerCommonUtils { static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60); static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000; - static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(1); + static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofMinutes(20); static final Duration STARTING_BACKOFF = Duration.ofMillis(500); static final Duration MAX_BACKOFF = Duration.ofSeconds(60); static final int BACKOFF_RATE = 2;