From 293eb6cdb6fccbde581d96ce2c4ce1c59d18c337 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 22 Nov 2023 18:04:06 +0000 Subject: [PATCH] Fix crash in Kafka consumer when negative acknowledments are received Signed-off-by: Krishna Kondaka --- .../plugins/kafka/consumer/KafkaCustomConsumer.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 479303bf7b..099daf754f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -224,14 +224,20 @@ private void addAcknowledgedOffsets(final TopicPartition topicPartition, final R if (Objects.isNull(commitTracker) && errLogRateLimiter.isAllowed(System.currentTimeMillis())) { LOG.error("Commit tracker not found for TopicPartition: {}", topicPartition); + return; } - final OffsetAndMetadata offsetAndMetadata = - partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); + final OffsetAndMetadata offsetAndMetadata = commitTracker.addCompletedOffsets(offsetRange); updateOffsetsToCommit(topicPartition, offsetAndMetadata); } private void resetOffsets() { + // resetting offsets is similar to committing acknowledged offsets. Throttle the frequency of resets by + // checking current time with last commit time. Same "lastCommitTime" and commit interval are used in both cases + long currentTimeMillis = System.currentTimeMillis(); + if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) { + return; + } if (partitionsToReset.size() > 0) { partitionsToReset.forEach(partition -> { try { @@ -244,6 +250,8 @@ private void resetOffsets() { consumer.seek(partition, offsetAndMetadata); } partitionCommitTrackerMap.remove(partition.partition()); + final long epoch = getCurrentTimeNanos(); + ownedPartitionsEpoch.put(partition, epoch); } catch (Exception e) { LOG.error("Failed to seek to last committed offset upon negative acknowledgement {}", partition, e); } @@ -493,7 +501,6 @@ public void onPartitionsAssigned(Collection partitions) { continue; } LOG.info("Assigned partition {}", topicPartition); - partitionCommitTrackerMap.remove(topicPartition.partition()); ownedPartitionsEpoch.put(topicPartition, epoch); } }