Skip to content

Commit

Permalink
NODE-2293: Improve blockchain updates (#3359)
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq authored Mar 24, 2021
1 parent 7f83b02 commit a4e96dd
Show file tree
Hide file tree
Showing 38 changed files with 1,448 additions and 761 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import com.wavesplatform.account.{Address, AddressScheme, KeyPair}
import com.wavesplatform.block.Block
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils._
import com.wavesplatform.database.{LevelDBWriter, openDB}
import com.wavesplatform.database.{openDB, LevelDBWriter}
import com.wavesplatform.protobuf.transaction.PBRecipients
import com.wavesplatform.state.{Diff, Portfolio}
import com.wavesplatform.transaction.{GenesisTransaction, Proofs}
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.transaction.assets.IssueTransaction
import com.wavesplatform.transaction.{GenesisTransaction, Proofs}
import com.wavesplatform.utils.{NTP, ScorexLogging}
import monix.reactive.Observer

Expand Down Expand Up @@ -92,7 +92,7 @@ object RollbackBenchmark extends ScorexLogging {

log.info("Rolling back")
val start = System.nanoTime()
levelDBWriter.rollbackTo(genesisBlock.id())
levelDBWriter.rollbackTo(1)
val end = System.nanoTime()
log.info(f"Rollback took ${(end - start) * 1e-6}%.3f ms")
levelDBWriter.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
private[this] implicit val scheduler = Schedulers.fixedPool(sys.runtime.availableProcessors(), "blockchain-updates")

private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
private[this] val repo = new UpdatesRepoImpl(s"${context.settings.directory}/blockchain-updates")
private[this] val repo = new UpdatesRepoImpl(s"${context.settings.directory}/blockchain-updates", context.blocksApi)

private[this] var grpcServer: Server = null

Expand All @@ -40,15 +40,18 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
val exception = new IllegalStateException(s"BlockchainUpdates at height $extensionHeight is lower than node at height $nodeHeight")
log.error("BlockchainUpdates startup check failed", exception)
throw exception
} else if (nodeHeight > 0) {
} else if (nodeHeight == 0) {
if (extensionHeight > 0) log.warn("Data has been reset, dropping entire blockchain updates data")
repo.rollback(context.blockchain, ByteStr.empty, 0, sendEvent = false).get
} else {
(repo.updateForHeight(nodeHeight), context.blockchain.blockHeader(nodeHeight)) match {
case (Success(Some(extensionBlockAtNodeHeight)), Some(lastNodeBlockHeader)) =>
val lastNodeBlockId = lastNodeBlockHeader.id.value()
case (Success(extensionBlockAtNodeHeight), Some(lastNodeBlockHeader)) =>
val lastNodeBlockId = lastNodeBlockHeader.id()

// check if extension is on fork. Block ids must be equal at node height
if (extensionBlockAtNodeHeight.toId != lastNodeBlockId) {
if (extensionBlockAtNodeHeight.id != lastNodeBlockId) {
val exception = new IllegalStateException(
s"BlockchainUpdates extension has forked: at node height $nodeHeight node block id is $lastNodeBlockId, extension's is ${extensionBlockAtNodeHeight.toId}"
s"BlockchainUpdates extension has forked: at node height $nodeHeight node block id is $lastNodeBlockId, extension's is ${extensionBlockAtNodeHeight.id}"
)
log.error("BlockchainUpdates startup check failed", exception)
throw exception
Expand All @@ -58,16 +61,10 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
if (extensionHeight > nodeHeight) {
log.warn(s"BlockchainUpdates at height $extensionHeight is higher than node at height $nodeHeight, rolling back BlockchainUpdates")
repo
.rollback(RollbackCompleted(extensionBlockAtNodeHeight.toId, extensionBlockAtNodeHeight.toHeight))
.recoverWith { case _: Throwable => Failure(new RuntimeException("BlockchainUpdates failed to rollback at startup")) }
.rollback(context.blockchain, extensionBlockAtNodeHeight.id, extensionBlockAtNodeHeight.height, sendEvent = false)
.recoverWith { case err: Throwable => Failure(new RuntimeException("BlockchainUpdates failed to rollback at startup", err)) }
.get
}
case (Success(None), Some(_)) =>
val exception = new RuntimeException(
s"BlockchainUpdates has no block at height $nodeHeight, while node has one at startup. Extension height: $extensionHeight, node height: $nodeHeight"
)
log.error("BlockchainUpdates startup check failed", exception)
throw exception
case (Failure(ex), _) =>
val exception = new RuntimeException(s"BlockchainUpdates failed to get extension block info at node height at startup", ex)
log.error("BlockchainUpdates startup check failed", ex)
Expand Down Expand Up @@ -136,10 +133,10 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
minerReward: Option[Long],
blockchainBeforeWithMinerReward: Blockchain
): Unit = {
val newBlock = BlockAppended.from(block, diff, minerReward, blockchainBeforeWithMinerReward)
val newBlock = BlockAppended.from(block, diff, blockchainBeforeWithMinerReward)
repo.appendBlock(newBlock).get
if (newBlock.toHeight % 100 == 0) {
log.debug(s"BlockchainUpdates appended blocks up to ${newBlock.toHeight}")
if (newBlock.height % 100 == 0) {
log.debug(s"BlockchainUpdates appended blocks up to ${newBlock.height}")
}
}

Expand All @@ -154,13 +151,11 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
repo.appendMicroBlock(newMicroBlock).get
}

override def onRollback(toBlockId: ByteStr, toHeight: Int): Unit = {
val rollbackCompleted = RollbackCompleted(toBlockId, toHeight)
repo.rollback(rollbackCompleted).get
override def onRollback(blockchainBefore: Blockchain, toBlockId: ByteStr, toHeight: Int): Unit = {
repo.rollback(blockchainBefore, toBlockId, toHeight).get
}

override def onMicroBlockRollback(toBlockId: ByteStr, height: Int): Unit = {
val microBlockRollbackCompleted = MicroBlockRollbackCompleted(toBlockId, height)
repo.rollbackMicroBlock(microBlockRollbackCompleted).get
override def onMicroBlockRollback(blockchainBefore: Blockchain, toBlockId: ByteStr): Unit = {
repo.rollbackMicroBlock(blockchainBefore, toBlockId).get
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package com.wavesplatform.events.api.grpc

import scala.concurrent.Future
import scala.util.{Failure, Success}

import com.wavesplatform.api.grpc._
import com.wavesplatform.events.api.grpc.protobuf._
import com.wavesplatform.events.protobuf.serde._
import com.wavesplatform.events.repo.UpdatesRepo
import com.wavesplatform.utils.ScorexLogging
import io.grpc.stub.StreamObserver
import io.grpc.{Status, StatusRuntimeException}
import io.grpc.stub.StreamObserver
import monix.execution.Scheduler

import scala.concurrent.Future
import scala.util.{Failure, Success}

class BlockchainUpdatesApiGrpcImpl(repo: UpdatesRepo.Read with UpdatesRepo.Stream)(implicit sc: Scheduler)
extends BlockchainUpdatesApiGrpc.BlockchainUpdatesApi
with ScorexLogging {
override def getBlockUpdate(request: GetBlockUpdateRequest): Future[GetBlockUpdateResponse] = Future {
repo.updateForHeight(request.height) match {
case Success(Some(upd)) => GetBlockUpdateResponse(Some(upd.protobuf))
case Success(None) => throw new StatusRuntimeException(Status.NOT_FOUND)
case Success(upd) => GetBlockUpdateResponse(Some(upd.protobuf))
case Failure(_: NoSuchElementException) => throw new StatusRuntimeException(Status.NOT_FOUND)
case Failure(e: IllegalArgumentException) =>
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(e.getMessage))
case Failure(exception) =>
Expand All @@ -28,21 +28,30 @@ class BlockchainUpdatesApiGrpcImpl(repo: UpdatesRepo.Read with UpdatesRepo.Strea
}

override def getBlockUpdatesRange(request: GetBlockUpdatesRangeRequest): Future[GetBlockUpdatesRangeResponse] = {
repo.updatesRange(request.fromHeight, request.toHeight) // TODO: Use stream
.take(1000) // Limit
.toListL
.runAsyncLogErr
.map(updates => GetBlockUpdatesRangeResponse(updates.map(_.protobuf)))
.wrapErrors
if (request.fromHeight <= 0) {
Future.failed(new IllegalArgumentException("height must be a positive integer")).wrapErrors
} else if (request.toHeight < request.fromHeight) {
Future.failed(new IllegalArgumentException("toHeight should be >= fromHeight")).wrapErrors
} else {
repo
.updatesRange(request.fromHeight, request.toHeight) // TODO: Use stream
.take(1000) // Limit
.toListL
.runAsyncLogErr
.map(updates => GetBlockUpdatesRangeResponse(updates.map(_.protobuf)))
.wrapErrors
}
}

override def subscribe(request: SubscribeRequest, responseObserver: StreamObserver[SubscribeEvent]): Unit = {
if (request.fromHeight <= 0) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("height must be a positive integer")))
responseObserver.failWith(new IllegalArgumentException("height must be a positive integer"))
} else if (request.toHeight != 0 && request.toHeight < request.fromHeight) {
responseObserver.failWith(new IllegalArgumentException("toHeight should be >= fromHeight"))
} else {
val updatesPB = repo
.stream(request.fromHeight)
.takeWhile(bu => request.toHeight == 0 || bu.toHeight <= request.toHeight)
.takeWhile(bu => request.toHeight == 0 || bu.height <= request.toHeight)
.map(elem => SubscribeEvent(update = Some(elem.protobuf)))

responseObserver.completeWith(updatesPB)
Expand Down
Loading

0 comments on commit a4e96dd

Please sign in to comment.