-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added option to customize partition events by stage logic subclasses #52
Added option to customize partition events by stage logic subclasses #52
Conversation
During PR #48 review process, I thought that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, but left 1 question
src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@IgorFedchenko FYI, on the previous commit had one test failure: Akka.Streams.Kafka.Tests.Akka.Streams.Kafka.Tests.Integration.PlainSourceIntegrationTests.PlainSource_consumes_messages_from_KafkaProducer_with_subscribe_to_topic You can see the logs and such if you click through here: https://github.com/akkadotnet/Akka.Streams.Kafka/runs/227053848 Might have a race condition of some kind with that test. |
@Aaronontheweb Good point. Created issue for that - looks little bit wierd, so unfortunately can not fix this right now, "on the fly". |
This is a small PR, which adds a way to customize partition events handling by different stage logic classes.
Before this PR partitions events were handled by
SingleSourceStageLogic
without any option to override it. Besides, original kafka driver callbacks also provideconsumer
instance passed to callbacks, so actually handling could use some consumer's methods.Now consumer is passed to this callbacks, and the callbacks itself may be customized by logic subclasses (looks like only for
TransactionalSourceLogic
so far).