Skip to content

Commit

Permalink
Add support for querying last known sequence number by persistenceId. (
Browse files Browse the repository at this point in the history
…#267)

* Add support for querying last known sequence number by persistenceId.

* Align to existing code style.

* Update license headers for new files.

* Align to existing code style.

* Remove marker trait.

* mima issue

* Create CurrentLastKnownSequenceNumberByPersistenceIdTest.scala

* Return java.lang.Long value in javadsl.

---------

Co-authored-by: PJ Fanning <[email protected]>
  • Loading branch information
janjaali and pjfanning authored Feb 16, 2025
1 parent cc70ed5 commit e552ef4
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# https://github.com/apache/pekko-persistence-jdbc/pull/267
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.ReadJournalDao.lastPersistenceIdSequenceNumber")
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class DefaultReadJournalDao(
override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result))

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] =
db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)

override def maxJournalSequence(): Future[Long] =
db.run(queries.maxJournalSequenceQuery.result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ trait ReadJournalDao extends JournalDaoWithReadMessages {
*/
def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed]

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]]

/**
* @return The value of the maximum (ordering) id in the journal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
Expand All @@ -43,6 +44,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
}

private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) =
baseTableQuery()
.filter(_.persistenceId === persistenceId)
.map(_.sequenceNumber)
.max

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
.via(serializer.deserializeFlow)
}

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] =
db.run(queries.lastPersistenceIdSequenceNumberQuery(persistenceId).result)

override def messages(
persistenceId: String,
fromSequenceNr: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
val messagesQuery = Compiled(_messagesQuery _)
val eventsByTag = Compiled(_eventsByTag _)
val journalSequenceQuery = Compiled(_journalSequenceQuery _)
val lastPersistenceIdSequenceNumberQuery = Compiled(_lastPersistenceIdSequenceNumberQuery _)
val maxJournalSequenceQuery = Compiled {
JournalTable.map(_.ordering).max.getOrElse(0L)
}
Expand All @@ -38,6 +39,12 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] =
baseTableQuery().map(_.persistenceId).distinct.take(max)

private def _lastPersistenceIdSequenceNumberQuery(persistenceId: Rep[String]) =
baseTableQuery()
.filter(_.persistenceId === persistenceId)
.map(_.sequenceNumber)
.max

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
package org.apache.pekko.persistence.jdbc.query.javadsl

import org.apache.pekko

import java.util.Optional
import java.util.concurrent.CompletionStage

import pekko.NotUsed
import pekko.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJournal }
import pekko.persistence.query.{ EventEnvelope, Offset }
import pekko.persistence.query.javadsl._
import pekko.stream.javadsl.Source
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._

object JdbcReadJournal {
final val Identifier = ScalaJdbcReadJournal.Identifier
Expand Down Expand Up @@ -132,4 +138,16 @@ class JdbcReadJournal(journal: ScalaJdbcReadJournal)
*/
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
journal.eventsByTag(tag, offset).asJava

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): CompletionStage[Optional[java.lang.Long]] =
journal
.currentLastKnownSequenceNumberByPersistenceId(persistenceId)
.asJava
.thenApply(_.map(java.lang.Long.valueOf).toJava)
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,13 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E

def eventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
eventsByTag(tag, offset, terminateAfterOffset = None)

/**
* Returns the last known sequence number for the given `persistenceId`. Empty if the `persistenceId` is unknown.
*
* @param persistenceId The `persistenceId` for which the last known sequence number should be returned.
* @return Some sequence number or None if the `persistenceId` is unknown.
*/
def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] =
readJournalDao.lastPersistenceIdSequenceNumber(persistenceId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.jdbc.query

import org.scalatest.concurrent.ScalaFutures

abstract class CurrentLastKnownSequenceNumberByPersistenceIdTest(config: String) extends QueryTestSpec(config)
with ScalaFutures {

it should "return None for unknown persistenceId" in withActorSystem { implicit system =>
val journalOps = new ScalaJdbcReadJournalOperations(system)

journalOps
.currentLastKnownSequenceNumberByPersistenceId("unknown")
.futureValue shouldBe None
}

it should "return last sequence number for known persistenceId" in withActorSystem { implicit system =>
val journalOps = new ScalaJdbcReadJournalOperations(system)

withTestActors() { (actor1, _, _) =>
actor1 ! 1
actor1 ! 2
actor1 ! 3
actor1 ! 4

eventually {
journalOps
.currentLastKnownSequenceNumberByPersistenceId("my-1")
.futureValue shouldBe Some(4)

// Just ensuring that query targets the correct persistenceId.
journalOps
.currentLastKnownSequenceNumberByPersistenceId("my-2")
.futureValue shouldBe None
}
}
}
}

class H2ScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("h2-shared-db-application.conf")
with H2Cleaner
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class ScalaJdbcReadJournalOperations(readJournal: JdbcReadJournal)(implicit syst
tp.within(within)(f(tp))
}

def currentLastKnownSequenceNumberByPersistenceId(persistenceId: String): Future[Option[Long]] =
readJournal.currentLastKnownSequenceNumberByPersistenceId(persistenceId)

override def countJournal: Future[Long] =
readJournal
.currentPersistenceIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class TestProbeReadJournalDao(val probe: TestProbe) extends ReadJournalDao {
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = ???

override def lastPersistenceIdSequenceNumber(persistenceId: String): Future[Option[Long]] = ???

/**
* Returns a Source of bytes for a certain persistenceId
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.jdbc.integration

import org.apache.pekko.persistence.jdbc.query.{
CurrentLastKnownSequenceNumberByPersistenceIdTest,
MysqlCleaner,
OracleCleaner,
PostgresCleaner,
SqlServerCleaner
}

// Note: these tests use the shared-db configs, the test for all (so not only current) events use the regular db config

class PostgresScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("postgres-shared-db-application.conf")
with PostgresCleaner

class MySQLScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("mysql-shared-db-application.conf")
with MysqlCleaner

class OracleScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("oracle-shared-db-application.conf")
with OracleCleaner

class SqlServerScalaCurrentLastKnownSequenceNumberByPersistenceIdTest
extends CurrentLastKnownSequenceNumberByPersistenceIdTest("sqlserver-shared-db-application.conf")
with SqlServerCleaner

0 comments on commit e552ef4

Please sign in to comment.