diff --git a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala index 66e41a868fd..7b11afc9676 100644 --- a/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala +++ b/src/main/scala/com/wavesplatform/state2/BlockchainUpdaterImpl.scala @@ -15,6 +15,7 @@ import com.wavesplatform.utils.{UnsupportedFeature, forceStopApplication} import kamon.Kamon import kamon.metric.instrument.Time import monix.eval.Coeval +import monix.reactive.Observable import monix.reactive.subjects.ConcurrentSubject import scorex.block.{Block, MicroBlock} import scorex.transaction.ValidationError.{BlockAppendError, GenericError, MicroBlockAppendError} @@ -40,7 +41,9 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea private lazy val inMemDiffs: Synchronized[NEL[BlockDiff]] = Synchronized(NEL.one(BlockDiff.empty)) // fresh head private lazy val ngState: Synchronized[Option[NgState]] = Synchronized(Option.empty[NgState]) - override val lastBlockInfo = ConcurrentSubject.publish[LastBlockInfo](monix.execution.Scheduler.singleThread("last-block-info-publisher")) + private val internalLastBlockInfo = ConcurrentSubject.publish[LastBlockInfo](monix.execution.Scheduler.singleThread("last-block-info-publisher")) + override val lastBlockInfo: Observable[LastBlockInfo] = internalLastBlockInfo.cache(1) + lastBlockInfo.subscribe()(monix.execution.Scheduler.global) // Start caching private val unsafeDiffByRange = BlockDiffer.unsafeDiffByRange(settings.blockchainSettings.functionalitySettings, featureProvider, historyWriter, maxTransactionsPerChunk) _ @@ -61,6 +64,11 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea new NgHistoryReader(() => read { implicit l => ngState() }, historyWriter, settings.blockchainSettings.functionalitySettings) } + // Store last block information in a cache + historyReader.lastBlock.foreach { b => + internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady)) + } + private def syncPersistedAndInMemory(): Unit = write { implicit l => log.info(heights("State rebuild started")) @@ -182,7 +190,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea _ map { case ((newBlockDiff, discacrded)) => val height = historyWriter.height() + 1 ngState.set(Some(new NgState(block, newBlockDiff, featuresApprovedWithBlock(block)))) - historyReader.lastBlockId().foreach(id => lastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady))) + historyReader.lastBlockId().foreach(id => internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady))) log.info(s"$block appended. New height: $height)") discacrded } @@ -233,7 +241,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea } val totalDiscardedBlocks: Seq[Block] = discardedHistoryBlocks ++ discardedNgBlock.toSeq - if (totalDiscardedBlocks.nonEmpty) lastBlockInfo.onNext(LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady)) + if (totalDiscardedBlocks.nonEmpty) internalLastBlockInfo.onNext(LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady)) TxsInBlockchainStats.record(-totalDiscardedBlocks.size) Right(totalDiscardedBlocks) } @@ -263,7 +271,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea } yield { log.info(s"$microBlock appended") ng.append(microBlock, diff, System.currentTimeMillis()) - lastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true)) + internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true)) } } }