Skip to content

Commit

Permalink
pevm: unordered merge mode flag (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
welkin22 authored Oct 22, 2024
1 parent 2b01ec3 commit 4db9987
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 45 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var (
utils.RollupSuperchainUpgradesFlag,
utils.ParallelTxLegacyFlag,
utils.ParallelTxFlag,
utils.ParallelTxUnorderedMergeFlag,
utils.ParallelTxNumFlag,
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
Expand Down
13 changes: 13 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,12 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelTxUnorderedMergeFlag = &cli.BoolFlag{
Name: "parallel.unordered-merge",
Usage: "Enable unordered merge mode, during the parallel confirm phase, merge transaction execution results without following the transaction order.",
Category: flags.VMCategory,
}

ParallelTxNumFlag = &cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
Expand Down Expand Up @@ -2049,6 +2055,13 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ParallelTxMode = ctx.Bool(ParallelTxFlag.Name)
}

if ctx.IsSet(ParallelTxUnorderedMergeFlag.Name) {
cfg.ParallelTxUnorderedMerge = ctx.Bool(ParallelTxUnorderedMergeFlag.Name)
if ctx.IsSet(ParallelTxLegacyFlag.Name) && ctx.Bool(ParallelTxLegacyFlag.Name) {
log.Warn("ParallelTxUnorderedMergeFlag does not have any effect in ParallelTxLegacy mode")
}
}

if ctx.IsSet(ParallelTxDAGFlag.Name) {
cfg.EnableParallelTxDAG = ctx.Bool(ParallelTxDAGFlag.Name)
}
Expand Down
6 changes: 5 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ var (

parallelTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/txs", nil)
parallelConflictTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/conflicttxs", nil)
parallelExecutionTimer = metrics.NewRegisteredTimer("chain/parallel/exec", nil)
parallelConfirmTimer = metrics.NewRegisteredTimer("chain/parallel/confirm", nil)
parallelTxLevelsSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/size", nil)
parallelTxLevelTxSizeMeter = metrics.NewRegisteredGauge("chain/parallel/txlevel/txsize", nil)

blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)
Expand Down Expand Up @@ -2751,7 +2755,7 @@ func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
}

func (bc *BlockChain) TxDAGEnabledWhenMine() bool {
return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil
return bc.enableTxDAG && bc.txDAGWriteCh == nil && bc.txDAGReader == nil && !bc.vmConfig.EnableParallelExec && !bc.vmConfig.EnableParallelExecLegacy
}

