From 8883f780cf4298d0b4c62fea81830a5d002008c6 Mon Sep 17 00:00:00 2001 From: nikita myazin Date: Tue, 11 Jul 2023 00:32:37 +0300 Subject: [PATCH] fetch more elements in repeatZIO regardless of page structure --- .../zio/cassandra/session/SessionSpec.scala | 28 +++++++++++++++++-- .../scala/zio/cassandra/session/Session.scala | 20 +++++++------ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/it/scala/zio/cassandra/session/SessionSpec.scala b/src/it/scala/zio/cassandra/session/SessionSpec.scala index 8fbfce7..a3ebddf 100644 --- a/src/it/scala/zio/cassandra/session/SessionSpec.scala +++ b/src/it/scala/zio/cassandra/session/SessionSpec.scala @@ -206,7 +206,7 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { |where id = 'key' and p_nr = ? and seq_nr >= ? and seq_nr <= ?""".stripMargin ) res <- session.repeatZIO(selectStatement(st)).runCount - } yield assertTrue(records.size == res.toInt) + } yield assertTrue(res.toInt == records.size) }, test("select continuously over multiple key partitions (dsl query)") { val partitionSize = 10L @@ -228,7 +228,31 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils { cql"""select * from $table where id = 'key' and p_nr = $pn and seq_nr >= ${pn * partitionSize} and seq_nr <= ${(pn + 1) * partitionSize}""" } }.runCount - } yield assertTrue(records.size == res.toInt) + } yield assertTrue(res.toInt == records.size) + }, + test("select continuously over multiple key partitions (dsl query) and deal nicely with page end corner case") { + val partitionSize = 10L + for { + session <- ZIO.service[Session] + tbl = "table_" + UUID.randomUUID().toString.replaceAll("-", "_") + table = lift(s"$keyspace.$tbl") + _ <- session.execute { + cqlConst"create table $table(id text, p_nr bigint, seq_nr bigint, primary key((id, p_nr), seq_nr))" + } + records = Chunk.fromIterable(0L.until(37L)) + _ <- records.mapZIO { i => + session.execute(cql"""insert into $table (id, p_nr, seq_nr) values ('key', ${i / partitionSize}, $i)""") + } + partitionNr <- Ref.make(0L) + // read all records per key partition + res <- session.repeatZIO { + partitionNr.getAndUpdate(_ + 1).map { pn => + cql"""select seq_nr from $table where id = 'key' and p_nr = $pn and seq_nr >= ${pn * partitionSize} and seq_nr <= ${(pn + 1) * partitionSize}""" + .config(_.setPageSize(1)) + .as[Long] + } + }.runCollect + } yield assertTrue(res.toSet == records.toSet) } ) } diff --git a/src/main/scala/zio/cassandra/session/Session.scala b/src/main/scala/zio/cassandra/session/Session.scala index be1b698..99b2a06 100644 --- a/src/main/scala/zio/cassandra/session/Session.scala +++ b/src/main/scala/zio/cassandra/session/Session.scala @@ -122,21 +122,25 @@ object Session { ch: ZIO[R, Throwable, AsyncResultSet], fn: Row => A, continuous: Boolean, - next: => ZChannel[R, Any, Any, Any, Throwable, Chunk[A], Any] + next: => ZChannel[R, Any, Any, Any, Throwable, Chunk[A], Any], + partitionNonEmpty: Boolean = false ): ZChannel[R, Any, Any, Any, Throwable, Chunk[A], Any] = ZChannel.fromZIO(ch).flatMap { - case rs if rs.hasMorePages => + case rs if rs.hasMorePages => write(rs.currentPage(), fn) *> loop( ZIO.fromCompletionStage(rs.fetchNextPage()), fn, continuous, - next + next, + partitionNonEmpty = true ) - case rs if rs.currentPage().iterator().hasNext => - if (continuous) { - write(rs.currentPage(), fn) *> next - } else write(rs.currentPage(), fn) - case _ => ZChannel.unit + case rs => + val hasMoreElements = rs.currentPage().iterator().hasNext + + val curr = if (hasMoreElements) write(rs.currentPage(), fn) else ZChannel.unit + val next_ = if (continuous && (hasMoreElements || partitionNonEmpty)) next else ZChannel.unit + + curr *> next_ } override def select(stmt: Statement[_]): Stream[Throwable, Row] =