diff --git a/build.sbt b/build.sbt index 8c9f7562..62927f63 100644 --- a/build.sbt +++ b/build.sbt @@ -236,7 +236,7 @@ lazy val commonSettings = Seq( compilerOptions, Test / fork := true, libraryDependencies ++= compilerPlugins, - // mimaPreviousArtifacts := Seq(), // TODO + mimaPreviousArtifacts := Set(), // TODO libraryDependencies ++= Seq( "com.disneystreaming" %% "weaver-cats" % Versions.Weaver, "com.disneystreaming" %% "weaver-framework" % Versions.Weaver, diff --git a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala index ed00d6b3..ea91f377 100644 --- a/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala +++ b/connectors/activemq/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala @@ -142,8 +142,8 @@ private[activemq] object taps { } private def subscriberStream[F[_], A](subscriber: SinkQueueWithCancel[A])(implicit F: Async[F]): Stream[F, A] = { - val pull = Async[F].fromFuture(F.delay(subscriber.pull())) val cancel = F.delay(subscriber.cancel()) + val pull = Async[F].fromFutureCancelable(F.delay((subscriber.pull(), cancel))) Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) }