-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Base stage with consuming actor #48
Base stage with consuming actor #48
Conversation
Because of making lots of local testing and aborting tests, I really needed this PR to be merged: #47 . So, I pulled changes from there to my working branch. |
Merged #47 and updated this PR with latest. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor changes thus far, but I know you're still working on things @IgorFedchenko
src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs
Outdated
Show resolved
Hide resolved
CompleteStage(); | ||
break; | ||
default: | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we ignore all messages until the ConsumerActor
is terminated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is what alpakka does.
I think this is mostly because we get to this actor state only after getting onDownstreamFinished
state event, so no more point of using this messages anyway.
src/Akka.Streams.Kafka/Stages/Consumers/Exceptions/StoppingException.cs
Outdated
Show resolved
Hide resolved
src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs
Show resolved
Hide resolved
void AssignedPositions(IImmutableSet<TopicPartition> assignedTopicPartitions, IConsumer<K, V> consumer, TimeSpan positionTimeout); | ||
} | ||
|
||
internal static class CommitRefreshing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this class is used for "commit refresh" feature: storing committed offsets to re-commit them if no new commits were made for some time (set by commit-refresh-interval
setting and can be disabled).
// resume partitions to fetch | ||
IImmutableSet<TopicPartition> partitionsToFetch = _requests.Values.SelectMany(v => v.Topics).ToImmutableHashSet(); | ||
/* | ||
// TODO: Make consumer pause-resume calls work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb This is a most tricky thing left in this actor.
For some reason, if I will uncomment _consumer.Resume()
call the following _consumer.Consume()
will reset consumer's offset to 0, returning the first message in the partition. If we do not use pause-resume stuff, all works fine - at least until we will need to switch between different partitions, which is not the case so far but will be required when actor is shared between different stages.
I have found an issue with confluent driver, but it is marked as resolved for almost a year. I spent some time on it, and decided to get back as a separate issue/PR.
So far, I have commented out all pause-resume lines from implementation, so instead of initially pausing assigned partitions and resuming them on-demand, consumer actor is now consuming all assigned partitions all the time. If used will changing partitions set, this can lead to skipping messages from partitions, that do not currently have request for (such partitions will be just consumed and results will be ignored).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, after fixing CI tests failing I have realized that we do not require pause-resume offset that much: it is possible to reset consumer's position each time we consume message from wrong partition. I updated implementation, so now it basically should work even with different partition sets. Sure, not consuming would be better then consuming and putting offset back, but... That is what I have right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IgorFedchenko Can you create an issue for the Confluent driver if you think it's a bug with their code? Would love to see that cross-referenced somewhere on this repository too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this issue #51 in our repo, will create issue for Confluent driver and reference our issue from there.
I was going to submit this PR for review, but one of the tests failing in CI while running well on my local machine. I will put few more commits to get more informative logs about this error, and make it pass. |
So now it looks pretty much fine to get your review, @Aaronontheweb |
@IgorFedchenko looking at it now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some very minor changes requested and some questions, but otherwise this is looking great.
@@ -98,7 +99,7 @@ public CommittableOffset(IInternalCommitter committer, PartitionOffset offset, s | |||
/// </summary> | |||
public Task Commit() | |||
{ | |||
return _committer.Commit(); | |||
return _committer.Commit(ImmutableList.Create(Offset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually pretty important one.
Before that, in our implementation, _committer
was not using any offsets, and was just calling kafka's consumer Commit()
method, which was committing current/latest offset for consumer.
So basically, if I consumed 3 messages and called await consumedMessages[0].Commit()
using first message with small offset, that would commit latest comsumer's offset (3 in this case).
So now instead of just committing anything we have consumed so far, we directly specify message offset to be committed.
src/Akka.Streams.Kafka/Stages/Consumers/Abstract/BaseSingleSourceLogic.cs
Show resolved
Hide resolved
src/Akka.Streams.Kafka/Stages/Consumers/Exceptions/CommitTimeoutException.cs
Show resolved
Hide resolved
@Aaronontheweb Thanks for review, I have answered questions and added minor changes - and more serious changes like updating signatures of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Nice work. |
This PR is related to issue #36 .
To move on and add different types of stages for consumers, we need to add two more abstractions:
BaseSingleSourceLogic
- this is a new subclass forSingleSourceStageLogic
. WhileSingleSourceStageLogic
still contains some configuration and callback handlers, newBaseSingleSourceLogic
is used to handle messages consuming from consumer actorKafkaConsumerActor
- is an actor, that is used to manage kafka consumer client. Basically, it receives all subscriptions from stages, subscribes to all topics they are interested in, and consumes messages on demand, broadcasting them to requesters once consumed. Error handling is also performed in this actor, to stages now only responsible for handling error messages and handling them according to supervision strategy.I am creating this PR as a job-in-progress draft - right know new implementation is failing all consuming tests. So I will fix it and then move to review.
Also, code changes are pretty big, so will try to put my notes here as a part of PR review to simplify code review.