Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System.ArgumentException: Unexpected records polled potentially thrown during a rebalance #415

Open
jblackburn21 opened this issue Jan 7, 2025 · 5 comments
Assignees

Comments

@jblackburn21
Copy link

Version Information
Version of Akka.NET? 1.5.33
Which Akka.NET Modules? Akka.Streams.Kafka

Describe the bug
We use the KafkaConsumer.CommittablePartitionedSource running in kubernetes with autoscaling enabled. When Pods are spun up/down, it is possible for the SubSourceLogic to get out of sync with the TopicPartition assignment on the consumer, which results in an ex being thrown and the consumer shutting down. The PartitionsAssignedHandler and PartitionsRevokedHandler handlers are a side effect of _consumer.Consume(), and doesn't appear handle timing properly.

To Reproduce
Steps to reproduce the behavior:

NOTE: I added additional logging to KafkaConsumerActor in order to better capture current start during rebalancing

  1. Run kafka locally using docker

  2. Create a topic with 10 partitions, in this case I'm using a members topic

  3. Run a producer so that messages are generated with a key to produce to all partitions

  4. Start 3 instances of a consumer:

    • Start Consumer 1, all 10 partitions are assigned
    consumer 1 - 10 assignments: members [[0]], members [[1]], members [[2]], members [[3]], members [[4]], members [[5]], members [[6]], members [[7]], members [[8]], members [[9]]
    • Start Consumer 2, partitions are reassigned to the new consumer
    consumer 1 - 5 assignments: members [[0]], members [[1]], members [[2]], members [[3]], members [[4]]
    consumer 2 - 5 assignments: members [[5]], members [[6]], members [[7]], members [[8]], members [[9]]
    • Start Consumer 3, partitions are reassigned to the new consumer
    consumer 1 - 4 assignments: members [[0]], members [[1]], members [[2]], members [[3]]
    consumer 2 - 3 assignments: members [[4]], members [[5]], members [[6]]
    consumer 3 - 3 assignments: members [[7]], members [[8]], members [[9]]
  5. Stop Consumer 1, check logs for consumers 2 and 3 to see if they stopped. It may take multiple restarts of consumer 1 to trigger the error.

Expected behavior
The KafkaConsumerActor properly update the _requests and _requestors on PartitionsRevokedHandler so that it isn't expected to consume messages from partitions that are not longer assigned.

Actual behavior

When a rebalance happens, partitions are revoked and then new partitions are assigned.

First, the next _consumer.Consume() completes with zero records to process and zero assignments since the partitions have been consumed.

This can be seen with these logs from Consumer 2:

[13:53:40 INF] [2cadbea5-c6a8-40cf-ad57-3a757c315b7b] Messages requested from: [akka://KafkaStream/user/members-consumer/StreamSupervisor-1/$$j#1512204890], for: members [[2]]
[13:53:40 INF] [2cadbea5-c6a8-40cf-ad57-3a757c315b7b] Delayed poll when messages requested, periodic: False
[13:53:40 INF] [2cadbea5-c6a8-40cf-ad57-3a757c315b7b] Poll requested, periodic: False
[13:53:40 INF] [2cadbea5-c6a8-40cf-ad57-3a757c315b7b] Starting poll with rebalancing: False, 5 requests: members [[4]], members [[0]], members [[2]], members [[3]], members [[1]], 5 assignments: members [[0]], members [[1]], members [[2]], members [[3]], members [[4]]
[13:53:40 INF] [2cadbea5-c6a8-40cf-ad57-3a757c315b7b] Processing 0 records, 0 assignments:

The PartitionsRevokedHandler is then called, where the IPartitionEventHandler notifies the SubSourceLogic that partitions have been revoked and _rebalanceInProgress is set to true.

[13:53:40 INF] [cdc639ef-c511-47e1-8567-44e19debc586] Partitions revoked: members [[0]], members [[1]], members [[2]], members [[3]], members [[4]]

Based on timing, a Poll message can be sent to KafkaConsumerActor after the revoke and before new partitions are assigned. These logs show that when _consumer.Consume() is called, the assignments are empty, but there are still 3 active _requests from the SubSourceLogic.

[13:53:40 INF] [1ea20efb-b0ad-4df0-bbe1-38abb67f7265] Poll requested, periodic: True
[13:53:40 INF] [1ea20efb-b0ad-4df0-bbe1-38abb67f7265] Starting poll with rebalancing: True, 5 requests: members [[4]], members [[0]], members [[2]], members [[3]], members [[1]], 0 assignments: 

When the _consumer.Consume() completes, it returns records from the newly assigned partitions.

[13:53:40 INF] [1ea20efb-b0ad-4df0-bbe1-38abb67f7265] Processing 5 records, 3 assignments: members [[4]], members [[5]], members [[6]]

Inside ProcessResult() there is a check to validate that the messages that were consumed are from the partitions that were requested. However, due to timing during a rebalance, these can be out of sync and a System.ArgumentException is thrown.

System.ArgumentException: Unexpected records polled. Expected: [members [[0]], members [[1]], members [[2]], members [[3]], members [[4]]], result: [members [[4]], members [[5]]], consumer assignment: [members [[4]], members [[5]], members [[6]]]
   at Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2.ProcessResult(Guid pollCorrelationId, IImmutableSet`1 partitionsToFetch, List`1 rawResult) in /Users/jblackburn/repos/github/akka/Akka.Streams.Kafka/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs:line 658
   at Akka.Streams.Kafka.Stages.Consumers.Actors.KafkaConsumerActor`2.Poll(Guid pollCorrelationId) in /Users/jblackburn/repos/github/akka/Akka.Streams.Kafka/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs:line 588

The exception is handled with ProcessExceptions() and the KafkaConsumerActor is stopped

[13:53:40 INF] [1ea20efb-b0ad-4df0-bbe1-38abb67f7265] Sending failure to [akka://KafkaStream/user/members-consumer/StreamSupervisor-1/$$g#765079443], [akka://KafkaStream/user/members-consumer/StreamSupervisor-1/$$a#1997365679], [akka://KafkaStream/user/members-consumer/StreamSupervisor-1/$$h#1166729880], [akka://KafkaStream/user/members-consumer/StreamSupervisor-1/$$j#1512204890], [akka://KafkaStream/user/members-consumer/StreamSupervisor-1/$$k#1142637371], [akka://KafkaStream/user/members-consumer/StreamSupervisor-1/$$i#1325051480]. Error: Unexpected records polled. Expected: [members [[0]], members [[1]], members [[2]], members [[3]], members [[4]]], result: [members [[4]], members [[5]]], consumer assignment: [members [[4]], members [[5]], members [[6]]]
[13:53:40 ERR] [1ea20efb-b0ad-4df0-bbe1-38abb67f7265] Exception when polling from consumer, KafkaConsumerActor actor: Unexpected records polled. Expected: [members [[0]], members [[1]], members [[2]], members [[3]], members [[4]]], result: [members [[4]], members [[5]]], consumer assignment: [members [[4]], members [[5]], members [[6]]]

Screenshots
N/A

Environment
Dotnet 8 running on Mac, with kafka running in docker desktop

Additional context
Add any other context about the problem here.

@jblackburn21
Copy link
Author

I have added this app to test with, steps are documented here: https://github.com/jblackburn21/akka-streams-kafka-lab/blob/main/docs/akka-streams-kafka-issue-415.md

I have also pushed more detailed logging to this branch on my fork: https://github.com/jblackburn21/Akka.Streams.Kafka/tree/kafka-consumer-logging

@Aaronontheweb
Copy link
Member

Thanks @jblackburn21 - we're going to look into this; we're wondering if there's a relationship with #414, which we're also looking into right now

@Aaronontheweb
Copy link
Member

@jblackburn21 we've been able to reproduce the issue! we'll update this ticket as we go

@Arkatufus
Copy link
Contributor

Arkatufus commented Feb 26, 2025

Here's what has been observed so far:

The Akka.Streams.Kafka original code was ported from alpakka-kafka repository in 2019. As far as we can tell, the event handling are the same.

There are significant difference between Java org.apache.kafka and .NET Confluent.Kafka implementation, .NET Confluent.Kafka is based on the native C++ librdkafka library while org.apache.kafka implements their own low level network I/O.

org.apache.kafka kafka consumer client implementation is tightly coupled with the underlying driver, it implements a batching fetch on the network level for each .consume() call to improve performance. .NET Confluent.Kafka consumer client calls the underlying librdkafka API and is limited to a single message per .Consume() call.

Some speculations:

I'm not sure how org.apache.kafka client handles a partition revoked/lost event from the broker, but in .NET consumer client, it seemed like the event is passed directly by the consumer actor to the partition event handlers.

There is a possibility that the .NET consumer client does not validate its buffer content when a repartition happens, the buffer might retain a message with invalid partition after the partition have been revoked/lost.

@Arkatufus
Copy link
Contributor

Doing research on what the client is doing, it appears that the newest "partition.assignment.strategy" default value is leveraging the new "cooperative strategy" which brings a new behavior change: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#partition-assignment-strategy

In the new default, the "partition.assignment.strategy" is set to [RangeAssignor, CooperativeStickyAssignor] where the first partition assignment behaves like the old behavior but then any subsequent assignment (due to partition rebalance) uses the "cooperative strategy" behavior that is different than what we're using in code right now.

Here is the XML-DOC from the ConsumerBuilder.SetPartitionsAssignedHandler:

Specify a handler that will be called when a new consumer group partition assignment has been received by this consumer.

Following execution of the handler, consumption will resume from the last committed offset for each partition, or if there is no committed offset, in accordance with the auto. offset. reset configuration property.

Kafka supports two rebalance protocols: EAGER (range and roundrobin assignors) and COOPERATIVE (incremental) (cooperative-sticky assignor). Use the PartitionAssignmentStrategy configuration property to specify which assignor to use.

EAGER Rebalancing (range, roundrobin)

Partitions passed to the handler represent the entire set of partitions to consume from. There will be exactly one call to the partitions revoked or partitions lost handler (if they have been set using SetPartitionsRevokedHandler / SetPartitionsLostHandler) corresponding to every call to this handler.

COOPERATIVE (Incremental)

Rebalancing Partitions passed to the handler are an incremental assignment - are in addition to those already being consumed from.

Remarks:
Executes as a side-effect of the Consumer. Consume call (on the same thread).

(Incremental) Assign/ Unassign must not be called in the handler.

Exceptions: Any exception thrown by your partitions assigned handler will be wrapped in a ConsumeException with ErrorCode ErrorCode. Local_Application and thrown by the initiating call to Consume

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants