Skip to content

Commit

Permalink
Update libs (#259)
Browse files Browse the repository at this point in the history
* upgrade libs

* fix localstack

* clean compatible version
  • Loading branch information
bwiercinski authored Mar 10, 2023
1 parent 256b463 commit 2549985
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.ocadotechnology.pass4s.phobos

import com.ocadotechnology.pass4s.core.Destination
import com.ocadotechnology.pass4s.core.Message
import ru.tinkoff.phobos.encoding.EncodingError
import ru.tinkoff.phobos.encoding.XmlEncoder

object XmlMessage {
Expand All @@ -27,7 +28,7 @@ object XmlMessage {
destination: Destination[P],
metadata: Map[String, String] = Map(),
charset: String = "UTF-8"
): Message[P] =
Message(Message.Payload(XmlEncoder[A].encode(body, charset), metadata), destination)
): Either[EncodingError, Message[P]] =
XmlEncoder[A].encode(body, charset).map(payload => Message(Message.Payload(payload, metadata), destination))

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ocadotechnology.pass4s.phobos

import cats.MonadError
import cats.MonadThrow
import cats.syntax.all._
import com.ocadotechnology.pass4s.core.Destination
import com.ocadotechnology.pass4s.core.Message
Expand All @@ -26,16 +25,24 @@ import com.ocadotechnology.pass4s.core.groupId.GroupIdMeta
import com.ocadotechnology.pass4s.core.groupId.MessageGroup
import com.ocadotechnology.pass4s.kernel.Consumer
import com.ocadotechnology.pass4s.kernel.Sender
import ru.tinkoff.phobos.decoding.DecodingError
import ru.tinkoff.phobos.decoding.XmlDecoder
import ru.tinkoff.phobos.encoding.EncodingError
import ru.tinkoff.phobos.encoding.XmlEncoder

object syntax {

final private[syntax] class AsXmlSenderPartiallyApplied[F[_], P, A](private val sender: Sender[F, Message[P]]) extends AnyVal {

@scala.annotation.nowarn("cat=unused-params")
def apply[R >: P](to: Destination[R])(implicit encoder: XmlEncoder[A], noGroupId: GroupIdMeta.Absent[R]): Sender[F, A] =
sender.contramap(XmlMessage(_, to).widen)
def apply[R >: P](
to: Destination[R]
)(
implicit M: MonadError[F, _ >: EncodingError],
encoder: XmlEncoder[A],
noGroupId: GroupIdMeta.Absent[R]
): Sender[F, A] =
sender.contramapM(XmlMessage(_, to).liftTo[F].map(_.widen))

}

Expand All @@ -47,10 +54,11 @@ object syntax {
to: Destination[R],
computeMetadata: A => Map[String, String]
)(
implicit encoder: XmlEncoder[A],
implicit M: MonadError[F, _ >: EncodingError],
encoder: XmlEncoder[A],
noGroupId: GroupIdMeta.Absent[R]
): Sender[F, A] =
sender.contramap(a => XmlMessage(a, to, computeMetadata(a)).widen)
sender.contramapM(a => XmlMessage(a, to, computeMetadata(a)).liftTo[F].map(_.widen))

}

Expand All @@ -61,11 +69,13 @@ object syntax {
to: Destination[R],
computeMetadata: A => Map[String, String] = _ => Map()
)(
implicit encoder: XmlEncoder[A],
implicit M: MonadError[F, _ >: EncodingError],
encoder: XmlEncoder[A],
groupIdMeta: GroupIdMeta[R],
messageGroup: MessageGroup[A]
): Sender[F, A] =
sender.asXmlSenderWithCustomMetadata[A](to, a => Map(groupIdMeta.groupIdKey -> messageGroup.groupId(a)) ++ computeMetadata(a))(
M,
encoder,
GroupIdMeta.Absent.iKnowWhatImDoing
)
Expand Down Expand Up @@ -98,14 +108,14 @@ object syntax {
}

implicit final class ConsumeXmlMessageSyntax[F[_]](private val consumer: Consumer[F, String]) {
def asXmlConsumer[A: XmlDecoder](implicit F: MonadThrow[F]): Consumer[F, A] =
def asXmlConsumer[A: XmlDecoder](implicit M: MonadError[F, _ >: DecodingError]): Consumer[F, A] =
consumer.mapM(XmlDecoder[A].decode(_).liftTo[F])
}

implicit final class ConsumeXmlGenericMessageSyntax[F[_], A](private val consumer: Consumer[F, A]) {

def asXmlConsumer[B: XmlDecoder](
implicit M: MonadError[F, _ >: ru.tinkoff.phobos.decoding.DecodingError],
implicit M: MonadError[F, _ >: DecodingError],
ev: A <:< Payload
): Consumer[F, B] =
consumer.mapM(msg => XmlDecoder[B].decode(msg.text).liftTo[F])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.phobos

import cats.effect.IO
import com.ocadotechnology.pass4s.core.Destination
import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Message.Payload
import com.ocadotechnology.pass4s.kernel.Consumer
import com.ocadotechnology.pass4s.kernel.Sender
import com.ocadotechnology.pass4s.phobos.syntax._
import ru.tinkoff.phobos.decoding.XmlDecoder
import ru.tinkoff.phobos.derivation.semiauto._
import ru.tinkoff.phobos.encoding.XmlEncoder
import weaver.SimpleIOSuite

import scala.reflect.runtime.universe._

object PhobosTests extends SimpleIOSuite {

object UnitEnd extends Destination[Unit] { override def name: String = "unit"; override def capability: Type = typeOf[Unit] }

case class MyEvent(foo: Int, bar: String)

object MyEvent {
implicit val xmlEncoder: XmlEncoder[MyEvent] = deriveXmlEncoder("journey")
implicit val xmlDecoder: XmlDecoder[MyEvent] = deriveXmlDecoder("journey")

val testValue = MyEvent(10, "string")
val testXml = "<?xml version='1.0' encoding='UTF-8'?><journey><foo>10</foo><bar>string</bar></journey>"
}

test("sender.asXmlSender") {
for {
sender <- Sender.testing[IO, Message[Unit]]
_ <- sender.asXmlSender[MyEvent](UnitEnd).sendOne(MyEvent.testValue)
sent <- sender.sent
} yield expect.all(sent == List(Message(Payload(MyEvent.testXml, Map()), UnitEnd)))
}

test("consumer.asXmlSender") {
val consumer = Consumer.one[IO, Payload](Payload(MyEvent.testXml, Map())).asXmlConsumer[MyEvent]
Consumer.toStreamBounded(1)(consumer).take(1).compile.lastOrError.map { consumedMessage =>
expect.all(consumedMessage == MyEvent.testValue)
}
}

}
47 changes: 25 additions & 22 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ThisBuild / tlBaseVersion := "0.2" // current series x.y
ThisBuild / tlBaseVersion := "0.3" // current series x.y

ThisBuild / organization := "com.ocadotechnology"
ThisBuild / organizationName := "Ocado Technology"
Expand All @@ -20,9 +20,13 @@ ThisBuild / githubWorkflowBuild ++= Seq(
)

val Versions = new {
val Log4Cats = "2.3.2"
val Weaver = "0.7.15"
val Laserdisc = "5.1.0"
val ActiveMq = "5.17.4"
val CatsEffect = "3.4.8"
val Fs2 = "3.6.1"
val Logback = "1.4.5"
val Log4Cats = "2.5.0"
val Weaver = "0.8.1"
val Laserdisc = "6.0.0"
}

lazy val IntegrationTest = config("it") extend Test
Expand All @@ -37,15 +41,15 @@ lazy val root = (project in file("."))
"com.disneystreaming" %% "weaver-cats" % Versions.Weaver,
"com.disneystreaming" %% "weaver-framework" % Versions.Weaver,
"com.disneystreaming" %% "weaver-scalacheck" % Versions.Weaver,
"org.scalatest" %% "scalatest" % "3.2.14", // just for `shouldNot compile`
"org.scalatest" %% "scalatest" % "3.2.15", // just for `shouldNot compile`
"com.dimafeng" %% "testcontainers-scala-localstack-v2" % "0.40.12",
"com.amazonaws" % "aws-java-sdk-core" % "1.12.336" exclude ("*", "*"), // fixme after https://github.com/testcontainers/testcontainers-java/issues/4279
"com.amazonaws" % "aws-java-sdk-core" % "1.12.421" exclude ("*", "*"), // fixme after release of https://github.com/testcontainers/testcontainers-java/pull/5827
"com.dimafeng" %% "testcontainers-scala-mockserver" % "0.40.12",
"org.mock-server" % "mockserver-client-java" % "5.13.2",
"org.apache.activemq" % "activemq-broker" % "5.17.2",
"org.mock-server" % "mockserver-client-java" % "5.15.0",
"org.apache.activemq" % "activemq-broker" % Versions.ActiveMq,
"org.typelevel" %% "log4cats-core" % Versions.Log4Cats,
"org.typelevel" %% "log4cats-slf4j" % Versions.Log4Cats,
"ch.qos.logback" % "logback-classic" % "1.2.11"
"ch.qos.logback" % "logback-classic" % Versions.Logback
).map(_ % IntegrationTest),
Defaults.itSettings,
inConfig(IntegrationTest) {
Expand All @@ -63,17 +67,17 @@ lazy val core = module("core")
.settings(
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"co.fs2" %% "fs2-core" % "3.3.0",
"org.typelevel" %% "cats-effect" % "3.3.14"
"co.fs2" %% "fs2-core" % Versions.Fs2,
"org.typelevel" %% "cats-effect" % Versions.CatsEffect
)
)

lazy val kernel = module("kernel").settings(
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % "3.3.0",
"org.typelevel" %% "cats-effect" % "3.3.9",
"co.fs2" %% "fs2-core" % Versions.Fs2,
"org.typelevel" %% "cats-effect" % Versions.CatsEffect,
"org.typelevel" %% "cats-tagless-core" % "0.14.0",
"org.typelevel" %% "cats-laws" % "2.8.0" % Test,
"org.typelevel" %% "cats-laws" % "2.9.0" % Test,
"com.disneystreaming" %% "weaver-discipline" % Versions.Weaver % Test
)
)
Expand All @@ -84,16 +88,15 @@ lazy val high = module("high")
// connectors

