Skip to content

Commit

Permalink
checking reorg and executing complete. Missing mark check blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
joanestebanr committed May 9, 2024
1 parent f28bf7f commit 703ebff
Show file tree
Hide file tree
Showing 24 changed files with 766 additions and 122 deletions.
2 changes: 1 addition & 1 deletion cmd/run/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ func RunCmd(cliCtx *cli.Context) error {
log.Error("Error creating synchronizer", err)
return err
}
return sync.Sync(true)
return sync.Sync(false)
}
20 changes: 19 additions & 1 deletion etherman/etherman.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,24 @@ func (etherMan *Client) updateL1InfoTreeEvent(ctx context.Context, vLog types.Lo
(*blocksOrder)[block.BlockHash] = append((*blocksOrder)[block.BlockHash], order)
return nil
}

func (etherMan *Client) GetL1BlockByNumber(ctx context.Context, blockNumber uint64) (*Block, error) {
ethBlock, err := etherMan.EthClient.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil {
return nil, err
}
t := time.Unix(int64(ethBlock.Time()), 0)

//block := prepareBlock(vLog, t, fullBlock)
block := Block{
BlockNumber: ethBlock.NumberU64(),
BlockHash: ethBlock.Hash(),
ParentHash: ethBlock.ParentHash(),
ReceivedAt: t,
}
return &block, nil
}

