Skip to content

Commit

Permalink
Merge pull request #37 from narma/fetch_more_elements_in_repeatZIO_re…
Browse files Browse the repository at this point in the history
…gardless_of_page_structure

fetch more elements in repeatZIO regardless of page structure
  • Loading branch information
myazinn authored Jul 11, 2023
2 parents 2e4639e + 8883f78 commit 01d605f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 10 deletions.
28 changes: 26 additions & 2 deletions src/it/scala/zio/cassandra/session/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
|where id = 'key' and p_nr = ? and seq_nr >= ? and seq_nr <= ?""".stripMargin
)
res <- session.repeatZIO(selectStatement(st)).runCount
} yield assertTrue(records.size == res.toInt)
} yield assertTrue(res.toInt == records.size)
},
test("select continuously over multiple key partitions (dsl query)") {
val partitionSize = 10L
Expand All @@ -228,7 +228,31 @@ object SessionSpec extends ZIOCassandraSpec with ZIOCassandraSpecUtils {
cql"""select * from $table where id = 'key' and p_nr = $pn and seq_nr >= ${pn * partitionSize} and seq_nr <= ${(pn + 1) * partitionSize}"""
}
}.runCount
} yield assertTrue(records.size == res.toInt)
} yield assertTrue(res.toInt == records.size)
},
test("select continuously over multiple key partitions (dsl query) and deal nicely with page end corner case") {
val partitionSize = 10L
for {
session <- ZIO.service[Session]
tbl = "table_" + UUID.randomUUID().toString.replaceAll("-", "_")
table = lift(s"$keyspace.$tbl")
_ <- session.execute {
cqlConst"create table $table(id text, p_nr bigint, seq_nr bigint, primary key((id, p_nr), seq_nr))"
}
records = Chunk.fromIterable(0L.until(37L))
_ <- records.mapZIO { i =>
session.execute(cql"""insert into $table (id, p_nr, seq_nr) values ('key', ${i / partitionSize}, $i)""")
}
partitionNr <- Ref.make(0L)
// read all records per key partition
res <- session.repeatZIO {
partitionNr.getAndUpdate(_ + 1).map { pn =>
cql"""select seq_nr from $table where id = 'key' and p_nr = $pn and seq_nr >= ${pn * partitionSize} and seq_nr <= ${(pn + 1) * partitionSize}"""
.config(_.setPageSize(1))
.as[Long]
}
}.runCollect
} yield assertTrue(res.toSet == records.toSet)
}
)
}
20 changes: 12 additions & 8 deletions src/main/scala/zio/cassandra/session/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,25 @@ object Session {
ch: ZIO[R, Throwable, AsyncResultSet],
fn: Row => A,
continuous: Boolean,
next: => ZChannel[R, Any, Any, Any, Throwable, Chunk[A], Any]
next: => ZChannel[R, Any, Any, Any, Throwable, Chunk[A], Any],
partitionNonEmpty: Boolean = false
): ZChannel[R, Any, Any, Any, Throwable, Chunk[A], Any] =
ZChannel.fromZIO(ch).flatMap {
case rs if rs.hasMorePages =>
case rs if rs.hasMorePages =>
write(rs.currentPage(), fn) *> loop(
ZIO.fromCompletionStage(rs.fetchNextPage()),
fn,
continuous,
next
next,
partitionNonEmpty = true
)
case rs if rs.currentPage().iterator().hasNext =>
if (continuous) {
write(rs.currentPage(), fn) *> next
} else write(rs.currentPage(), fn)
case _ => ZChannel.unit
case rs =>
val hasMoreElements = rs.currentPage().iterator().hasNext

val curr = if (hasMoreElements) write(rs.currentPage(), fn) else ZChannel.unit
val next_ = if (continuous && (hasMoreElements || partitionNonEmpty)) next else ZChannel.unit

curr *> next_
}

override def select(stmt: Statement[_]): Stream[Throwable, Row] =
Expand Down

0 comments on commit 01d605f

Please sign in to comment.