Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs2-rabbit backend #82

Merged
merged 30 commits into from
Jul 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c32699d
init consumer using rabbit fs2
thomasl8 Jun 9, 2023
1a8aff2
implement amqpClient interface with fs2 rabbit
thomasl8 Jun 9, 2023
521c432
fix requeue
thomasl8 Jun 9, 2023
4edb9c6
WIP separation into modules
andrewgee Jun 15, 2023
b89a68e
All compilation and tests
andrewgee Jun 16, 2023
b84c7cd
Finish off Delivery MessageEncoder and support all declaration options
andrewgee Jun 21, 2023
243b0a6
Remove temporary client in examples module
andrewgee Jun 21, 2023
9be1b06
Requeue on reject for RequeueImmediately support
andrewgee Jun 21, 2023
a9edcef
Restore example to JavaBackend client
andrewgee Jun 21, 2023
b54c346
Bleh! Type erasure is the worst
andrewgee Jun 21, 2023
82bd57d
`deliveryEncoder` tests
andrewgee Jun 28, 2023
b28cab8
WIP
andrewgee Jan 2, 2024
6794781
Get compiling again after a rebase
andrewgee Jan 26, 2024
1d1b51e
Passing tests
andrewgee Jan 26, 2024
06fdcb0
Abstract out fs2-rabbit config
andrewgee Jan 26, 2024
a71e5c4
Move unit tests to relevant module
andrewgee Jan 26, 2024
99e7b50
Fix up test logging
andrewgee Jan 26, 2024
3a02be6
Sort declarations before executing them!
andrewgee Jan 26, 2024
8b84713
Try existing integration tests with new client
andrewgee Jan 26, 2024
b7949a9
Added lots
JosBogan Jul 5, 2024
f92eb0f
Reworked publisher to return an F[Publisher[F, ???]]
andrewgee Jul 5, 2024
e611e99
Fixed error in publishing
JosBogan Jul 10, 2024
ffdae11
working for cross compile
JosBogan Jul 10, 2024
548d06d
Supporting 2.12
JosBogan Jul 10, 2024
e123926
More 2.12
JosBogan Jul 10, 2024
7e0dc47
More 2.12 madness
JosBogan Jul 10, 2024
da665da
revert version
JosBogan Jul 10, 2024
1d74b7c
Co-authored-by: Andrew Gee <andrew.gee@itv.com> 🫥
JosBogan Jul 10, 2024
c575c41
Time :(
JosBogan Jul 10, 2024
80055a4
success?
andrewgee Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP separation into modules
andrewgee committed Jan 26, 2024
commit 4edb9c6c14b87ddb1821106791aa92581514d031
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package com.itv.bucky.backend.fs2rabbit

import cats.data.Kleisli
import cats.effect.implicits.genSpawnOps
import cats.effect.{Async, Resource}
import cats.implicits._
import com.itv.bucky
import com.itv.bucky.decl.ExchangeType
import com.itv.bucky.publish.PublishCommand
import com.itv.bucky.{AmqpClient, AmqpClientConfig, Envelope, ExchangeName, Handler, Payload, Publisher, QueueName, RoutingKey, consume, decl, publish}
import com.rabbitmq.client.LongString
import dev.profunktor.fs2rabbit.arguments.SafeArg
import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig
import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig}
import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder}
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.model
import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{BooleanVal, ByteArrayVal, ByteVal, DecimalVal, DoubleVal, FloatVal, IntVal, LongVal, NullVal, ShortVal, StringVal, TimestampVal}
import scodec.bits.ByteVector

import java.nio.charset.StandardCharsets.UTF_8
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.language.higherKinds

class Fs2RabbitAmqpClient[F[_]](client: RabbitClient[F], connection: model.AMQPConnection, publishChannel: model.AMQPChannel)(implicit F: Async[F]) extends AmqpClient[F] {

implicit val deliveryEncoder: MessageEncoder[F, PublishCommand] =
Kleisli { publishCommand =>
val fs2MessageHeaders: Map[String, model.AmqpFieldValue] = publishCommand.basicProperties.headers.mapValues {
case bd: java.math.BigDecimal => DecimalVal.unsafeFrom(bd)
case ts: java.time.Instant => TimestampVal.from(ts)
case d: java.util.Date => TimestampVal.from(d)
// case t: java.util.Map[String@unchecked, AnyRef@unchecked] =>
// TableVal(t.asScala.toMap.map { case (key, v) => ShortString.unsafeFrom(key) -> unsafeFrom(v) })
case byte: java.lang.Byte => ByteVal(byte)
case double: java.lang.Double => DoubleVal(double)
case float: java.lang.Float => FloatVal(float)
case short: java.lang.Short => ShortVal(short)
case byteArray: Array[Byte] => ByteArrayVal(ByteVector(byteArray))
case b: java.lang.Boolean => BooleanVal(b)
case i: java.lang.Integer => IntVal(i)
case l: java.lang.Long => LongVal(l)
case s: java.lang.String => StringVal(s)
case ls: LongString => StringVal(ls.toString)
// case a: java.util.List[AnyRef@unchecked] => ArrayVal(a.asScala.toVector.map(unsafeFrom))
case _ => NullVal
}

val message = model.AmqpMessage(publishCommand.body.value, model.AmqpProperties.empty.copy(
contentEncoding = Some(UTF_8.name()),
headers = fs2MessageHeaders,
expiration = publishCommand.basicProperties.expiration
))

F.pure(message)
}

def deliveryDecoder(queueName: QueueName): EnvelopeDecoder[F, consume.Delivery] =
Kleisli { amqpEnvelope =>

val messageProperties = publish.MessageProperties.basic.copy(
headers = amqpEnvelope.properties.headers.mapValues(_.toValueWriterCompatibleJava)
)

F.pure(
consume.Delivery(
Payload(amqpEnvelope.payload),
consume.ConsumerTag.create(queueName),
Envelope(
amqpEnvelope.deliveryTag.value,
amqpEnvelope.redelivered,
ExchangeName(amqpEnvelope.exchangeName.value),
RoutingKey(amqpEnvelope.routingKey.value)
),
messageProperties
)
)
}

override def declare(declarations: decl.Declaration*): F[Unit] = declare(declarations.toIterable)

override def declare(declarations: Iterable[decl.Declaration]): F[Unit] = {

def argumentsFromAnyRef(arguments: Map[String, AnyRef]): Map[String, SafeArg] =
arguments.mapValues {
case arg: String => arg
case arg: BigDecimal => arg
case arg: Integer => arg.intValue()
case arg: java.lang.Long => arg.longValue()
case arg: java.lang.Double => arg.doubleValue()
case arg: java.lang.Float => arg.floatValue()
case arg: java.lang.Short => arg.shortValue()
case arg: java.lang.Boolean => arg.booleanValue()
case arg: java.lang.Byte => arg.byteValue()
case arg: java.util.Date => arg
case t => throw new IllegalArgumentException(s"Unsupported type for rabbit arguments $t")
}

def exchangeTypeToFs2ExchangeType(exchangeType: ExchangeType): model.ExchangeType =
exchangeType match {
case decl.Direct => model.ExchangeType.Direct
case decl.Topic => model.ExchangeType.Topic
case decl.Headers => model.ExchangeType.Headers
case decl.Fanout => model.ExchangeType.FanOut
}

implicit val channel: model.AMQPChannel = publishChannel
declarations.toList.traverse_ {
case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) =>
client.declareExchange(
DeclarationExchangeConfig
.default(model.ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType))
.copy(arguments = argumentsFromAnyRef(arguments))
) *>
declare(bindings)
case decl.Binding(exchangeName, queueName, routingKey, arguments) =>
client.bindQueue(
model.QueueName(queueName.value),
model.ExchangeName(exchangeName.value),
model.RoutingKey(routingKey.value),
model.QueueBindingArgs(argumentsFromAnyRef(arguments))
)
case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) => ???
case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) =>
client.declareQueue(DeclarationQueueConfig.default(model.QueueName(name.value)).copy(arguments = argumentsFromAnyRef(arguments)))
}

}

override def publisher(): Publisher[F, publish.PublishCommand] = (publishCommand: PublishCommand) => {
implicit val channel: model.AMQPChannel = publishChannel
println(publishCommand)
client
.createPublisher[PublishCommand](model.ExchangeName(publishCommand.exchange.value), model.RoutingKey(publishCommand.routingKey.value))
.flatMap(f => f(publishCommand))
}

override def registerConsumer(
queueName: bucky.QueueName,
handler: Handler[F, consume.Delivery],
exceptionalAction: consume.ConsumeAction,
prefetchCount: Int,
shutdownTimeout: FiniteDuration,
shutdownRetry: FiniteDuration
): Resource[F, Unit] =
client.createChannel(connection).flatMap { implicit channel =>
implicit val decoder: EnvelopeDecoder[F, consume.Delivery] = deliveryDecoder(queueName)
Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery => handler(delivery.payload).map(_ -> delivery.deliveryTag))
.evalMap {
case (consume.Ack, tag) => acker(model.AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => ??? // TODO
}
.compile
.drain
.background
.map(_ => ())
}
}

override def isConnectionOpen: F[Boolean] = F.pure(connection.value.isOpen)

}

object Fs2RabbitAmqpClient {
def apply[F[_]](config: AmqpClientConfig)(implicit async: Async[F]): Resource[F, AmqpClient[F]] = {
val fs2RabbitConfig = Fs2RabbitConfig(
config.host, config.port, "/", 10.seconds, ssl = false, Some(config.username), Some(config.password), requeueOnNack = false, requeueOnReject = false, Some(10)
)
for {
client <- RabbitClient.default[F](fs2RabbitConfig).resource
connection <- client.createConnection
publishChannel <- client.createConnectionChannel
} yield new Fs2RabbitAmqpClient(client, connection, publishChannel)
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package com.itv.bucky
package com.itv.bucky.backend.javaamqp

import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Dispatcher
import cats.implicits._
import com.itv.bucky.backend.javaamqp.publish.PendingConfirmListener
import com.itv.bucky.consume._
import com.itv.bucky.publish._
import com.itv.bucky.decl.Declaration
import com.itv.bucky.publish.PendingConfirmListener
import com.itv.bucky.publish._
import com.itv.bucky.{AmqpClientConfig, Handler, QueueName}
import com.typesafe.scalalogging.StrictLogging

import scala.collection.immutable.TreeMap
import scala.util.Try
import cats.effect.{Deferred, Ref, Temporal}

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.util.Try

private[bucky] case class AmqpClientConnectionManager[F[_]](
amqpConfig: AmqpClientConfig,
@@ -89,7 +87,7 @@ private[bucky] object AmqpClientConnectionManager extends StrictLogging {
pendingConfirmations <- Ref.of[F, TreeMap[Long, Deferred[F, Boolean]]](TreeMap.empty)
pendingReturn <- Ref.of[F, Boolean](false)
_ <- publishChannel.confirmSelect
confirmListener <- F.blocking(publish.PendingConfirmListener(pendingConfirmations, pendingReturn, dispatcher))
confirmListener <- F.blocking(PendingConfirmListener(pendingConfirmations, pendingReturn, dispatcher))
_ <- publishChannel.addConfirmListener(confirmListener)
_ <- publishChannel.addReturnListener(confirmListener)
} yield AmqpClientConnectionManager(config, publishChannel, confirmListener, dispatcher, executionContext)
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.itv.bucky
package com.itv.bucky.backend.javaamqp

import cats.effect.implicits._
import cats.effect.std.Dispatcher
import cats.effect.{Async, Sync}
import cats.implicits._
import com.itv.bucky.backend.javaamqp.consume.Consumer
import com.itv.bucky.consume._
import com.itv.bucky.decl._
import com.itv.bucky.publish.PublishCommand
import com.itv.bucky.{Envelope, Handler, QueueName}
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{ConfirmListener, DefaultConsumer, ReturnListener, Channel => RabbitChannel, Envelope => RabbitMQEnvelope}
import com.typesafe.scalalogging.StrictLogging
@@ -142,7 +144,7 @@ object Channel {
case Left(e) =>
F.delay(logger.debug(s"Handler failure with {} will recover to: {}", e.getMessage, onHandlerException)) *> F.delay(onHandlerException)
}
.flatMap(sendAction(_)(Envelope.fromEnvelope(envelope)))
.flatMap(sendAction(_)(EnvelopeConversion.fromJavaEnvelope(envelope)))
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.itv.bucky.backend.javaamqp

import com.itv.bucky.{Envelope, ExchangeName, RoutingKey}
import com.rabbitmq.client.{Envelope => RabbitEnvelope}

object EnvelopeConversion {
def fromJavaEnvelope(envelope: RabbitEnvelope): Envelope =
Envelope(envelope.getDeliveryTag, envelope.isRedeliver, ExchangeName(envelope.getExchange), RoutingKey(envelope.getRoutingKey))
}
Loading