func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) {
Expand Down
25 changes: 20 additions & 5 deletions core/parallel_state_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"fmt"
"runtime"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

var runner chan func()
Expand Down Expand Up @@ -99,7 +101,7 @@ func (cq *confirmQueue) collect(result *PEVMTxResult) error {
return nil
}

func (cq *confirmQueue) confirmWithTrust(level TxLevel, execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) {
func (cq *confirmQueue) confirmWithUnordered(level TxLevel, execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) {
// find all able-to-confirm transactions, and try to confirm them
for _, tx := range level {
i := tx.txIndex
Expand Down Expand Up @@ -187,16 +189,23 @@ func (cq *confirmQueue) rerun(i int, execute func(*PEVMTxRequest) *PEVMTxResult,

// run runs the transactions in parallel
// execute must return a non-nil result, otherwise it panics.
func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error) (error, int) {
func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func(*PEVMTxResult) error, unorderedMerge bool) (error, int) {
toConfirm := &confirmQueue{
queue: make([]confirmation, tls.txCount()),
confirmed: -1,
}

trustDAG := false
totalExecutionTime := int64(0)
totalConfirmTime := int64(0)
maxLevelTxCount := 0

// execute all transactions in parallel
for _, txLevel := range tls {
start := time.Now()
log.Debug("txLevel tx count", "tx count", len(txLevel))
if len(txLevel) > maxLevelTxCount {
maxLevelTxCount = len(txLevel)
}
wait := sync.WaitGroup{}
trunks := txLevel.Split(runtime.NumCPU())
wait.Add(len(trunks))
Expand All @@ -215,9 +224,11 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func
runner <- run
}
wait.Wait()
totalExecutionTime += time.Since(start).Nanoseconds()
start = time.Now()
// all transactions of current level are executed, now try to confirm.
if trustDAG {
if err, txIndex := toConfirm.confirmWithTrust(txLevel, execute, confirm); err != nil {
if unorderedMerge {
if err, txIndex := toConfirm.confirmWithUnordered(txLevel, execute, confirm); err != nil {
// something very wrong, stop the process
return err, txIndex
}
Expand All @@ -227,7 +238,11 @@ func (tls TxLevels) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm func
return err, txIndex
}
}
totalConfirmTime += time.Since(start).Nanoseconds()
}
parallelTxLevelTxSizeMeter.Update(int64(maxLevelTxCount))
parallelExecutionTimer.Update(time.Duration(totalExecutionTime))
parallelConfirmTimer.Update(time.Duration(totalConfirmTime))
return nil, 0
}

Expand Down
16 changes: 7 additions & 9 deletions core/parallel_state_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,7 @@ func setTxIndex(allReq []*PEVMTxRequest) {
func TestTxLevelRun(t *testing.T) {
// case 1: empty txs
case1 := func() {
levels([]uint64{}, [][]int{}).Run(
func(*PEVMTxRequest) *PEVMTxResult { return nil },
func(*PEVMTxResult) error { return nil })
levels([]uint64{}, [][]int{}).Run(func(*PEVMTxRequest) *PEVMTxResult { return nil }, func(*PEVMTxResult) error { return nil }, false)
}
// case 2: 4 txs with no dependencies, no conflicts
case2 := func() {
Expand All @@ -287,7 +285,7 @@ func TestTxLevelRun(t *testing.T) {
nil, nil, nil, nil,
})
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21, 7: 31, 8: 41})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -323,7 +321,7 @@ func TestTxLevelRun(t *testing.T) {
nil, nil, {0}, {1},
})
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -359,7 +357,7 @@ func TestTxLevelRun(t *testing.T) {
{0}, nil, {-1}, {-1},
})
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, txdag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 0, 2: 0, 3: 0, 4: 0, 5: 11, 6: 21})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -407,7 +405,7 @@ func TestTxLevelRun(t *testing.T) {
res[i+2000] = i
}
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(res)
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand All @@ -433,7 +431,7 @@ func TestTxLevelRun(t *testing.T) {
}
setTxIndex(allReqs)
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, nil).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 5, 2: 20, 3: 10})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down Expand Up @@ -462,7 +460,7 @@ func TestTxLevelRun(t *testing.T) {
}
caller := caller{txs: make(map[*PEVMTxRequest]*mockTx)}
setTxIndex(allReqs)
err, _ := NewTxLevels(allReqs, dag).Run(caller.execute, caller.confirm)
err, _ := NewTxLevels(allReqs, dag).Run(caller.execute, caller.confirm, false)
ok := checkMainDB(map[int]int{1: 5, 2: 20, 3: 10})
if err != nil {
t.Fatalf("failed, err:%v", err)
Expand Down
30 changes: 25 additions & 5 deletions core/pevm_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ type PEVMProcessor struct {
commonTxs []*types.Transaction
receipts types.Receipts
debugConflictRedoNum uint64
unorderedMerge bool
}

func newPEVMProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *PEVMProcessor {
processor := &PEVMProcessor{
StateProcessor: *NewStateProcessor(config, bc, engine),
unorderedMerge: bc.vmConfig.EnableParallelUnorderedMerge,
}
log.Info("Parallel execution mode is enabled", "Parallel Num", ParallelNum(),
"CPUNum", runtime.NumCPU())
"CPUNum", runtime.NumCPU(), "unorderedMerge", processor.unorderedMerge)
return processor
}

Expand Down Expand Up @@ -120,9 +122,14 @@ func (p *PEVMProcessor) executeInSlot(maindb *state.StateDB, txReq *PEVMTxReques
// if it is in Stage 2 it is a likely result, not 100% sure
func (p *PEVMProcessor) toConfirmTxIndexResult(txResult *PEVMTxResult) error {
txReq := txResult.txReq
if err := p.hasConflict(txResult); err != nil {
log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex))
return err
if !p.unorderedMerge || !txReq.useDAG {
// If we do not use a DAG, then we need to check for conflicts to ensure correct execution.
// When we perform an unordered merge, we cannot conduct conflict checks
// and can only choose to trust that the DAG is correct and that conflicts do not exist.
if err := p.hasConflict(txResult); err != nil {
log.Info(fmt.Sprintf("HasConflict!! block: %d, txIndex: %d\n", txResult.txReq.block.NumberU64(), txResult.txReq.txIndex))
return err
}
}

// goroutine unsafe operation will be handled from here for safety
Expand Down Expand Up @@ -253,6 +260,8 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
// parallel execution
start := time.Now()
txLevels := NewTxLevels(p.allTxReqs, txDAG)
log.Debug("txLevels size", "txLevels size", len(txLevels))
parallelTxLevelsSizeMeter.Update(int64(len(txLevels)))
buildLevelsDuration := time.Since(start)
var executeDurations, confirmDurations int64 = 0, 0
err, txIndex := txLevels.Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) {
Expand All @@ -274,8 +283,9 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
atomic.AddUint64(&p.debugConflictRedoNum, 1)
}
}(time.Now())
log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex)
return p.confirmTxResult(statedb, gp, pr)
})
}, p.unorderedMerge && txDAG != nil)
parallelRunDuration := time.Since(start) - buildLevelsDuration
if err != nil {
tx := allTxs[txIndex]
Expand Down Expand Up @@ -320,7 +330,17 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
p.engine.Finalize(p.bc, header, statedb, p.commonTxs, block.Uncles(), withdrawals)

var allLogs []*types.Log
var lindex = 0
var cumulativeGasUsed uint64
for _, receipt := range p.receipts {
// reset the log index
for _, log := range receipt.Logs {
log.Index = uint(lindex)
lindex++
}
// re-calculate the cumulativeGasUsed
cumulativeGasUsed += receipt.GasUsed
receipt.CumulativeGasUsed = cumulativeGasUsed
allLogs = append(allLogs, receipt.Logs...)
}
return p.receipts, allLogs, *usedGas, nil
Expand Down
6 changes: 3 additions & 3 deletions core/state/pevm_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ func (pst *UncommittedDB) Merge(deleteEmptyObjects bool) error {
// so we don't need to merge anything.
return nil
}
if err := pst.conflictsToMaindb(); err != nil {
return err
}
//if err := pst.conflictsToMaindb(); err != nil {
// return err
//}

// 0. set the TxContext
pst.maindb.SetTxContext(pst.txHash, pst.txIndex)
Expand Down
1 change: 0 additions & 1 deletion core/types/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func ValidatePlainTxDAG(d TxDAG, txCnt int) error {
return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i)
}
}

}
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions core/types/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/snappy"