val awsSnykOverrides = Seq(
"commons-codec" % "commons-codec" % "1.15",
"software.amazon.awssdk" % "netty-nio-client" % "2.18.41"
"commons-codec" % "commons-codec" % "1.15"
)

lazy val activemq = module("activemq", directory = "connectors")
.settings(
name := "pass4s-connector-activemq",
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-jms" % "3.0.4",
"org.apache.activemq" % "activemq-pool" % "5.17.2",
"com.lightbend.akka" %% "akka-stream-alpakka-jms" % "4.0.0", // 5.x.x contains akka-streams +2.7.x which is licensed under BUSL 1.1
"org.apache.activemq" % "activemq-pool" % Versions.ActiveMq,
"org.typelevel" %% "log4cats-core" % Versions.Log4Cats
),
headerSources / excludeFilter := HiddenFileFilter || "taps.scala"
Expand Down Expand Up @@ -133,15 +136,15 @@ lazy val sqs = module("sqs", directory = "connectors")
lazy val circe = module("circe", directory = "addons")
.settings(
libraryDependencies ++= Seq(
"io.circe" %% "circe-parser" % "0.14.3"
"io.circe" %% "circe-parser" % "0.14.5"
)
)
.dependsOn(core, kernel)

lazy val phobos = module("phobos", directory = "addons")
.settings(
libraryDependencies ++= Seq(
"ru.tinkoff" %% "phobos-core" % "0.14.1"
"ru.tinkoff" %% "phobos-core" % "0.20.0"
)
)
.dependsOn(core, kernel)
Expand Down Expand Up @@ -202,10 +205,10 @@ lazy val demo = module("demo")
publishArtifact := false,
// mimaPreviousArtifacts := Set(), // TODO
libraryDependencies ++= Seq(
"io.circe" %% "circe-generic" % "0.14.3",
"io.circe" %% "circe-generic" % "0.14.5",
"org.typelevel" %% "log4cats-core" % Versions.Log4Cats,
"org.typelevel" %% "log4cats-slf4j" % Versions.Log4Cats,
"ch.qos.logback" % "logback-classic" % "1.2.11"
"ch.qos.logback" % "logback-classic" % Versions.Logback
)
)
.dependsOn(activemq, sns, sqs, extra, logging)
Expand Down
5 changes: 1 addition & 4 deletions pass4s-docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ services:
- "8161:8161"