func (etherMan *Client) retrieveFullBlockbyHash(ctx context.Context, blockHash common.Hash) (*Block, error) {
var err error
var fullBlock *types.Block
Expand Down Expand Up @@ -1006,7 +1024,7 @@ func (etherMan *Client) retrieveFullBlockbyHash(ctx context.Context, blockHash c
//block := prepareBlock(vLog, t, fullBlock)
block := Block{
BlockNumber: fullBlock.NumberU64(),
BlockHash: blockHash,
BlockHash: fullBlock.Hash(),
ParentHash: fullBlock.ParentHash(),
ReceivedAt: t,
}
Expand Down
5 changes: 4 additions & 1 deletion state/entities/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
)

type TxCallbackType func(Tx, error)

type Tx interface {
Commit(ctx context.Context) error
Rollback(ctx context.Context) error

AddRollbackCallback(func())
AddRollbackCallback(TxCallbackType)
AddCommitCallback(TxCallbackType)
}
4 changes: 4 additions & 0 deletions state/model/l1infotree_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func HashLeaf(leaf *L1InfoTreeLeaf) common.Hash {
return l1infotree.HashLeafData(leaf.GlobalExitRoot, leaf.PreviousBlockHash, timestamp)
}

func (s *L1InfoTreeState) OnReorg(reorg ReorgExecutionResult) {
log.Infof("Reorg: clean cache L1InfoTree")
s.l1InfoTree = nil
}
func (s *L1InfoTreeState) BuildL1InfoTreeCacheIfNeed(ctx context.Context, dbTx stateTxType) error {
if s.l1InfoTree != nil {
return nil
Expand Down
98 changes: 98 additions & 0 deletions state/model/reorg_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package model

import (
"context"
"fmt"
"sync"
"time"
)

// ReorgRequest is a struct that contains the information needed to execute a reorg
type ReorgRequest struct {
FirstL1BlockNumberToKeep uint64
ReasonError error
}

func (r *ReorgRequest) String() string {
return fmt.Sprintf("FirstL1BlockNumberToKeep: %d, ReasonError: %s", r.FirstL1BlockNumberToKeep, r.ReasonError)
}

// ReorgExecutionResult is a struct that contains the information of the reorg execution
type ReorgExecutionResult struct {
Request ReorgRequest
ExecutionCounter uint64 // Number of reorg in this execution (is not a global unique ID!!!!)
ExecutionError error
ExecutionTime time.Time
ExecutionDuration time.Duration
}

func (r ReorgExecutionResult) IsSuccess() bool {
return r.ExecutionError == nil
}

func (r *ReorgExecutionResult) String() string {
return fmt.Sprintf("Request: %s, ExecutionCounter: %d, ExecutionError: %v, ExecutionTime: %s, ExecutionDuration: %s",
r.Request.String(), r.ExecutionCounter, r.ExecutionError, r.ExecutionTime.String(), r.ExecutionDuration.String())
}

type ReorgCallbackType = func(ReorgExecutionResult)

type StorageReorgInterface interface {
ResetToL1BlockNumber(ctx context.Context, firstBlockNumberToKeep uint64, dbTx storageTxType) error
}

type ReorgState struct {
mutex sync.Mutex
storage StorageReorgInterface
onReorgCallbacks []ReorgCallbackType
lastReorgResult *ReorgExecutionResult
}

func NewReorgState(storage StorageReorgInterface) *ReorgState {
return &ReorgState{
storage: storage,
}
}

func (s *ReorgState) AddOnReorgCallback(f ReorgCallbackType) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.onReorgCallbacks = append(s.onReorgCallbacks, f)
}

func (s *ReorgState) ExecuteReorg(ctx context.Context, reorgRequest ReorgRequest, dbTx storageTxType) ReorgExecutionResult {
startTime := time.Now()
err := s.storage.ResetToL1BlockNumber(ctx, reorgRequest.FirstL1BlockNumberToKeep, dbTx)
res := s.createNewResult(reorgRequest, err, startTime)
dbTx.AddCommitCallback(s.onTxCommit)
dbTx.AddCommitCallback(s.onTxRollback)
return res
}

func (s *ReorgState) onTxCommit(dbTx storageTxType, err error) {
if err != nil {
for _, f := range s.onReorgCallbacks {
f(*s.lastReorgResult)
}
}
}

func (s *ReorgState) onTxRollback(dbTx storageTxType, err error) {
}

func (s *ReorgState) createNewResult(reorgRequest ReorgRequest, err error, startTime time.Time) ReorgExecutionResult {
s.mutex.Lock()
defer s.mutex.Unlock()
res := ReorgExecutionResult{
Request: reorgRequest,
ExecutionCounter: 1,
ExecutionError: err,
ExecutionTime: startTime,
ExecutionDuration: time.Since(startTime),
}
if s.lastReorgResult != nil {
res.ExecutionCounter = s.lastReorgResult.ExecutionCounter + 1
}
s.lastReorgResult = &res
return res
}
4 changes: 4 additions & 0 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type State struct {
*model.ForkIdState
*model.L1InfoTreeState
*model.BatchState
*model.ReorgState
storage.BlockStorer
}

Expand All @@ -19,7 +20,10 @@ func NewState(storageImpl storage.Storer) *State {
model.NewForkIdState(storageImpl),
model.NewL1InfoTreeManager(storageImpl),
model.NewBatchState(storageImpl),
model.NewReorgState(storageImpl),
storageImpl,
}
// Connect cache invalidation on Reorg
res.ReorgState.AddOnReorgCallback(res.L1InfoTreeState.OnReorg)
return res
}
5 changes: 5 additions & 0 deletions state/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type virtualBatchStorer interface {
GetVirtualBatchByBatchNumber(ctx context.Context, batchNumber uint64, dbTx storageTxType) (*VirtualBatch, error)
}

type reorgStorer interface {
ResetToL1BlockNumber(ctx context.Context, firstBlockNumberToKeep uint64, dbTx storageTxType) error
}

type txStorer interface {
BeginTransaction(ctx context.Context) (storageTxType, error)
}
Expand All @@ -59,4 +63,5 @@ type Storer interface {
l1infoTreeStorer
virtualBatchStorer
sequencedBatchStorer
reorgStorer
}
75 changes: 75 additions & 0 deletions state/storage/pgstorage/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package pgstorage

import (
"context"
"encoding/json"
)

func (p *PostgresStorage) KVSetString(ctx context.Context, key string, value string, dbTx dbTxType) error {
e := p.getExecQuerier(getPgTx(dbTx))
const setSQL = "INSERT INTO sync.kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2"
if _, err := e.Exec(ctx, setSQL, key, value); err != nil {
return err
}
return nil
}

func (p *PostgresStorage) KVSetJson(ctx context.Context, key string, value interface{}, dbTx dbTxType) error {
jsonValue, err := json.Marshal(value)
if err != nil {
return err
}
return p.KVSetString(ctx, key, string(jsonValue), dbTx)
}

func (p *PostgresStorage) KVSetUint64(ctx context.Context, key string, value uint64, dbTx dbTxType) error {
jsonValue, err := json.Marshal(value)
if err != nil {
return err
}
return p.KVSetString(ctx, key, string(jsonValue), dbTx)
}

func (p *PostgresStorage) KVGetString(ctx context.Context, key string, dbTx dbTxType) (string, error) {
e := p.getExecQuerier(getPgTx(dbTx))
const getSQL = "SELECT value FROM sync.kv WHERE key = $1"
row := e.QueryRow(ctx, getSQL, key)
var value string
err := row.Scan(&value)
err = translatePgxError(err, "KVGetString")
if err != nil {
return "", err
}
return value, nil
}

func (p *PostgresStorage) KVGetJson(ctx context.Context, key string, value interface{}, dbTx dbTxType) error {
valueStr, err := p.KVGetString(ctx, key, dbTx)
if err != nil {
return err
}
return json.Unmarshal([]byte(valueStr), value)
}

func (p *PostgresStorage) KVGetUint64(ctx context.Context, key string, dbTx dbTxType) (uint64, error) {
valueStr, err := p.KVGetString(ctx, key, dbTx)
if err != nil {
return 0, err
}
value := uint64(0)
err = json.Unmarshal([]byte(valueStr), &value)
return value, err
}

func (p *PostgresStorage) KVExists(ctx context.Context, key string, dbTx dbTxType) (bool, error) {
e := p.getExecQuerier(getPgTx(dbTx))
const existsSQL = "SELECT EXISTS(SELECT 1 FROM sync.kv WHERE key = $1)"
row := e.QueryRow(ctx, existsSQL, key)
var exists bool
err := row.Scan(&exists)
err = translatePgxError(err, "KVExists")
if err != nil {
return false, err
}
return exists, nil
}
88 changes: 88 additions & 0 deletions state/storage/pgstorage/kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package pgstorage_test

import (
"context"
"testing"

"github.com/0xPolygonHermez/zkevm-synchronizer-l1/state/storage/pgstorage"
"github.com/stretchr/testify/require"
)

const (
testKey = "fake_key_used_for_unittest"
)

func TestKVSet(t *testing.T) {
skipDatabaseTestIfNeeded(t)
ctx := context.TODO()
dbConfig := getStorageConfig()
err := pgstorage.ResetDB(dbConfig)
require.NoError(t, err)
storage, err := pgstorage.NewPostgresStorage(dbConfig)
require.NoError(t, err)
dbTx, err := storage.BeginTransaction(ctx)
require.NoError(t, err)
exists, err := storage.KVExists(ctx, testKey, dbTx)
require.NoError(t, err)
require.False(t, exists)
err = storage.KVSetString(ctx, testKey, "fake_value", dbTx)
require.NoError(t, err)
exists, err = storage.KVExists(ctx, testKey, dbTx)
require.NoError(t, err)
require.True(t, exists)
value, err := storage.KVGetString(ctx, testKey, dbTx)
require.NoError(t, err)
require.Equal(t, "fake_value", value)
dbTx.Commit(ctx)

Check failure on line 36 in state/storage/pgstorage/kv_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `dbTx.Commit` is not checked (errcheck)
}

type kvTestStruct struct {
A int
B string
}

func TestKVJson(t *testing.T) {
skipDatabaseTestIfNeeded(t)
ctx := context.TODO()
dbConfig := getStorageConfig()
err := pgstorage.ResetDB(dbConfig)
require.NoError(t, err)
storage, err := pgstorage.NewPostgresStorage(dbConfig)
require.NoError(t, err)
dbTx, err := storage.BeginTransaction(ctx)
require.NoError(t, err)
data := kvTestStruct{A: 1, B: "test"}
err = storage.KVSetJson(ctx, testKey, data, dbTx)
require.NoError(t, err)
var dataRead kvTestStruct
err = storage.KVGetJson(ctx, testKey, &dataRead, dbTx)
require.NoError(t, err)
require.Equal(t, data, dataRead)

dbTx.Commit(ctx)
}

func TestKVUint64(t *testing.T) {
skipDatabaseTestIfNeeded(t)
ctx := context.TODO()
dbConfig := getStorageConfig()
err := pgstorage.ResetDB(dbConfig)
require.NoError(t, err)
storage, err := pgstorage.NewPostgresStorage(dbConfig)
require.NoError(t, err)
dbTx, err := storage.BeginTransaction(ctx)
require.NoError(t, err)
data := uint64(1234)
err = storage.KVSetUint64(ctx, testKey, data, dbTx)
require.NoError(t, err)
dataRead, err := storage.KVGetUint64(ctx, testKey, dbTx)
require.NoError(t, err)
require.Equal(t, data, dataRead)

err = storage.KVSetString(ctx, testKey, "not a number", dbTx)
require.NoError(t, err)
_, err = storage.KVGetUint64(ctx, testKey, dbTx)
require.Error(t, err)

dbTx.Commit(ctx)
}
10 changes: 10 additions & 0 deletions state/storage/pgstorage/migrations/0004.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +migrate Up
CREATE TABLE IF NOT EXISTS sync.kv (
key VARCHAR(256) PRIMARY KEY,
value VARCHAR
);


-- +migrate Down

DROP TABLE IF EXISTS sync.kv;
10 changes: 7 additions & 3 deletions state/storage/pgstorage/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package pgstorage

import (
"context"
"log"
)

func (p *PostgresStorage) Reset(ctx context.Context, blockNumber uint64, dbTx dbTxType) error {
log.Fatal("Not implemented")
// ResetToL1BlockNumber resets the state to a block for the given DB tx
func (p *PostgresStorage) ResetToL1BlockNumber(ctx context.Context, firstBlockNumberToKeep uint64, dbTx dbTxType) error {
e := p.getExecQuerier(getPgTx(dbTx))
const resetSQL = "DELETE FROM sync.block WHERE block_num > $1"
if _, err := e.Exec(ctx, resetSQL, firstBlockNumberToKeep); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 703ebff

Please sign in to comment.