Skip to content

Commit

Permalink
Make ZStreamPlatformSpecificSpec fromSocketServer tests more stable (z…
Browse files Browse the repository at this point in the history
…io#6037)

* Make ZStreamPlatformSpecificSpec fromSocketServer tests more stable

* retry in case we have some socket communication issues
  • Loading branch information
kmatasfp authored Nov 19, 2021
1 parent 790fa20 commit 0966901
Showing 1 changed file with 50 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zio.stream
import zio._
import zio.test.Assertion._
import zio.test._
import zio.test.TestAspect.flaky

import java.io.{FileNotFoundException, FileReader, IOException, OutputStream, Reader}
import java.net.InetSocketAddress
Expand Down Expand Up @@ -294,56 +295,59 @@ object ZStreamPlatformSpecificSpec extends ZIOBaseSpec {
}
),
suite("fromSocketServer")(
test("read data")(check(Gen.string.filter(_.nonEmpty)) { message =>
test("read data") {
for {
refOut <- Ref.make("")
messages <- Gen.string1(Gen.unicodeChar).runCollectN(200)
readMessages <- ZStream
.fromSocketServer(8896)
.zip(ZStream.managed(socketClient(8896)))
.flatMap { case (serverChannel, clientChannel) =>
ZStream
.fromIterable(messages)
.mapZIO(m =>
ZIO
.fromFutureJava(
clientChannel.write(ByteBuffer.wrap(m.getBytes("UTF-8")))
) *> serverChannel.read
.take(m.size.toLong)
.via(ZPipeline.utf8Decode)
.mkString
)

server <- ZStream
.fromSocketServer(8896)
.foreach { c =>
c.read
.via(ZPipeline.utf8Decode)
.mkString
.flatMap(s => refOut.update(_ + s))
}
.fork

_ <- socketClient(8896)
.use(c => ZIO.fromFutureJava(c.write(ByteBuffer.wrap(message.getBytes))))
.retry(Schedule.forever)

receive <- refOut.get.repeatWhileZIO(s => ZIO.succeed(s.isEmpty))

_ <- server.interrupt
} yield assert(receive)(equalTo(message))
}),
test("write data")(check(Gen.string.filter(_.nonEmpty)) { message =>
(for {
refOut <- Ref.make("")

server <- ZStream
.fromSocketServer(8897)
.foreach(c => ZStream.fromIterable(message.getBytes).run(c.write))
.fork

_ <- socketClient(8897).use { c =>
val buffer = ByteBuffer.allocate(message.getBytes.length)

ZIO
.fromFutureJava(c.read(buffer))
.repeatUntil(_ < 1)
.flatMap { _ =>
(buffer: Buffer).flip()
refOut.update(_ => new String(buffer.array))
}
}.retry(Schedule.forever)
}
.take(messages.size.toLong)
.runCollect
} yield assert(readMessages)(hasSameElementsDistinct(readMessages))
},
test("write data") {
for {
messages <- Gen.string1(Gen.unicodeChar).runCollectN(200)
writtenMessages <- ZStream
.fromSocketServer(8897)
.zip(ZStream.managed(socketClient(8897)))
.flatMap { case (serverChannel, clientChannel) =>
ZStream
.fromIterable(messages)
.mapZIO(m =>
ZStream.fromIterable(m.getBytes("UTF-8")).run(serverChannel.write) *> {
val buffer = ByteBuffer.allocate(m.getBytes("UTF-8").length)

receive <- refOut.get.repeatWhileZIO(s => ZIO.succeed(s.isEmpty))
ZIO
.fromFutureJava(clientChannel.read(buffer))
.repeatUntil(_ < 1)
.map { _ =>
(buffer: Buffer).flip()
new String(buffer.array)
}
}
)
}
.take(messages.size.toLong)
.runCollect
} yield assert(writtenMessages)(hasSameElementsDistinct(messages))

_ <- server.interrupt
} yield assert(receive)(equalTo(message)))
})
),
}
) @@ flaky(20), // socket connections can be flaky some times
suite("fromOutputStreamWriter")(
test("reads what is written") {
check(Gen.listOf(Gen.chunkOf(Gen.byte)), Gen.int(1, 10)) { (bytess, chunkSize) =>
Expand Down

0 comments on commit 0966901

Please sign in to comment.