localstack:
image: localstack/localstack-full:latest
image: localstack/localstack:0.14.5
ports:
- "4566-4599:4566-4599"
- "8080:8080"
environment:
- SERVICES=sns,sqs,s3
- DATA_DIR=/tmp/localstack/data
- DEBUG=1
- SETUP_IAM=true
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object SnsTests extends MutableIOSuite {
implicit val messageGroup: MessageGroup[Foo] = _.order
}

val payload = Foo(2137, order = "uśmiechu")
val payload = Foo(2137, order = "some-order")

topicWithSubscriptionResource(snsClient, sqsClient)("fifo-topic", isFifo = true)
.use { case (topicArn, queueUrl) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.laserdisc.pure.sqs.tagless.SqsAsyncClientOp
import org.testcontainers.containers.localstack.LocalStackContainer.Service
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
Expand Down Expand Up @@ -110,7 +111,7 @@ object SqsTests extends MutableIOSuite {
("bar", "2")
)

queueResource(client)("dedup-queue", isFifo = true, isDedup = true)
queueResource(client)("dedup-queue", isFifo = true)
.use { queueUrl =>
def sendMessageRequest(body: String, groupId: String) =
SendMessageRequest
Expand Down Expand Up @@ -157,9 +158,8 @@ object SqsTests extends MutableIOSuite {
}

test("when consumer is failing message should not be deleted").usingRes { case (broker, client) =>
queueResource(client)("input-dlq")
.mproduct(dlqUrl => queueResource(client)("input-queue", _.attributes(redrivePolicy(dlqUrl.value).asJava)))
.use { case (dlqUrl, queueUrl) =>
createQueueWithDlq(client, "input-queue", "input-dlq")
.use { case (queueUrl, dlqUrl) =>
val failingQueueConsumer = broker
.consumer(SqsEndpoint(queueUrl, SqsEndpoint.Settings(messageProcessingTimeout = 1.second)))
.consume(_ => IO.raiseError(new Exception("processing failed")))
Expand All @@ -180,9 +180,8 @@ object SqsTests extends MutableIOSuite {

test("when consumer is processing message for too long, then processing should be interrupted and message should not be deleted")
.usingRes { case (broker, client) =>
queueResource(client)("input-dlq")
.mproduct(dlqUrl => queueResource(client)("input-queue", _.attributes(redrivePolicy(dlqUrl.value).asJava)))
.use { case (dlqUrl, queueUrl) =>
createQueueWithDlq(client, "input-queue", "input-dlq")
.use { case (queueUrl, dlqUrl) =>
val longProcessingQueueConsumer = broker
.consumer(SqsEndpoint(queueUrl, SqsEndpoint.Settings(messageProcessingTimeout = 1.second)))
.consume(_ => IO.sleep(10.seconds))
Expand Down Expand Up @@ -224,6 +223,15 @@ object SqsTests extends MutableIOSuite {
}
}

private def createQueueWithDlq(client: SqsAsyncClientOp[IO], queueName: String, dlqName: String): Resource[IO, (SqsUrl, SqsUrl)] =
for {
dlqUrl <- queueResource(client)(dlqName)
queueAttributesRequest =
GetQueueAttributesRequest.builder().queueUrl(dlqUrl.value).attributeNames(QueueAttributeName.QUEUE_ARN).build()
dlqArn <- Resource.eval(client.getQueueAttributes(queueAttributesRequest)).map(_.attributes().get(QueueAttributeName.QUEUE_ARN))
queueUrl <- queueResource(client)(queueName, _.attributes(redrivePolicy(dlqArn).asJava))
} yield (queueUrl, dlqUrl)

private val redrivePolicy: String => Map[QueueAttributeName, String] =
dlqUrl => Map(QueueAttributeName.REDRIVE_POLICY -> s"""{"deadLetterTargetArn":"$dlqUrl","maxReceiveCount":"1"}""")

Expand Down
Loading

0 comments on commit 2549985

Please sign in to comment.