Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs2-rabbit backend #82

Merged
merged 30 commits into from
Jul 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c32699d
init consumer using rabbit fs2
thomasl8 Jun 9, 2023
1a8aff2
implement amqpClient interface with fs2 rabbit
thomasl8 Jun 9, 2023
521c432
fix requeue
thomasl8 Jun 9, 2023
4edb9c6
WIP separation into modules
andrewgee Jun 15, 2023
b89a68e
All compilation and tests
andrewgee Jun 16, 2023
b84c7cd
Finish off Delivery MessageEncoder and support all declaration options
andrewgee Jun 21, 2023
243b0a6
Remove temporary client in examples module
andrewgee Jun 21, 2023
9be1b06
Requeue on reject for RequeueImmediately support
andrewgee Jun 21, 2023
a9edcef
Restore example to JavaBackend client
andrewgee Jun 21, 2023
b54c346
Bleh! Type erasure is the worst
andrewgee Jun 21, 2023
82bd57d
`deliveryEncoder` tests
andrewgee Jun 28, 2023
b28cab8
WIP
andrewgee Jan 2, 2024
6794781
Get compiling again after a rebase
andrewgee Jan 26, 2024
1d1b51e
Passing tests
andrewgee Jan 26, 2024
06fdcb0
Abstract out fs2-rabbit config
andrewgee Jan 26, 2024
a71e5c4
Move unit tests to relevant module
andrewgee Jan 26, 2024
99e7b50
Fix up test logging
andrewgee Jan 26, 2024
3a02be6
Sort declarations before executing them!
andrewgee Jan 26, 2024
8b84713
Try existing integration tests with new client
andrewgee Jan 26, 2024
b7949a9
Added lots
JosBogan Jul 5, 2024
f92eb0f
Reworked publisher to return an F[Publisher[F, ???]]
andrewgee Jul 5, 2024
e611e99
Fixed error in publishing
JosBogan Jul 10, 2024
ffdae11
working for cross compile
JosBogan Jul 10, 2024
548d06d
Supporting 2.12
JosBogan Jul 10, 2024
e123926
More 2.12
JosBogan Jul 10, 2024
7e0dc47
More 2.12 madness
JosBogan Jul 10, 2024
da665da
revert version
JosBogan Jul 10, 2024
1d74b7c
Co-authored-by: Andrew Gee <andrew.gee@itv.com> 🫥
JosBogan Jul 10, 2024
c575c41
Time :(
JosBogan Jul 10, 2024
80055a4
success?
andrewgee Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Time :(
Co-authored-by: Andrew Gee <andrew.gee@itv.com> 🫥
  • Loading branch information
JosBogan committed Jul 10, 2024
commit c575c415e7367d3b1c42891cd4324ff26016da7f
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.itv.bucky.backend.fs2rabbit

import cats.data.Kleisli
import cats.effect.implicits.genSpawnOps
import cats.effect.implicits.{genSpawnOps, genTemporalOps}
import cats.effect.std.Dispatcher
import cats.effect.{Async, Resource}
import cats.effect.{Async, Ref, Resource}
import cats.implicits._
import com.itv.bucky
import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient.deliveryDecoder
@@ -50,13 +50,14 @@ import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{
import dev.profunktor.fs2rabbit.model.{AMQPChannel, PublishingFlag, ShortString}
import scodec.bits.ByteVector

import java.util.Date
import java.util.{Date, UUID}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.language.higherKinds
import Fs2RabbitAmqpClient._
import cats.effect.kernel.Temporal

class Fs2RabbitAmqpClient[F[_]: Async](
class Fs2RabbitAmqpClient[F[_]: Async: Temporal](
client: RabbitClient[F],
connection: model.AMQPConnection,
publishChannel: model.AMQPChannel,
@@ -165,6 +166,13 @@ class Fs2RabbitAmqpClient[F[_]: Async](
}
.map(amqpClientConnectionManager.addConfirmListeningToPublisher)

private def repeatUntil[A](eval: F[A])(pred: A => Boolean)(sleep: FiniteDuration): F[Unit] =
for {
result <- eval
ended <- Async[F].pure(pred(result))
_ <- if (ended) Async[F].unit else Temporal[F].sleep(sleep) *> repeatUntil(eval)(pred)(sleep)
} yield ()

override def registerConsumer(
queueName: bucky.QueueName,
handler: Handler[F, consume.Delivery],
@@ -175,19 +183,37 @@ class Fs2RabbitAmqpClient[F[_]: Async](
): Resource[F, Unit] =
client.createChannel(connection).flatMap { implicit channel =>
implicit val decoder: EnvelopeDecoder[F, consume.Delivery] = deliveryDecoder(queueName)
Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery => handler(delivery.payload).map(_ -> delivery.deliveryTag))
.evalMap {
case (consume.Ack, tag) => acker(model.AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => acker(model.AckResult.Reject(tag))
Resource
.make(Ref.of[F, Set[UUID]](Set.empty))(set =>
repeatUntil(set.get.flatTap(ids => Async[F].blocking(println(s"$ids left"))))(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)
)
.flatMap(consumptionIds =>
Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery =>
for {
uuid <- Async[F].delay(UUID.randomUUID())
_ <- consumptionIds.update(set => set + uuid)
_ <- consumptionIds.get.flatTap(ids => Async[F].blocking(println("set is: " + ids)))
res <- handler(delivery.payload).attempt
_ <- Async[F].blocking(println("past the handler!"))
tag = delivery.deliveryTag
_ <- consumptionIds.update(set => set - uuid)
_ <- consumptionIds.get.flatTap(ids => Async[F].blocking(println("set is: " + ids)))
result <- Async[F].fromEither(res)
} yield (result, tag)
)
.evalMap {
case (consume.Ack, tag) => acker(model.AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => acker(model.AckResult.Reject(tag))
}
.compile
.drain
.background
.map(_ => ())
}
.compile
.drain
.background
.map(_ => ())
}
)
}

override def isConnectionOpen: F[Boolean] = Async[F].pure(connection.value.isOpen)
7 changes: 4 additions & 3 deletions it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect

Fs2RabbitAmqpClient[IO](config)
.use { client =>
val handler = StubHandlers.recordingHandler[IO, Delivery]((_: Delivery) => IO.sleep(3.seconds).map(_ => Ack))
val handler = (_: Delivery) => IO.sleep(3.seconds).map(_ => Ack)
Resource
.eval(client.declare(declarations))
.flatMap(_ =>
@@ -68,14 +68,15 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect
.use { _ =>
val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey)
client.publisher().flatMap { publisher =>
publisher(pcb.toPublishCommand("a message"))
publisher(pcb.toPublishCommand("a message")) *> IO
.sleep(3.seconds)
.flatMap(_ => test)
}
}
}
}

test("Should wait until a handler finishes executing before shuttind down") {
test("Should wait until a handler finishes executing before shutting down") {
val clock = Clock.systemUTC()
val start = Instant.now(clock)
runTest[Instant](IO.delay(Instant.now())).map { result =>