Skip to content

Commit

Permalink
some ride-runner fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Jan 29, 2025
1 parent 56801b9 commit 755cf0d
Show file tree
Hide file tree
Showing 19 changed files with 43 additions and 39 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ lazy val `waves-node` = (project in file("."))
`node-it`,
`node-testkit`,
`node-tests`,
// `node-generator`, // TODO: [scala3] enable
`node-generator`,
benchmark,
// `ride-runner` // TODO: [scala3] enable
)
Expand Down Expand Up @@ -239,7 +239,7 @@ checkPRRaw := Def
(`repl-js` / Compile / fastOptJS).value
(`node-it` / Test / compile).value
(benchmark / Test / compile).value
// (`node-generator` / Compile / compile).value // TODO: [scala3] enable
(`node-generator` / Compile / compile).value // TODO: [scala3] enable
// (`ride-runner` / Test / compile).value // TODO: [scala3] enable
}
)
Expand Down
19 changes: 8 additions & 11 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ object Dependencies {
val curve25519 = "com.wavesplatform" % "curve25519-java" % "0.6.6"
val nettyHandler = "io.netty" % "netty-handler" % "4.1.116.Final"

val shapeless = Def.setting("org.typelevel" %% "shapeless3-deriving" % "3.4.3")

val playJson = "org.playframework" %% "play-json" % "3.0.4"

val scalaTest = "org.scalatest" %% "scalatest" % "3.2.19" % Test
val scalaJsTest = Def.setting("com.lihaoyi" %%% "utest" % "0.8.5" % Test)

val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.10.2"
val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.10.2"
val sttp3 = "com.softwaremill.sttp.client3" %% "core" % "3.10.2"
val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.10.2"

val bouncyCastleProvider = "org.bouncycastle" % s"bcprov-jdk18on" % "1.79"

Expand All @@ -53,8 +51,8 @@ object Dependencies {
monixModule("eval").value,
"org.typelevel" %%% s"cats-core" % "2.12.0",
"com.lihaoyi" %%% "fastparse" % "3.1.1",
"org.typelevel" %%% "cats-mtl" % "1.5.0",
"ch.obermuhlner" % "big-math" % "2.3.2",
"org.typelevel" %%% "cats-mtl" % "1.5.0",
"ch.obermuhlner" % "big-math" % "2.3.2",
googleGuava, // BaseEncoding.base16()
curve25519,
bouncyCastleProvider,
Expand Down Expand Up @@ -98,12 +96,11 @@ object Dependencies {
("org.rudogma" %%% "supertagged" % "2.0-RC2")
.exclude("org.scala-js", "scalajs-library_2.13")
.cross(CrossVersion.for3Use2_13),
"commons-net" % "commons-net" % "3.11.1",
"commons-io" % "commons-io" % "2.18.0",
"com.github.pureconfig" %% "pureconfig-core" % "0.17.8",
"commons-net" % "commons-net" % "3.11.1",
"commons-io" % "commons-io" % "2.18.0",
"com.github.pureconfig" %% "pureconfig-core" % "0.17.8",
"com.github.pureconfig" %% "pureconfig-generic-scala3" % "0.17.8",

"net.logstash.logback" % "logstash-logback-encoder" % "8.0" % Runtime,
"net.logstash.logback" % "logstash-logback-encoder" % "8.0" % Runtime,
kamonCore,
kamonModule("system-metrics"),
kamonModule("influxdb"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.typesafe.config.ConfigMemorySize
import com.wavesplatform.api.GrpcChannelSettings.ChannelOptionsSettings
import io.grpc.netty.{InternalNettyChannelBuilder, NettyChannelBuilder}
import io.netty.channel.ChannelOption
import pureconfig.ConfigReader

import scala.concurrent.duration.FiniteDuration
import scala.util.chaining.*
Expand All @@ -18,7 +19,7 @@ final case class GrpcChannelSettings(
maxInboundMessageSize: ConfigMemorySize,
channelOptions: ChannelOptionsSettings,
maxConcurrentCalls: Option[Int] = None
) {
) derives ConfigReader {

def toNettyChannelBuilder(target: String): NettyChannelBuilder =
NettyChannelBuilder
Expand All @@ -39,5 +40,5 @@ final case class GrpcChannelSettings(
}

object GrpcChannelSettings {
final case class ChannelOptionsSettings(connectTimeout: FiniteDuration)
final case class ChannelOptionsSettings(connectTimeout: FiniteDuration) derives ConfigReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.wavesplatform.common.utils.{Base64, EitherExt2}
import com.wavesplatform.lang.API
import com.wavesplatform.lang.script.Script
import com.wavesplatform.lang.v1.estimator.v3.ScriptEstimatorV3
import com.wavesplatform.common.utils.EitherExt2.explicitGet

object ScriptUtil {
def from(src: String, libraries: Map[String, String] = Map.empty): Script =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.wavesplatform.transaction.Asset.{IssuedAsset, Waves}
import com.wavesplatform.transaction.TxValidationError.AliasDoesNotExist
import com.wavesplatform.transaction.transfer.{TransferTransaction, TransferTransactionLike}
import com.wavesplatform.transaction.{Asset, Proofs, Transaction, TxPositiveAmount}
import com.wavesplatform.common.utils.EitherExt2.explicitGet

import scala.util.chaining.scalaUtilChainingOps

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.chaining.scalaUtilChainingOps
class DefaultDiskCaches private (storage: RideDbAccess, initialBlockHeadersLastHeight: Option[Height]) extends DiskCaches with ScorexLogging {
override val addressIds: AddressIdDiskCache = new AddressIdDiskCache {
private val lastAddressIdKey = KvPairs.LastAddressId.at(())
private val lastAddressId = new AtomicLong(storage.directReadOnly(_.getOpt(lastAddressIdKey).getOrElse(-1L)))
private val lastAddressId = new AtomicLong(storage.directReadOnly(_.getOpt(lastAddressIdKey).fold(-1L)(_.toLong)))

private val addressIdCache: Cache[Address, java.lang.Long] =
Caffeine
Expand All @@ -41,7 +41,7 @@ class DefaultDiskCaches private (storage: RideDbAccess, initialBlockHeadersLastH
addressIdCache.get(
address,
{ address =>
ctx.getOpt(KvPairs.AddressToId.at(address)).fold[JLong](null)(x => JLong.valueOf(x))
ctx.getOpt(KvPairs.AddressToId.at(address)).fold[JLong](null)(x => JLong.valueOf(x.toLong))
}
)
).map(x => AddressId(x.toLong))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.wavesplatform.transaction.serialization.impl.DataTxSerializer
import com.wavesplatform.transaction.{Asset, AssetIdLength, Transaction}
import org.rocksdb.ColumnFamilyHandle
import com.wavesplatform.protobuf.snapshot.TransactionStatus as PBStatus
import com.wavesplatform.common.utils.EitherExt2.explicitGet

import java.io.{ByteArrayOutputStream, OutputStream}
import java.nio.ByteBuffer
Expand Down Expand Up @@ -196,7 +197,7 @@ object KvPairs {
extends KvPair[state.Height, List[TransactionId]](109)(implicitly, AsBytes.listAsBytes.consumeAll(transactionIdWithLenAsBytes))
object Transactions extends KvPair[TransactionId, Option[state.Height]](110)

implicit val addressId: AsBytes[AddressId] = AsBytes.longAsBytes.transform(AddressId(_), x => x)
implicit val addressId: AsBytes[AddressId] = AsBytes.longAsBytes.transform(AddressId(_), _.toLong)

implicit val addressAsBytes: AsBytes[Address] =
AsBytes.byteArrayAsBytes.fixed(Address.AddressLength).transform[Address](Address.fromBytes(_).explicitGet(), _.bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.typesafe.config.ConfigMemorySize
import com.wavesplatform.ride.runner.caches.RemoteData
import com.wavesplatform.ride.runner.caches.mem.MemBlockchainDataCache.Settings
import com.wavesplatform.ride.runner.stats.KamonCaffeineStats
import pureconfig.ConfigReader

class MemBlockchainDataCache(settings: Settings) {
private val backend = Caffeine
Expand Down Expand Up @@ -37,5 +38,5 @@ class MemBlockchainDataCache(settings: Settings) {
}

object MemBlockchainDataCache {
case class Settings(size: ConfigMemorySize)
case class Settings(size: ConfigMemorySize) derives ConfigReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.wavesplatform.state.{AssetDescription, AssetScriptInfo, DataEntry, He
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.utils.StringBytes
import com.wavesplatform.{account, state, transaction}
import com.wavesplatform.common.utils.EitherExt2.explicitGet

sealed trait MemCacheKey extends Product with Serializable {
type ValueT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.wavesplatform.ride.runner.caches.RemoteData
import com.wavesplatform.ride.runner.caches.disk.{KvHistoryPair, KvPair}
import com.wavesplatform.state.Height
import org.rocksdb.ColumnFamilyHandle
import shapeless.=:!=

import scala.annotation.unused

Expand Down Expand Up @@ -64,10 +63,7 @@ trait ReadOnly {
}

def getRemoteDataOpt[T](key: Key[Option[T]]): RemoteData[T] = RemoteData(getOpt(key))

def readFromDb[K, V](k: K, kvHistoryPair: KvHistoryPair[K, V], maxHeight: Height)(implicit @unused ev: V =:!= Option[?]): RemoteData[V] =
readFromDbRaw(k, kvHistoryPair, maxHeight).fold(RemoteData.unknown[V])(RemoteData.Cached(_))


def readFromDb[K, V](k: K, kvHistoryPair: KvHistoryPair[K, Option[V]], maxHeight: Height): RemoteData[V] =
readFromDbRaw(k, kvHistoryPair, maxHeight).fold(RemoteData.unknown[V])(RemoteData.loaded)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.wavesplatform.ride.runner.caches.disk.KvHistoryPair
import com.wavesplatform.ride.runner.db.Heights.{splitHeightsAt, splitHeightsAtRollback}
import com.wavesplatform.state.Height
import org.rocksdb.ColumnFamilyHandle
import shapeless.=:!=

import scala.annotation.unused
import scala.collection.mutable
Expand Down Expand Up @@ -38,7 +37,7 @@ trait ReadWrite extends ReadOnly {
k: K,
kvHistoryPair: KvHistoryPair[K, V],
fromHeight: Height
)(implicit @unused ev: V =:!= Option[?]): RemoteData[V] =
): RemoteData[V] =
RemoteData.cachedOrUnknown(removeFromAndGetLatestExistedBase(k, kvHistoryPair, fromHeight))

def removeFromAndGetLatestExisted[K, V](k: K, kvHistoryPair: KvHistoryPair[K, Option[V]], fromHeight: Height): RemoteData[V] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@ package com.wavesplatform.ride.runner.db
import com.wavesplatform.utils.ScorexLogging
import monix.eval.Task
import org.rocksdb.*
import shapeless.<:!<

import scala.util.Using

trait RideDbAccess {
def batchedReadOnly[T](f: ReadOnly => T)(implicit ev: T <:!< Task[?]): T
def batchedReadWrite[T](f: ReadWrite => T)(implicit ev: T <:!< Task[?]): T
def batchedReadOnly[T](f: ReadOnly => T): T
def batchedReadWrite[T](f: ReadWrite => T): T
def directReadOnly[T](f: ReadOnly => T): T
def directReadWrite[T](f: ReadWrite => T): T
}

object RideDbAccess {
def fromRocksDb(db: RocksDB): RideDbAccess = new RideDbAccess with ScorexLogging {
override def batchedReadOnly[T](f: ReadOnly => T)(implicit ev: T <:!< Task[?]): T = withReadOptions { ro =>
override def batchedReadOnly[T](f: ReadOnly => T): T = withReadOptions { ro =>
f(new BatchedReadOnly(db, ro))
}

override def batchedReadWrite[T](f: ReadWrite => T)(implicit ev: T <:!< Task[?]): T = withReadOptions { ro =>
override def batchedReadWrite[T](f: ReadWrite => T): T = withReadOptions { ro =>
Using.resource(mkWriteOptions()) { wo =>
Using.resource(SynchronizedWriteBatch()) { wb =>
val r = f(new BatchedReadWrite(db, ro, wb))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.typesafe.config.ConfigMemorySize
import com.wavesplatform.api.GrpcChannelSettings
import com.wavesplatform.ride.runner.caches.mem.MemBlockchainDataCache
import com.wavesplatform.ride.runner.db.RideRocksDb
import pureconfig.ConfigReader

import scala.concurrent.duration.FiniteDuration

Expand All @@ -23,7 +24,7 @@ case class RideRunnerCommonSettings(
grpcApiChannel: GrpcChannelSettings,
blockchainUpdatesApiChannel: GrpcChannelSettings,
delayBeforeForceRestartBlockchainUpdates: FiniteDuration
) {
) derives ConfigReader {
val availableProcessors = Runtime.getRuntime.availableProcessors()
val exactRideSchedulerThreads = rideSchedulerThreads.getOrElse(availableProcessors * 2).min(4)
val grpcConnectorExecutorThreads = grpcApiMaxConcurrentRequests.fold(availableProcessors * 2)(_ + 1) // +1 for Blockchain Updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ case class RideRunnerGlobalSettings(
restApi: RestAPISettings,
rideRunner: RideRunnerCommonSettings,
rideCompareService: WavesRideRunnerCompareService.Settings
) {
) derives ConfigReader {
// Consider the service as unhealthy if it don't update events in more than this duration.
// Should be more than publicApi.noDataTimeout, because it could be fixed after a restart of the blockchain updates stream.
val unhealthyIdleTimeoutMs: Long = (publicApi.noDataTimeout + 30.seconds).toMillis
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.wavesplatform.ride.runner.entrypoints.settings

import com.typesafe.config.ConfigMemorySize
import pureconfig.ConfigReader

import scala.concurrent.duration.FiniteDuration

case class RideRunnerResponseCacheSettings(
size: ConfigMemorySize,
ttl: FiniteDuration,
gcThreshold: Int
)
) derives ConfigReader
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.wavesplatform.ride.runner.entrypoints.settings

import pureconfig.ConfigReader

import scala.concurrent.duration.FiniteDuration

case class WavesPublicApiSettings(
restApi: String,
grpcApi: String,
grpcBlockchainUpdatesApi: String,
noDataTimeout: FiniteDuration
)
) derives ConfigReader
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.wavesplatform.account.Address
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.features.BlockchainFeatures
import com.wavesplatform.transaction.Asset.IssuedAsset
import pureconfig.ConfigReader

case class RideRunnerBlockchainState(
height: Int = 3296626,
Expand All @@ -12,4 +13,4 @@ case class RideRunnerBlockchainState(
assets: Map[IssuedAsset, RideRunnerAsset] = Map.empty,
blocks: Map[Int, RideRunnerBlock] = Map.empty,
transactions: Map[ByteStr, RideRunnerTransaction] = Map.empty
)
) derives ConfigReader
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.wavesplatform.ride.runner.input

import com.wavesplatform.transaction.TxNonNegativeAmount
import pureconfig.ConfigReader

case class RideRunnerLeaseBalance(in: TxNonNegativeAmount = TxNonNegativeAmount(0), out: TxNonNegativeAmount = TxNonNegativeAmount(0))
case class RideRunnerLeaseBalance(in: TxNonNegativeAmount = TxNonNegativeAmount.unsafeFrom(0), out: TxNonNegativeAmount = TxNonNegativeAmount.unsafeFrom(0))
derives ConfigReader
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ private final class OnErrorRetryWithObservable[A](source: Observable[A], p: Part
private def loop(subscriber: Subscriber[A], task: OrderedCancelable, retryIdx: Long, firstMessage: Option[A]): Unit = {
val cancelable = source.unsafeSubscribeFn(new Subscriber[A] {
implicit val scheduler: Scheduler = subscriber.scheduler
private[this] var isDone = false
private var isDone = false

// This should be here, calling subscriber.onNext in onError doesn't work
private[this] var ack: Future[Ack] = firstMessage.fold[Future[Ack]](Continue)(subscriber.onNext)
private var ack: Future[Ack] = firstMessage.fold[Future[Ack]](Continue)(subscriber.onNext)

def onNext(elem: A): Future[Ack] = {
ack = subscriber.onNext(elem)
Expand Down

0 comments on commit 755cf0d

Please sign in to comment.