-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delay reading from the Kafka buffer as long as the circuit breaker is open #4135
Delay reading from the Kafka buffer as long as the circuit breaker is open #4135
Conversation
} else if (pauseConsumePredicate.pauseConsuming()) { | ||
LOG.debug("Pause and skip the next consume from Kafka topic due to an external condition: {}", pauseConsumePredicate); | ||
Thread.sleep(10000); | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka consumer max.poll.interval will expire if we continue to pause for longer duration. This problem looks similar to #4023 where bufferAccumator gets full and we explicitly pause kafka consumer so that max poll does not expire. Not sure if we are taking in the PR (kkondaka). May be we can combine these 2 conditions and have a common solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hshardeesi , This is a good point. I'll take another look at that angle. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hshardeesi , I pushed a change that pauses the Kafka consumer. I ended up calling it in a different location because this pause happens before any poll
calls whereas the other happens while writing to the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch @hshardeesi I misunderstood it as part of shutdown handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
functionally this looks correct except that in other case (#4023) we don't pause consumer immediately but delay it until we are close to expiring kafka max.poll timeout. I feel this pause/resume logic should be abstracted out into separate class, similar to pausePredicate. abstraction can maintain state on when & what partitions to pause/resume etc and the core logic remains at one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hshardeesi , These are all good points. I agree with consolidating the logic to make it consistent. Do we need that in this PR, or can we follow this on?
… predicate is in place. This will allow the Kafka buffer to wait for the circuit breaker to close before reading. Signed-off-by: David Venable <[email protected]>
Signed-off-by: David Venable <[email protected]>
b793536
to
12a636b
Compare
Signed-off-by: David Venable <[email protected]>
consumer.pause(consumer.assignment()); | ||
Thread.sleep(10000); | ||
continue; | ||
} else if(paused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this "else" required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it is strictly required. But, it avoids the extra call to resume()
if we are not currently paused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable that should not happen, right? We the resume is done, paused should be set to false. Looks like that's missing in your code.
continue; | ||
} else if(paused) { | ||
LOG.debug("Resume consuming from Kafka topic."); | ||
consumer.resume(consumer.assignment()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you missed setting paused = false
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I just pushed a fix.
Signed-off-by: David Venable <[email protected]>
Description
The
kafka
buffer was pausing 10 seconds to wait for the circuit breaker to close. However, it would then consume after that. This change will keep holding until the circuit breaker closes again. This is similar to how inputs into buffers work.Issues Resolved
N/A
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.