Skip to content

Commit

Permalink
DEX-1707 leveldb metrics (#756)
Browse files Browse the repository at this point in the history
* DEX-1707 added metrics

Co-authored-by: Evgeniya Tsybrenko <[email protected]>
Co-authored-by: alexander-branevskiy <[email protected]>
  • Loading branch information
3 people authored Oct 6, 2022
1 parent 2798a31 commit a96d71d
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 144 deletions.
36 changes: 26 additions & 10 deletions dex/src/main/scala/com/wavesplatform/dex/db/AssetPairsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.wavesplatform.dex.db

import com.wavesplatform.dex.db.leveldb.LevelDb
import com.wavesplatform.dex.domain.asset.AssetPair
import com.wavesplatform.dex.meta.getSimpleName
import com.wavesplatform.dex.tool.OnComplete

trait AssetPairsDb[F[_]] {
def add(pair: AssetPair): F[Unit]
Expand All @@ -12,23 +14,37 @@ trait AssetPairsDb[F[_]] {

object AssetPairsDb {

def levelDb[F[_]](levelDb: LevelDb[F]): AssetPairsDb[F] = new AssetPairsDb[F] {
private val cls = getSimpleName(this)

def add(pair: AssetPair): F[Unit] = levelDb.put(DbKeys.assetPair(pair), ())
def remove(pair: AssetPair): F[Unit] = levelDb.delete(DbKeys.assetPair(pair))
def levelDb[F[_]: OnComplete](levelDb: LevelDb[F]): AssetPairsDb[F] = new AssetPairsDb[F] {

def all(): F[Set[AssetPair]] = levelDb.readOnly { ro =>
val r = Set.newBuilder[AssetPair]
def add(pair: AssetPair): F[Unit] =
measureDb(cls, "add") {
levelDb.put(DbKeys.assetPair(pair), ())
}

ro.iterateOver(DbKeys.AssetPairsPrefix) { pair =>
r += AssetPair.fromBytes(pair.getKey.drop(2))._1
def remove(pair: AssetPair): F[Unit] =
measureDb(cls, "remove") {
levelDb.delete(DbKeys.assetPair(pair))
}

r.result()
}
def all(): F[Set[AssetPair]] =
measureDb(cls, "all") {
levelDb.readOnly { ro =>
val r = Set.newBuilder[AssetPair]

ro.iterateOver(DbKeys.AssetPairsPrefix) { pair =>
r += AssetPair.fromBytes(pair.getKey.drop(2))._1
}

r.result()
}
}

def contains(pair: AssetPair): F[Boolean] =
levelDb.has(DbKeys.assetPair(pair))
measureDb(cls, "contains") {
levelDb.has(DbKeys.assetPair(pair))
}

}

Expand Down
19 changes: 16 additions & 3 deletions dex/src/main/scala/com/wavesplatform/dex/db/AssetsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,29 @@ package com.wavesplatform.dex.db
import com.wavesplatform.dex.db.leveldb.LevelDb
import com.wavesplatform.dex.domain.asset.Asset.IssuedAsset
import com.wavesplatform.dex.grpc.integration.dto.BriefAssetDescription
import com.wavesplatform.dex.meta.getSimpleName
import com.wavesplatform.dex.tool.OnComplete

trait AssetsDb[F[_]] extends AssetsReadOnlyDb[F] {
def put(asset: IssuedAsset, item: BriefAssetDescription): F[Unit]
}

object AssetsDb {

def levelDb[F[_]](levelDb: LevelDb[F]): AssetsDb[F] = new AssetsDb[F] {
override def put(asset: IssuedAsset, record: BriefAssetDescription): F[Unit] = levelDb.put(DbKeys.asset(asset), Some(record))
override def get(asset: IssuedAsset): F[Option[BriefAssetDescription]] = levelDb.get(DbKeys.asset(asset))
private val cls = getSimpleName(this)

def levelDb[F[_]: OnComplete](levelDb: LevelDb[F]): AssetsDb[F] = new AssetsDb[F] {

override def put(asset: IssuedAsset, record: BriefAssetDescription): F[Unit] =
measureDb(cls, "put") {
levelDb.put(DbKeys.asset(asset), Some(record))
}

override def get(asset: IssuedAsset): F[Option[BriefAssetDescription]] =
measureDb(cls, "get") {
levelDb.get(DbKeys.asset(asset))
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ trait AssetsReadOnlyDb[F[_]] {
object AssetsReadOnlyDb {

implicit final class AssetsReadOnlyDbOps[F[_]](val self: AssetsReadOnlyDb[F]) extends AnyVal {

def contains(asset: IssuedAsset)(implicit F: Applicative[F]): F[Boolean] = self.get(asset).map(_.nonEmpty)

def get(asset: Asset)(implicit F: Applicative[F]): F[Option[BriefAssetDescription]] = asset match {
case asset: IssuedAsset => self.get(asset)
case Asset.Waves => BriefAssetDescription.someWavesDescription.pure[F]
}
def get(asset: Asset)(implicit F: Applicative[F]): F[Option[BriefAssetDescription]] =
asset match {
case asset: IssuedAsset => self.get(asset)
case Asset.Waves => BriefAssetDescription.someWavesDescription.pure[F]
}

// Can't use MonadError here, because it is not implemented for Id
def unsafeGet(asset: Asset)(implicit F: Applicative[F]): F[BriefAssetDescription] =
Expand Down
42 changes: 26 additions & 16 deletions dex/src/main/scala/com/wavesplatform/dex/db/ExchangeTxStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.wavesplatform.dex.db.leveldb.{LevelDb, ReadWriteDb}
import com.wavesplatform.dex.domain.bytes.ByteStr
import com.wavesplatform.dex.domain.order.Order
import com.wavesplatform.dex.domain.transaction.ExchangeTransaction
import com.wavesplatform.dex.meta.getSimpleName
import com.wavesplatform.dex.tool.OnComplete

trait ExchangeTxStorage[F[_]] {
def put(tx: ExchangeTransaction): F[Unit]
Expand All @@ -12,16 +14,32 @@ trait ExchangeTxStorage[F[_]] {

object ExchangeTxStorage {

def levelDB[F[_]](db: LevelDb[F]): ExchangeTxStorage[F] = new ExchangeTxStorage[F] {
private val cls = getSimpleName(this)

override def put(tx: ExchangeTransaction): F[Unit] = db.readWrite { rw =>
val txKey = DbKeys.exchangeTransaction(tx.id())
if (!rw.has(txKey)) {
rw.put(txKey, Some(tx))
appendTxId(rw, tx.buyOrder.id(), tx.id())
appendTxId(rw, tx.sellOrder.id(), tx.id())
def levelDB[F[_]: OnComplete](db: LevelDb[F]): ExchangeTxStorage[F] = new ExchangeTxStorage[F] {

override def put(tx: ExchangeTransaction): F[Unit] =
measureDb(cls, "put") {
db.readWrite { rw =>
val txKey = DbKeys.exchangeTransaction(tx.id())
if (!rw.has(txKey)) {
rw.put(txKey, Some(tx))
appendTxId(rw, tx.buyOrder.id(), tx.id())
appendTxId(rw, tx.sellOrder.id(), tx.id())
}
}
}

override def transactionsByOrder(orderId: Order.Id): F[Seq[ExchangeTransaction]] =
measureDb(cls, "transactionsByOrder") {
db.readOnly { ro =>
for {
seqNr <- 1 to ro.get(DbKeys.orderTxIdsSeqNr(orderId))
txId = ro.get(DbKeys.orderTxId(orderId, seqNr))
tx <- ro.get(DbKeys.exchangeTransaction(txId))
} yield tx
}
}
}

private def appendTxId(rw: ReadWriteDb, orderId: ByteStr, txId: ByteStr): Unit = {
val key = DbKeys.orderTxIdsSeqNr(orderId)
Expand All @@ -30,14 +48,6 @@ object ExchangeTxStorage {
rw.put(DbKeys.orderTxId(orderId, nextSeqNr), txId)
}

override def transactionsByOrder(orderId: Order.Id): F[Seq[ExchangeTransaction]] = db.readOnly { ro =>
for {
seqNr <- 1 to ro.get(DbKeys.orderTxIdsSeqNr(orderId))
txId = ro.get(DbKeys.orderTxId(orderId, seqNr))
tx <- ro.get(DbKeys.exchangeTransaction(txId))
} yield tx
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import cats.syntax.apply.catsSyntaxTuple2Semigroupal
import com.google.common.primitives.Longs
import com.wavesplatform.dex.db.leveldb.{Key, LevelDb}
import com.wavesplatform.dex.domain.asset.AssetPair
import com.wavesplatform.dex.meta.getSimpleName
import com.wavesplatform.dex.model.OrderBookSnapshot
import com.wavesplatform.dex.queue.ValidatedCommandWithMeta.Offset
import com.wavesplatform.dex.tool.OnComplete

import java.nio.ByteBuffer

Expand All @@ -20,44 +22,61 @@ trait OrderBookSnapshotDb[F[_]] {

object OrderBookSnapshotDb {

def levelDb[F[_]](levelDb: LevelDb[F]): OrderBookSnapshotDb[F] = new OrderBookSnapshotDb[F] {
private val cls = getSimpleName(this)

override def get(assetPair: AssetPair): F[Option[(Offset, OrderBookSnapshot)]] = levelDb.readOnly { ro =>
val (obOffsetKey, obKey) = keys(assetPair)
(ro.get(obOffsetKey), ro.get(obKey)).tupled
}
def levelDb[F[_]: OnComplete](levelDb: LevelDb[F]): OrderBookSnapshotDb[F] = new OrderBookSnapshotDb[F] {

override def update(assetPair: AssetPair, offset: Offset, newSnapshot: Option[OrderBookSnapshot]): F[Unit] = levelDb.readWrite { rw =>
val (obOffsetKey, obKey) = keys(assetPair)
rw.put(obOffsetKey, Some(offset))
newSnapshot.foreach(x => rw.put(obKey, Some(x)))
}
override def get(assetPair: AssetPair): F[Option[(Offset, OrderBookSnapshot)]] =
measureDb(cls, "get") {
levelDb.readOnly { ro =>
val (obOffsetKey, obKey) = keys(assetPair)
(ro.get(obOffsetKey), ro.get(obKey)).tupled
}
}

override def update(assetPair: AssetPair, offset: Offset, newSnapshot: Option[OrderBookSnapshot]): F[Unit] =
measureDb(cls, "update") {
levelDb.readWrite { rw =>
val (obOffsetKey, obKey) = keys(assetPair)
rw.put(obOffsetKey, Some(offset))
newSnapshot.foreach(x => rw.put(obKey, Some(x)))
}
}

override def delete(assetPair: AssetPair): F[Unit] = levelDb.readWrite { rw =>
val (obOffsetKey, obKey) = keys(assetPair)
rw.delete(obOffsetKey)
rw.delete(obKey)
}
override def delete(assetPair: AssetPair): F[Unit] =
measureDb(cls, "delete") {
levelDb.readWrite { rw =>
val (obOffsetKey, obKey) = keys(assetPair)
rw.delete(obOffsetKey)
rw.delete(obKey)
}
}

def iterateOffsets(pred: AssetPair => Boolean): F[Map[AssetPair, Offset]] = levelDb.readOnly { ro =>
val m = Map.newBuilder[AssetPair, Offset]
ro.iterateOver(DbKeys.OrderBookSnapshotOffsetPrefix) { entry =>
val pair = AssetPair.fromBytes(entry.getKey.drop(2))._1
if (pred(pair))
m.addOne(pair -> Longs.fromByteArray(entry.getValue))
def iterateOffsets(pred: AssetPair => Boolean): F[Map[AssetPair, Offset]] =
measureDb(cls, "iterateOffsets") {
levelDb.readOnly { ro =>
val m = Map.newBuilder[AssetPair, Offset]
ro.iterateOver(DbKeys.OrderBookSnapshotOffsetPrefix) { entry =>
val pair = AssetPair.fromBytes(entry.getKey.drop(2))._1
if (pred(pair))
m.addOne(pair -> Longs.fromByteArray(entry.getValue))
}
m.result()
}
}
m.result()
}

def iterateSnapshots(pred: AssetPair => Boolean): F[Map[AssetPair, OrderBookSnapshot]] = levelDb.readOnly { ro =>
val m = Map.newBuilder[AssetPair, OrderBookSnapshot]
ro.iterateOver(DbKeys.OrderBookSnapshotPrefix) { entry =>
val pair = AssetPair.fromBytes(entry.getKey.drop(2))._1
if (pred(pair))
m.addOne(pair -> OrderBookSnapshot.fromBytes(ByteBuffer.wrap(entry.getValue)))
def iterateSnapshots(pred: AssetPair => Boolean): F[Map[AssetPair, OrderBookSnapshot]] =
measureDb(cls, "iterateSnapshots") {
levelDb.readOnly { ro =>
val m = Map.newBuilder[AssetPair, OrderBookSnapshot]
ro.iterateOver(DbKeys.OrderBookSnapshotPrefix) { entry =>
val pair = AssetPair.fromBytes(entry.getKey.drop(2))._1
if (pred(pair))
m.addOne(pair -> OrderBookSnapshot.fromBytes(ByteBuffer.wrap(entry.getValue)))
}
m.result()
}
}
m.result()
}

private def keys(assetPair: AssetPair): (Key[Option[Offset]], Key[Option[OrderBookSnapshot]]) =
(DbKeys.orderBookSnapshotOffset(assetPair), DbKeys.orderBookSnapshot(assetPair))
Expand Down
Loading

0 comments on commit a96d71d

Please sign in to comment.