From 703ebff221eeac457ae4c62c8bbbe47f2e8c30b1 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Thu, 9 May 2024 16:58:39 +0200 Subject: [PATCH] checking reorg and executing complete. Missing mark check blocks --- cmd/run/run_cmd.go | 2 +- etherman/etherman.go | 20 ++- state/entities/tx.go | 5 +- state/model/l1infotree_state.go | 4 + state/model/reorg_state.go | 98 ++++++++++++ state/state.go | 4 + state/storage/interface.go | 5 + state/storage/pgstorage/kv.go | 75 +++++++++ state/storage/pgstorage/kv_test.go | 88 +++++++++++ state/storage/pgstorage/migrations/0004.sql | 10 ++ state/storage/pgstorage/reset.go | 10 +- state/storage/pgstorage/tx.go | 31 +++- synchronizer/common/reorg_error.go | 11 +- .../common/syncinterfaces/etherman.go | 1 + synchronizer/common/syncinterfaces/state.go | 6 + synchronizer/common/syncinterfaces/storage.go | 5 - synchronizer/interfaces.go | 1 + synchronizer/l1_sync/block_range_iterator.go | 79 ++++++++++ .../l1_sync/bock_range_iterator_test.go | 15 ++ synchronizer/l1_sync/l1_sequential_sync.go | 146 ++++++++++-------- .../l1_sync/mocks/etherman_interface.go | 59 +++++++ synchronizer/mocks/etherman_full_interface.go | 59 +++++++ synchronizer/mocks/etherman_interface.go | 59 +++++++ synchronizer/synchronizer_impl.go | 95 ++++++++---- 24 files changed, 766 insertions(+), 122 deletions(-) create mode 100644 state/model/reorg_state.go create mode 100644 state/storage/pgstorage/kv.go create mode 100644 state/storage/pgstorage/kv_test.go create mode 100644 state/storage/pgstorage/migrations/0004.sql create mode 100644 synchronizer/l1_sync/block_range_iterator.go create mode 100644 synchronizer/l1_sync/bock_range_iterator_test.go diff --git a/cmd/run/run_cmd.go b/cmd/run/run_cmd.go index c4cf27d..9d3a3ca 100644 --- a/cmd/run/run_cmd.go +++ b/cmd/run/run_cmd.go @@ -20,5 +20,5 @@ func RunCmd(cliCtx *cli.Context) error { log.Error("Error creating synchronizer", err) return err } - return sync.Sync(true) + return sync.Sync(false) } diff --git a/etherman/etherman.go b/etherman/etherman.go index 9296adf..f09516e 100644 --- a/etherman/etherman.go +++ b/etherman/etherman.go @@ -976,6 +976,24 @@ func (etherMan *Client) updateL1InfoTreeEvent(ctx context.Context, vLog types.Lo (*blocksOrder)[block.BlockHash] = append((*blocksOrder)[block.BlockHash], order) return nil } + +func (etherMan *Client) GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*Block, error) { + ethBlock, err := etherMan.EthClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) + if err != nil { + return nil, err + } + t := time.Unix(int64(ethBlock.Time()), 0) + + //block := prepareBlock(vLog, t, fullBlock) + block := Block{ + BlockNumber: ethBlock.NumberU64(), + BlockHash: ethBlock.Hash(), + ParentHash: ethBlock.ParentHash(), + ReceivedAt: t, + } + return &block, nil +} + func (etherMan *Client) retrieveFullBlockbyHash(ctx context.Context, blockHash common.Hash) (*Block, error) { var err error var fullBlock *types.Block @@ -1006,7 +1024,7 @@ func (etherMan *Client) retrieveFullBlockbyHash(ctx context.Context, blockHash c //block := prepareBlock(vLog, t, fullBlock) block := Block{ BlockNumber: fullBlock.NumberU64(), - BlockHash: blockHash, + BlockHash: fullBlock.Hash(), ParentHash: fullBlock.ParentHash(), ReceivedAt: t, } diff --git a/state/entities/tx.go b/state/entities/tx.go index 2dd2dd7..4e14db1 100644 --- a/state/entities/tx.go +++ b/state/entities/tx.go @@ -4,9 +4,12 @@ import ( "context" ) +type TxCallbackType func(Tx, error) + type Tx interface { Commit(ctx context.Context) error Rollback(ctx context.Context) error - AddRollbackCallback(func()) + AddRollbackCallback(TxCallbackType) + AddCommitCallback(TxCallbackType) } diff --git a/state/model/l1infotree_state.go b/state/model/l1infotree_state.go index 94fcc94..87ab254 100644 --- a/state/model/l1infotree_state.go +++ b/state/model/l1infotree_state.go @@ -42,6 +42,10 @@ func HashLeaf(leaf *L1InfoTreeLeaf) common.Hash { return l1infotree.HashLeafData(leaf.GlobalExitRoot, leaf.PreviousBlockHash, timestamp) } +func (s *L1InfoTreeState) OnReorg(reorg ReorgExecutionResult) { + log.Infof("Reorg: clean cache L1InfoTree") + s.l1InfoTree = nil +} func (s *L1InfoTreeState) BuildL1InfoTreeCacheIfNeed(ctx context.Context, dbTx stateTxType) error { if s.l1InfoTree != nil { return nil diff --git a/state/model/reorg_state.go b/state/model/reorg_state.go new file mode 100644 index 0000000..9cf2661 --- /dev/null +++ b/state/model/reorg_state.go @@ -0,0 +1,98 @@ +package model + +import ( + "context" + "fmt" + "sync" + "time" +) + +// ReorgRequest is a struct that contains the information needed to execute a reorg +type ReorgRequest struct { + FirstL1BlockNumberToKeep uint64 + ReasonError error +} + +func (r *ReorgRequest) String() string { + return fmt.Sprintf("FirstL1BlockNumberToKeep: %d, ReasonError: %s", r.FirstL1BlockNumberToKeep, r.ReasonError) +} + +// ReorgExecutionResult is a struct that contains the information of the reorg execution +type ReorgExecutionResult struct { + Request ReorgRequest + ExecutionCounter uint64 // Number of reorg in this execution (is not a global unique ID!!!!) + ExecutionError error + ExecutionTime time.Time + ExecutionDuration time.Duration +} + +func (r ReorgExecutionResult) IsSuccess() bool { + return r.ExecutionError == nil +} + +func (r *ReorgExecutionResult) String() string { + return fmt.Sprintf("Request: %s, ExecutionCounter: %d, ExecutionError: %v, ExecutionTime: %s, ExecutionDuration: %s", + r.Request.String(), r.ExecutionCounter, r.ExecutionError, r.ExecutionTime.String(), r.ExecutionDuration.String()) +} + +type ReorgCallbackType = func(ReorgExecutionResult) + +type StorageReorgInterface interface { + ResetToL1BlockNumber(ctx context.Context, firstBlockNumberToKeep uint64, dbTx storageTxType) error +} + +type ReorgState struct { + mutex sync.Mutex + storage StorageReorgInterface + onReorgCallbacks []ReorgCallbackType + lastReorgResult *ReorgExecutionResult +} + +func NewReorgState(storage StorageReorgInterface) *ReorgState { + return &ReorgState{ + storage: storage, + } +} + +func (s *ReorgState) AddOnReorgCallback(f ReorgCallbackType) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.onReorgCallbacks = append(s.onReorgCallbacks, f) +} + +func (s *ReorgState) ExecuteReorg(ctx context.Context, reorgRequest ReorgRequest, dbTx storageTxType) ReorgExecutionResult { + startTime := time.Now() + err := s.storage.ResetToL1BlockNumber(ctx, reorgRequest.FirstL1BlockNumberToKeep, dbTx) + res := s.createNewResult(reorgRequest, err, startTime) + dbTx.AddCommitCallback(s.onTxCommit) + dbTx.AddCommitCallback(s.onTxRollback) + return res +} + +func (s *ReorgState) onTxCommit(dbTx storageTxType, err error) { + if err != nil { + for _, f := range s.onReorgCallbacks { + f(*s.lastReorgResult) + } + } +} + +func (s *ReorgState) onTxRollback(dbTx storageTxType, err error) { +} + +func (s *ReorgState) createNewResult(reorgRequest ReorgRequest, err error, startTime time.Time) ReorgExecutionResult { + s.mutex.Lock() + defer s.mutex.Unlock() + res := ReorgExecutionResult{ + Request: reorgRequest, + ExecutionCounter: 1, + ExecutionError: err, + ExecutionTime: startTime, + ExecutionDuration: time.Since(startTime), + } + if s.lastReorgResult != nil { + res.ExecutionCounter = s.lastReorgResult.ExecutionCounter + 1 + } + s.lastReorgResult = &res + return res +} diff --git a/state/state.go b/state/state.go index fa353a3..344e744 100644 --- a/state/state.go +++ b/state/state.go @@ -10,6 +10,7 @@ type State struct { *model.ForkIdState *model.L1InfoTreeState *model.BatchState + *model.ReorgState storage.BlockStorer } @@ -19,7 +20,10 @@ func NewState(storageImpl storage.Storer) *State { model.NewForkIdState(storageImpl), model.NewL1InfoTreeManager(storageImpl), model.NewBatchState(storageImpl), + model.NewReorgState(storageImpl), storageImpl, } + // Connect cache invalidation on Reorg + res.ReorgState.AddOnReorgCallback(res.L1InfoTreeState.OnReorg) return res } diff --git a/state/storage/interface.go b/state/storage/interface.go index e21f715..4e614ac 100644 --- a/state/storage/interface.go +++ b/state/storage/interface.go @@ -48,6 +48,10 @@ type virtualBatchStorer interface { GetVirtualBatchByBatchNumber(ctx context.Context, batchNumber uint64, dbTx storageTxType) (*VirtualBatch, error) } +type reorgStorer interface { + ResetToL1BlockNumber(ctx context.Context, firstBlockNumberToKeep uint64, dbTx storageTxType) error +} + type txStorer interface { BeginTransaction(ctx context.Context) (storageTxType, error) } @@ -59,4 +63,5 @@ type Storer interface { l1infoTreeStorer virtualBatchStorer sequencedBatchStorer + reorgStorer } diff --git a/state/storage/pgstorage/kv.go b/state/storage/pgstorage/kv.go new file mode 100644 index 0000000..f11d6b3 --- /dev/null +++ b/state/storage/pgstorage/kv.go @@ -0,0 +1,75 @@ +package pgstorage + +import ( + "context" + "encoding/json" +) + +func (p *PostgresStorage) KVSetString(ctx context.Context, key string, value string, dbTx dbTxType) error { + e := p.getExecQuerier(getPgTx(dbTx)) + const setSQL = "INSERT INTO sync.kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2" + if _, err := e.Exec(ctx, setSQL, key, value); err != nil { + return err + } + return nil +} + +func (p *PostgresStorage) KVSetJson(ctx context.Context, key string, value interface{}, dbTx dbTxType) error { + jsonValue, err := json.Marshal(value) + if err != nil { + return err + } + return p.KVSetString(ctx, key, string(jsonValue), dbTx) +} + +func (p *PostgresStorage) KVSetUint64(ctx context.Context, key string, value uint64, dbTx dbTxType) error { + jsonValue, err := json.Marshal(value) + if err != nil { + return err + } + return p.KVSetString(ctx, key, string(jsonValue), dbTx) +} + +func (p *PostgresStorage) KVGetString(ctx context.Context, key string, dbTx dbTxType) (string, error) { + e := p.getExecQuerier(getPgTx(dbTx)) + const getSQL = "SELECT value FROM sync.kv WHERE key = $1" + row := e.QueryRow(ctx, getSQL, key) + var value string + err := row.Scan(&value) + err = translatePgxError(err, "KVGetString") + if err != nil { + return "", err + } + return value, nil +} + +func (p *PostgresStorage) KVGetJson(ctx context.Context, key string, value interface{}, dbTx dbTxType) error { + valueStr, err := p.KVGetString(ctx, key, dbTx) + if err != nil { + return err + } + return json.Unmarshal([]byte(valueStr), value) +} + +func (p *PostgresStorage) KVGetUint64(ctx context.Context, key string, dbTx dbTxType) (uint64, error) { + valueStr, err := p.KVGetString(ctx, key, dbTx) + if err != nil { + return 0, err + } + value := uint64(0) + err = json.Unmarshal([]byte(valueStr), &value) + return value, err +} + +func (p *PostgresStorage) KVExists(ctx context.Context, key string, dbTx dbTxType) (bool, error) { + e := p.getExecQuerier(getPgTx(dbTx)) + const existsSQL = "SELECT EXISTS(SELECT 1 FROM sync.kv WHERE key = $1)" + row := e.QueryRow(ctx, existsSQL, key) + var exists bool + err := row.Scan(&exists) + err = translatePgxError(err, "KVExists") + if err != nil { + return false, err + } + return exists, nil +} diff --git a/state/storage/pgstorage/kv_test.go b/state/storage/pgstorage/kv_test.go new file mode 100644 index 0000000..3e50071 --- /dev/null +++ b/state/storage/pgstorage/kv_test.go @@ -0,0 +1,88 @@ +package pgstorage_test + +import ( + "context" + "testing" + + "github.com/0xPolygonHermez/zkevm-synchronizer-l1/state/storage/pgstorage" + "github.com/stretchr/testify/require" +) + +const ( + testKey = "fake_key_used_for_unittest" +) + +func TestKVSet(t *testing.T) { + skipDatabaseTestIfNeeded(t) + ctx := context.TODO() + dbConfig := getStorageConfig() + err := pgstorage.ResetDB(dbConfig) + require.NoError(t, err) + storage, err := pgstorage.NewPostgresStorage(dbConfig) + require.NoError(t, err) + dbTx, err := storage.BeginTransaction(ctx) + require.NoError(t, err) + exists, err := storage.KVExists(ctx, testKey, dbTx) + require.NoError(t, err) + require.False(t, exists) + err = storage.KVSetString(ctx, testKey, "fake_value", dbTx) + require.NoError(t, err) + exists, err = storage.KVExists(ctx, testKey, dbTx) + require.NoError(t, err) + require.True(t, exists) + value, err := storage.KVGetString(ctx, testKey, dbTx) + require.NoError(t, err) + require.Equal(t, "fake_value", value) + dbTx.Commit(ctx) +} + +type kvTestStruct struct { + A int + B string +} + +func TestKVJson(t *testing.T) { + skipDatabaseTestIfNeeded(t) + ctx := context.TODO() + dbConfig := getStorageConfig() + err := pgstorage.ResetDB(dbConfig) + require.NoError(t, err) + storage, err := pgstorage.NewPostgresStorage(dbConfig) + require.NoError(t, err) + dbTx, err := storage.BeginTransaction(ctx) + require.NoError(t, err) + data := kvTestStruct{A: 1, B: "test"} + err = storage.KVSetJson(ctx, testKey, data, dbTx) + require.NoError(t, err) + var dataRead kvTestStruct + err = storage.KVGetJson(ctx, testKey, &dataRead, dbTx) + require.NoError(t, err) + require.Equal(t, data, dataRead) + + dbTx.Commit(ctx) +} + +func TestKVUint64(t *testing.T) { + skipDatabaseTestIfNeeded(t) + ctx := context.TODO() + dbConfig := getStorageConfig() + err := pgstorage.ResetDB(dbConfig) + require.NoError(t, err) + storage, err := pgstorage.NewPostgresStorage(dbConfig) + require.NoError(t, err) + dbTx, err := storage.BeginTransaction(ctx) + require.NoError(t, err) + data := uint64(1234) + err = storage.KVSetUint64(ctx, testKey, data, dbTx) + require.NoError(t, err) + dataRead, err := storage.KVGetUint64(ctx, testKey, dbTx) + require.NoError(t, err) + require.Equal(t, data, dataRead) + + err = storage.KVSetString(ctx, testKey, "not a number", dbTx) + require.NoError(t, err) + _, err = storage.KVGetUint64(ctx, testKey, dbTx) + require.Error(t, err) + + dbTx.Commit(ctx) +} diff --git a/state/storage/pgstorage/migrations/0004.sql b/state/storage/pgstorage/migrations/0004.sql new file mode 100644 index 0000000..d987c86 --- /dev/null +++ b/state/storage/pgstorage/migrations/0004.sql @@ -0,0 +1,10 @@ +-- +migrate Up +CREATE TABLE IF NOT EXISTS sync.kv ( + key VARCHAR(256) PRIMARY KEY, + value VARCHAR +); + + +-- +migrate Down + +DROP TABLE IF EXISTS sync.kv; \ No newline at end of file diff --git a/state/storage/pgstorage/reset.go b/state/storage/pgstorage/reset.go index ee48aff..197eab0 100644 --- a/state/storage/pgstorage/reset.go +++ b/state/storage/pgstorage/reset.go @@ -2,10 +2,14 @@ package pgstorage import ( "context" - "log" ) -func (p *PostgresStorage) Reset(ctx context.Context, blockNumber uint64, dbTx dbTxType) error { - log.Fatal("Not implemented") +// ResetToL1BlockNumber resets the state to a block for the given DB tx +func (p *PostgresStorage) ResetToL1BlockNumber(ctx context.Context, firstBlockNumberToKeep uint64, dbTx dbTxType) error { + e := p.getExecQuerier(getPgTx(dbTx)) + const resetSQL = "DELETE FROM sync.block WHERE block_num > $1" + if _, err := e.Exec(ctx, resetSQL, firstBlockNumberToKeep); err != nil { + return err + } return nil } diff --git a/state/storage/pgstorage/tx.go b/state/storage/pgstorage/tx.go index abaf1f3..1e1d6fe 100644 --- a/state/storage/pgstorage/tx.go +++ b/state/storage/pgstorage/tx.go @@ -3,25 +3,42 @@ package pgstorage import ( "context" + "github.com/0xPolygonHermez/zkevm-synchronizer-l1/state/entities" "github.com/jackc/pgx/v4" ) type Tx interface { pgx.Tx - AddRollbackCallback(func()) + entities.Tx } +type TxCallbackType = entities.TxCallbackType + type stateImplementationTx struct { pgx.Tx - rollbackCallbacks []func() + rollbackCallbacks []TxCallbackType + commitCallbacks []TxCallbackType } -func (s *stateImplementationTx) AddRollbackCallback(cb func()) { +func (s *stateImplementationTx) AddRollbackCallback(cb TxCallbackType) { s.rollbackCallbacks = append(s.rollbackCallbacks, cb) } -func (tx *stateImplementationTx) Rollback(ctx context.Context) error { - for _, cb := range tx.rollbackCallbacks { - cb() +func (s *stateImplementationTx) AddCommitCallback(cb TxCallbackType) { + s.commitCallbacks = append(s.commitCallbacks, cb) +} + +func (s *stateImplementationTx) Commit(ctx context.Context) error { + err := s.Tx.Commit(ctx) + for _, cb := range s.commitCallbacks { + cb(s, err) + } + return err +} + +func (s *stateImplementationTx) Rollback(ctx context.Context) error { + err := s.Tx.Rollback(ctx) + for _, cb := range s.rollbackCallbacks { + cb(s, err) } - return tx.Tx.Rollback(ctx) + return err } diff --git a/synchronizer/common/reorg_error.go b/synchronizer/common/reorg_error.go index e8dba3d..92cf462 100644 --- a/synchronizer/common/reorg_error.go +++ b/synchronizer/common/reorg_error.go @@ -26,6 +26,13 @@ func IsReorgError(err error) bool { return ok } +func CastReorgError(err error) *ReorgError { + if reorgErr, ok := err.(*ReorgError); ok { + return reorgErr + } + return nil +} + // GetReorgErrorBlockNumber returns the block number that caused the reorg func GetReorgErrorBlockNumber(err error) uint64 { if reorgErr, ok := err.(*ReorgError); ok { @@ -34,8 +41,8 @@ func GetReorgErrorBlockNumber(err error) uint64 { return 0 } -// GetReorgError returns the error that caused the reorg -func GetReorgError(err error) error { +// GetReorgErrorWrappedError returns the wrapped error that caused the reorg +func GetReorgErrorWrappedError(err error) error { if reorgErr, ok := err.(*ReorgError); ok { return reorgErr.Err } diff --git a/synchronizer/common/syncinterfaces/etherman.go b/synchronizer/common/syncinterfaces/etherman.go index aa19ad0..8eb8383 100644 --- a/synchronizer/common/syncinterfaces/etherman.go +++ b/synchronizer/common/syncinterfaces/etherman.go @@ -14,6 +14,7 @@ type EthermanFullInterface interface { HeaderByNumber(ctx context.Context, number *big.Int) (*ethTypes.Header, error) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) EthBlockByNumber(ctx context.Context, blockNumber uint64) (*ethTypes.Block, error) + GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*etherman.Block, error) GetLatestBatchNumber() (uint64, error) GetTrustedSequencerURL() (string, error) VerifyGenBlockNumber(ctx context.Context, genBlockNumber uint64) (bool, error) diff --git a/synchronizer/common/syncinterfaces/state.go b/synchronizer/common/syncinterfaces/state.go index 025940d..92c9111 100644 --- a/synchronizer/common/syncinterfaces/state.go +++ b/synchronizer/common/syncinterfaces/state.go @@ -21,6 +21,11 @@ type stateOnSequencedBatchesManager interface { OnSequencedBatchesOnL1(ctx context.Context, seq model.SequenceOfBatches, dbTx stateTxType) error } +type stateReorgManager interface { + AddOnReorgCallback(f model.ReorgCallbackType) + ExecuteReorg(ctx context.Context, reorgRequest model.ReorgRequest, dbTx stateTxType) model.ReorgExecutionResult +} + type StateInterface interface { AddL1InfoTreeLeafAndAssignIndex(ctx context.Context, exitRoot *entities.L1InfoTreeLeaf, dbTx stateTxType) (*entities.L1InfoTreeLeaf, error) @@ -35,4 +40,5 @@ type StateInterface interface { StateTxProvider stateOnSequencedBatchesManager StorageBlockReaderInterface + stateReorgManager } diff --git a/synchronizer/common/syncinterfaces/storage.go b/synchronizer/common/syncinterfaces/storage.go index 7d7bd6a..9e7ba29 100644 --- a/synchronizer/common/syncinterfaces/storage.go +++ b/synchronizer/common/syncinterfaces/storage.go @@ -18,10 +18,6 @@ type StorageBlockReaderInterface interface { GetPreviousBlock(ctx context.Context, offset uint64, fromBlockNumber *uint64, dbTx stateTxType) (*entities.L1Block, error) } -type StorageResetInterface interface { - Reset(ctx context.Context, blockNumber uint64, dbTx stateTxType) error -} - type StorageForkIDInterface interface { AddForkID(ctx context.Context, forkID pgstorage.ForkIDInterval, dbTx stateTxType) error GetForkIDs(ctx context.Context, dbTx stateTxType) ([]pgstorage.ForkIDInterval, error) @@ -62,7 +58,6 @@ type StorageInterface interface { StorageTransactionInterface StorageBlockWriterInterface StorageBlockReaderInterface - StorageResetInterface StorageForkIDInterface StorageL1InfoTreeInterface StorageSequenceBatchesInterface diff --git a/synchronizer/interfaces.go b/synchronizer/interfaces.go index 3835cbb..327e200 100644 --- a/synchronizer/interfaces.go +++ b/synchronizer/interfaces.go @@ -14,6 +14,7 @@ type EthermanInterface interface { HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) EthBlockByNumber(ctx context.Context, blockNumber uint64) (*types.Block, error) + GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*etherman.Block, error) //GetNetworkID(ctx context.Context) (uint, error) GetRollupID() uint GetL1BlockUpgradeLxLy(ctx context.Context, genesisBlock *uint64) (uint64, error) diff --git a/synchronizer/l1_sync/block_range_iterator.go b/synchronizer/l1_sync/block_range_iterator.go new file mode 100644 index 0000000..02c3bb2 --- /dev/null +++ b/synchronizer/l1_sync/block_range_iterator.go @@ -0,0 +1,79 @@ +package l1sync + +import ( + "fmt" + "log" +) + +type BlockRange struct { + FromBlock, ToBlock uint64 + OverlappedFirstBlock bool +} + +func (b BlockRange) InsideRange(blockNumber uint64) bool { + if b.OverlappedFirstBlock { + return blockNumber >= b.FromBlock && blockNumber <= b.ToBlock + } + return blockNumber > b.FromBlock && blockNumber <= b.ToBlock +} + +func (b BlockRange) String() string { + return fmt.Sprintf("FromBlock: %d, ToBlock: %d overlappedFirstBlock:%t", b.FromBlock, b.ToBlock, b.OverlappedFirstBlock) +} + +type BlockRangeIterator struct { + fromBlock, toBlock uint64 + SyncChunkSize uint64 + MaximumBlock uint64 +} + +func NewBlockRangeIterator(fromBlock, syncChunkSize uint64, maximumBlock uint64) *BlockRangeIterator { + res := &BlockRangeIterator{ + fromBlock: fromBlock, + toBlock: fromBlock, + SyncChunkSize: syncChunkSize, + MaximumBlock: maximumBlock, + } + res = res.NextRange(fromBlock) + return res +} +func (i *BlockRangeIterator) IsLastRange() bool { + return i.toBlock >= i.MaximumBlock +} + +func (i *BlockRangeIterator) NextRange(fromBlock uint64) *BlockRangeIterator { + // The FromBlock is the new block (can be the previous one if no blocks found in the range) + if fromBlock < i.fromBlock { + log.Fatal("FromBlock is less than the current fromBlock") + } + i.fromBlock = fromBlock + // Extend toBlock by sync chunk size + i.toBlock = i.toBlock + i.SyncChunkSize + + if i.toBlock > i.MaximumBlock { + i.toBlock = i.MaximumBlock + } + if i.fromBlock > i.toBlock { + return nil + } + return i +} + +func (i *BlockRangeIterator) GetRange(overlappedFirst bool) BlockRange { + if overlappedFirst { + return BlockRange{ + FromBlock: i.fromBlock, + ToBlock: i.toBlock, + OverlappedFirstBlock: overlappedFirst, + } + } + return BlockRange{ + FromBlock: i.fromBlock + 1, + ToBlock: i.toBlock, + OverlappedFirstBlock: overlappedFirst, + } +} + +func (i *BlockRangeIterator) String() string { + return fmt.Sprintf("FromBlock: %d, ToBlock: %d, MaximumBlock: %d", i.fromBlock, i.toBlock, i.MaximumBlock) +} diff --git a/synchronizer/l1_sync/bock_range_iterator_test.go b/synchronizer/l1_sync/bock_range_iterator_test.go new file mode 100644 index 0000000..b4bfc5a --- /dev/null +++ b/synchronizer/l1_sync/bock_range_iterator_test.go @@ -0,0 +1,15 @@ +package l1sync_test + +import ( + "testing" + + l1sync "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/l1_sync" + "github.com/stretchr/testify/require" +) + +func TestKK(t *testing.T) { + it := l1sync.NewBlockRangeIterator(100, 10, 120) + require.False(t, it.IsLastRange()) + it = it.NextRange(100) + require.True(t, it.IsLastRange()) +} diff --git a/synchronizer/l1_sync/l1_sequential_sync.go b/synchronizer/l1_sync/l1_sequential_sync.go index f28ce11..f65b7d8 100644 --- a/synchronizer/l1_sync/l1_sequential_sync.go +++ b/synchronizer/l1_sync/l1_sequential_sync.go @@ -14,6 +14,7 @@ import ( type EthermanInterface interface { GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) + GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*etherman.Block, error) } type StateL1SeqInterface interface { @@ -50,8 +51,9 @@ type L1SequentialSync struct { } type L1SequentialSyncConfig struct { - SyncChunkSize uint64 - GenesisBlockNumber uint64 + SyncChunkSize uint64 + GenesisBlockNumber uint64 + AllowEmptyBlocksAsCheckPoints bool } func NewL1SequentialSync(blockPointsRetriever BlockPointsRetriever, @@ -101,44 +103,6 @@ func (s *BlockPointsRetrieverImplementation) GetL1BlockPoints(ctx context.Contex }, nil } -type IterateBlockRange struct { - FromBlock, ToBlock uint64 - SyncChunkSize uint64 - MaximumBlock uint64 -} - -func NewIterateBlockRange(fromBlock, syncChunkSize uint64, maximumBlock uint64) *IterateBlockRange { - res := &IterateBlockRange{ - FromBlock: fromBlock, - ToBlock: fromBlock, - SyncChunkSize: syncChunkSize, - MaximumBlock: maximumBlock, - } - res = res.NextRange(fromBlock) - return res -} -func (i *IterateBlockRange) IsLastRange() bool { - return i.FromBlock >= i.MaximumBlock -} - -func (i *IterateBlockRange) NextRange(fromBlock uint64) *IterateBlockRange { - // The FromBlock is the new block (can be the previous one if no blocks found in the range) - i.FromBlock = fromBlock - // Extend toBlock by sync chunk size - i.ToBlock = i.ToBlock + i.SyncChunkSize - - if i.ToBlock > i.MaximumBlock { - i.ToBlock = i.MaximumBlock - } - if i.FromBlock > i.ToBlock { - return nil - } - return i -} -func (i *IterateBlockRange) String() string { - return fmt.Sprintf("FromBlock: %d, ToBlock: %d, MaximumBlock: %d", i.FromBlock, i.ToBlock, i.MaximumBlock) -} - // This function syncs the node from a specific block to the latest // returns the last block synced and an error if any // returns true if the sync is completed @@ -153,26 +117,27 @@ func (s *L1SequentialSync) SyncBlocksSequential(ctx context.Context, lastEthBloc if lastEthBlockSynced.BlockNumber > 0 { fromBlock = lastEthBlockSynced.BlockNumber } - blockRange := NewIterateBlockRange(fromBlock, s.cfg.SyncChunkSize, blockPoints.L1LastBlockToSync) + blockRangeIterator := NewBlockRangeIterator(fromBlock, s.cfg.SyncChunkSize, blockPoints.L1LastBlockToSync) for { - if blockRange == nil { + if blockRangeIterator == nil { log.Debugf("Nothing to do starting from %d to %d. Skipping...", fromBlock, blockPoints.L1LastBlockToSync) return lastEthBlockSynced, true, nil } - - log.Infof("Syncing %s", blockRange.String()) - lastEthBlockSynced, synced, err := s.iteration(ctx, blockRange.FromBlock, blockRange.ToBlock, blockPoints.L1FinalizedBlockNumber, lastEthBlockSynced) + blockRange := blockRangeIterator.GetRange(lastEthBlockSynced.HasEvents && !lastEthBlockSynced.Checked) + log.Infof("Syncing %s", blockRangeIterator.String()) + synced := false + lastEthBlockSynced, synced, err = s.iteration(ctx, blockRange, blockPoints.L1FinalizedBlockNumber, lastEthBlockSynced) if err != nil { return lastEthBlockSynced, false, err } if synced { return lastEthBlockSynced, true, nil } - if blockRange.IsLastRange() { + if blockRangeIterator.IsLastRange() { break } - blockRange = blockRange.NextRange(lastEthBlockSynced.BlockNumber) + blockRangeIterator = blockRangeIterator.NextRange(lastEthBlockSynced.BlockNumber) } return lastEthBlockSynced, true, nil } @@ -198,47 +163,54 @@ func (s *L1SequentialSync) checkResponseGetRollupInfoByBlockRangeForOverlappedFi } // firstBlockRequestIsOverlapped = means that fromBlock is one that we already have and have events -func (s *L1SequentialSync) iteration(ctx context.Context, fromBlock, toBlock, finalizedBlockNumber uint64, lastEthBlockSynced *stateBlockType) (*stateBlockType, bool, error) { +func (s *L1SequentialSync) iteration(ctx context.Context, blockRange BlockRange, finalizedBlockNumber uint64, lastEthBlockSynced *stateBlockType) (*stateBlockType, bool, error) { - log.Infof("Getting rollup info from block %d to block %d", fromBlock, toBlock) + log.Infof("Getting rollup info range: %s finalizedBlock:%d", blockRange.String(), finalizedBlockNumber) // This function returns the rollup information contained in the ethereum blocks and an extra param called order. // Order param is a map that contains the event order to allow the synchronizer store the info in the same order that is readed. // Name can be different in the order struct. For instance: Batches or Name:NewSequencers. This name is an identifier to check // if the next info that must be stored in the db is a new sequencer or a batch. The value pos (position) tells what is the // array index where this value is. - firstBlockRequestIsOverlapped := lastEthBlockSynced.HasEvents - if !firstBlockRequestIsOverlapped { - log.Infof("Request is not overlapped, so is not going to detect reorgs. FromBlock: %d toBlock:%d", fromBlock) - } - blocks, order, err := s.etherMan.GetRollupInfoByBlockRange(ctx, fromBlock, &toBlock) + toBlock := blockRange.ToBlock + blocks, order, err := s.etherMan.GetRollupInfoByBlockRange(ctx, blockRange.FromBlock, &toBlock) if err != nil { log.Errorf("error getting rollup info by block range. Err: %v", err) return lastEthBlockSynced, false, err } - if firstBlockRequestIsOverlapped { - err = s.checkResponseGetRollupInfoByBlockRangeForOverlappedFirstBlock(blocks, fromBlock) + if blockRange.OverlappedFirstBlock { + err = s.checkResponseGetRollupInfoByBlockRangeForOverlappedFirstBlock(blocks, blockRange.FromBlock) if err != nil { return lastEthBlockSynced, false, err } } var initBlockReceived *etherman.Block - if len(blocks) != 0 && firstBlockRequestIsOverlapped { + if len(blocks) != 0 && blockRange.OverlappedFirstBlock { + // The first block is overlapped, it have been processed we only want + // it to check reorgs (compare that have not changed between the previous checkReorg and the call GetRollupInfoByBlockRange) initBlockReceived = &blocks[0] // First position of the array must be deleted blocks = removeBlockElement(blocks, 0) } - // Check reorg again to be sure that the chain has not changed between the previous checkReorg and the call GetRollupInfoByBlockRange - block, lastBadBlockNumber, err := s.reorgManager.CheckReorg(lastEthBlockSynced, initBlockReceived) - if err != nil { - log.Errorf("error checking reorgs. Retrying... Err: %v", err) - return lastEthBlockSynced, false, fmt.Errorf("error checking reorgs. Err:%w", err) - } - if block != nil { - // In fact block.BlockNumber is the first ok block, so add 1 to be the first block wrong - // maybe doesnt exists - err := syncommon.NewReorgError(lastBadBlockNumber, fmt.Errorf("reorg detected. First valid block is %d, lastBadBlock is %d by CheckReorg func", block.BlockNumber, lastBadBlockNumber)) - return block, false, err + + if !lastEthBlockSynced.Checked || initBlockReceived != nil { + // Check reorg again to be sure that the chain has not changed between the previous checkReorg and the call GetRollupInfoByBlockRange + log.Debugf("Checking reorgs between lastEthBlockSynced =%d and initBlockReceived: %v", lastEthBlockSynced.BlockNumber, initBlockReceived) + block, lastBadBlockNumber, err := s.reorgManager.CheckReorg(lastEthBlockSynced, initBlockReceived) + log.Debugf("Checking reorgs between lastEthBlockSynced =%d [AFTER]", lastEthBlockSynced.BlockNumber) + if err != nil { + log.Errorf("error checking reorgs. Retrying... Err: %v", err) + return lastEthBlockSynced, false, fmt.Errorf("error checking reorgs. Err:%w", err) + } + if block != nil { + // In fact block.BlockNumber is the first ok block, so add 1 to be the first block wrong + // maybe doesnt exists + err := syncommon.NewReorgError(lastBadBlockNumber, fmt.Errorf("reorg detected. First valid block is %d, lastBadBlock is %d by CheckReorg func", block.BlockNumber, lastBadBlockNumber)) + return block, false, err + } + } else { + log.Debugf("Skipping reorg check because lastEthBlockSynced %d is checked", lastEthBlockSynced.BlockNumber) + } err = s.blockRangeProcessor.ProcessBlockRange(ctx, blocks, order, finalizedBlockNumber) @@ -253,10 +225,48 @@ func (s *L1SequentialSync) iteration(ctx context.Context, fromBlock, toBlock, fi for i := range blocks { log.Info("Position: ", i, ". New block. BlockNumber: ", blocks[i].BlockNumber, ". BlockHash: ", blocks[i].BlockHash) } + } else { + // Decide if create a empty block to move the FromBlock + // We can ignore error because if we can't create empty block we can continue extending toBlock + emptyBlock, _ := s.CreateBlockWithoutRollupInfoIfNeeded(ctx, blockRange, finalizedBlockNumber) + if emptyBlock != nil { + log.Infof("Creating empty block BlockNumber: %d", emptyBlock.BlockNumber) + lastEthBlockSynced = emptyBlock + + } } return lastEthBlockSynced, false, nil } +// CreateBlockWithoutRollupInfoIfNeeded creates a block without rollup if +// the condition is that the fromBlock + SyncChunkSize < finalizedBlockNumber +// because means that the empty block that we can't check the reorg is safe create it +func (s *L1SequentialSync) CreateBlockWithoutRollupInfoIfNeeded(ctx context.Context, blockRange BlockRange, finalizedBlockNumber uint64) (*stateBlockType, error) { + if !s.cfg.AllowEmptyBlocksAsCheckPoints { + return nil, nil + } + proposedBlockNumber := blockRange.FromBlock + s.cfg.SyncChunkSize + + if !blockRange.InsideRange(proposedBlockNumber) { + return nil, nil + } + if entities.IsBlockFinalized(proposedBlockNumber, finalizedBlockNumber) { + // Create a block without rollup info + emptyBlock, err := s.etherMan.GetL1BlockByNumber(ctx, proposedBlockNumber) + if err != nil || emptyBlock == nil { + log.Warnf("error getting block %d from the blockchain. Error: %v", proposedBlockNumber, err) + return nil, err + } + err = s.blockRangeProcessor.ProcessBlockRange(ctx, []etherman.Block{*emptyBlock}, nil, finalizedBlockNumber) + if err != nil { + log.Warnf("error processing the block range. Err: %v", err) + return nil, err + } + return entities.NewL1BlockFromEthermanBlock(emptyBlock, true), nil + } + return nil, nil +} + /* func (s *L1SequentialSync) reorgDetected(ctx context.Context, lastEthBlockSynced *stateBlockType) (*stateBlockType, error) { prevBlock, err := s.state.GetPreviousBlock(ctx, 1, nil) diff --git a/synchronizer/l1_sync/mocks/etherman_interface.go b/synchronizer/l1_sync/mocks/etherman_interface.go index 5e73651..060fb39 100644 --- a/synchronizer/l1_sync/mocks/etherman_interface.go +++ b/synchronizer/l1_sync/mocks/etherman_interface.go @@ -25,6 +25,65 @@ func (_m *EthermanInterface) EXPECT() *EthermanInterface_Expecter { return &EthermanInterface_Expecter{mock: &_m.Mock} } +// GetL1BlockByNumber provides a mock function with given fields: ctx, blockNumber +func (_m *EthermanInterface) GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*etherman.Block, error) { + ret := _m.Called(ctx, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for GetL1BlockByNumber") + } + + var r0 *etherman.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*etherman.Block, error)); ok { + return rf(ctx, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *etherman.Block); ok { + r0 = rf(ctx, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*etherman.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanInterface_GetL1BlockByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetL1BlockByNumber' +type EthermanInterface_GetL1BlockByNumber_Call struct { + *mock.Call +} + +// GetL1BlockByNumber is a helper method to define mock.On call +// - ctx context.Context +// - blockNumber uint64 +func (_e *EthermanInterface_Expecter) GetL1BlockByNumber(ctx interface{}, blockNumber interface{}) *EthermanInterface_GetL1BlockByNumber_Call { + return &EthermanInterface_GetL1BlockByNumber_Call{Call: _e.mock.On("GetL1BlockByNumber", ctx, blockNumber)} +} + +func (_c *EthermanInterface_GetL1BlockByNumber_Call) Run(run func(ctx context.Context, blockNumber uint64)) *EthermanInterface_GetL1BlockByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64)) + }) + return _c +} + +func (_c *EthermanInterface_GetL1BlockByNumber_Call) Return(_a0 *etherman.Block, _a1 error) *EthermanInterface_GetL1BlockByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanInterface_GetL1BlockByNumber_Call) RunAndReturn(run func(context.Context, uint64) (*etherman.Block, error)) *EthermanInterface_GetL1BlockByNumber_Call { + _c.Call.Return(run) + return _c +} + // GetRollupInfoByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock func (_m *EthermanInterface) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) { ret := _m.Called(ctx, fromBlock, toBlock) diff --git a/synchronizer/mocks/etherman_full_interface.go b/synchronizer/mocks/etherman_full_interface.go index 44a7b18..a532ea2 100644 --- a/synchronizer/mocks/etherman_full_interface.go +++ b/synchronizer/mocks/etherman_full_interface.go @@ -87,6 +87,65 @@ func (_c *EthermanFullInterface_EthBlockByNumber_Call) RunAndReturn(run func(con return _c } +// GetL1BlockByNumber provides a mock function with given fields: ctx, blockNumber +func (_m *EthermanFullInterface) GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*etherman.Block, error) { + ret := _m.Called(ctx, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for GetL1BlockByNumber") + } + + var r0 *etherman.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*etherman.Block, error)); ok { + return rf(ctx, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *etherman.Block); ok { + r0 = rf(ctx, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*etherman.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanFullInterface_GetL1BlockByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetL1BlockByNumber' +type EthermanFullInterface_GetL1BlockByNumber_Call struct { + *mock.Call +} + +// GetL1BlockByNumber is a helper method to define mock.On call +// - ctx context.Context +// - blockNumber uint64 +func (_e *EthermanFullInterface_Expecter) GetL1BlockByNumber(ctx interface{}, blockNumber interface{}) *EthermanFullInterface_GetL1BlockByNumber_Call { + return &EthermanFullInterface_GetL1BlockByNumber_Call{Call: _e.mock.On("GetL1BlockByNumber", ctx, blockNumber)} +} + +func (_c *EthermanFullInterface_GetL1BlockByNumber_Call) Run(run func(ctx context.Context, blockNumber uint64)) *EthermanFullInterface_GetL1BlockByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64)) + }) + return _c +} + +func (_c *EthermanFullInterface_GetL1BlockByNumber_Call) Return(_a0 *etherman.Block, _a1 error) *EthermanFullInterface_GetL1BlockByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanFullInterface_GetL1BlockByNumber_Call) RunAndReturn(run func(context.Context, uint64) (*etherman.Block, error)) *EthermanFullInterface_GetL1BlockByNumber_Call { + _c.Call.Return(run) + return _c +} + // GetL1BlockUpgradeLxLy provides a mock function with given fields: ctx, genesisBlock func (_m *EthermanFullInterface) GetL1BlockUpgradeLxLy(ctx context.Context, genesisBlock uint64) (uint64, error) { ret := _m.Called(ctx, genesisBlock) diff --git a/synchronizer/mocks/etherman_interface.go b/synchronizer/mocks/etherman_interface.go index 87737f6..424e9b6 100644 --- a/synchronizer/mocks/etherman_interface.go +++ b/synchronizer/mocks/etherman_interface.go @@ -203,6 +203,65 @@ func (_c *EthermanInterface_GetForks_Call) RunAndReturn(run func(context.Context return _c } +// GetL1BlockByNumber provides a mock function with given fields: ctx, blockNumber +func (_m *EthermanInterface) GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*etherman.Block, error) { + ret := _m.Called(ctx, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for GetL1BlockByNumber") + } + + var r0 *etherman.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*etherman.Block, error)); ok { + return rf(ctx, blockNumber) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *etherman.Block); ok { + r0 = rf(ctx, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*etherman.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EthermanInterface_GetL1BlockByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetL1BlockByNumber' +type EthermanInterface_GetL1BlockByNumber_Call struct { + *mock.Call +} + +// GetL1BlockByNumber is a helper method to define mock.On call +// - ctx context.Context +// - blockNumber uint64 +func (_e *EthermanInterface_Expecter) GetL1BlockByNumber(ctx interface{}, blockNumber interface{}) *EthermanInterface_GetL1BlockByNumber_Call { + return &EthermanInterface_GetL1BlockByNumber_Call{Call: _e.mock.On("GetL1BlockByNumber", ctx, blockNumber)} +} + +func (_c *EthermanInterface_GetL1BlockByNumber_Call) Run(run func(ctx context.Context, blockNumber uint64)) *EthermanInterface_GetL1BlockByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64)) + }) + return _c +} + +func (_c *EthermanInterface_GetL1BlockByNumber_Call) Return(_a0 *etherman.Block, _a1 error) *EthermanInterface_GetL1BlockByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EthermanInterface_GetL1BlockByNumber_Call) RunAndReturn(run func(context.Context, uint64) (*etherman.Block, error)) *EthermanInterface_GetL1BlockByNumber_Call { + _c.Call.Return(run) + return _c +} + // GetL1BlockUpgradeLxLy provides a mock function with given fields: ctx, genesisBlock func (_m *EthermanInterface) GetL1BlockUpgradeLxLy(ctx context.Context, genesisBlock *uint64) (uint64, error) { ret := _m.Called(ctx, genesisBlock) diff --git a/synchronizer/synchronizer_impl.go b/synchronizer/synchronizer_impl.go index 0ac44ac..08bc5de 100644 --- a/synchronizer/synchronizer_impl.go +++ b/synchronizer/synchronizer_impl.go @@ -7,10 +7,12 @@ import ( "github.com/0xPolygonHermez/zkevm-synchronizer-l1/log" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/state/entities" + "github.com/0xPolygonHermez/zkevm-synchronizer-l1/state/model" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/actions/elderberry" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/actions/etrog" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/actions/incaberry" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/actions/processor_manager" + "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/common" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/common/syncinterfaces" syncconfig "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/config" "github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer/l1_check_block" @@ -87,9 +89,12 @@ func NewSynchronizerImpl( sync.l1Sync = l1sync.NewL1SequentialSync(blocksRetriever, ethMan, state, sync.blockRangeProcessor, reorgManager, l1sync.L1SequentialSyncConfig{ - SyncChunkSize: cfg.SyncChunkSize, - GenesisBlockNumber: sync.genBlockNumber, + SyncChunkSize: cfg.SyncChunkSize, + GenesisBlockNumber: sync.genBlockNumber, + AllowEmptyBlocksAsCheckPoints: true, }) + + state.AddOnReorgCallback(sync.OnReorgExecuted) return &sync, nil } @@ -106,6 +111,10 @@ func (s *SynchronizerImpl) SetCallbackOnReorgDone(callback func(newFirstL1BlockN log.Fatal("Not implemented") } +func (s *SynchronizerImpl) OnReorgExecuted(reorg model.ReorgExecutionResult) { + log.Infof("Reorg executed! %s", reorg.String()) +} + // Sync function will read the last state synced and will continue from that point. // Sync() will read blockchain events to detect rollup updates func (s *SynchronizerImpl) Sync(returnOnSync bool) error { @@ -141,9 +150,15 @@ func (s *SynchronizerImpl) Sync(returnOnSync bool) error { //Sync L1Blocks var isSynced bool - //if lastBlockSynced, isSynced, err = s.syncBlocks(lastBlockSynced); err != nil { if lastBlockSynced, isSynced, err = s.l1Sync.SyncBlocksSequential(s.ctx, lastBlockSynced); err != nil { log.Warnf("networkID: %d, error syncing blocks: %v", s.networkID, err) + + err = s.executeReorgIfNeeded(common.CastReorgError(err)) + if err != nil { + log.Errorf("networkID: %d, error resetting the state to a previous block. Error: %v", s.networkID, err) + continue + } + lastBlockSynced, err = s.storage.GetLastBlock(s.ctx, nil) if err != nil { log.Fatalf("networkID: %d, error getting lastBlockSynced to resume the synchronization... Error: ", s.networkID, err) @@ -152,48 +167,59 @@ func (s *SynchronizerImpl) Sync(returnOnSync bool) error { continue } } - if !s.synced { - // Check latest Block - header, err := s.etherMan.HeaderByNumber(s.ctx, nil) - if err != nil { - log.Warnf("networkID: %d, error getting latest block from. Error: %s", s.networkID, err.Error()) - continue - } - lastKnownBlock := header.Number.Uint64() - log.Debugf("NetworkID: %d, lastBlockSynced: %d, lastKnownBlock: %d", s.networkID, lastBlockSynced.BlockNumber, lastKnownBlock) - if isSynced && !s.synced { - log.Infof("NetworkID %d Synced! lastL1Block: %d lastBlockSynced:%d ", s.networkID, lastKnownBlock, lastBlockSynced.BlockNumber) - waitDuration = s.cfg.SyncInterval.Duration - s.synced = true - if returnOnSync { - log.Infof("NetworkID: %d, Synchronization finished, returning because returnOnSync=true", s.networkID) - return nil - } + s.setSyncedStatus(isSynced) + if s.synced { + log.Infof("NetworkID %d Synced! lastBlockSynced:%d ", s.networkID, lastBlockSynced.BlockNumber) + if returnOnSync { + log.Infof("NetworkID: %d, Synchronization finished, returning because returnOnSync=true", s.networkID) + return nil } - if lastBlockSynced.BlockNumber > lastKnownBlock { - if s.networkID == 0 { - log.Fatalf("networkID: %d, error: latest Synced BlockNumber (%d) is higher than the latest Proposed block (%d) in the network", s.networkID, lastBlockSynced.BlockNumber, lastKnownBlock) - } else { - log.Errorf("networkID: %d, error: latest Synced BlockNumber (%d) is higher than the latest Proposed block (%d) in the network", s.networkID, lastBlockSynced.BlockNumber, lastKnownBlock) - err = s.resetState(lastKnownBlock) - if err != nil { - log.Errorf("networkID: %d, error resetting the state to a previous block. Error: %v", s.networkID, err) - continue - } - } - } - } else { - s.synced = isSynced + waitDuration = s.cfg.SyncInterval.Duration } } } } +func (s *SynchronizerImpl) setSyncedStatus(synced bool) { + s.synced = synced +} + // Stop function stops the synchronizer func (s *SynchronizerImpl) Stop() { s.cancelCtx() } +func (s *SynchronizerImpl) executeReorgIfNeeded(reorgError *common.ReorgError) error { + if reorgError == nil { + return nil + } + dbTx, err := s.state.BeginTransaction(s.ctx) + if err != nil { + log.Errorf("networkID: %d, error starting a db transaction to execute reorg. Error: %v", s.networkID, err) + return err + } + + req := model.ReorgRequest{ + FirstL1BlockNumberToKeep: reorgError.BlockNumber - 1, // Previous block to last bad block + ReasonError: reorgError.Err, + } + + result := s.state.ExecuteReorg(s.ctx, req, dbTx) + if !result.IsSuccess() { + log.Errorf("networkID: %d, error executing reorg. Error: %v", s.networkID, result.ExecutionError) + // I don't care about result of Rollback + _ = dbTx.Rollback(s.ctx) + return result.ExecutionError + } + errCommit := dbTx.Commit(s.ctx) + if errCommit != nil { + log.Errorf("networkID: %d, error committing reorg. Error: %v", s.networkID, errCommit) + return errCommit + } + return nil + +} +/* // This function allows reset the state until an specific ethereum block func (s *SynchronizerImpl) resetState(blockNumber uint64) error { log.Infof("NetworkID: %d. Reverting synchronization to block: %d", s.networkID, blockNumber) @@ -228,3 +254,4 @@ func (s *SynchronizerImpl) resetState(blockNumber uint64) error { return nil } +*/