Skip to content

Commit

Permalink
Persistence, basics
Browse files Browse the repository at this point in the history
  • Loading branch information
IgorCandido committed May 6, 2019
1 parent fce4f7c commit a165e96
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 2 deletions.
17 changes: 15 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,23 @@ version := "0.1"

scalaVersion := "2.12.8"

val akkaPersistence = "com.typesafe.akka" %% "akka-persistence" % "2.5.22"
val akkaTest = "com.typesafe.akka" %% "akka-testkit" % "2.5.22" % Test
val akka = "com.typesafe.akka" %% "akka-actor" % "2.5.22"
val akka = "com.typesafe.akka" %% "akka-actor" % "2.5.22"
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.0-SNAP10" % Test
val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.14.0" % Test
val cats = "org.typelevel" %% "cats-core" % "1.6.0"
val leveldb = "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
val leveld80 = "org.iq80.leveldb" % "leveldb" % "0.7"
val leveldbjniAll = "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"

libraryDependencies ++= Seq(akka, akkaTest, scalaTest, scalaCheck, cats)
libraryDependencies ++= Seq(
akka,
akkaTest,
akkaPersistence,
leveld80,
leveldbjniAll,
scalaTest,
scalaCheck,
cats
)
9 changes: 9 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false
70 changes: 70 additions & 0 deletions src/main/scala/Peristence/AkkaPersistent.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package Peristence

import Peristence.AkkaPersistent.Cmd
import akka.actor.{ActorSystem, Props}
import akka.persistence.{PersistentActor, SnapshotOffer}

import scala.io.StdIn

object AkkaPersistent{
def props = Props(new AkkaPersistent)

case class Cmd(data: String)
case class Evt(data: String)

case class State(events: List[String] = Nil){
def updated(evt: Evt): State = State(evt.data :: events)
def size: Int = events.length

override def toString: String = events.reverse.toString
}
}

class AkkaPersistent extends PersistentActor{
import AkkaPersistent._
var state: State = State()

override def persistenceId: String = "persistenceExample-id-1"

def updateState(event: Evt): Unit =
state = state.updated(event)

def numEvents =
state.size

override def receiveRecover: Receive = {
case evt: Evt => updateState(evt)
case SnapshotOffer(_, snapshot: State) => state = snapshot
}

val snapShotInterval = 1000
override def receiveCommand: Receive = {
case Cmd(data) =>
persist(Evt(s"${data}-${numEvents}")) { event =>
updateState(event)
context.system.eventStream.publish(event)
if(lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0)
saveSnapshot(state)
}

defer(Evt(s"${data}-${numEvents} In Memory")) {event =>
println(s"${event} is being handled deferred in memory")
}
case "print" => println(state)
}
}

object example extends App{
val system = ActorSystem("persistenceTest")

val persistent = system.actorOf(AkkaPersistent.props, "persistent")

persistent ! Cmd("test")
persistent ! Cmd("test 1")
persistent ! "print"

StdIn.readLine()
system.terminate()
}

// todo At-Least-Once Example

0 comments on commit a165e96

Please sign in to comment.