Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite loop: kminion doesn't expect empty consumer offset #110

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion minion/offset_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) {
highWaterMark := partition.Offset - 1
consumedOffset := consumedOffsets[partition.Partition]
partitionLag := highWaterMark - consumedOffset
if partitionLag < 0 {
if partitionLag < 0 || consumedOffset == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's not going to work because at the very beginning (before we consumed a message at all) consumedOffset would be 0.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas how to define what is:

  • 0 because we haven't consumed a message at all
  • 0 because there's nothing to consume?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I need to fetch low water marks somehow here. This way the check would be lowWaterMark == highWaterMark

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree I don't think there's a way around it without having access to the low water marks. Because in the case of 100 being low and high water mark we don't expect any messages and would still be done with the message - just as you described in your filed issue.

partitionLag = 0
continue
}

if partitionLag > 0 {
Expand Down