"github.com/cometbft/cometbft/libs/rand"
Expand All @@ -21,16 +22,37 @@ var (

func TestEncodeTxDAGCalldata(t *testing.T) {
tg := mockSimpleDAG()
originTg := tg
data, err := EncodeTxDAGCalldata(tg)
assert.Equal(t, nil, err)
tg, err = DecodeTxDAGCalldata(data)
assert.Equal(t, nil, err)
assert.Equal(t, true, tg.TxCount() > 0)
assert.Equal(t, originTg, tg)

_, err = DecodeTxDAGCalldata(nil)
assert.NotEqual(t, nil, err)
}

func TestDecodeCalldata(t *testing.T) {
calldata := "0x5517ed8c000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000001b201f901aef901abc2c002c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c2c152c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c1c0c2c0010000000000000000000000000000"
decode, err := hexutil.Decode(calldata)
if err != nil {
return
}
dagCalldata, err := DecodeTxDAGCalldata(decode)
if err != nil {
t.Errorf("Error decoding calldata: %s", err)
return
}
//for i := 0; i < dagCalldata.TxCount(); i++ {
// dep := dagCalldata.TxDep(i)
// log.Printf("idx:%d,dep:%v", i, dep.TxIndexes)
//}
assert.Equal(t, true, dagCalldata.TxDep(186).Exist(82))
assert.Equal(t, 0, dagCalldata.TxDep(187).Count())
}

func TestTxDAG_SetTxDep(t *testing.T) {
dag := mockSimpleDAG()
require.NoError(t, dag.SetTxDep(9, NewTxDep(nil, NonDependentRelFlag)))
Expand Down
21 changes: 11 additions & 10 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ type PrecompileOverrides func(params.Rules, PrecompiledContract, common.Address)

// Config are the configuration options for the Interpreter
type Config struct {
Tracer EVMLogger // Opcode logger
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled
EnableParallelExecLegacy bool // Whether to execute transaction in parallel mode when do full sync
EnableParallelExec bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism
EnableOpcodeOptimizations bool // Enable opcode optimization
TxDAG types.TxDAG
Tracer EVMLogger // Opcode logger
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled
EnableParallelExecLegacy bool // Whether to execute transaction in parallel mode when do full sync
EnableParallelExec bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
OptimismPrecompileOverrides PrecompileOverrides // Precompile overrides for Optimism
EnableOpcodeOptimizations bool // Enable opcode optimization
TxDAG types.TxDAG
EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
11 changes: 6 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
journalFilePath := fmt.Sprintf("%s/%s", stack.ResolvePath(ChainData), JournalFileName)
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
EnableParallelExecLegacy: config.ParallelTxLegacyMode,
EnableParallelExec: config.ParallelTxMode,
ParallelTxNum: config.ParallelTxNum,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
EnablePreimageRecording: config.EnablePreimageRecording,
EnableParallelExecLegacy: config.ParallelTxLegacyMode,
EnableParallelExec: config.ParallelTxMode,
EnableParallelUnorderedMerge: config.ParallelTxUnorderedMerge,
ParallelTxNum: config.ParallelTxNum,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
Expand Down
13 changes: 7 additions & 6 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,13 @@ type Config struct {
RollupDisableTxPoolAdmission bool
RollupHaltOnIncompatibleProtocolVersion string

ParallelTxLegacyMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxLegacyMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down

0 comments on commit 4db9987

Please sign in to comment.