Skip to content

Commit

Permalink
Fix issue where we start from checkpoint for PIT with acks to instead…
Browse files Browse the repository at this point in the history
… start from beginning (#3610)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 8, 2023
1 parent 3e558c0 commit f08c16c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
LOG.info("Starting processing for index: '{}'", indexName);
Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();

if (openSearchIndexProgressStateOptional.isEmpty()) {
// We can't checkpoint acks yet so need to restart from the beginning of index when acks are enabled for now
if (openSearchSourceConfiguration.isAcknowledgmentsEnabled() || openSearchIndexProgressStateOptional.isEmpty()) {
openSearchIndexProgressStateOptional = Optional.of(initializeProgressState());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class WorkerCommonUtils {
static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60);

static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000;
static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(1);
static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofMinutes(20);
static final Duration STARTING_BACKOFF = Duration.ofMillis(500);
static final Duration MAX_BACKOFF = Duration.ofSeconds(60);
static final int BACKOFF_RATE = 2;
Expand Down

0 comments on commit f08c16c

Please sign in to comment.