Skip to content

Commit

Permalink
Merge pull request #16 from precog/reactivestreams-to-flow
Browse files Browse the repository at this point in the history
reactivestreams to flow
  • Loading branch information
domaspoliakas authored Apr 3, 2024
2 parents b33b985 + 9a70664 commit 14227dc
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.14, 2.13.7]
scala: [2.12.14, 2.13.12]
java: [[email protected]]
runs-on: ${{ matrix.os }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import ReleaseTransformations._
import microsites.CdnDirectives

lazy val scala212 = "2.12.14"
lazy val scala213 = "2.13.7"
lazy val scala213 = "2.13.12"
lazy val supportedScalaVersions = List(scala212, scala213)

ThisBuild / scalaVersion := scala213
Expand Down
8 changes: 5 additions & 3 deletions circe/src/main/scala/mongo4cats/circe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object circe extends JsonCodecs {
}

object implicits {
implicit def circeEncoderToEncoder[A: Encoder] = new BsonEncoder[A] {
implicit def circeEncoderToEncoder[A: Encoder]: BsonEncoder[A] = new BsonEncoder[A] {
def apply(a: A): BsonValue = {
val json = a.asJson
val wrapped = Json.obj(RootTag := json)
Expand All @@ -48,12 +48,14 @@ object circe extends JsonCodecs {
}
}

implicit def circeDecoderToDecoder[A: Decoder] = new BsonDecoder[A] {
implicit def circeDecoderToDecoder[A: Decoder]: BsonDecoder[A] = new BsonDecoder[A] {

val decoder = Decoder.instance[A](_.as[A])

def apply(b: BsonValue) = {
val doc = BsonDocument(RootTag -> (if (b == null) new BsonNull else b)).toJson()
val json = parser.parse(doc)
val jsonWithoutRoot = json.flatMap(_.hcursor.get[Json](RootTag))
val decoder = Decoder.instance[A](_.as[A])
jsonWithoutRoot
.flatMap(decoder.decodeJson(_))
.leftMap(x =>
Expand Down
5 changes: 3 additions & 2 deletions circe/src/test/scala/mongo4cats/MongoCollectionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ import java.time.{Instant, LocalDate}
import java.time.temporal.ChronoField.MILLI_OF_SECOND
import java.time.temporal.ChronoUnit
import scala.concurrent.Future
import mongo4cats.bson.BsonDocumentEncoder

class MongoCollectionSpec extends AsyncWordSpec with Matchers with EmbeddedMongo {

import MongoCollectionSpec._

implicit val personEnc = unsafe.circeDocumentEncoder[Person]
implicit val paymentEnc = unsafe.circeDocumentEncoder[Payment]
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]
implicit val paymentEnc: BsonDocumentEncoder[Payment] = unsafe.circeDocumentEncoder[Payment]

override val mongoPort: Int = 12348

Expand Down
15 changes: 8 additions & 7 deletions circe/src/test/scala/mongo4cats/circe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ class CirceSpec extends AnyWordSpec with Matchers with EitherValues {

"circe conversions" should {
"decode null as if it was Json.null" in {
circe.implicits.circeDecoderToDecoder[Unit](Decoder.instance { c =>
c.value.asNull.toRight(DecodingFailure("wasn't null!", Nil))
}).apply(null) shouldBe Right(())
circe.implicits
.circeDecoderToDecoder[Unit](Decoder.instance { c =>
c.value.asNull.toRight(DecodingFailure("wasn't null!", Nil))
})
.apply(null) shouldBe Right(())
}

"not report the internal root tag in history when reporting errors" in {

val deco = Decoder.instance(h => {
h.get[String]("hek")(Decoder.failedWithMessage("Bad!"))
})
val deco =
Decoder.instance(h => h.get[String]("hek")(Decoder.failedWithMessage("Bad!")))

val res = circe.implicits.circeDecoderToDecoder[String](deco).apply(new BsonString("hek"))

res.left.value.msg shouldBe "An error occured during decoding BsonValue BsonString{value='hek'}: DecodingFailure(Attempt to decode value on failed cursor, List(DownField(hek)))"
res.left.value.msg shouldBe "An error occured during decoding BsonValue BsonString{value='hek'}: DecodingFailure at .hek: Missing required field"

}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/mongo4cats/bson/BsonDecoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ object BsonDocumentDecoder extends LowLevelDocumentDecoder {
}

trait LowLevelDocumentDecoder {
implicit def narrowDecoder[A: BsonDecoder] = BsonDocumentDecoder.instance[A] {
(b: BsonDocument) =>
implicit def narrowDecoder[A: BsonDecoder]: BsonDocumentDecoder[A] =
BsonDocumentDecoder.instance[A] { (b: BsonDocument) =>
BsonDecoder[A].apply(b: BsonValue)
}
}
}

object BsonDecoder {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/mongo4cats/helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package mongo4cats

import cats.effect.Async
import fs2.Stream
import fs2.interop.reactivestreams
import fs2.interop.flow
import org.reactivestreams.Publisher
import org.reactivestreams.FlowAdapters

object helpers {

Expand All @@ -36,9 +37,9 @@ object helpers {
boundedStream(1).compile.drain

def stream[F[_]: Async]: Stream[F, T] =
reactivestreams.fromPublisher(publisher, DefaultStreamChunkSize)
flow.fromPublisher(FlowAdapters.toFlowPublisher(publisher), DefaultStreamChunkSize)

def boundedStream[F[_]: Async](chunkSize: Int): Stream[F, T] =
reactivestreams.fromPublisher(publisher, chunkSize)
flow.fromPublisher(FlowAdapters.toFlowPublisher(publisher), chunkSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import mongo4cats.circe.unsafe
import mongo4cats.embedded.EmbeddedMongo

import java.time.Instant
import mongo4cats.bson.BsonDocumentEncoder

object CaseClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {

Expand All @@ -35,8 +36,8 @@ object CaseClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {
registrationDate: Instant
)

implicit val addressEnc = unsafe.circeDocumentEncoder[Address]
implicit val personEnc = unsafe.circeDocumentEncoder[Person]
implicit val addressEnc: BsonDocumentEncoder[Address] = unsafe.circeDocumentEncoder[Address]
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]

override val run: IO[Unit] =
withRunningEmbeddedMongo("localhost", 27017) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import mongo4cats.client.MongoClient
import mongo4cats.embedded.EmbeddedMongo

import java.time.Instant
import mongo4cats.bson.BsonDocumentEncoder

object DistinctNestedClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {

Expand All @@ -35,8 +36,8 @@ object DistinctNestedClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMo
registrationDate: Instant
)

implicit val addressEnc = unsafe.circeDocumentEncoder[Address]
implicit val personEnc = unsafe.circeDocumentEncoder[Person]
implicit val addressEnc: BsonDocumentEncoder[Address] = unsafe.circeDocumentEncoder[Address]
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]

override val run: IO[Unit] =
withRunningEmbeddedMongo("localhost", 27017) {
Expand Down
11 changes: 4 additions & 7 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ import sbt._

object Dependencies {
private object Versions {
val mongodb = "4.4.0"
val fs2 = "3.2.4"
val mongodb = "4.11.1"
val fs2 = "3.10.2"
val scalaCompat = "2.6.0"
val circe = "0.14.1"
val circe = "0.14.6"
val findbugsJsr305Version = "1.3.9"

val logback = "1.2.10"
val scalaTest = "3.2.10"

val testContainers = "0.39.12"
val testContainers = "0.40.10"

val embeddedMongo = "3.2.5"
val immutableValue = "2.8.8"
Expand All @@ -25,7 +25,6 @@ object Dependencies {
val findbugsJsr305Version = "com.google.code.findbugs" % "jsr305" % Versions.findbugsJsr305Version % Provided

val fs2Core = "co.fs2" %% "fs2-core" % Versions.fs2
val fs2RS = "co.fs2" %% "fs2-reactive-streams" % Versions.fs2
val scalaCompat = "org.scala-lang.modules" %% "scala-collection-compat" % Versions.scalaCompat

val circeCore = "io.circe" %% "circe-core" % Versions.circe
Expand All @@ -49,7 +48,6 @@ object Dependencies {
Libraries.mongodbDriverStreams,
Libraries.findbugsJsr305Version,
Libraries.fs2Core,
Libraries.fs2RS,
Libraries.scalaCompat
)

Expand All @@ -70,7 +68,6 @@ object Dependencies {

lazy val embedded = Seq(
Libraries.fs2Core,
Libraries.fs2RS,
Libraries.embeddedMongo,
Libraries.immutableValue,
Libraries.commonsCompress
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.7.1"
version in ThisBuild := "0.7.2"

0 comments on commit 14227dc

Please sign in to comment.