diff --git a/build.sbt b/build.sbt index e7aae6e0c4..3df6f95953 100644 --- a/build.sbt +++ b/build.sbt @@ -142,7 +142,7 @@ lazy val `waves-node` = (project in file(".")) `node-it`, `node-testkit`, `node-tests`, - // `node-generator`, // TODO: [scala3] enable + `node-generator`, benchmark, // `ride-runner` // TODO: [scala3] enable ) @@ -239,7 +239,7 @@ checkPRRaw := Def (`repl-js` / Compile / fastOptJS).value (`node-it` / Test / compile).value (benchmark / Test / compile).value - // (`node-generator` / Compile / compile).value // TODO: [scala3] enable + (`node-generator` / Compile / compile).value // TODO: [scala3] enable // (`ride-runner` / Test / compile).value // TODO: [scala3] enable } ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f27f292c84..816eefe482 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -32,15 +32,13 @@ object Dependencies { val curve25519 = "com.wavesplatform" % "curve25519-java" % "0.6.6" val nettyHandler = "io.netty" % "netty-handler" % "4.1.116.Final" - val shapeless = Def.setting("org.typelevel" %% "shapeless3-deriving" % "3.4.3") - val playJson = "org.playframework" %% "play-json" % "3.0.4" val scalaTest = "org.scalatest" %% "scalatest" % "3.2.19" % Test val scalaJsTest = Def.setting("com.lihaoyi" %%% "utest" % "0.8.5" % Test) - val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.10.2" - val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.10.2" + val sttp3 = "com.softwaremill.sttp.client3" %% "core" % "3.10.2" + val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.10.2" val bouncyCastleProvider = "org.bouncycastle" % s"bcprov-jdk18on" % "1.79" @@ -53,8 +51,8 @@ object Dependencies { monixModule("eval").value, "org.typelevel" %%% s"cats-core" % "2.12.0", "com.lihaoyi" %%% "fastparse" % "3.1.1", - "org.typelevel" %%% "cats-mtl" % "1.5.0", - "ch.obermuhlner" % "big-math" % "2.3.2", + "org.typelevel" %%% "cats-mtl" % "1.5.0", + "ch.obermuhlner" % "big-math" % "2.3.2", googleGuava, // BaseEncoding.base16() curve25519, bouncyCastleProvider, @@ -98,12 +96,11 @@ object Dependencies { ("org.rudogma" %%% "supertagged" % "2.0-RC2") .exclude("org.scala-js", "scalajs-library_2.13") .cross(CrossVersion.for3Use2_13), - "commons-net" % "commons-net" % "3.11.1", - "commons-io" % "commons-io" % "2.18.0", - "com.github.pureconfig" %% "pureconfig-core" % "0.17.8", + "commons-net" % "commons-net" % "3.11.1", + "commons-io" % "commons-io" % "2.18.0", + "com.github.pureconfig" %% "pureconfig-core" % "0.17.8", "com.github.pureconfig" %% "pureconfig-generic-scala3" % "0.17.8", - - "net.logstash.logback" % "logstash-logback-encoder" % "8.0" % Runtime, + "net.logstash.logback" % "logstash-logback-encoder" % "8.0" % Runtime, kamonCore, kamonModule("system-metrics"), kamonModule("influxdb"), diff --git a/ride-runner/src/main/scala/com/wavesplatform/api/GrpcChannelSettings.scala b/ride-runner/src/main/scala/com/wavesplatform/api/GrpcChannelSettings.scala index d90599b930..933845a4a7 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/api/GrpcChannelSettings.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/api/GrpcChannelSettings.scala @@ -4,6 +4,7 @@ import com.typesafe.config.ConfigMemorySize import com.wavesplatform.api.GrpcChannelSettings.ChannelOptionsSettings import io.grpc.netty.{InternalNettyChannelBuilder, NettyChannelBuilder} import io.netty.channel.ChannelOption +import pureconfig.ConfigReader import scala.concurrent.duration.FiniteDuration import scala.util.chaining.* @@ -18,7 +19,7 @@ final case class GrpcChannelSettings( maxInboundMessageSize: ConfigMemorySize, channelOptions: ChannelOptionsSettings, maxConcurrentCalls: Option[Int] = None -) { +) derives ConfigReader { def toNettyChannelBuilder(target: String): NettyChannelBuilder = NettyChannelBuilder @@ -39,5 +40,5 @@ final case class GrpcChannelSettings( } object GrpcChannelSettings { - final case class ChannelOptionsSettings(connectTimeout: FiniteDuration) + final case class ChannelOptionsSettings(connectTimeout: FiniteDuration) derives ConfigReader } diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/ScriptUtil.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/ScriptUtil.scala index 94e877b6ea..7b06dd72f3 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/ScriptUtil.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/ScriptUtil.scala @@ -4,6 +4,7 @@ import com.wavesplatform.common.utils.{Base64, EitherExt2} import com.wavesplatform.lang.API import com.wavesplatform.lang.script.Script import com.wavesplatform.lang.v1.estimator.v3.ScriptEstimatorV3 +import com.wavesplatform.common.utils.EitherExt2.explicitGet object ScriptUtil { def from(src: String, libraries: Map[String, String] = Map.empty): Script = diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala index e6d202883b..97e471c07b 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/blockchain/ImmutableBlockchain.scala @@ -21,6 +21,7 @@ import com.wavesplatform.transaction.Asset.{IssuedAsset, Waves} import com.wavesplatform.transaction.TxValidationError.AliasDoesNotExist import com.wavesplatform.transaction.transfer.{TransferTransaction, TransferTransactionLike} import com.wavesplatform.transaction.{Asset, Proofs, Transaction, TxPositiveAmount} +import com.wavesplatform.common.utils.EitherExt2.explicitGet import scala.util.chaining.scalaUtilChainingOps diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala index 57639c12db..218fe73e19 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala @@ -23,7 +23,7 @@ import scala.util.chaining.scalaUtilChainingOps class DefaultDiskCaches private (storage: RideDbAccess, initialBlockHeadersLastHeight: Option[Height]) extends DiskCaches with ScorexLogging { override val addressIds: AddressIdDiskCache = new AddressIdDiskCache { private val lastAddressIdKey = KvPairs.LastAddressId.at(()) - private val lastAddressId = new AtomicLong(storage.directReadOnly(_.getOpt(lastAddressIdKey).getOrElse(-1L))) + private val lastAddressId = new AtomicLong(storage.directReadOnly(_.getOpt(lastAddressIdKey).fold(-1L)(_.toLong))) private val addressIdCache: Cache[Address, java.lang.Long] = Caffeine @@ -41,7 +41,7 @@ class DefaultDiskCaches private (storage: RideDbAccess, initialBlockHeadersLastH addressIdCache.get( address, { address => - ctx.getOpt(KvPairs.AddressToId.at(address)).fold[JLong](null)(x => JLong.valueOf(x)) + ctx.getOpt(KvPairs.AddressToId.at(address)).fold[JLong](null)(x => JLong.valueOf(x.toLong)) } ) ).map(x => AddressId(x.toLong)) diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala index a6645cc536..7f1a96781a 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala @@ -23,6 +23,7 @@ import com.wavesplatform.transaction.serialization.impl.DataTxSerializer import com.wavesplatform.transaction.{Asset, AssetIdLength, Transaction} import org.rocksdb.ColumnFamilyHandle import com.wavesplatform.protobuf.snapshot.TransactionStatus as PBStatus +import com.wavesplatform.common.utils.EitherExt2.explicitGet import java.io.{ByteArrayOutputStream, OutputStream} import java.nio.ByteBuffer @@ -196,7 +197,7 @@ object KvPairs { extends KvPair[state.Height, List[TransactionId]](109)(implicitly, AsBytes.listAsBytes.consumeAll(transactionIdWithLenAsBytes)) object Transactions extends KvPair[TransactionId, Option[state.Height]](110) - implicit val addressId: AsBytes[AddressId] = AsBytes.longAsBytes.transform(AddressId(_), x => x) + implicit val addressId: AsBytes[AddressId] = AsBytes.longAsBytes.transform(AddressId(_), _.toLong) implicit val addressAsBytes: AsBytes[Address] = AsBytes.byteArrayAsBytes.fixed(Address.AddressLength).transform[Address](Address.fromBytes(_).explicitGet(), _.bytes) diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemBlockchainDataCache.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemBlockchainDataCache.scala index 46a5695232..1e4a57dc66 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemBlockchainDataCache.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemBlockchainDataCache.scala @@ -5,6 +5,7 @@ import com.typesafe.config.ConfigMemorySize import com.wavesplatform.ride.runner.caches.RemoteData import com.wavesplatform.ride.runner.caches.mem.MemBlockchainDataCache.Settings import com.wavesplatform.ride.runner.stats.KamonCaffeineStats +import pureconfig.ConfigReader class MemBlockchainDataCache(settings: Settings) { private val backend = Caffeine @@ -37,5 +38,5 @@ class MemBlockchainDataCache(settings: Settings) { } object MemBlockchainDataCache { - case class Settings(size: ConfigMemorySize) + case class Settings(size: ConfigMemorySize) derives ConfigReader } diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemCacheKey.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemCacheKey.scala index 2c7d1f4d81..b181f81b6e 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemCacheKey.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/mem/MemCacheKey.scala @@ -14,6 +14,7 @@ import com.wavesplatform.state.{AssetDescription, AssetScriptInfo, DataEntry, He import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.utils.StringBytes import com.wavesplatform.{account, state, transaction} +import com.wavesplatform.common.utils.EitherExt2.explicitGet sealed trait MemCacheKey extends Product with Serializable { type ValueT diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala index d3b5c1b692..be6d6eecc6 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala @@ -6,7 +6,6 @@ import com.wavesplatform.ride.runner.caches.RemoteData import com.wavesplatform.ride.runner.caches.disk.{KvHistoryPair, KvPair} import com.wavesplatform.state.Height import org.rocksdb.ColumnFamilyHandle -import shapeless.=:!= import scala.annotation.unused @@ -64,10 +63,7 @@ trait ReadOnly { } def getRemoteDataOpt[T](key: Key[Option[T]]): RemoteData[T] = RemoteData(getOpt(key)) - - def readFromDb[K, V](k: K, kvHistoryPair: KvHistoryPair[K, V], maxHeight: Height)(implicit @unused ev: V =:!= Option[?]): RemoteData[V] = - readFromDbRaw(k, kvHistoryPair, maxHeight).fold(RemoteData.unknown[V])(RemoteData.Cached(_)) - + def readFromDb[K, V](k: K, kvHistoryPair: KvHistoryPair[K, Option[V]], maxHeight: Height): RemoteData[V] = readFromDbRaw(k, kvHistoryPair, maxHeight).fold(RemoteData.unknown[V])(RemoteData.loaded) diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala index 3c6d7c30b6..ce793611cb 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala @@ -7,7 +7,6 @@ import com.wavesplatform.ride.runner.caches.disk.KvHistoryPair import com.wavesplatform.ride.runner.db.Heights.{splitHeightsAt, splitHeightsAtRollback} import com.wavesplatform.state.Height import org.rocksdb.ColumnFamilyHandle -import shapeless.=:!= import scala.annotation.unused import scala.collection.mutable @@ -38,7 +37,7 @@ trait ReadWrite extends ReadOnly { k: K, kvHistoryPair: KvHistoryPair[K, V], fromHeight: Height - )(implicit @unused ev: V =:!= Option[?]): RemoteData[V] = + ): RemoteData[V] = RemoteData.cachedOrUnknown(removeFromAndGetLatestExistedBase(k, kvHistoryPair, fromHeight)) def removeFromAndGetLatestExisted[K, V](k: K, kvHistoryPair: KvHistoryPair[K, Option[V]], fromHeight: Height): RemoteData[V] = diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/RideDbAccess.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/RideDbAccess.scala index 91c5e23c67..a826ff0081 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/RideDbAccess.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/RideDbAccess.scala @@ -3,24 +3,23 @@ package com.wavesplatform.ride.runner.db import com.wavesplatform.utils.ScorexLogging import monix.eval.Task import org.rocksdb.* -import shapeless.<:!< import scala.util.Using trait RideDbAccess { - def batchedReadOnly[T](f: ReadOnly => T)(implicit ev: T <:!< Task[?]): T - def batchedReadWrite[T](f: ReadWrite => T)(implicit ev: T <:!< Task[?]): T + def batchedReadOnly[T](f: ReadOnly => T): T + def batchedReadWrite[T](f: ReadWrite => T): T def directReadOnly[T](f: ReadOnly => T): T def directReadWrite[T](f: ReadWrite => T): T } object RideDbAccess { def fromRocksDb(db: RocksDB): RideDbAccess = new RideDbAccess with ScorexLogging { - override def batchedReadOnly[T](f: ReadOnly => T)(implicit ev: T <:!< Task[?]): T = withReadOptions { ro => + override def batchedReadOnly[T](f: ReadOnly => T): T = withReadOptions { ro => f(new BatchedReadOnly(db, ro)) } - override def batchedReadWrite[T](f: ReadWrite => T)(implicit ev: T <:!< Task[?]): T = withReadOptions { ro => + override def batchedReadWrite[T](f: ReadWrite => T): T = withReadOptions { ro => Using.resource(mkWriteOptions()) { wo => Using.resource(SynchronizedWriteBatch()) { wb => val r = f(new BatchedReadWrite(db, ro, wb)) diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerCommonSettings.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerCommonSettings.scala index d0ac7490b7..43547751a9 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerCommonSettings.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerCommonSettings.scala @@ -4,6 +4,7 @@ import com.typesafe.config.ConfigMemorySize import com.wavesplatform.api.GrpcChannelSettings import com.wavesplatform.ride.runner.caches.mem.MemBlockchainDataCache import com.wavesplatform.ride.runner.db.RideRocksDb +import pureconfig.ConfigReader import scala.concurrent.duration.FiniteDuration @@ -23,7 +24,7 @@ case class RideRunnerCommonSettings( grpcApiChannel: GrpcChannelSettings, blockchainUpdatesApiChannel: GrpcChannelSettings, delayBeforeForceRestartBlockchainUpdates: FiniteDuration -) { +) derives ConfigReader { val availableProcessors = Runtime.getRuntime.availableProcessors() val exactRideSchedulerThreads = rideSchedulerThreads.getOrElse(availableProcessors * 2).min(4) val grpcConnectorExecutorThreads = grpcApiMaxConcurrentRequests.fold(availableProcessors * 2)(_ + 1) // +1 for Blockchain Updates diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerGlobalSettings.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerGlobalSettings.scala index bcf51c3d9d..4d07d79158 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerGlobalSettings.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerGlobalSettings.scala @@ -18,7 +18,7 @@ case class RideRunnerGlobalSettings( restApi: RestAPISettings, rideRunner: RideRunnerCommonSettings, rideCompareService: WavesRideRunnerCompareService.Settings -) { +) derives ConfigReader { // Consider the service as unhealthy if it don't update events in more than this duration. // Should be more than publicApi.noDataTimeout, because it could be fixed after a restart of the blockchain updates stream. val unhealthyIdleTimeoutMs: Long = (publicApi.noDataTimeout + 30.seconds).toMillis diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerResponseCacheSettings.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerResponseCacheSettings.scala index 81edf34c23..242ba65c42 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerResponseCacheSettings.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/RideRunnerResponseCacheSettings.scala @@ -1,6 +1,7 @@ package com.wavesplatform.ride.runner.entrypoints.settings import com.typesafe.config.ConfigMemorySize +import pureconfig.ConfigReader import scala.concurrent.duration.FiniteDuration @@ -8,4 +9,4 @@ case class RideRunnerResponseCacheSettings( size: ConfigMemorySize, ttl: FiniteDuration, gcThreshold: Int -) +) derives ConfigReader diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/WavesPublicApiSettings.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/WavesPublicApiSettings.scala index 6b0df8758a..adf7b0e42d 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/WavesPublicApiSettings.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/entrypoints/settings/WavesPublicApiSettings.scala @@ -1,5 +1,7 @@ package com.wavesplatform.ride.runner.entrypoints.settings +import pureconfig.ConfigReader + import scala.concurrent.duration.FiniteDuration case class WavesPublicApiSettings( @@ -7,4 +9,4 @@ case class WavesPublicApiSettings( grpcApi: String, grpcBlockchainUpdatesApi: String, noDataTimeout: FiniteDuration -) +) derives ConfigReader diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerBlockchainState.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerBlockchainState.scala index fdb1c575ff..a818141106 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerBlockchainState.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerBlockchainState.scala @@ -4,6 +4,7 @@ import com.wavesplatform.account.Address import com.wavesplatform.common.state.ByteStr import com.wavesplatform.features.BlockchainFeatures import com.wavesplatform.transaction.Asset.IssuedAsset +import pureconfig.ConfigReader case class RideRunnerBlockchainState( height: Int = 3296626, @@ -12,4 +13,4 @@ case class RideRunnerBlockchainState( assets: Map[IssuedAsset, RideRunnerAsset] = Map.empty, blocks: Map[Int, RideRunnerBlock] = Map.empty, transactions: Map[ByteStr, RideRunnerTransaction] = Map.empty -) +) derives ConfigReader diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerLeaseBalance.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerLeaseBalance.scala index 578463bac5..ef4aecf951 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerLeaseBalance.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/input/RideRunnerLeaseBalance.scala @@ -1,5 +1,7 @@ package com.wavesplatform.ride.runner.input import com.wavesplatform.transaction.TxNonNegativeAmount +import pureconfig.ConfigReader -case class RideRunnerLeaseBalance(in: TxNonNegativeAmount = TxNonNegativeAmount(0), out: TxNonNegativeAmount = TxNonNegativeAmount(0)) +case class RideRunnerLeaseBalance(in: TxNonNegativeAmount = TxNonNegativeAmount.unsafeFrom(0), out: TxNonNegativeAmount = TxNonNegativeAmount.unsafeFrom(0)) + derives ConfigReader diff --git a/ride-runner/src/main/scala/monix/reactive/operators/OnErrorRetryWithObservable.scala b/ride-runner/src/main/scala/monix/reactive/operators/OnErrorRetryWithObservable.scala index 93aa74e113..d8735b63d0 100644 --- a/ride-runner/src/main/scala/monix/reactive/operators/OnErrorRetryWithObservable.scala +++ b/ride-runner/src/main/scala/monix/reactive/operators/OnErrorRetryWithObservable.scala @@ -15,10 +15,10 @@ private final class OnErrorRetryWithObservable[A](source: Observable[A], p: Part private def loop(subscriber: Subscriber[A], task: OrderedCancelable, retryIdx: Long, firstMessage: Option[A]): Unit = { val cancelable = source.unsafeSubscribeFn(new Subscriber[A] { implicit val scheduler: Scheduler = subscriber.scheduler - private[this] var isDone = false + private var isDone = false // This should be here, calling subscriber.onNext in onError doesn't work - private[this] var ack: Future[Ack] = firstMessage.fold[Future[Ack]](Continue)(subscriber.onNext) + private var ack: Future[Ack] = firstMessage.fold[Future[Ack]](Continue)(subscriber.onNext) def onNext(elem: A): Future[Ack] = { ack = subscriber.onNext(elem)