Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Handle NormalShutdownReason in MergeHub #1741

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.KillSwitches
import pekko.stream.ThrottleMode
import pekko.stream.impl.ActorPublisher
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.TestPublisher
import pekko.stream.testkit.TestSubscriber
Expand Down Expand Up @@ -51,6 +52,16 @@ class HubSpec extends StreamSpec {
upstream.expectCancellation()
}

"do not throw exceptions when upstream completes normally" in {
EventFilter.error("Upstream producer failed with exception, removing from MergeHub now",
occurrences = 0).intercept {
val (sink, result) = MergeHub.source[Int](16).take(10).toMat(Sink.seq)(Keep.both).run()
Source.failed(ActorPublisher.NormalShutdownReason).runWith(sink)
Source(1 to 10).runWith(sink)
result.futureValue.sorted should ===(1 to 10)
}
}

"notify existing producers if consumer cancels after a few elements" in {
val (sink, result) = MergeHub.source[Int](16).take(5).toMat(Sink.seq)(Keep.both).run()
val upstream = TestPublisher.probe[Int]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import pekko.annotation.InternalApi
import pekko.dispatch.AbstractNodeQueue
import pekko.stream._
import pekko.stream.Attributes.LogLevels
import pekko.stream.impl.ActorPublisher
import pekko.stream.stage._

/**
Expand Down Expand Up @@ -356,7 +357,8 @@ private[pekko] class MergeHub[T](perProducerBufferSize: Int, drainingEnabled: Bo

// Make some noise
override def onUpstreamFailure(ex: Throwable): Unit = {
throw new MergeHub.ProducerFailed(
if (ex eq ActorPublisher.NormalShutdownReason) completeStage()
else throw new MergeHub.ProducerFailed(
"Upstream producer failed with exception, " +
"removing from MergeHub now",
ex)
Expand Down
Loading