diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala index ce6c4495b2..0a334c3729 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala @@ -21,6 +21,7 @@ import org.apache.pekko import pekko.Done import pekko.stream.KillSwitches import pekko.stream.ThrottleMode +import pekko.stream.impl.ActorPublisher import pekko.stream.testkit.StreamSpec import pekko.stream.testkit.TestPublisher import pekko.stream.testkit.TestSubscriber @@ -51,6 +52,16 @@ class HubSpec extends StreamSpec { upstream.expectCancellation() } + "do not throw exceptions when upstream completes normally" in { + EventFilter.error("Upstream producer failed with exception, removing from MergeHub now", + occurrences = 0).intercept { + val (sink, result) = MergeHub.source[Int](16).take(10).toMat(Sink.seq)(Keep.both).run() + Source.failed(ActorPublisher.NormalShutdownReason).runWith(sink) + Source(1 to 10).runWith(sink) + result.futureValue.sorted should ===(1 to 10) + } + } + "notify existing producers if consumer cancels after a few elements" in { val (sink, result) = MergeHub.source[Int](16).take(5).toMat(Sink.seq)(Keep.both).run() val upstream = TestPublisher.probe[Int]() diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala index 94c44005c1..23be24d7f4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala @@ -33,6 +33,7 @@ import pekko.annotation.InternalApi import pekko.dispatch.AbstractNodeQueue import pekko.stream._ import pekko.stream.Attributes.LogLevels +import pekko.stream.impl.ActorPublisher import pekko.stream.stage._ /** @@ -356,7 +357,8 @@ private[pekko] class MergeHub[T](perProducerBufferSize: Int, drainingEnabled: Bo // Make some noise override def onUpstreamFailure(ex: Throwable): Unit = { - throw new MergeHub.ProducerFailed( + if (ex eq ActorPublisher.NormalShutdownReason) completeStage() + else throw new MergeHub.ProducerFailed( "Upstream producer failed with exception, " + "removing from MergeHub now", ex)