Skip to content

Commit

Permalink
Merge pull request #744 from wavesplatform/NODE-421-height-0
Browse files Browse the repository at this point in the history
NODE-421 Cache last block in lastBlockInfo
  • Loading branch information
vsuharnikov authored Dec 13, 2017
2 parents f427812 + 17a7200 commit 8d4ab86
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.wavesplatform.utils.{UnsupportedFeature, forceStopApplication}
import kamon.Kamon
import kamon.metric.instrument.Time
import monix.eval.Coeval
import monix.reactive.Observable
import monix.reactive.subjects.ConcurrentSubject
import scorex.block.{Block, MicroBlock}
import scorex.transaction.ValidationError.{BlockAppendError, GenericError, MicroBlockAppendError}
Expand All @@ -40,7 +41,9 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
private lazy val inMemDiffs: Synchronized[NEL[BlockDiff]] = Synchronized(NEL.one(BlockDiff.empty)) // fresh head
private lazy val ngState: Synchronized[Option[NgState]] = Synchronized(Option.empty[NgState])

override val lastBlockInfo = ConcurrentSubject.publish[LastBlockInfo](monix.execution.Scheduler.singleThread("last-block-info-publisher"))
private val internalLastBlockInfo = ConcurrentSubject.publish[LastBlockInfo](monix.execution.Scheduler.singleThread("last-block-info-publisher"))
override val lastBlockInfo: Observable[LastBlockInfo] = internalLastBlockInfo.cache(1)
lastBlockInfo.subscribe()(monix.execution.Scheduler.global) // Start caching

private val unsafeDiffByRange = BlockDiffer.unsafeDiffByRange(settings.blockchainSettings.functionalitySettings, featureProvider, historyWriter, maxTransactionsPerChunk) _

Expand All @@ -61,6 +64,11 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
new NgHistoryReader(() => read { implicit l => ngState() }, historyWriter, settings.blockchainSettings.functionalitySettings)
}

// Store last block information in a cache
historyReader.lastBlock.foreach { b =>
internalLastBlockInfo.onNext(LastBlockInfo(b.uniqueId, historyReader.height(), historyReader.score(), blockchainReady))
}

private def syncPersistedAndInMemory(): Unit = write { implicit l =>
log.info(heights("State rebuild started"))

Expand Down Expand Up @@ -182,7 +190,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
_ map { case ((newBlockDiff, discacrded)) =>
val height = historyWriter.height() + 1
ngState.set(Some(new NgState(block, newBlockDiff, featuresApprovedWithBlock(block))))
historyReader.lastBlockId().foreach(id => lastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady)))
historyReader.lastBlockId().foreach(id => internalLastBlockInfo.onNext(LastBlockInfo(id, historyReader.height(), historyReader.score(), blockchainReady)))
log.info(s"$block appended. New height: $height)")
discacrded
}
Expand Down Expand Up @@ -233,7 +241,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
}

val totalDiscardedBlocks: Seq[Block] = discardedHistoryBlocks ++ discardedNgBlock.toSeq
if (totalDiscardedBlocks.nonEmpty) lastBlockInfo.onNext(LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady))
if (totalDiscardedBlocks.nonEmpty) internalLastBlockInfo.onNext(LastBlockInfo(blockId, historyReader.height(), historyReader.score(), blockchainReady))
TxsInBlockchainStats.record(-totalDiscardedBlocks.size)
Right(totalDiscardedBlocks)
}
Expand Down Expand Up @@ -263,7 +271,7 @@ class BlockchainUpdaterImpl private(persisted: StateWriter with SnapshotStateRea
} yield {
log.info(s"$microBlock appended")
ng.append(microBlock, diff, System.currentTimeMillis())
lastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true))
internalLastBlockInfo.onNext(LastBlockInfo(microBlock.totalResBlockSig, historyReader.height(), historyReader.score(), ready = true))
}
}
}
Expand Down

0 comments on commit 8d4ab86

Please sign in to comment.