-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaConsumer should continue to poll while waiting for buffer #4023
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener | |
private final String topicName; | ||
private final TopicConsumerConfig topicConfig; | ||
private MessageFormat schema; | ||
private boolean paused; | ||
private final BufferAccumulator<Record<Event>> bufferAccumulator; | ||
private final Buffer<Record<Event>> buffer; | ||
private static final ObjectMapper objectMapper = new ObjectMapper(); | ||
|
@@ -110,6 +111,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer, | |
this.shutdownInProgress = shutdownInProgress; | ||
this.consumer = consumer; | ||
this.buffer = buffer; | ||
this.paused = false; | ||
this.byteDecoder = byteDecoder; | ||
this.topicMetrics = topicMetrics; | ||
this.pauseConsumePredicate = pauseConsumePredicate; | ||
|
@@ -170,10 +172,15 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, CommitOf | |
return acknowledgementSet; | ||
} | ||
|
||
<T> void consumeRecords() throws Exception { | ||
try { | ||
<T> ConsumerRecords<String, T> doPoll() throws Exception { | ||
ConsumerRecords<String, T> records = | ||
consumer.poll(Duration.ofMillis(topicConfig.getThreadWaitingTime().toMillis()/2)); | ||
return records; | ||
} | ||
|
||
<T> void consumeRecords() throws Exception { | ||
try { | ||
ConsumerRecords<String, T> records = doPoll(); | ||
if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { | ||
Map<TopicPartition, CommitOffsetRange> offsets = new HashMap<>(); | ||
AcknowledgementSet acknowledgementSet = null; | ||
|
@@ -411,29 +418,51 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in | |
return new Record<Event>(event); | ||
} | ||
|
||
private void processRecord(final AcknowledgementSet acknowledgementSet, final Record<Event> record) { | ||
private <T> void processRecord(final AcknowledgementSet acknowledgementSet, final Record<Event> record) { | ||
// Always add record to acknowledgementSet before adding to | ||
// buffer because another thread may take and process | ||
// buffer contents before the event record is added | ||
// to acknowledgement set | ||
if (acknowledgementSet != null) { | ||
acknowledgementSet.add(record.getData()); | ||
} | ||
long numRetries = 0; | ||
final int retrySleepTimeMs = 100; | ||
// Donot pause until half the poll interval time has expired | ||
final long maxRetries = topicConfig.getMaxPollInterval().toMillis() / (2 * retrySleepTimeMs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two variables ( Perhaps make
You can make a static field for the other
|
||
while (true) { | ||
try { | ||
bufferAccumulator.add(record); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens to this record that was caught in exception ? Is this lost ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, the consumer retries in while loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. The record is put in buffer eventually because we are in an infinite loop here. |
||
break; | ||
} catch (Exception e) { | ||
if (!paused && numRetries++ > maxRetries) { | ||
paused = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should kafka consumer be paused for all exception ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In all cases of exception, we are trying for ever to put in the buffer, right? that's why we are pausing in all cases of exceptions. |
||
consumer.pause(consumer.assignment()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very critical logic. We should have a unit test to verify that we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added new test case to test pause/resume |
||
} | ||
if (e instanceof SizeOverflowException) { | ||
topicMetrics.getNumberOfBufferSizeOverflows().increment(); | ||
} else { | ||
LOG.debug("Error while adding record to buffer, retrying ", e); | ||
} | ||
try { | ||
Thread.sleep(100); | ||
Thread.sleep(retrySleepTimeMs); | ||
if (paused) { | ||
ConsumerRecords<String, T> records = doPoll(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code does not really do anything with the value of
|
||
if (records.count() > 0) { | ||
LOG.debug("Unexpected records received while the consumer is paused. Resetting the paritions to retry from last read pointer"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to make it info to show up in logs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like a WARN level to me, but you probably have more context. |
||
synchronized(this) { | ||
partitionsToReset.addAll(consumer.assignment()); | ||
}; | ||
break; | ||
} | ||
} | ||
} catch (Exception ex) {} // ignore the exception because it only means the thread slept for shorter time | ||
} | ||
} | ||
if (paused) { | ||
consumer.resume(consumer.assignment()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there may be an issue here. We may have to call resume on exactly the same partitions that we paused. e.g. lets say consumer initially had 4 partitions that were paused. While paused, 2 got revoked and after reassignment we resumed only 2 assigned partitions. Now same 2 partitions are assigned back, will they remain paused? because we never called resume on them. It'll be good to test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the partitions have moved out of the consumer, I do not think they will be in the paused state. I do not think we can resume the partitions that are currently not owned by a consumer. But will try to test it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tested this scenario. When the partitions are assigned back, they are not paused. I think once the partitions are revoked, the partitions are removed from the consumer's assignment. When they are assigned back, they are assigned as any other new partition. There is no stale state from previous assignment. |
||
paused = false; | ||
} | ||
} | ||
|
||
private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, final AcknowledgementSet acknowledgementSet, | ||
|
@@ -503,6 +532,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | |
LOG.info("Assigned partition {}", topicPartition); | ||
ownedPartitionsEpoch.put(topicPartition, epoch); | ||
} | ||
if (paused) { | ||
consumer.pause(consumer.assignment()); | ||
} | ||
} | ||
dumpTopicPartitionOffsets(partitions); | ||
} | ||
|
@@ -520,6 +552,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | |
ownedPartitionsEpoch.remove(topicPartition); | ||
partitionCommitTrackerMap.remove(topicPartition.partition()); | ||
} | ||
if (paused) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it really required in partition revocation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably not. I thought about it before adding this code. I do not see any down side of doing it. Let me know if you can think of any side effects of this code. |
||
consumer.pause(consumer.assignment()); | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you change the code below to use
?
(see my other comment), this templateT
becomes unnecessary and you can remove it.