Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer.CommittablePartitionedSource sub stream failure #413

Open
Arkatufus opened this issue Jan 3, 2025 · 1 comment
Open

KafkaConsumer.CommittablePartitionedSource sub stream failure #413

Arkatufus opened this issue Jan 3, 2025 · 1 comment

Comments

@Arkatufus
Copy link
Contributor

There are evidence that attributes being passed to the source were not being passed on correctly to the source of the partition sub-stream. A failing sub-stream did not use the same supervisor directive as the parent stream. This was observed when OnDeserializationError was used to log bad messages and the decider was supposed to use Directive.Resume instead of stop.

@jblackburn21
Copy link

Hi, I've been testing this with a small change, and has been working as I'd expect. Current in KafkaConsumerActor<K, V>, the ProcessError is executed even if Directive.Resume is returned from the decider. The ProcessError sends a Status.Failure to all of the sub sources, which fails the stages, and sends a KafkaConsumerActorMetadata.Internal.Stop back to the consumer actor so that everything shuts down.

  private void ProcessExceptions(Exception exception)
  {
      if (exception == null)
          return;

      var directive = _decider(exception);
      ProcessError(exception);
      if (directive == Directive.Resume)
          return;
      
       _pollCancellation?.Cancel();
      if(directive == Directive.Stop && _log.IsErrorEnabled)
          _log.Error(exception, "Exception when polling from consumer, KafkaConsumerActor actor: {0}", exception.Message);
      Context.Stop(Self);
  }

Moving the ProcessError after the directive check is working for my scenario, but I'm not sure if there was another reason to always process the errors.

  private void ProcessExceptions(Exception exception)
  {
      if (exception == null)
          return;

      var directive = _decider(exception);
      
      if (directive == Directive.Resume)
          return;
      
      ProcessError(exception);

       _pollCancellation?.Cancel();
      if(directive == Directive.Stop && _log.IsErrorEnabled)
          _log.Error(exception, "Exception when polling from consumer, KafkaConsumerActor actor: {0}", exception.Message);
      Context.Stop(Self);
  }

I can send over a PR if this logic change is agreed upon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants