Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init consumer using rabbit fs2 #81

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ lazy val example = project
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion,
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestVersion % "test",
"com.typesafe" % "config" % typeSafeVersion
"com.typesafe" % "config" % typeSafeVersion,
"ch.qos.logback" % "logback-classic" % logbackVersion,
"dev.profunktor" %% "fs2-rabbit" % "5.0.0",
"dev.profunktor" %% "fs2-rabbit-circe" % "5.0.0"
)
)

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n</pattern>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n</pattern>
</encoder>
</appender>

<appender name="ITVSYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
<syslogHost>localhost</syslogHost>
<facility>LOCAL3</facility>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n</suffixPattern>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n</suffixPattern>
</appender>

<logger name="ch.qos.logback" level="WARN"/>
Expand Down
4 changes: 2 additions & 2 deletions example/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n</pattern>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n</pattern>
</encoder>
</appender>

<appender name="ITVSYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
<syslogHost>localhost</syslogHost>
<facility>LOCAL3</facility>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n</suffixPattern>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n</suffixPattern>
</appender>

<logger name="org.apache.qpid" level="ERROR"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package com.itv.bucky.example.requeue

import cats.data.Kleisli
import cats.effect.{IO, Resource}
import cats.implicits._
import com.itv.bucky
import com.itv.bucky.decl.{ExchangeType => BuckyExchangeType}
import com.itv.bucky.publish.PublishCommand
import com.itv.bucky.{
AmqpClient,
Envelope,
Handler,
Payload,
Publisher,
consume,
decl,
publish,
ExchangeName => BuckyExchangeName,
QueueName => BuckyQueueName,
RoutingKey => BuckyRoutingKey
}
import com.rabbitmq.client.LongString
import dev.profunktor.fs2rabbit.arguments.SafeArg
import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig
import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig}
import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder}
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.model.AmqpFieldValue._
import dev.profunktor.fs2rabbit.model.{
AMQPChannel,
AMQPConnection,
AckResult,
AmqpFieldValue,
AmqpMessage,
AmqpProperties,
QueueBindingArgs,
ExchangeName => Fs2ExchangeName,
ExchangeType => Fs2ExchangeType,
QueueName => Fs2RabbitQueueName,
RoutingKey => Fs2RoutingKey
}
import scodec.bits.ByteVector

import java.nio.charset.StandardCharsets.UTF_8
import scala.concurrent.duration.FiniteDuration

class Fs2RabbitAmqpClient private (client: RabbitClient[IO], connection: AMQPConnection, publishChannel: AMQPChannel) extends AmqpClient[IO] {

implicit val deliveryEncoder: MessageEncoder[IO, PublishCommand] =
Kleisli { publishCommand =>
val fs2MessageHeaders: Map[String, AmqpFieldValue] = publishCommand.basicProperties.headers.mapValues {
case bd: java.math.BigDecimal => DecimalVal.unsafeFrom(bd)
case ts: java.time.Instant => TimestampVal.from(ts)
case d: java.util.Date => TimestampVal.from(d)
// case t: java.util.Map[String@unchecked, AnyRef@unchecked] =>
// TableVal(t.asScala.toMap.map { case (key, v) => ShortString.unsafeFrom(key) -> unsafeFrom(v) })
case byte: java.lang.Byte => ByteVal(byte)
case double: java.lang.Double => DoubleVal(double)
case float: java.lang.Float => FloatVal(float)
case short: java.lang.Short => ShortVal(short)
case byteArray: Array[Byte] => ByteArrayVal(ByteVector(byteArray))
case b: java.lang.Boolean => BooleanVal(b)
case i: java.lang.Integer => IntVal(i)
case l: java.lang.Long => LongVal(l)
case s: java.lang.String => StringVal(s)
case ls: LongString => StringVal(ls.toString)
// case a: java.util.List[AnyRef@unchecked] => ArrayVal(a.asScala.toVector.map(unsafeFrom))
case _ => NullVal
}

val message = AmqpMessage(publishCommand.body.value, AmqpProperties.empty.copy(
contentEncoding = Some(UTF_8.name()),
headers = fs2MessageHeaders,
expiration = publishCommand.basicProperties.expiration
))

IO.println(message).as(message)
}

def deliveryDecoder(queueName: BuckyQueueName): EnvelopeDecoder[IO, consume.Delivery] =
Kleisli { amqpEnvelope =>

val messageProperties = publish.MessageProperties.basic.copy(
headers = amqpEnvelope.properties.headers.mapValues(_.toValueWriterCompatibleJava)
)

IO.pure(
consume.Delivery(
Payload(amqpEnvelope.payload),
consume.ConsumerTag.create(queueName),
Envelope(
amqpEnvelope.deliveryTag.value,
amqpEnvelope.redelivered,
BuckyExchangeName(amqpEnvelope.exchangeName.value),
BuckyRoutingKey(amqpEnvelope.routingKey.value)
),
messageProperties
)
)
}

override def declare(declarations: decl.Declaration*): IO[Unit] = declare(declarations.toIterable)

override def declare(declarations: Iterable[decl.Declaration]): IO[Unit] = {

def argumentsFromAnyRef(arguments: Map[String, AnyRef]): Map[String, SafeArg] =
arguments.mapValues {
case arg: String => arg
case arg: BigDecimal => arg
case arg: Integer => arg.intValue()
case arg: java.lang.Long => arg.longValue()
case arg: java.lang.Double => arg.doubleValue()
case arg: java.lang.Float => arg.floatValue()
case arg: java.lang.Short => arg.shortValue()
case arg: java.lang.Boolean => arg.booleanValue()
case arg: java.lang.Byte => arg.byteValue()
case arg: java.util.Date => arg
case t => throw new IllegalArgumentException(s"Unsupported type for rabbit arguments $t")
}

def exchangeTypeToFs2ExchangeType(exchangeType: BuckyExchangeType): Fs2ExchangeType =
exchangeType match {
case decl.Direct => Fs2ExchangeType.Direct
case decl.Topic => Fs2ExchangeType.Topic
case decl.Headers => Fs2ExchangeType.Headers
case decl.Fanout => Fs2ExchangeType.FanOut
}

implicit val channel: AMQPChannel = publishChannel
declarations.toList.traverse_ {
case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) =>
client.declareExchange(
DeclarationExchangeConfig
.default(Fs2ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType))
.copy(arguments = argumentsFromAnyRef(arguments))
) *>
declare(bindings)
case decl.Binding(exchangeName, queueName, routingKey, arguments) =>
client.bindQueue(
Fs2RabbitQueueName(queueName.value),
Fs2ExchangeName(exchangeName.value),
Fs2RoutingKey(routingKey.value),
QueueBindingArgs(argumentsFromAnyRef(arguments))
)
case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) => ???
case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) =>
client.declareQueue(DeclarationQueueConfig.default(Fs2RabbitQueueName(name.value)).copy(arguments = argumentsFromAnyRef(arguments)))
}

}

override def publisher(): Publisher[IO, publish.PublishCommand] = (publishCommand: PublishCommand) => {
implicit val channel: AMQPChannel = publishChannel
println(publishCommand)
client
.createPublisher[PublishCommand](Fs2ExchangeName(publishCommand.exchange.value), Fs2RoutingKey(publishCommand.routingKey.value))
.flatMap(f => f(publishCommand))
}

override def registerConsumer(
queueName: bucky.QueueName,
handler: Handler[IO, consume.Delivery],
exceptionalAction: consume.ConsumeAction,
prefetchCount: Int,
shutdownTimeout: FiniteDuration,
shutdownRetry: FiniteDuration
): Resource[IO, Unit] =
client.createChannel(connection).flatMap { implicit channel =>
implicit val decoder: EnvelopeDecoder[IO, consume.Delivery] = deliveryDecoder(queueName)
Resource.eval(client.createAckerConsumer[consume.Delivery](Fs2RabbitQueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery => handler(delivery.payload).map(_ -> delivery.deliveryTag))
.evalMap {
case (consume.Ack, tag) => acker(AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => ??? // TODO
}
.compile
.drain
.background
.void
}
}

override def isConnectionOpen: IO[Boolean] = IO(connection.value.isOpen)

}

object Fs2RabbitAmqpClient {
def apply(config: Fs2RabbitConfig): Resource[IO, Fs2RabbitAmqpClient] =
for {
client <- RabbitClient.default[IO](config).resource
connection <- client.createConnection
publishChannel <- client.createConnectionChannel
} yield new Fs2RabbitAmqpClient(client, connection, publishChannel)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import cats.effect._
import cats.implicits._
import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig

object RequeueConsumer extends IOApp with StrictLogging {

Expand All @@ -24,11 +24,23 @@ object RequeueConsumer extends IOApp with StrictLogging {

val config: Config = ConfigFactory.load("bucky")
val amqpClientConfig: AmqpClientConfig = AmqpClientConfig(config.getString("rmq.host"), 5672, "guest", "guest")
val fs2RabbitConfig: Fs2RabbitConfig = Fs2RabbitConfig(
amqpClientConfig.host,
amqpClientConfig.port,
amqpClientConfig.virtualHost.getOrElse("/"),
10.seconds,
ssl = false,
Some(amqpClientConfig.username),
Some(amqpClientConfig.password),
requeueOnNack = false,
requeueOnReject = false,
None
)

val stringToLogRequeueHandler: RequeueHandler[IO, String] =
RequeueHandler[IO, String] { message: String =>
IO.delay {
logger.info(message)
println(message)

message match {
case "requeue" => Requeue
Expand All @@ -40,9 +52,9 @@ object RequeueConsumer extends IOApp with StrictLogging {

override def run(args: List[String]): IO[ExitCode] =
(for {
amqpClient <- AmqpClient[IO](amqpClientConfig)
_ <- Resource.eval(amqpClient.declare(Declarations.all))
_ <- amqpClient.registerRequeueConsumerOf(Declarations.queue.name,
stringToLogRequeueHandler)
} yield ()).use(_ => IO.never *> IO(ExitCode.Success))
}
amqpClient <- Fs2RabbitAmqpClient(fs2RabbitConfig)
_ <- Resource.eval(amqpClient.declare(Declarations.all))
_ <- amqpClient.registerRequeueConsumerOf(Declarations.queue.name,
stringToLogRequeueHandler, requeuePolicy = RequeuePolicy(10, 5.seconds))
} yield ()).use(_ => IO.never *> IO(ExitCode.Success))
}
4 changes: 2 additions & 2 deletions kamon/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n</pattern>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n</pattern>
</encoder>
</appender>

<appender name="ITVSYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
<syslogHost>localhost</syslogHost>
<facility>LOCAL3</facility>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n</suffixPattern>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n</suffixPattern>
</appender>

<logger name="ch.qos.logback" level="WARN"/>
Expand Down
4 changes: 2 additions & 2 deletions test/src/it/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n</pattern>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n</pattern>
</encoder>
</appender>

<appender name="ITVSYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
<syslogHost>localhost</syslogHost>
<facility>LOCAL3</facility>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n</suffixPattern>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n</suffixPattern>
</appender>

<logger name="ch.qos.logback" level="WARN"/>
Expand Down
4 changes: 2 additions & 2 deletions test/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%msg"%n</pattern>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - "%delivery"%n</pattern>
</encoder>
</appender>

<appender name="ITVSYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
<syslogHost>localhost</syslogHost>
<facility>LOCAL3</facility>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%msg"%n</suffixPattern>
<suffixPattern>content-delivery - work-order-to-workflow-translator - [%thread] %-5level %logger - "%delivery"%n</suffixPattern>
</appender>

<logger name="ch.qos.logback" level="WARN"/>
Expand Down