Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NODE-2596 Optimised active leases #3884

Merged
merged 23 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/src/main/protobuf/waves/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,8 @@ message LeaseDetails {
Expired expired = 12;
}
}

message LeaseIdsAndDetailsSeq {
repeated bytes ids = 1;
repeated LeaseDetails details = 2;
}
1 change: 1 addition & 0 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ waves {
db {
directory = ${waves.directory}"/data"
store-transactions-by-address = true
store-lease-states-by-address = true
store-invoke-script-results = true
store-state-hashes = false
# Limits the size of caches which are used during block validation. Lower values slightly decrease memory consumption,
Expand Down
2 changes: 1 addition & 1 deletion node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
override val blocksApi: CommonBlocksApi =
CommonBlocksApi(blockchainUpdater, loadBlockMetaAt(rdb.db, blockchainUpdater), loadBlockInfoAt(rdb, blockchainUpdater))
override val accountsApi: CommonAccountsApi =
CommonAccountsApi(() => blockchainUpdater.snapshotBlockchain, rdb, blockchainUpdater)
CommonAccountsApi(() => blockchainUpdater.snapshotBlockchain, rdb, blockchainUpdater, settings.dbSettings.storeLeaseStatesByAddress)
override val assetsApi: CommonAssetsApi =
CommonAssetsApi(() => blockchainUpdater.bestLiquidSnapshot.orEmpty, rdb.db, blockchainUpdater)
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/main/scala/com/wavesplatform/Explorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ object Explorer extends ScorexLogging {
val s = Scheduler.fixedPool("foo-bar", 8, executionModel = ExecutionModel.AlwaysAsyncExecution)

def countEntries(): Future[Long] = {
CommonAccountsApi(() => SnapshotBlockchain(reader, StateSnapshot.empty), rdb, reader)
CommonAccountsApi(() => SnapshotBlockchain(reader, StateSnapshot.empty), rdb, reader, settings.dbSettings.storeLeaseStatesByAddress)
.dataStream(Address.fromString("3PC9BfRwJWWiw9AREE2B3eWzCks3CYtg4yo").explicitGet(), None)
.countL
.runToFuture(s)
Expand Down
2 changes: 1 addition & 1 deletion node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ object Importer extends ScorexLogging {
Application.loadBlockInfoAt(rdb, blockchainUpdater)
)
override def accountsApi: CommonAccountsApi =
CommonAccountsApi(() => blockchainUpdater.snapshotBlockchain, rdb, blockchainUpdater)
CommonAccountsApi(() => blockchainUpdater.snapshotBlockchain, rdb, blockchainUpdater, settings.dbSettings.storeLeaseStatesByAddress)
override def assetsApi: CommonAssetsApi =
CommonAssetsApi(() => blockchainUpdater.bestLiquidSnapshot.orEmpty, rdb.db, blockchainUpdater)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.wavesplatform.account.Address
import com.wavesplatform.api.common.AddressTransactions.TxByAddressIterator.BatchSize
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.protobuf.EthereumTransactionMeta
import com.wavesplatform.database.{AddressId, DBExt, DBResource, Key, KeyTags, Keys, RDB, readTransactionHNSeqAndType}
import com.wavesplatform.database.{AddressId, DBExt, DBResource, Key, Keys, RDB, readTransactionHNSeqAndType}
import com.wavesplatform.state.{Height, InvokeScriptResult, StateSnapshot, TransactionId, TxMeta, TxNum}
import com.wavesplatform.transaction.{Authorized, EthereumTransaction, GenesisTransaction, Transaction, TransactionType}
import monix.eval.Task
Expand Down Expand Up @@ -118,7 +118,7 @@ object AddressTransactions {
.filter { case (_, tx) => types.isEmpty || types.contains(tx.tpe) }
.collect { case (m, tx: Authorized) if sender.forall(_ == tx.sender.toAddress) => (m, tx, None) }

class TxByAddressIterator(
private class TxByAddressIterator(
db: DBResource,
txHandle: RDB.TxHandle,
addressId: AddressId,
Expand All @@ -127,9 +127,7 @@ object AddressTransactions {
sender: Option[Address],
types: Set[Transaction.Type]
) extends AbstractIterator[Seq[(TxMeta, Transaction, Option[TxNum])]] {
val prefix: Array[Byte] = KeyTags.AddressTransactionHeightTypeAndNums.prefixBytes ++ addressId.toByteArray
val seqNr: Int = db.get(Keys.addressTransactionSeqNr(addressId))

private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId))
db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr).keyBytes))()

final override def computeNext(): Seq[(TxMeta, Transaction, Option[TxNum])] = db.withSafePrefixIterator { dbIterator =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@ import com.google.common.base.Charsets
import com.google.common.collect.AbstractIterator
import com.wavesplatform.account.{Address, Alias}
import com.wavesplatform.api.common.AddressPortfolio.{assetBalanceIterator, nftIterator}
import com.wavesplatform.api.common.TransactionMeta.Ethereum
import com.wavesplatform.api.common.lease.AddressLeaseInfo
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.database.{DBExt, DBResource, KeyTags, Keys, RDB}
import com.wavesplatform.features.BlockchainFeatures
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.protobuf.transaction.PBRecipients
import com.wavesplatform.state.patch.CancelLeasesToDisabledAliases
import com.wavesplatform.state.reader.LeaseDetails.Status
import com.wavesplatform.state.reader.SnapshotBlockchain
import com.wavesplatform.state.{AccountScriptInfo, AssetDescription, Blockchain, DataEntry, Height, InvokeScriptResult, TxMeta}
import com.wavesplatform.state.{AccountScriptInfo, AssetDescription, Blockchain, DataEntry}
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.transaction.EthereumTransaction.Invocation
import com.wavesplatform.transaction.TxValidationError.GenericError
import com.wavesplatform.transaction.lease.LeaseTransaction
import com.wavesplatform.transaction.{EthereumTransaction, TransactionType}
import monix.eval.Task
import monix.reactive.Observable

Expand Down Expand Up @@ -60,7 +53,8 @@ object CommonAccountsApi {
def apply(
compositeBlockchain: () => SnapshotBlockchain,
rdb: RDB,
blockchain: Blockchain
blockchain: Blockchain,
leaseStatesAreStoredByAddress: Boolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

): CommonAccountsApi = new CommonAccountsApi {

override def balance(address: Address, confirmations: Int = 0): Long =
Expand Down Expand Up @@ -130,82 +124,13 @@ object CommonAccountsApi {
override def resolveAlias(alias: Alias): Either[ValidationError, Address] = blockchain.resolveAlias(alias)

override def activeLeases(address: Address): Observable[LeaseInfo] =
addressTransactions(
rdb,
Some(Height(blockchain.height) -> compositeBlockchain().snapshot),
address,
None,
Set(TransactionType.Lease, TransactionType.InvokeScript, TransactionType.InvokeExpression, TransactionType.Ethereum),
None
).flatMapIterable {
case TransactionMeta(leaseHeight, lt: LeaseTransaction, TxMeta.Status.Succeeded) if leaseIsActive(lt.id()) =>
Seq(
LeaseInfo(
lt.id(),
lt.id(),
lt.sender.toAddress,
blockchain.resolveAlias(lt.recipient).explicitGet(),
lt.amount.value,
leaseHeight,
LeaseInfo.Status.Active
)
)
case TransactionMeta.Invoke(invokeHeight, originTransaction, TxMeta.Status.Succeeded, _, Some(scriptResult)) =>
extractLeases(address, scriptResult, originTransaction.id(), invokeHeight)
case Ethereum(height, tx @ EthereumTransaction(_: Invocation, _, _, _), TxMeta.Status.Succeeded, _, _, Some(scriptResult)) =>
extractLeases(address, scriptResult, tx.id(), height)
case _ => Seq()
}

private def extractLeases(subject: Address, result: InvokeScriptResult, txId: ByteStr, height: Height): Seq[LeaseInfo] = {
(for {
lease <- result.leases
details <- blockchain.leaseDetails(lease.id) if details.isActive
sender = details.sender.toAddress
recipient <- blockchain.resolveAlias(lease.recipient).toOption if subject == sender || subject == recipient
} yield LeaseInfo(
lease.id,
txId,
sender,
recipient,
lease.amount,
height,
LeaseInfo.Status.Active
)) ++ {
result.invokes.flatMap(i => extractLeases(subject, i.stateChanges, txId, height))
}
}

private def resolveDisabledAlias(leaseId: ByteStr): Either[ValidationError, Address] =
CancelLeasesToDisabledAliases.patchData
.get(leaseId)
.fold[Either[ValidationError, Address]](Left(GenericError("Unknown lease ID"))) { case (_, recipientAddress) =>
Right(recipientAddress)
}

def leaseInfo(leaseId: ByteStr): Option[LeaseInfo] = blockchain.leaseDetails(leaseId) map { ld =>
LeaseInfo(
leaseId,
ld.sourceId,
ld.sender.toAddress,
blockchain.resolveAlias(ld.recipient).orElse(resolveDisabledAlias(leaseId)).explicitGet(),
ld.amount,
ld.height,
ld.status match {
case Status.Active => LeaseInfo.Status.Active
case Status.Cancelled(_, _) => LeaseInfo.Status.Canceled
case Status.Expired(_) => LeaseInfo.Status.Expired
},
ld.status.cancelHeight,
ld.status.cancelTransactionId
)
}
AddressLeaseInfo.activeLeases(rdb, compositeBlockchain().snapshot, blockchain, address)

private[this] def leaseIsActive(id: ByteStr): Boolean =
blockchain.leaseDetails(id).exists(_.isActive)
def leaseInfo(leaseId: ByteStr): Option[LeaseInfo] =
blockchain.leaseDetails(leaseId).map(LeaseInfo.fromLeaseDetails(leaseId, _, blockchain))
}

class AddressDataIterator(
private class AddressDataIterator(
db: DBResource,
address: Address,
entriesFromDiff: Array[DataEntry[?]],
Expand Down
32 changes: 31 additions & 1 deletion node/src/main/scala/com/wavesplatform/api/common/LeaseInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,45 @@ package com.wavesplatform.api.common
import com.wavesplatform.account.Address
import com.wavesplatform.api.common.LeaseInfo.Status
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.state.Blockchain
import com.wavesplatform.state.patch.CancelLeasesToDisabledAliases
import com.wavesplatform.state.reader.LeaseDetails
import com.wavesplatform.transaction.TxValidationError.GenericError

object LeaseInfo {
type Status = Status.Value
//noinspection TypeAnnotation
// noinspection TypeAnnotation
object Status extends Enumeration {
val Active = Value(1)
val Canceled = Value(0)
val Expired = Value(2)
}

def fromLeaseDetails(id: ByteStr, details: LeaseDetails, blockchain: Blockchain): LeaseInfo =
LeaseInfo(
id,
details.sourceId,
details.sender.toAddress,
blockchain.resolveAlias(details.recipient).orElse(resolveDisabledAlias(id)).explicitGet(),
details.amount,
details.height,
details.status match {
case LeaseDetails.Status.Active => LeaseInfo.Status.Active
case LeaseDetails.Status.Cancelled(_, _) => LeaseInfo.Status.Canceled
case LeaseDetails.Status.Expired(_) => LeaseInfo.Status.Expired
},
details.status.cancelHeight,
details.status.cancelTransactionId
)

private def resolveDisabledAlias(leaseId: ByteStr): Either[ValidationError, Address] =
CancelLeasesToDisabledAliases.patchData
.get(leaseId)
.fold[Either[ValidationError, Address]](Left(GenericError("Unknown lease ID"))) { case (_, recipientAddress) =>
Right(recipientAddress)
}
}

case class LeaseInfo(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.wavesplatform.api.common.lease

import com.wavesplatform.account.Address
import com.wavesplatform.api.common.LeaseInfo.Status.Active
import com.wavesplatform.api.common.LeaseInfo
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.{AddressId, DBExt, DBResource, Keys, RDB}
import com.wavesplatform.state.reader.LeaseDetails
import com.wavesplatform.state.{Blockchain, StateSnapshot}
import monix.eval.Task
import monix.reactive.Observable

import scala.jdk.CollectionConverters.IteratorHasAsScala

object AddressLeaseInfo {
def activeLeases(
rdb: RDB,
snapshot: StateSnapshot,
blockchain: Blockchain,
subject: Address
): Observable[LeaseInfo] = {
val snapshotLeases = leasesFromSnapshot(snapshot, blockchain, subject)
val dbLeases = leasesFromDb(rdb, blockchain, subject)
(Observable.fromIterable(snapshotLeases) ++ dbLeases.filterNot(info => snapshotLeases.exists(_.id == info.id)))
.filter(_.status == Active)
}

private def leasesFromSnapshot(snapshot: StateSnapshot, blockchain: Blockchain, subject: Address): Seq[LeaseInfo] =
snapshot.leaseStates.collect {
case (id, details) if subject == details.sender.toAddress || blockchain.resolveAlias(details.recipient).exists(subject == _) =>
LeaseInfo.fromLeaseDetails(id, details, blockchain)
}.toSeq

private def leasesFromDb(rdb: RDB, blockchain: Blockchain, subject: Address): Observable[LeaseInfo] =
for {
dbResource <- rdb.db.resourceObservable
(leaseId, details) <- dbResource
.get(Keys.addressId(subject))
.map(fromLeaseDbIterator(dbResource, _))
.getOrElse(Observable.empty)
} yield LeaseInfo.fromLeaseDetails(leaseId, details, blockchain)

private def fromLeaseDbIterator(dbResource: DBResource, addressId: AddressId): Observable[(ByteStr, LeaseDetails)] =
Observable
.fromIterator(Task(new LeaseByAddressIterator(dbResource, addressId).asScala))
.concatMapIterable(identity)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.wavesplatform.api.common.lease

import com.google.common.collect.AbstractIterator
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.{AddressId, DBResource, Keys, readLeaseSeq}
import com.wavesplatform.state.reader.LeaseDetails

import scala.collection.mutable

private class LeaseByAddressIterator(db: DBResource, addressId: AddressId) extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] {
private val seqNr = db.get(Keys.addressLeaseSeqNr(addressId))
db.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr).keyBytes))()

final override def computeNext(): Seq[(ByteStr, LeaseDetails)] =
db.withSafePrefixIterator { dbIterator =>
val buffer = mutable.Map[ByteStr, LeaseDetails]()
while (dbIterator.isValid) {
readLeaseSeq(dbIterator.value()).foreach { case (id, newDetails) =>
buffer.updateWith(id)(_.fold(Option(newDetails))(details => if (details.isActive) Some(newDetails) else None))
}
dbIterator.prev()
}
if (buffer.nonEmpty)
buffer.toSeq
else
endOfData()
}(
endOfData()
)
}
4 changes: 3 additions & 1 deletion node/src/main/scala/com/wavesplatform/database/KeyTags.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ object KeyTags extends Enumeration {
StateHash,
EthereumTransactionMeta,
NthTransactionStateSnapshotAtHeight,
MaliciousMinerBanHeights = Value
MaliciousMinerBanHeights,
AddressLeaseInfoSeqNr,
AddressLeaseInfoSeq = Value

final implicit class KeyTagExt(val t: KeyTag) extends AnyVal {
@inline def prefixBytes: Array[Byte] = Shorts.toByteArray(t.id.toShort)
Expand Down
19 changes: 12 additions & 7 deletions node/src/main/scala/com/wavesplatform/database/Keys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@ object DataNode {

object Keys {
import KeyHelpers.*
import KeyTags.{
AddressId as AddressIdTag,
EthereumTransactionMeta as EthereumTransactionMetaTag,
InvokeScriptResult as InvokeScriptResultTag,
LeaseDetails as LeaseDetailsTag,
*
}
import KeyTags.{AddressId as AddressIdTag, EthereumTransactionMeta as EthereumTransactionMetaTag, InvokeScriptResult as InvokeScriptResultTag, LeaseDetails as LeaseDetailsTag, *}

val version: Key[Int] = intKey(Version, default = 1)
val height: Key[Height] =
Expand Down Expand Up @@ -191,6 +185,17 @@ object Keys {
writeTransactionHNSeqAndType
)

def addressLeaseSeqNr(addressId: AddressId): Key[Int] =
bytesSeqNr(AddressLeaseInfoSeqNr, addressId.toByteArray)

def addressLeaseSeq(addressId: AddressId, seqNr: Int): Key[Option[Seq[(ByteStr, LeaseDetails)]]] =
Key.opt(
AddressLeaseInfoSeq,
hBytes(addressId.toByteArray, seqNr),
readLeaseSeq,
writeLeaseSeq
)

def transactionMetaById(txId: TransactionId, cfh: RDB.TxMetaHandle): Key[Option[TransactionMeta]] =
Key.opt(
TransactionMetaById,
Expand Down
18 changes: 18 additions & 0 deletions node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import scala.jdk.CollectionConverters.*
import scala.util.control.NonFatal

object RocksDBWriter extends ScorexLogging {

/** {{{
* ([10, 7, 4], 5, 11) => [10, 7, 4]
* ([10, 7], 5, 11) => [10, 7, 1]
Expand Down Expand Up @@ -537,6 +538,23 @@ class RocksDBWriter(
}
}

if (dbSettings.storeLeaseStatesByAddress) {
val addressIdWithLeases =
for {
(leaseId, details) <- snapshot.leaseStates.toSeq
address <- resolveAlias(details.recipient).toSeq :+ details.sender.toAddress
addressId = this.addressIdWithFallback(address, newAddresses)
} yield (addressId, (leaseId, details))
val leasesByAddressId = addressIdWithLeases.groupMap { case (addressId, _) => (addressId, Keys.addressLeaseSeqNr(addressId)) }(_._2)
rw.multiGetInts(leasesByAddressId.keys.map(_._2).toSeq)
.zip(leasesByAddressId)
.foreach { case (prevSeqNr, ((addressId, leaseSeqKey), leaseIdsAndDetails)) =>
val nextSeqNr = prevSeqNr.getOrElse(0) + 1
rw.put(Keys.addressLeaseSeq(addressId, nextSeqNr), Some(leaseIdsAndDetails))
rw.put(leaseSeqKey, nextSeqNr)
}
}

for ((alias, address) <- snapshot.aliases) {
val key = Keys.addressIdOfAlias(alias)
val value = addressIdWithFallback(address, newAddresses)
Expand Down
Loading