From 881100a31345c6597478160f8832ea51007d8405 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Thu, 16 Jan 2025 19:47:41 +0800 Subject: [PATCH] Fix pevm 20per performance issue (#256) Co-authored-by: andyzhang2023 --- cmd/utils/flags.go | 2 ++ core/blockchain.go | 82 +++++++++++++++++++++++++++++++--------------- 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 703fcaf2b..7c7b01bcc 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2066,6 +2066,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(ParallelTxDATMaxDepthRatioFlag.Name) { cfg.ParallelTxDAGMaxDepthRatio = ctx.Float64(ParallelTxDATMaxDepthRatioFlag.Name) + } else { + cfg.ParallelTxDAGMaxDepthRatio = ParallelTxDATMaxDepthRatioFlag.Value } if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) { diff --git a/core/blockchain.go b/core/blockchain.go index 993d71a7a..52196fe26 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -107,7 +107,7 @@ var ( // metrics to identify whether a block is processed in parallel EVM or serial EVM parallelInSequencial = metrics.NewRegisteredMeter("chain/parallel/sequencial", nil) - parallelTxDepth = metrics.NewRegisteredMeter("chain/parallel/txdepth", nil) + parallelTxDepth = metrics.NewRegisteredGauge("chain/parallel/txdepth", nil) // TxDepthRatio = TxDepth / TxNum * 100 parallelTxDepthRatio = metrics.NewRegisteredGauge("chain/parallel/txdepth/ratio", nil) @@ -581,7 +581,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if bc.vmConfig.EnableParallelExec { bc.processor = newPEVMProcessor(chainConfig, bc, engine) bc.serialProcessor = NewStateProcessor(chainConfig, bc, engine) - log.Info("Parallel V2 enabled", "parallelNum", ParallelNum()) + log.Info("Parallel V2 enabled", "parallelNum", ParallelNum(), "TxDepthRation", bc.vmConfig.TxDAGMaxDepthRatio) } else { bc.processor = NewStateProcessor(chainConfig, bc, engine) bc.serialProcessor = bc.processor @@ -1772,27 +1772,26 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { return bc.insertChain(chain, true) } -func (bc *BlockChain) useSerialProcessor(block *types.Block) (bool, bool) { - // findout whether or not the dependencies of the block are too deep to be processed - // if the dependencies are too deep, we will fallback to serial processing - txCount := len(block.Transactions()) - _, depth := BuildTxLevels(txCount, bc.vmConfig.TxDAG) - var depthRatio float64 - - if txCount > 0 { - depthRatio = float64(depth) / float64(txCount) - } else { - depthRatio = 0 - } - tooDeep := depthRatio > bc.vmConfig.TxDAGMaxDepthRatio - isByzantium := bc.chainConfig.IsByzantium(block.Number()) - - txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge) - useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep || !isByzantium - +// userSerialProcessor decides whether or not to use serial processor for the block +// these are the reasons to use serial processor: +// 0. the parallel flag is not enabled +// 1. the block is empty +// 2. the block is not byzantium +// 3. the block is too deep in the dependency graph +// 4. the TxDAG is nil and parallel merge is enabled +func (bc *BlockChain) useSerialProcessor(block *types.Block) (useSerialProcessor bool) { + var ( + enableParallel bool + txCount int + depth int + depthRatio float64 + tooDeep bool + isByzantium bool + txDAGMissButNecessary bool + ) // mark the metrics defer func() { - parallelTxDepth.Mark(int64(depth)) + parallelTxDepth.Update(int64(depth)) parallelTxDepthRatio.Update(int64(depthRatio * 100)) // put reasons in expensive metrics if metrics.EnabledExpensive { @@ -1806,8 +1805,38 @@ func (bc *BlockChain) useSerialProcessor(block *types.Block) (bool, bool) { parallelConditionByzantium.Mark(1) } } + log.Info("run in parallel or sequencial", "block", block.Number(), "useSerialProcessor", useSerialProcessor, "depthRatio", depthRatio, "depth", depth, "txCount", txCount, "txDAG", bc.vmConfig.TxDAG != nil, "byz", isByzantium, "tooDeep", tooDeep, + "enableParallel", enableParallel, "enableParallelExec", bc.vmConfig.EnableParallelExec) }() - return useSerialProcessor, tooDeep + + txCount = len(block.Transactions()) + _, depth = BuildTxLevels(txCount, bc.vmConfig.TxDAG) + if txCount > 0 { + depthRatio = float64(depth) / float64(txCount) + } else { + depthRatio = 0 + } + // switch to serial processor if the block is empty + if txCount <= 1 { + return true + } + // switch to serial processor if parallel flag not enabled + if enableParallel = bc.vmConfig.EnableParallelExec; !enableParallel { + return true + } + // switch to serial processor if the block is not byzantium + if isByzantium = bc.chainConfig.IsByzantium(block.Number()); !isByzantium { + return true + } + // switch to serial processor if the block is too deep in the dependency graph + if tooDeep = (depthRatio > bc.vmConfig.TxDAGMaxDepthRatio); tooDeep { + return true + } + // switch to serial processor if the TxDAG is nil and parallel merge is enabled + if txDAGMissButNecessary = (bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge)); txDAGMissButNecessary { + return true + } + return false } // insertChain is the internal implementation of InsertChain, which assumes that @@ -2022,8 +2051,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) blockProcessedInParallel := false var ( - tooDeep, useSerialProcessor bool - depth int + useSerialProcessor bool ) // skip block process if we already have the state, receipts and logs from mining work if !(receiptExist && logExist && stateExist) { @@ -2037,8 +2065,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) bc.parseTxDAG(block) } - useSerialProcessor, tooDeep = bc.useSerialProcessor(block) - if !useSerialProcessor { + useSerialProcessor = bc.useSerialProcessor(block) + if !useSerialProcessor && bc.vmConfig.TxDAG != nil && bc.vmConfig.EnableTxParallelMerge { + //ParallelStateDB is used for parallel merge, and it works only when the block's TxDAG is not nil statedb, err = state.NewParallel(parent.Root, bc.stateCache, bc.snaps) } else { statedb, err = state.New(parent.Root, bc.stateCache, bc.snaps) @@ -2197,7 +2226,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) "storageUpdates", common.PrettyDuration(timers.StorageUpdates), "accountHashes", common.PrettyDuration(timers.AccountHashes), "storageHashes", common.PrettyDuration(timers.StorageHashes), - "tooDeep", tooDeep, "depth", depth, ) // Write the block to the chain and get the status.