From 1a4a7c3325122c602766f6ab31fd1f3ab7739e3d Mon Sep 17 00:00:00 2001 From: Luke Thomas Date: Fri, 9 Jun 2023 12:39:18 +0100 Subject: [PATCH 1/3] init consumer using rabbit fs2 --- build.sbt | 4 +- .../example/requeue/RequeueConsumer.scala | 108 +++++++++++++++--- 2 files changed, 98 insertions(+), 14 deletions(-) diff --git a/build.sbt b/build.sbt index 47ecd204..2d6b5663 100644 --- a/build.sbt +++ b/build.sbt @@ -192,7 +192,9 @@ lazy val example = project "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, "org.scalatest" %% "scalatest" % scalaTestVersion, "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test", - "com.typesafe" % "config" % typeSafeVersion + "com.typesafe" % "config" % typeSafeVersion, + "dev.profunktor" %% "fs2-rabbit" % "5.0.0", + "dev.profunktor" %% "fs2-rabbit-circe" % "5.0.0" ) ) diff --git a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala index 22919fb7..d8430fcf 100644 --- a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala +++ b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala @@ -1,9 +1,10 @@ package com.itv.bucky.example.requeue +import cats.data.Kleisli import cats.effect.{ExitCode, IO, IOApp, Resource} import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller -import com.itv.bucky.decl._ -import com.itv.bucky._ +import com.itv.bucky.decl.{Declaration, Direct, Queue} +import com.itv.bucky.{AmqpClientConfig, DeliveryUnmarshaller, Payload, PayloadUnmarshaller, RequeueHandler, RoutingKey, defaultPreFetchCount} import com.itv.bucky.consume._ import com.itv.bucky.pattern.requeue._ import com.typesafe.config.{Config, ConfigFactory} @@ -11,24 +12,30 @@ import com.typesafe.scalalogging.StrictLogging import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ - import cats.effect._ import cats.implicits._ +import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig +import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig +import dev.profunktor.fs2rabbit.effects.EnvelopeDecoder +import dev.profunktor.fs2rabbit.interpreter.RabbitClient +import dev.profunktor.fs2rabbit.model +import dev.profunktor.fs2rabbit.model.{AckResult, AmqpEnvelope, DeliveryTag, ExchangeName, ExchangeType, QueueName, RabbitConnection} object RequeueConsumer extends IOApp with StrictLogging { object Declarations { - val queue = Queue(QueueName(s"requeue_string-1")) - val all: Iterable[Declaration] = requeueDeclarations(queue.name, RoutingKey(queue.name.value), Some(ExchangeName(s"${queue.name.value}.dlx")), Direct, retryAfter = 1.second) + val queueName: model.QueueName = model.QueueName(s"requeue_string-1") +// val all: Iterable[Declaration] = +// requeueDeclarations(queue.name, RoutingKey(queue.name.value), Some(ExchangeName(s"${queue.name.value}.dlx")), Direct, retryAfter = 1.second) } - val config: Config = ConfigFactory.load("bucky") + val config: Config = ConfigFactory.load("bucky") val amqpClientConfig: AmqpClientConfig = AmqpClientConfig(config.getString("rmq.host"), 5672, "guest", "guest") val stringToLogRequeueHandler: RequeueHandler[IO, String] = RequeueHandler[IO, String] { message: String => IO.delay { - logger.info(message) + println(message) message match { case "requeue" => Requeue @@ -38,11 +45,86 @@ object RequeueConsumer extends IOApp with StrictLogging { } } - override def run(args: List[String]): IO[ExitCode] = + override def run(args: List[String]): IO[ExitCode] = { + val config: Fs2RabbitConfig = Fs2RabbitConfig( + amqpClientConfig.host, + amqpClientConfig.port, + amqpClientConfig.virtualHost.getOrElse("/"), + 10.seconds, + ssl = false, + Some(amqpClientConfig.username), + Some(amqpClientConfig.password), + requeueOnNack = false, + requeueOnReject = false, + None + ) + (for { - amqpClient <- AmqpClient[IO](amqpClientConfig) - _ <- Resource.eval(amqpClient.declare(Declarations.all)) - _ <- amqpClient.registerRequeueConsumerOf(Declarations.queue.name, - stringToLogRequeueHandler) - } yield ()).use(_ => IO.never *> IO(ExitCode.Success)) + amqpClient <- RabbitClient.default[IO](config).resource + channel <- amqpClient.createConnectionChannel + _ <- Resource.eval(amqpClient.declareQueue(DeclarationQueueConfig.default(Declarations.queueName).copy(arguments = Map("x-dead-letter-exchange" -> s"${Declarations.queueName.value}.dlx")))(channel)) + _ <- Resource.eval(amqpClient.declareExchange(ExchangeName(s"${Declarations.queueName.value}.dlx"), ExchangeType.FanOut)(channel)) + _ <- Resource.eval(amqpClient.declareQueue(DeclarationQueueConfig.default(QueueName(s"${Declarations.queueName.value}.dlq")))(channel)) + _ <- Resource.eval(amqpClient.bindQueue(QueueName(s"${Declarations.queueName.value}.dlq"), ExchangeName(s"${Declarations.queueName.value}.dlx"), model.RoutingKey("*"))(channel)) +// _ <- Resource.eval(amqpClient.declare(Declarations.all)) + _ <- registerRequeueConsumerOf(amqpClient)(Declarations.queueName, stringToLogRequeueHandler) + } yield ()).use(_ => IO.never *> IO(ExitCode.Success)) + } + + def registerRequeueConsumerOf[T](client: RabbitClient[IO])( + queueName: model.QueueName, + handler: RequeueHandler[IO, T], + requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 10, requeueAfter = 3.minutes), + onHandlerException: RequeueConsumeAction = Requeue, + unmarshalFailureAction: RequeueConsumeAction = DeadLetter, + onRequeueExpiryAction: T => IO[ConsumeAction] = (_: T) => IO.pure[ConsumeAction](DeadLetter), + prefetchCount: Int = defaultPreFetchCount + )(implicit unmarshaller: PayloadUnmarshaller[T]): Resource[IO, Unit] = { + + implicit val envelopeDecoder: EnvelopeDecoder[IO, T] = + new Kleisli[IO, AmqpEnvelope[Array[Byte]], T](amqpEnvelope => // TODO could we just use the circe Decoder? + unmarshaller.unmarshal(Payload(amqpEnvelope.payload)) match { + case Left(value) => IO.raiseError(value) + case Right(value) => IO.pure(value) + } + ) + + client.createConnectionChannel.flatMap { implicit channel => + Resource.eval(client.createAckerConsumer[T](queueName)).flatMap { case (acker, consumer) => + consumer + .evalMap(msg => handler(msg.payload).map(msg.deliveryTag -> _)) + .evalMap { + case (tag, Ack) => acker(AckResult.Ack(tag)) + case (tag, DeadLetter) => acker(AckResult.NAck(tag)) + case (tag, Requeue) => ??? + case (tag, RequeueImmediately) => ??? + } + .compile + .drain + .background + .void + } + } + } + + def registerDeliveryRequeueConsumerOf[T]( // TODO decode headers as an option so only need one consumer type? + queueName: model.QueueName, + handler: RequeueHandler[IO, T], + requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 10, requeueAfter = 3.minutes), + onHandlerException: RequeueConsumeAction = Requeue, + unmarshalFailureAction: RequeueConsumeAction = DeadLetter, + onRequeueExpiryAction: T => IO[ConsumeAction] = (_: T) => IO.pure[ConsumeAction](DeadLetter), + prefetchCount: Int = defaultPreFetchCount + )(implicit unmarshaller: DeliveryUnmarshaller[T]): Resource[IO, Unit] = { + implicit val envelopeDecoder: EnvelopeDecoder[IO, T] = + new Kleisli[IO, AmqpEnvelope[Array[Byte]], T](amqpEnvelope => // TODO could we just use the circe Decoder? + unmarshaller.unmarshal(Delivery(Payload(amqpEnvelope.payload), ???, ???, ???)) match { + case Left(value) => IO.raiseError(value) + case Right(value) => IO.pure(value) + } + ) + + ??? + } + } From 85297e72ce352cd44a554dd087c6da118d77cfe3 Mon Sep 17 00:00:00 2001 From: Luke Thomas Date: Fri, 9 Jun 2023 15:25:23 +0100 Subject: [PATCH 2/3] implement amqpClient interface with fs2 rabbit --- .../example/requeue/Fs2RabbitAmqpClient.scala | 127 ++++++++++++++++++ .../example/requeue/RequeueConsumer.scala | 12 +- 2 files changed, 134 insertions(+), 5 deletions(-) create mode 100644 example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala diff --git a/example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala b/example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala new file mode 100644 index 00000000..3ad380c3 --- /dev/null +++ b/example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala @@ -0,0 +1,127 @@ +package com.itv.bucky.example.requeue + +import cats.data.Kleisli +import cats.effect.{IO, Resource} +import cats.implicits._ +import com.itv.bucky +import com.itv.bucky.publish.PublishCommand +import com.itv.bucky.{AmqpClient, Envelope, Handler, Payload, Publisher, consume, decl, publish, ExchangeName => BuckyExchangeName, RoutingKey => BuckyRoutingKey} +import com.itv.bucky.decl.{ExchangeType => BuckyExchangeType} +import dev.profunktor.fs2rabbit.arguments.{SafeArg, SafeArgument} +import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig +import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig} +import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder} +import dev.profunktor.fs2rabbit.interpreter.RabbitClient +import dev.profunktor.fs2rabbit.model +import dev.profunktor.fs2rabbit.model.{AMQPChannel, AMQPConnection, AckResult, AmqpMessage, AmqpProperties, QueueBindingArgs, ExchangeName => Fs2ExchangeName, ExchangeType => Fs2ExchangeType, QueueName => Fs2RabbitQueueName, RoutingKey => Fs2RoutingKey} + +import java.nio.charset.StandardCharsets.UTF_8 +import scala.concurrent.duration.FiniteDuration + +class Fs2RabbitAmqpClient private (config: Fs2RabbitConfig, client: RabbitClient[IO], connection: AMQPConnection, publishChannel: AMQPChannel) + extends AmqpClient[IO] { + + implicit val arrayByteEncoder: MessageEncoder[IO, Array[Byte]] = + Kleisli { bytes => + IO.pure(AmqpMessage(bytes, AmqpProperties.empty.copy(contentEncoding = Some(UTF_8.name())))) + } + + implicit val arrayByteDecoder: EnvelopeDecoder[IO, Array[Byte]] = + Kleisli { amqpEnvelope => + IO.pure(amqpEnvelope.payload) + } + + override def declare(declarations: decl.Declaration*): IO[Unit] = declare(declarations.toIterable) + + override def declare(declarations: Iterable[decl.Declaration]): IO[Unit] = { + + def argumentsFromAnyRef(arguments: Map[String, AnyRef]): Map[String, SafeArg] = { + arguments.mapValues { + case arg: String => arg + case arg: BigDecimal => arg +// case arg: Integer => Int.box(arg) // TODO are these needed? +// case arg: Long => Long.box(arg) +// case arg: Double => Double.box(arg) +// case arg: Float => Float.box(arg) +// case arg: Short => Short.box(arg) +// case arg: Boolean => Boolean.box(arg) +// case arg: Byte => Byte.box(arg) + case arg: java.util.Date => arg + case t => throw new IllegalArgumentException(s"Unsupported type for rabbit arguments $t") + } + } + + def exchangeTypeToFs2ExchangeType(exchangeType: BuckyExchangeType): Fs2ExchangeType = { + exchangeType match { + case decl.Direct => Fs2ExchangeType.Direct + case decl.Topic => Fs2ExchangeType.Topic + case decl.Headers => Fs2ExchangeType.Headers + case decl.Fanout => Fs2ExchangeType.FanOut + } + } + + implicit val channel: AMQPChannel = publishChannel + declarations.toList.traverse_{ + case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) => + client.declareExchange(DeclarationExchangeConfig.default(Fs2ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType)).copy(arguments = argumentsFromAnyRef(arguments))) *> + declare(bindings) + case decl.Binding(exchangeName, queueName, routingKey, arguments) => + client.bindQueue(Fs2RabbitQueueName(queueName.value), Fs2ExchangeName(exchangeName.value), Fs2RoutingKey(routingKey.value), QueueBindingArgs(argumentsFromAnyRef(arguments))) + case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) => ??? + case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) => + client.declareQueue(DeclarationQueueConfig.default(Fs2RabbitQueueName(name.value)).copy(arguments = argumentsFromAnyRef(arguments))) + } + + } + + override def publisher(): Publisher[IO, publish.PublishCommand] = (publishCommand: PublishCommand) => { + implicit val channel: AMQPChannel = publishChannel + client + .createPublisher[Array[Byte]](Fs2ExchangeName(publishCommand.exchange.value), Fs2RoutingKey(publishCommand.routingKey.value)) + .flatMap(f => f(publishCommand.body.value)) + } + + override def registerConsumer( + queueName: bucky.QueueName, + handler: Handler[IO, consume.Delivery], + exceptionalAction: consume.ConsumeAction, + prefetchCount: Int, + shutdownTimeout: FiniteDuration, + shutdownRetry: FiniteDuration + ): Resource[IO, Unit] = + client.createChannel(connection).flatMap { implicit channel => + Resource.eval(client.createAckerConsumer[Array[Byte]](Fs2RabbitQueueName(queueName.value))).flatMap { case (acker, consumer) => + consumer + .evalMap(msg => handler(amqpEnvelopeToDelivery(queueName, msg)).map(_ -> msg.deliveryTag)) + .evalMap { + case (consume.Ack, tag) => acker(AckResult.Ack(tag)) + case (consume.DeadLetter, tag) => acker(AckResult.NAck(tag)) + case (consume.RequeueImmediately, tag) => ??? // TODO + } + .compile + .drain + .background + .void + } + } + + private def amqpEnvelopeToDelivery(queueName: bucky.QueueName, msg: model.AmqpEnvelope[Array[Byte]]) = + consume.Delivery( + Payload(msg.payload), + consume.ConsumerTag.create(queueName), + Envelope(msg.deliveryTag.value, msg.redelivered, BuckyExchangeName(msg.exchangeName.value), BuckyRoutingKey(msg.routingKey.value)), + publish.MessageProperties.basic // TODO convert from fs2rabbit MessageProperties + ) + + override def isConnectionOpen: IO[Boolean] = IO(connection.value.isOpen) + +} + +object Fs2RabbitAmqpClient { + def apply(config: Fs2RabbitConfig): Resource[IO, Fs2RabbitAmqpClient] = + for { + client <- RabbitClient.default[IO](config).resource + connection <- client.createConnection + publishChannel <- client.createConnectionChannel + } yield new Fs2RabbitAmqpClient(config, client, connection, publishChannel) +} diff --git a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala index d8430fcf..4060cc79 100644 --- a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala +++ b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala @@ -19,7 +19,8 @@ import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig import dev.profunktor.fs2rabbit.effects.EnvelopeDecoder import dev.profunktor.fs2rabbit.interpreter.RabbitClient import dev.profunktor.fs2rabbit.model -import dev.profunktor.fs2rabbit.model.{AckResult, AmqpEnvelope, DeliveryTag, ExchangeName, ExchangeType, QueueName, RabbitConnection} +import dev.profunktor.fs2rabbit.model.{AMQPConnection, AckResult, AmqpEnvelope, AmqpMessage, DeliveryTag, ExchangeName, ExchangeType, QueueName, RabbitConnection} +import fs2.{Pipe, Pure} object RequeueConsumer extends IOApp with StrictLogging { @@ -61,17 +62,18 @@ object RequeueConsumer extends IOApp with StrictLogging { (for { amqpClient <- RabbitClient.default[IO](config).resource - channel <- amqpClient.createConnectionChannel + connection <- amqpClient.createConnection + channel <- amqpClient.createChannel(connection) _ <- Resource.eval(amqpClient.declareQueue(DeclarationQueueConfig.default(Declarations.queueName).copy(arguments = Map("x-dead-letter-exchange" -> s"${Declarations.queueName.value}.dlx")))(channel)) _ <- Resource.eval(amqpClient.declareExchange(ExchangeName(s"${Declarations.queueName.value}.dlx"), ExchangeType.FanOut)(channel)) _ <- Resource.eval(amqpClient.declareQueue(DeclarationQueueConfig.default(QueueName(s"${Declarations.queueName.value}.dlq")))(channel)) _ <- Resource.eval(amqpClient.bindQueue(QueueName(s"${Declarations.queueName.value}.dlq"), ExchangeName(s"${Declarations.queueName.value}.dlx"), model.RoutingKey("*"))(channel)) // _ <- Resource.eval(amqpClient.declare(Declarations.all)) - _ <- registerRequeueConsumerOf(amqpClient)(Declarations.queueName, stringToLogRequeueHandler) + _ <- registerRequeueConsumerOf(amqpClient, connection)(Declarations.queueName, stringToLogRequeueHandler) } yield ()).use(_ => IO.never *> IO(ExitCode.Success)) } - def registerRequeueConsumerOf[T](client: RabbitClient[IO])( + def registerRequeueConsumerOf[T](client: RabbitClient[IO], connection: AMQPConnection)( queueName: model.QueueName, handler: RequeueHandler[IO, T], requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 10, requeueAfter = 3.minutes), @@ -89,7 +91,7 @@ object RequeueConsumer extends IOApp with StrictLogging { } ) - client.createConnectionChannel.flatMap { implicit channel => + client.createChannel(connection).flatMap { implicit channel => Resource.eval(client.createAckerConsumer[T](queueName)).flatMap { case (acker, consumer) => consumer .evalMap(msg => handler(msg.payload).map(msg.deliveryTag -> _)) From cecf40d549234e64a1659b8c9388b100d39433ad Mon Sep 17 00:00:00 2001 From: Luke Thomas Date: Fri, 9 Jun 2023 16:59:18 +0100 Subject: [PATCH 3/3] fix requeue --- build.sbt | 1 + core/src/test/resources/logback.xml | 4 +- example/src/main/resources/logback.xml | 4 +- .../example/requeue/Fs2RabbitAmqpClient.scala | 167 +++++++++++++----- .../example/requeue/RequeueConsumer.scala | 118 +++---------- kamon/src/test/resources/logback.xml | 4 +- test/src/it/resources/logback.xml | 4 +- test/src/test/resources/logback.xml | 4 +- 8 files changed, 152 insertions(+), 154 deletions(-) diff --git a/build.sbt b/build.sbt index 2d6b5663..e8d4e938 100644 --- a/build.sbt +++ b/build.sbt @@ -193,6 +193,7 @@ lazy val example = project "org.scalatest" %% "scalatest" % scalaTestVersion, "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test", "com.typesafe" % "config" % typeSafeVersion, + "ch.qos.logback" % "logback-classic" % logbackVersion, "dev.profunktor" %% "fs2-rabbit" % "5.0.0", "dev.profunktor" %% "fs2-rabbit-circe" % "5.0.0" ) diff --git a/core/src/test/resources/logback.xml b/core/src/test/resources/logback.xml index 2c3d386a..8837a709 100644 --- a/core/src/test/resources/logback.xml +++ b/core/src/test/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n diff --git a/example/src/main/resources/logback.xml b/example/src/main/resources/logback.xml index 29df8fb5..6a75a7bc 100644 --- a/example/src/main/resources/logback.xml +++ b/example/src/main/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n diff --git a/example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala b/example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala index 3ad380c3..e2f1e3df 100644 --- a/example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala +++ b/example/src/main/scala/com/itv/bucky/example/requeue/Fs2RabbitAmqpClient.scala @@ -4,69 +4,144 @@ import cats.data.Kleisli import cats.effect.{IO, Resource} import cats.implicits._ import com.itv.bucky -import com.itv.bucky.publish.PublishCommand -import com.itv.bucky.{AmqpClient, Envelope, Handler, Payload, Publisher, consume, decl, publish, ExchangeName => BuckyExchangeName, RoutingKey => BuckyRoutingKey} import com.itv.bucky.decl.{ExchangeType => BuckyExchangeType} -import dev.profunktor.fs2rabbit.arguments.{SafeArg, SafeArgument} +import com.itv.bucky.publish.PublishCommand +import com.itv.bucky.{ + AmqpClient, + Envelope, + Handler, + Payload, + Publisher, + consume, + decl, + publish, + ExchangeName => BuckyExchangeName, + QueueName => BuckyQueueName, + RoutingKey => BuckyRoutingKey +} +import com.rabbitmq.client.LongString +import dev.profunktor.fs2rabbit.arguments.SafeArg import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig} import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder} import dev.profunktor.fs2rabbit.interpreter.RabbitClient -import dev.profunktor.fs2rabbit.model -import dev.profunktor.fs2rabbit.model.{AMQPChannel, AMQPConnection, AckResult, AmqpMessage, AmqpProperties, QueueBindingArgs, ExchangeName => Fs2ExchangeName, ExchangeType => Fs2ExchangeType, QueueName => Fs2RabbitQueueName, RoutingKey => Fs2RoutingKey} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue._ +import dev.profunktor.fs2rabbit.model.{ + AMQPChannel, + AMQPConnection, + AckResult, + AmqpFieldValue, + AmqpMessage, + AmqpProperties, + QueueBindingArgs, + ExchangeName => Fs2ExchangeName, + ExchangeType => Fs2ExchangeType, + QueueName => Fs2RabbitQueueName, + RoutingKey => Fs2RoutingKey +} +import scodec.bits.ByteVector import java.nio.charset.StandardCharsets.UTF_8 import scala.concurrent.duration.FiniteDuration -class Fs2RabbitAmqpClient private (config: Fs2RabbitConfig, client: RabbitClient[IO], connection: AMQPConnection, publishChannel: AMQPChannel) - extends AmqpClient[IO] { +class Fs2RabbitAmqpClient private (client: RabbitClient[IO], connection: AMQPConnection, publishChannel: AMQPChannel) extends AmqpClient[IO] { + + implicit val deliveryEncoder: MessageEncoder[IO, PublishCommand] = + Kleisli { publishCommand => + val fs2MessageHeaders: Map[String, AmqpFieldValue] = publishCommand.basicProperties.headers.mapValues { + case bd: java.math.BigDecimal => DecimalVal.unsafeFrom(bd) + case ts: java.time.Instant => TimestampVal.from(ts) + case d: java.util.Date => TimestampVal.from(d) +// case t: java.util.Map[String@unchecked, AnyRef@unchecked] => +// TableVal(t.asScala.toMap.map { case (key, v) => ShortString.unsafeFrom(key) -> unsafeFrom(v) }) + case byte: java.lang.Byte => ByteVal(byte) + case double: java.lang.Double => DoubleVal(double) + case float: java.lang.Float => FloatVal(float) + case short: java.lang.Short => ShortVal(short) + case byteArray: Array[Byte] => ByteArrayVal(ByteVector(byteArray)) + case b: java.lang.Boolean => BooleanVal(b) + case i: java.lang.Integer => IntVal(i) + case l: java.lang.Long => LongVal(l) + case s: java.lang.String => StringVal(s) + case ls: LongString => StringVal(ls.toString) +// case a: java.util.List[AnyRef@unchecked] => ArrayVal(a.asScala.toVector.map(unsafeFrom)) + case _ => NullVal + } + + val message = AmqpMessage(publishCommand.body.value, AmqpProperties.empty.copy( + contentEncoding = Some(UTF_8.name()), + headers = fs2MessageHeaders, + expiration = publishCommand.basicProperties.expiration + )) - implicit val arrayByteEncoder: MessageEncoder[IO, Array[Byte]] = - Kleisli { bytes => - IO.pure(AmqpMessage(bytes, AmqpProperties.empty.copy(contentEncoding = Some(UTF_8.name())))) + IO.println(message).as(message) } - implicit val arrayByteDecoder: EnvelopeDecoder[IO, Array[Byte]] = + def deliveryDecoder(queueName: BuckyQueueName): EnvelopeDecoder[IO, consume.Delivery] = Kleisli { amqpEnvelope => - IO.pure(amqpEnvelope.payload) + + val messageProperties = publish.MessageProperties.basic.copy( + headers = amqpEnvelope.properties.headers.mapValues(_.toValueWriterCompatibleJava) + ) + + IO.pure( + consume.Delivery( + Payload(amqpEnvelope.payload), + consume.ConsumerTag.create(queueName), + Envelope( + amqpEnvelope.deliveryTag.value, + amqpEnvelope.redelivered, + BuckyExchangeName(amqpEnvelope.exchangeName.value), + BuckyRoutingKey(amqpEnvelope.routingKey.value) + ), + messageProperties + ) + ) } override def declare(declarations: decl.Declaration*): IO[Unit] = declare(declarations.toIterable) override def declare(declarations: Iterable[decl.Declaration]): IO[Unit] = { - def argumentsFromAnyRef(arguments: Map[String, AnyRef]): Map[String, SafeArg] = { - arguments.mapValues { - case arg: String => arg - case arg: BigDecimal => arg -// case arg: Integer => Int.box(arg) // TODO are these needed? -// case arg: Long => Long.box(arg) -// case arg: Double => Double.box(arg) -// case arg: Float => Float.box(arg) -// case arg: Short => Short.box(arg) -// case arg: Boolean => Boolean.box(arg) -// case arg: Byte => Byte.box(arg) - case arg: java.util.Date => arg - case t => throw new IllegalArgumentException(s"Unsupported type for rabbit arguments $t") - } - } - - def exchangeTypeToFs2ExchangeType(exchangeType: BuckyExchangeType): Fs2ExchangeType = { + def argumentsFromAnyRef(arguments: Map[String, AnyRef]): Map[String, SafeArg] = + arguments.mapValues { + case arg: String => arg + case arg: BigDecimal => arg + case arg: Integer => arg.intValue() + case arg: java.lang.Long => arg.longValue() + case arg: java.lang.Double => arg.doubleValue() + case arg: java.lang.Float => arg.floatValue() + case arg: java.lang.Short => arg.shortValue() + case arg: java.lang.Boolean => arg.booleanValue() + case arg: java.lang.Byte => arg.byteValue() + case arg: java.util.Date => arg + case t => throw new IllegalArgumentException(s"Unsupported type for rabbit arguments $t") + } + + def exchangeTypeToFs2ExchangeType(exchangeType: BuckyExchangeType): Fs2ExchangeType = exchangeType match { - case decl.Direct => Fs2ExchangeType.Direct - case decl.Topic => Fs2ExchangeType.Topic + case decl.Direct => Fs2ExchangeType.Direct + case decl.Topic => Fs2ExchangeType.Topic case decl.Headers => Fs2ExchangeType.Headers - case decl.Fanout => Fs2ExchangeType.FanOut + case decl.Fanout => Fs2ExchangeType.FanOut } - } implicit val channel: AMQPChannel = publishChannel - declarations.toList.traverse_{ + declarations.toList.traverse_ { case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) => - client.declareExchange(DeclarationExchangeConfig.default(Fs2ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType)).copy(arguments = argumentsFromAnyRef(arguments))) *> + client.declareExchange( + DeclarationExchangeConfig + .default(Fs2ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType)) + .copy(arguments = argumentsFromAnyRef(arguments)) + ) *> declare(bindings) case decl.Binding(exchangeName, queueName, routingKey, arguments) => - client.bindQueue(Fs2RabbitQueueName(queueName.value), Fs2ExchangeName(exchangeName.value), Fs2RoutingKey(routingKey.value), QueueBindingArgs(argumentsFromAnyRef(arguments))) + client.bindQueue( + Fs2RabbitQueueName(queueName.value), + Fs2ExchangeName(exchangeName.value), + Fs2RoutingKey(routingKey.value), + QueueBindingArgs(argumentsFromAnyRef(arguments)) + ) case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) => ??? case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) => client.declareQueue(DeclarationQueueConfig.default(Fs2RabbitQueueName(name.value)).copy(arguments = argumentsFromAnyRef(arguments))) @@ -76,9 +151,10 @@ class Fs2RabbitAmqpClient private (config: Fs2RabbitConfig, client: RabbitClient override def publisher(): Publisher[IO, publish.PublishCommand] = (publishCommand: PublishCommand) => { implicit val channel: AMQPChannel = publishChannel + println(publishCommand) client - .createPublisher[Array[Byte]](Fs2ExchangeName(publishCommand.exchange.value), Fs2RoutingKey(publishCommand.routingKey.value)) - .flatMap(f => f(publishCommand.body.value)) + .createPublisher[PublishCommand](Fs2ExchangeName(publishCommand.exchange.value), Fs2RoutingKey(publishCommand.routingKey.value)) + .flatMap(f => f(publishCommand)) } override def registerConsumer( @@ -90,9 +166,10 @@ class Fs2RabbitAmqpClient private (config: Fs2RabbitConfig, client: RabbitClient shutdownRetry: FiniteDuration ): Resource[IO, Unit] = client.createChannel(connection).flatMap { implicit channel => - Resource.eval(client.createAckerConsumer[Array[Byte]](Fs2RabbitQueueName(queueName.value))).flatMap { case (acker, consumer) => + implicit val decoder: EnvelopeDecoder[IO, consume.Delivery] = deliveryDecoder(queueName) + Resource.eval(client.createAckerConsumer[consume.Delivery](Fs2RabbitQueueName(queueName.value))).flatMap { case (acker, consumer) => consumer - .evalMap(msg => handler(amqpEnvelopeToDelivery(queueName, msg)).map(_ -> msg.deliveryTag)) + .evalMap(delivery => handler(delivery.payload).map(_ -> delivery.deliveryTag)) .evalMap { case (consume.Ack, tag) => acker(AckResult.Ack(tag)) case (consume.DeadLetter, tag) => acker(AckResult.NAck(tag)) @@ -105,14 +182,6 @@ class Fs2RabbitAmqpClient private (config: Fs2RabbitConfig, client: RabbitClient } } - private def amqpEnvelopeToDelivery(queueName: bucky.QueueName, msg: model.AmqpEnvelope[Array[Byte]]) = - consume.Delivery( - Payload(msg.payload), - consume.ConsumerTag.create(queueName), - Envelope(msg.deliveryTag.value, msg.redelivered, BuckyExchangeName(msg.exchangeName.value), BuckyRoutingKey(msg.routingKey.value)), - publish.MessageProperties.basic // TODO convert from fs2rabbit MessageProperties - ) - override def isConnectionOpen: IO[Boolean] = IO(connection.value.isOpen) } @@ -123,5 +192,5 @@ object Fs2RabbitAmqpClient { client <- RabbitClient.default[IO](config).resource connection <- client.createConnection publishChannel <- client.createConnectionChannel - } yield new Fs2RabbitAmqpClient(config, client, connection, publishChannel) + } yield new Fs2RabbitAmqpClient(client, connection, publishChannel) } diff --git a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala index 4060cc79..134749ff 100644 --- a/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala +++ b/example/src/main/scala/com/itv/bucky/example/requeue/RequeueConsumer.scala @@ -1,10 +1,9 @@ package com.itv.bucky.example.requeue -import cats.data.Kleisli import cats.effect.{ExitCode, IO, IOApp, Resource} import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller -import com.itv.bucky.decl.{Declaration, Direct, Queue} -import com.itv.bucky.{AmqpClientConfig, DeliveryUnmarshaller, Payload, PayloadUnmarshaller, RequeueHandler, RoutingKey, defaultPreFetchCount} +import com.itv.bucky.decl._ +import com.itv.bucky._ import com.itv.bucky.consume._ import com.itv.bucky.pattern.requeue._ import com.typesafe.config.{Config, ConfigFactory} @@ -15,23 +14,28 @@ import scala.concurrent.duration._ import cats.effect._ import cats.implicits._ import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig -import dev.profunktor.fs2rabbit.config.declaration.DeclarationQueueConfig -import dev.profunktor.fs2rabbit.effects.EnvelopeDecoder -import dev.profunktor.fs2rabbit.interpreter.RabbitClient -import dev.profunktor.fs2rabbit.model -import dev.profunktor.fs2rabbit.model.{AMQPConnection, AckResult, AmqpEnvelope, AmqpMessage, DeliveryTag, ExchangeName, ExchangeType, QueueName, RabbitConnection} -import fs2.{Pipe, Pure} object RequeueConsumer extends IOApp with StrictLogging { object Declarations { - val queueName: model.QueueName = model.QueueName(s"requeue_string-1") -// val all: Iterable[Declaration] = -// requeueDeclarations(queue.name, RoutingKey(queue.name.value), Some(ExchangeName(s"${queue.name.value}.dlx")), Direct, retryAfter = 1.second) + val queue = Queue(QueueName(s"requeue_string-1")) + val all: Iterable[Declaration] = requeueDeclarations(queue.name, RoutingKey(queue.name.value), Some(ExchangeName(s"${queue.name.value}.dlx")), Direct, retryAfter = 1.second) } - val config: Config = ConfigFactory.load("bucky") + val config: Config = ConfigFactory.load("bucky") val amqpClientConfig: AmqpClientConfig = AmqpClientConfig(config.getString("rmq.host"), 5672, "guest", "guest") + val fs2RabbitConfig: Fs2RabbitConfig = Fs2RabbitConfig( + amqpClientConfig.host, + amqpClientConfig.port, + amqpClientConfig.virtualHost.getOrElse("/"), + 10.seconds, + ssl = false, + Some(amqpClientConfig.username), + Some(amqpClientConfig.password), + requeueOnNack = false, + requeueOnReject = false, + None + ) val stringToLogRequeueHandler: RequeueHandler[IO, String] = RequeueHandler[IO, String] { message: String => @@ -46,87 +50,11 @@ object RequeueConsumer extends IOApp with StrictLogging { } } - override def run(args: List[String]): IO[ExitCode] = { - val config: Fs2RabbitConfig = Fs2RabbitConfig( - amqpClientConfig.host, - amqpClientConfig.port, - amqpClientConfig.virtualHost.getOrElse("/"), - 10.seconds, - ssl = false, - Some(amqpClientConfig.username), - Some(amqpClientConfig.password), - requeueOnNack = false, - requeueOnReject = false, - None - ) - + override def run(args: List[String]): IO[ExitCode] = (for { - amqpClient <- RabbitClient.default[IO](config).resource - connection <- amqpClient.createConnection - channel <- amqpClient.createChannel(connection) - _ <- Resource.eval(amqpClient.declareQueue(DeclarationQueueConfig.default(Declarations.queueName).copy(arguments = Map("x-dead-letter-exchange" -> s"${Declarations.queueName.value}.dlx")))(channel)) - _ <- Resource.eval(amqpClient.declareExchange(ExchangeName(s"${Declarations.queueName.value}.dlx"), ExchangeType.FanOut)(channel)) - _ <- Resource.eval(amqpClient.declareQueue(DeclarationQueueConfig.default(QueueName(s"${Declarations.queueName.value}.dlq")))(channel)) - _ <- Resource.eval(amqpClient.bindQueue(QueueName(s"${Declarations.queueName.value}.dlq"), ExchangeName(s"${Declarations.queueName.value}.dlx"), model.RoutingKey("*"))(channel)) -// _ <- Resource.eval(amqpClient.declare(Declarations.all)) - _ <- registerRequeueConsumerOf(amqpClient, connection)(Declarations.queueName, stringToLogRequeueHandler) + amqpClient <- Fs2RabbitAmqpClient(fs2RabbitConfig) + _ <- Resource.eval(amqpClient.declare(Declarations.all)) + _ <- amqpClient.registerRequeueConsumerOf(Declarations.queue.name, + stringToLogRequeueHandler, requeuePolicy = RequeuePolicy(10, 5.seconds)) } yield ()).use(_ => IO.never *> IO(ExitCode.Success)) - } - - def registerRequeueConsumerOf[T](client: RabbitClient[IO], connection: AMQPConnection)( - queueName: model.QueueName, - handler: RequeueHandler[IO, T], - requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 10, requeueAfter = 3.minutes), - onHandlerException: RequeueConsumeAction = Requeue, - unmarshalFailureAction: RequeueConsumeAction = DeadLetter, - onRequeueExpiryAction: T => IO[ConsumeAction] = (_: T) => IO.pure[ConsumeAction](DeadLetter), - prefetchCount: Int = defaultPreFetchCount - )(implicit unmarshaller: PayloadUnmarshaller[T]): Resource[IO, Unit] = { - - implicit val envelopeDecoder: EnvelopeDecoder[IO, T] = - new Kleisli[IO, AmqpEnvelope[Array[Byte]], T](amqpEnvelope => // TODO could we just use the circe Decoder? - unmarshaller.unmarshal(Payload(amqpEnvelope.payload)) match { - case Left(value) => IO.raiseError(value) - case Right(value) => IO.pure(value) - } - ) - - client.createChannel(connection).flatMap { implicit channel => - Resource.eval(client.createAckerConsumer[T](queueName)).flatMap { case (acker, consumer) => - consumer - .evalMap(msg => handler(msg.payload).map(msg.deliveryTag -> _)) - .evalMap { - case (tag, Ack) => acker(AckResult.Ack(tag)) - case (tag, DeadLetter) => acker(AckResult.NAck(tag)) - case (tag, Requeue) => ??? - case (tag, RequeueImmediately) => ??? - } - .compile - .drain - .background - .void - } - } - } - - def registerDeliveryRequeueConsumerOf[T]( // TODO decode headers as an option so only need one consumer type? - queueName: model.QueueName, - handler: RequeueHandler[IO, T], - requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 10, requeueAfter = 3.minutes), - onHandlerException: RequeueConsumeAction = Requeue, - unmarshalFailureAction: RequeueConsumeAction = DeadLetter, - onRequeueExpiryAction: T => IO[ConsumeAction] = (_: T) => IO.pure[ConsumeAction](DeadLetter), - prefetchCount: Int = defaultPreFetchCount - )(implicit unmarshaller: DeliveryUnmarshaller[T]): Resource[IO, Unit] = { - implicit val envelopeDecoder: EnvelopeDecoder[IO, T] = - new Kleisli[IO, AmqpEnvelope[Array[Byte]], T](amqpEnvelope => // TODO could we just use the circe Decoder? - unmarshaller.unmarshal(Delivery(Payload(amqpEnvelope.payload), ???, ???, ???)) match { - case Left(value) => IO.raiseError(value) - case Right(value) => IO.pure(value) - } - ) - - ??? - } - -} +} \ No newline at end of file diff --git a/kamon/src/test/resources/logback.xml b/kamon/src/test/resources/logback.xml index 64ee35ff..fdece88e 100644 --- a/kamon/src/test/resources/logback.xml +++ b/kamon/src/test/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n diff --git a/test/src/it/resources/logback.xml b/test/src/it/resources/logback.xml index 06ca111a..97819f2f 100644 --- a/test/src/it/resources/logback.xml +++ b/test/src/it/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n diff --git a/test/src/test/resources/logback.xml b/test/src/test/resources/logback.xml index 2c3d386a..8837a709 100644 --- a/test/src/test/resources/logback.xml +++ b/test/src/test/resources/logback.xml @@ -2,14 +2,14 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n localhost LOCAL3 - content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n + content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n