diff --git a/streams-tests/jvm/src/test/scala/zio/stream/ZStreamPlatformSpecificSpec.scala b/streams-tests/jvm/src/test/scala/zio/stream/ZStreamPlatformSpecificSpec.scala index 9308dc6b46ca..212d8b81f641 100644 --- a/streams-tests/jvm/src/test/scala/zio/stream/ZStreamPlatformSpecificSpec.scala +++ b/streams-tests/jvm/src/test/scala/zio/stream/ZStreamPlatformSpecificSpec.scala @@ -3,6 +3,7 @@ package zio.stream import zio._ import zio.test.Assertion._ import zio.test._ +import zio.test.TestAspect.flaky import java.io.{FileNotFoundException, FileReader, IOException, OutputStream, Reader} import java.net.InetSocketAddress @@ -294,56 +295,59 @@ object ZStreamPlatformSpecificSpec extends ZIOBaseSpec { } ), suite("fromSocketServer")( - test("read data")(check(Gen.string.filter(_.nonEmpty)) { message => + test("read data") { for { - refOut <- Ref.make("") + messages <- Gen.string1(Gen.unicodeChar).runCollectN(200) + readMessages <- ZStream + .fromSocketServer(8896) + .zip(ZStream.managed(socketClient(8896))) + .flatMap { case (serverChannel, clientChannel) => + ZStream + .fromIterable(messages) + .mapZIO(m => + ZIO + .fromFutureJava( + clientChannel.write(ByteBuffer.wrap(m.getBytes("UTF-8"))) + ) *> serverChannel.read + .take(m.size.toLong) + .via(ZPipeline.utf8Decode) + .mkString + ) - server <- ZStream - .fromSocketServer(8896) - .foreach { c => - c.read - .via(ZPipeline.utf8Decode) - .mkString - .flatMap(s => refOut.update(_ + s)) - } - .fork - - _ <- socketClient(8896) - .use(c => ZIO.fromFutureJava(c.write(ByteBuffer.wrap(message.getBytes)))) - .retry(Schedule.forever) - - receive <- refOut.get.repeatWhileZIO(s => ZIO.succeed(s.isEmpty)) - - _ <- server.interrupt - } yield assert(receive)(equalTo(message)) - }), - test("write data")(check(Gen.string.filter(_.nonEmpty)) { message => - (for { - refOut <- Ref.make("") - - server <- ZStream - .fromSocketServer(8897) - .foreach(c => ZStream.fromIterable(message.getBytes).run(c.write)) - .fork - - _ <- socketClient(8897).use { c => - val buffer = ByteBuffer.allocate(message.getBytes.length) - - ZIO - .fromFutureJava(c.read(buffer)) - .repeatUntil(_ < 1) - .flatMap { _ => - (buffer: Buffer).flip() - refOut.update(_ => new String(buffer.array)) - } - }.retry(Schedule.forever) + } + .take(messages.size.toLong) + .runCollect + } yield assert(readMessages)(hasSameElementsDistinct(readMessages)) + }, + test("write data") { + for { + messages <- Gen.string1(Gen.unicodeChar).runCollectN(200) + writtenMessages <- ZStream + .fromSocketServer(8897) + .zip(ZStream.managed(socketClient(8897))) + .flatMap { case (serverChannel, clientChannel) => + ZStream + .fromIterable(messages) + .mapZIO(m => + ZStream.fromIterable(m.getBytes("UTF-8")).run(serverChannel.write) *> { + val buffer = ByteBuffer.allocate(m.getBytes("UTF-8").length) - receive <- refOut.get.repeatWhileZIO(s => ZIO.succeed(s.isEmpty)) + ZIO + .fromFutureJava(clientChannel.read(buffer)) + .repeatUntil(_ < 1) + .map { _ => + (buffer: Buffer).flip() + new String(buffer.array) + } + } + ) + } + .take(messages.size.toLong) + .runCollect + } yield assert(writtenMessages)(hasSameElementsDistinct(messages)) - _ <- server.interrupt - } yield assert(receive)(equalTo(message))) - }) - ), + } + ) @@ flaky(20), // socket connections can be flaky some times suite("fromOutputStreamWriter")( test("reads what is written") { check(Gen.listOf(Gen.chunkOf(Gen.byte)), Gen.int(1, 10)) { (bytess, chunkSize) =>