From d1534fcca76ccaf67b77b887f9fb6f4fd3d7beee Mon Sep 17 00:00:00 2001 From: Roman Behma <13855864+begmaroman@users.noreply.github.com> Date: Tue, 16 Jul 2024 09:34:56 +0100 Subject: [PATCH] Reporting offchain data gap detection (#102) --- .mockery.yaml | 6 ++ cmd/main.go | 2 +- db/db.go | 143 +++++++++++++++++++++---------- db/db_test.go | 96 +++++++++++++++++++++ db/migrations/0005.sql | 3 + mocks/db.generated.go | 58 +++++++++++++ mocks/gaps_detector.generated.go | 79 +++++++++++++++++ services/status/status.go | 27 ++++-- services/status/status_test.go | 7 +- synchronizer/batches.go | 88 ++++++++++++++++--- synchronizer/batches_test.go | 73 +++++++++++++++- synchronizer/store.go | 21 +++-- synchronizer/store_test.go | 94 ++++++++++++++++++++ types/types.go | 9 +- 14 files changed, 625 insertions(+), 81 deletions(-) create mode 100644 mocks/gaps_detector.generated.go diff --git a/.mockery.yaml b/.mockery.yaml index 0256cef9..bb55d443 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -36,3 +36,9 @@ packages: SequencerTracker: config: filename: sequencer_tracker.generated.go + github.com/0xPolygon/cdk-data-availability/services/status: + config: + interfaces: + GapsDetector: + config: + filename: gaps_detector.generated.go diff --git a/cmd/main.go b/cmd/main.go index bfc6032b..0923e214 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -151,7 +151,7 @@ func start(cliCtx *cli.Context) error { []rpc.Service{ { Name: status.APISTATUS, - Service: status.NewEndpoints(storage), + Service: status.NewEndpoints(storage, batchSynchronizer), }, { Name: sync.APISYNC, diff --git a/db/db.go b/db/db.go index d1353691..683792f7 100644 --- a/db/db.go +++ b/db/db.go @@ -11,6 +11,75 @@ import ( "github.com/jmoiron/sqlx" ) +const ( + // storeLastProcessedBlockSQL is a query that stores the last processed block for a given task + storeLastProcessedBlockSQL = ` + INSERT INTO data_node.sync_tasks (task, block) + VALUES ($1, $2) + ON CONFLICT (task) DO UPDATE + SET block = EXCLUDED.block, processed = NOW();` + + // getLastProcessedBlockSQL is a query that returns the last processed block for a given task + getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;` + + // storeUnresolvedBatchesSQL is a query that stores unresolved batches in the database + storeUnresolvedBatchesSQL = ` + INSERT INTO data_node.unresolved_batches (num, hash) + VALUES ($1, $2) + ON CONFLICT (num, hash) DO NOTHING; + ` + + // getUnresolvedBatchKeysSQL is a query that returns the unresolved batch keys from the database + getUnresolvedBatchKeysSQL = `SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;` + + // deleteUnresolvedBatchKeysSQL is a query that deletes the unresolved batch keys from the database + deleteUnresolvedBatchKeysSQL = `DELETE FROM data_node.unresolved_batches WHERE num = $1 AND hash = $2;` + + // storeOffChainDataSQL is a query that stores offchain data in the database + storeOffChainDataSQL = ` + INSERT INTO data_node.offchain_data (key, value, batch_num) + VALUES ($1, $2, $3) + ON CONFLICT (key) DO UPDATE + SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num; + ` + + // getOffchainDataSQL is a query that returns the offchain data for a given key + getOffchainDataSQL = ` + SELECT key, value, batch_num + FROM data_node.offchain_data + WHERE key = $1 LIMIT 1; + ` + + // listOffchainDataSQL is a query that returns the offchain data for a given list of keys + listOffchainDataSQL = ` + SELECT key, value, batch_num + FROM data_node.offchain_data + WHERE key IN (?); + ` + + // countOffchainDataSQL is a query that returns the count of rows in the offchain_data table + countOffchainDataSQL = "SELECT COUNT(*) FROM data_node.offchain_data;" + + // selectOffchainDataGapsSQL is a query that returns the gaps in the offchain_data table + selectOffchainDataGapsSQL = ` + WITH numbered_batches AS ( + SELECT + batch_num, + ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number + FROM data_node.offchain_data + ) + SELECT + nb1.batch_num AS current_batch_num, + nb2.batch_num AS next_batch_num + FROM + numbered_batches nb1 + LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1 + WHERE + nb1.batch_num IS NOT NULL + AND nb2.batch_num IS NOT NULL + AND nb1.batch_num + 1 <> nb2.batch_num;` +) + var ( // ErrStateNotSynchronized indicates the state database may be empty ErrStateNotSynchronized = errors.New("state not synchronized") @@ -28,8 +97,8 @@ type DB interface { GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error) StoreOffChainData(ctx context.Context, od []types.OffChainData) error - CountOffchainData(ctx context.Context) (uint64, error) + DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) } // DB is the database layer of the data node @@ -46,13 +115,6 @@ func New(pg *sqlx.DB) DB { // StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, block uint64, task string) error { - const storeLastProcessedBlockSQL = ` - INSERT INTO data_node.sync_tasks (task, block) - VALUES ($1, $2) - ON CONFLICT (task) DO UPDATE - SET block = EXCLUDED.block, processed = NOW(); - ` - if _, err := db.pg.ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil { return err } @@ -62,8 +124,6 @@ func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, block uint64, task // GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) { - const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;" - var ( lastBlock uint64 ) @@ -77,12 +137,6 @@ func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, // StoreUnresolvedBatchKeys stores unresolved batch keys in the database func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { - const storeUnresolvedBatchesSQL = ` - INSERT INTO data_node.unresolved_batches (num, hash) - VALUES ($1, $2) - ON CONFLICT (num, hash) DO NOTHING; - ` - tx, err := db.pg.BeginTxx(ctx, nil) if err != nil { return err @@ -107,8 +161,6 @@ func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchK // GetUnresolvedBatchKeys returns the unresolved batch keys from the database func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { - const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;" - rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL, limit) if err != nil { return nil, err @@ -137,11 +189,6 @@ func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types // DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { - const deleteUnresolvedBatchKeysSQL = ` - DELETE FROM data_node.unresolved_batches - WHERE num = $1 AND hash = $2; - ` - tx, err := db.pg.BeginTxx(ctx, nil) if err != nil { return err @@ -166,13 +213,6 @@ func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.Batch // StoreOffChainData stores and array of key values in the Db func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData) error { - const storeOffChainDataSQL = ` - INSERT INTO data_node.offchain_data (key, value, batch_num) - VALUES ($1, $2, $3) - ON CONFLICT (key) DO UPDATE - SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num; - ` - tx, err := db.pg.BeginTxx(ctx, nil) if err != nil { return err @@ -198,12 +238,6 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData) // GetOffChainData returns the value identified by the key func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) { - const getOffchainDataSQL = ` - SELECT key, value, batch_num - FROM data_node.offchain_data - WHERE key = $1 LIMIT 1; - ` - data := struct { Key string `db:"key"` Value string `db:"value"` @@ -231,12 +265,6 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ return nil, nil } - const listOffchainDataSQL = ` - SELECT key, value, batch_num - FROM data_node.offchain_data - WHERE key IN (?); - ` - preparedKeys := make([]string, len(keys)) for i, key := range keys { preparedKeys[i] = key.Hex() @@ -280,12 +308,37 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ // CountOffchainData returns the count of rows in the offchain_data table func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) { - const countQuery = "SELECT COUNT(*) FROM data_node.offchain_data;" - var count uint64 - if err := db.pg.QueryRowContext(ctx, countQuery).Scan(&count); err != nil { + if err := db.pg.QueryRowContext(ctx, countOffchainDataSQL).Scan(&count); err != nil { return 0, err } return count, nil } + +// DetectOffchainDataGaps returns the number of gaps in the offchain_data table +func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { + rows, err := db.pg.QueryxContext(ctx, selectOffchainDataGapsSQL) + if err != nil { + return nil, err + } + + defer rows.Close() + + type row struct { + CurrentBatchNum uint64 `db:"current_batch_num"` + NextBatchNum uint64 `db:"next_batch_num"` + } + + gaps := make(map[uint64]uint64) + for rows.Next() { + var data row + if err = rows.StructScan(&data); err != nil { + return nil, err + } + + gaps[data.CurrentBatchNum] = data.NextBatchNum + } + + return gaps, nil +} diff --git a/db/db_test.go b/db/db_test.go index 413cdbf5..6a186985 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -726,6 +726,102 @@ func Test_DB_CountOffchainData(t *testing.T) { } } +func Test_DB_DetectOffchainDataGaps(t *testing.T) { + t.Parallel() + + testTable := []struct { + name string + seed []types.OffChainData + gaps map[uint64]uint64 + returnErr error + }{ + { + name: "one gap found", + seed: []types.OffChainData{{ + Key: common.HexToHash("key1"), + Value: []byte("value1"), + BatchNum: 1, + }, { + Key: common.HexToHash("key2"), + Value: []byte("value2"), + BatchNum: 2, + }, { + Key: common.HexToHash("key4"), + Value: []byte("value4"), + BatchNum: 4, + }}, + gaps: map[uint64]uint64{ + 2: 4, + }, + }, + { + name: "error returned", + seed: []types.OffChainData{{ + Key: common.HexToHash("key1"), + Value: []byte("value1"), + }}, + returnErr: errors.New("test error"), + }, + } + + for _, tt := range testTable { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + + defer db.Close() + + wdb := sqlx.NewDb(db, "postgres") + + // Seed data + seedOffchainData(t, wdb, mock, tt.seed) + + expected := mock.ExpectQuery(`WITH numbered_batches AS \( + SELECT + batch_num, + ROW_NUMBER\(\) OVER \(ORDER BY batch_num\) AS row_number + FROM data_node\.offchain_data + \) + SELECT + nb1\.batch_num AS current_batch_num, + nb2\.batch_num AS next_batch_num + FROM + numbered_batches nb1 + LEFT JOIN numbered_batches nb2 ON nb1\.row_number = nb2\.row_number - 1 + WHERE + nb1\.batch_num IS NOT NULL + AND nb2\.batch_num IS NOT NULL + AND nb1\.batch_num \+ 1 <> nb2\.batch_num`) + + if tt.returnErr != nil { + expected.WillReturnError(tt.returnErr) + } else { + rows := sqlmock.NewRows([]string{"current_batch_num", "next_batch_num"}) + for k, v := range tt.gaps { + rows.AddRow(k, v) + } + expected.WillReturnRows(rows) + } + + dbPG := New(wdb) + + actual, err := dbPG.DetectOffchainDataGaps(context.Background()) + if tt.returnErr != nil { + require.ErrorIs(t, err, tt.returnErr) + } else { + require.NoError(t, err) + require.Equal(t, tt.gaps, actual) + } + + require.NoError(t, mock.ExpectationsWereMet()) + }) + } +} + func seedOffchainData(t *testing.T, db *sqlx.DB, mock sqlmock.Sqlmock, od []types.OffChainData) { t.Helper() diff --git a/db/migrations/0005.sql b/db/migrations/0005.sql index 6436ee02..41cbb20d 100644 --- a/db/migrations/0005.sql +++ b/db/migrations/0005.sql @@ -5,5 +5,8 @@ ALTER TABLE data_node.offchain_data DROP COLUMN IF EXISTS batch_num; ALTER TABLE data_node.offchain_data ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; +-- Ensure batch_num is indexed for optimal performance +CREATE INDEX idx_batch_num ON data_node.offchain_data(batch_num); + -- It triggers resync with an updated logic of all batches UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; \ No newline at end of file diff --git a/mocks/db.generated.go b/mocks/db.generated.go index cf708af2..d21533c9 100644 --- a/mocks/db.generated.go +++ b/mocks/db.generated.go @@ -128,6 +128,64 @@ func (_c *DB_DeleteUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Conte return _c } +// DetectOffchainDataGaps provides a mock function with given fields: ctx +func (_m *DB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for DetectOffchainDataGaps") + } + + var r0 map[uint64]uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[uint64]uint64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[uint64]uint64); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[uint64]uint64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DB_DetectOffchainDataGaps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetectOffchainDataGaps' +type DB_DetectOffchainDataGaps_Call struct { + *mock.Call +} + +// DetectOffchainDataGaps is a helper method to define mock.On call +// - ctx context.Context +func (_e *DB_Expecter) DetectOffchainDataGaps(ctx interface{}) *DB_DetectOffchainDataGaps_Call { + return &DB_DetectOffchainDataGaps_Call{Call: _e.mock.On("DetectOffchainDataGaps", ctx)} +} + +func (_c *DB_DetectOffchainDataGaps_Call) Run(run func(ctx context.Context)) *DB_DetectOffchainDataGaps_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *DB_DetectOffchainDataGaps_Call) Return(_a0 map[uint64]uint64, _a1 error) *DB_DetectOffchainDataGaps_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DB_DetectOffchainDataGaps_Call) RunAndReturn(run func(context.Context) (map[uint64]uint64, error)) *DB_DetectOffchainDataGaps_Call { + _c.Call.Return(run) + return _c +} + // GetLastProcessedBlock provides a mock function with given fields: ctx, task func (_m *DB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) { ret := _m.Called(ctx, task) diff --git a/mocks/gaps_detector.generated.go b/mocks/gaps_detector.generated.go new file mode 100644 index 00000000..a3f3f982 --- /dev/null +++ b/mocks/gaps_detector.generated.go @@ -0,0 +1,79 @@ +// Code generated by mockery v2.40.1. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// GapsDetector is an autogenerated mock type for the GapsDetector type +type GapsDetector struct { + mock.Mock +} + +type GapsDetector_Expecter struct { + mock *mock.Mock +} + +func (_m *GapsDetector) EXPECT() *GapsDetector_Expecter { + return &GapsDetector_Expecter{mock: &_m.Mock} +} + +// Gaps provides a mock function with given fields: +func (_m *GapsDetector) Gaps() map[uint64]uint64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Gaps") + } + + var r0 map[uint64]uint64 + if rf, ok := ret.Get(0).(func() map[uint64]uint64); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[uint64]uint64) + } + } + + return r0 +} + +// GapsDetector_Gaps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Gaps' +type GapsDetector_Gaps_Call struct { + *mock.Call +} + +// Gaps is a helper method to define mock.On call +func (_e *GapsDetector_Expecter) Gaps() *GapsDetector_Gaps_Call { + return &GapsDetector_Gaps_Call{Call: _e.mock.On("Gaps")} +} + +func (_c *GapsDetector_Gaps_Call) Run(run func()) *GapsDetector_Gaps_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GapsDetector_Gaps_Call) Return(_a0 map[uint64]uint64) *GapsDetector_Gaps_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GapsDetector_Gaps_Call) RunAndReturn(run func() map[uint64]uint64) *GapsDetector_Gaps_Call { + _c.Call.Return(run) + return _c +} + +// NewGapsDetector creates a new instance of GapsDetector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewGapsDetector(t interface { + mock.TestingT + Cleanup(func()) +}) *GapsDetector { + mock := &GapsDetector{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/services/status/status.go b/services/status/status.go index 664e5c7c..6b3d2a2a 100644 --- a/services/status/status.go +++ b/services/status/status.go @@ -15,17 +15,25 @@ import ( // APISTATUS is the namespace of the status service const APISTATUS = "status" +// GapsDetector is an interface for detecting gaps in the offchain data +type GapsDetector interface { + // Gaps returns a map of gaps in the offchain data + Gaps() map[uint64]uint64 +} + // Endpoints contains implementations for the "status" RPC endpoints type Endpoints struct { - db db.DB - startTime time.Time + db db.DB + startTime time.Time + gapsDetector GapsDetector } // NewEndpoints returns Endpoints -func NewEndpoints(db db.DB) *Endpoints { +func NewEndpoints(db db.DB, gapsDetector GapsDetector) *Endpoints { return &Endpoints{ - db: db, - startTime: time.Now(), + db: db, + startTime: time.Now(), + gapsDetector: gapsDetector, } } @@ -45,9 +53,10 @@ func (s *Endpoints) GetStatus() (interface{}, rpc.Error) { } return types.DACStatus{ - Version: dataavailability.Version, - Uptime: uptime, - KeyCount: rowCount, - BackfillProgress: backfillProgress, + Version: dataavailability.Version, + Uptime: uptime, + KeyCount: rowCount, + BackfillProgress: backfillProgress, + OffchainDataGapsExist: len(s.gapsDetector.Gaps()) > 0, }, nil } diff --git a/services/status/status_test.go b/services/status/status_test.go index d6ccbfd1..cc9b737f 100644 --- a/services/status/status_test.go +++ b/services/status/status_test.go @@ -52,7 +52,11 @@ func TestEndpoints_GetStatus(t *testing.T) { dbMock.On("GetLastProcessedBlock", mock.Anything, mock.Anything). Return(tt.getLastProcessedBlock, tt.getLastProcessedBlockErr) - statusEndpoints := NewEndpoints(dbMock) + gapDetectorMock := mocks.NewGapsDetector(t) + + gapDetectorMock.On("Gaps").Return(map[uint64]uint64{1: 1}) + + statusEndpoints := NewEndpoints(dbMock, gapDetectorMock) actual, err := statusEndpoints.GetStatus() @@ -69,6 +73,7 @@ func TestEndpoints_GetStatus(t *testing.T) { require.Equal(t, "v0.1.0", dacStatus.Version) require.Equal(t, tt.countOffchainData, dacStatus.KeyCount) require.Equal(t, tt.getLastProcessedBlock, dacStatus.BackfillProgress) + require.True(t, dacStatus.OffchainDataGapsExist) } }) } diff --git a/synchronizer/batches.go b/synchronizer/batches.go index cd83a93a..c313ed8d 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -1,6 +1,7 @@ package synchronizer import ( + "bytes" "context" "fmt" "math/rand" @@ -32,19 +33,21 @@ type SequencerTracker interface { // BatchSynchronizer watches for number events, checks if they are // "locally" stored, then retrieves and stores missing data type BatchSynchronizer struct { - client etherman.Etherman - stop chan struct{} - retry time.Duration - rpcTimeout time.Duration - blockBatchSize uint - self common.Address - db db.DB - committee *CommitteeMapSafe - syncLock sync.Mutex - reorgs <-chan BlockReorg - events chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches - sequencer SequencerTracker - rpcClientFactory client.Factory + client etherman.Etherman + stop chan struct{} + retry time.Duration + rpcTimeout time.Duration + blockBatchSize uint + self common.Address + db db.DB + committee *CommitteeMapSafe + syncLock sync.Mutex + reorgs <-chan BlockReorg + events chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches + sequencer SequencerTracker + rpcClientFactory client.Factory + offchainDataGaps map[uint64]uint64 + offchainDataGapsLock sync.RWMutex } // NewBatchSynchronizer creates the BatchSynchronizer @@ -73,6 +76,7 @@ func NewBatchSynchronizer( events: make(chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches), sequencer: sequencer, rpcClientFactory: rpcClientFactory, + offchainDataGaps: make(map[uint64]uint64), } return synchronizer, synchronizer.resolveCommittee() } @@ -101,6 +105,7 @@ func (bs *BatchSynchronizer) Start(ctx context.Context) { go bs.processUnresolvedBatches(ctx) go bs.produceEvents(ctx) go bs.handleReorgs(ctx) + go bs.startOffchainDataGapsDetection(ctx) } // Stop stops the synchronizer @@ -108,6 +113,17 @@ func (bs *BatchSynchronizer) Stop() { close(bs.stop) } +// Gaps returns the offchain data gaps +func (bs *BatchSynchronizer) Gaps() map[uint64]uint64 { + bs.offchainDataGapsLock.RLock() + gaps := make(map[uint64]uint64, len(bs.offchainDataGaps)) + for key, value := range bs.offchainDataGaps { + gaps[key] = value + } + bs.offchainDataGapsLock.RUnlock() + return gaps +} + func (bs *BatchSynchronizer) handleReorgs(ctx context.Context) { log.Info("starting reorgs handler") for { @@ -440,3 +456,49 @@ func (bs *BatchSynchronizer) resolveWithMember( BatchNum: batch.Number, }, nil } + +func (bs *BatchSynchronizer) startOffchainDataGapsDetection(ctx context.Context) { + log.Info("starting handling unresolved batches") + for { + delay := time.NewTimer(time.Minute) + select { + case <-delay.C: + if err := bs.detectOffchainDataGaps(ctx); err != nil { + log.Error(err) + } + case <-bs.stop: + return + } + } +} + +// detectOffchainDataGaps detects offchain data gaps and reports them in logs and the service state. +func (bs *BatchSynchronizer) detectOffchainDataGaps(ctx context.Context) error { + // Detect offchain data gaps + gaps, err := detectOffchainDataGaps(ctx, bs.db) + if err != nil { + return fmt.Errorf("failed to detect offchain data gaps: %v", err) + } + + // No gaps found, all good + if len(gaps) == 0 { + return nil + } + + // Log the detected gaps and store the detected gaps in the service state + gapsRaw := new(bytes.Buffer) + bs.offchainDataGapsLock.Lock() + bs.offchainDataGaps = make(map[uint64]uint64, len(gaps)) + for key, value := range gaps { + bs.offchainDataGaps[key] = value + + if _, err = fmt.Fprintf(gapsRaw, "%d=>%d\n", key, value); err != nil { + log.Errorf("failed to write offchain data gaps: %v", err) + } + } + bs.offchainDataGapsLock.Unlock() + + log.Warnf("detected offchain data gaps (current batch number => expected batch number): %s", gapsRaw.String()) + + return nil +} diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 9f179f72..bbb905a9 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -724,7 +724,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { })*/ } -func TestBatchSyncronizer_HandleReorgs(t *testing.T) { +func TestBatchSynchronizer_HandleReorgs(t *testing.T) { t.Parallel() type testConfig struct { @@ -804,3 +804,74 @@ func TestBatchSyncronizer_HandleReorgs(t *testing.T) { }) }) } + +func TestBatchSynchronizer_detectOffchainDataGaps(t *testing.T) { + t.Parallel() + + type testConfig struct { + // db mock + detectOffchainDataGapsArgs []interface{} + detectOffchainDataGapsReturns []interface{} + + expectedGaps map[uint64]uint64 + isErrorExpected bool + } + + testFn := func(t *testing.T, config testConfig) { + t.Helper() + + dbMock := mocks.NewDB(t) + + if config.detectOffchainDataGapsArgs != nil && config.detectOffchainDataGapsReturns != nil { + dbMock.On("DetectOffchainDataGaps", config.detectOffchainDataGapsArgs...).Return( + config.detectOffchainDataGapsReturns...).Once() + } + + batchSynronizer := &BatchSynchronizer{ + db: dbMock, + } + + err := batchSynronizer.detectOffchainDataGaps(context.Background()) + if config.isErrorExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, config.expectedGaps, batchSynronizer.Gaps()) + } + + dbMock.AssertExpectations(t) + } + + t.Run("no gaps detected", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + detectOffchainDataGapsArgs: []interface{}{mock.Anything}, + detectOffchainDataGapsReturns: []interface{}{map[uint64]uint64{}, nil}, + expectedGaps: map[uint64]uint64{}, + isErrorExpected: false, + }) + }) + + t.Run("one gap detected", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + detectOffchainDataGapsArgs: []interface{}{mock.Anything}, + detectOffchainDataGapsReturns: []interface{}{map[uint64]uint64{1: 3}, nil}, + expectedGaps: map[uint64]uint64{1: 3}, + isErrorExpected: false, + }) + }) + + t.Run("failed to detect gaps", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + detectOffchainDataGapsArgs: []interface{}{mock.Anything}, + detectOffchainDataGapsReturns: []interface{}{nil, errors.New("test error")}, + expectedGaps: map[uint64]uint64{}, + isErrorExpected: true, + }) + }) +} diff --git a/synchronizer/store.go b/synchronizer/store.go index b0be7cfa..3ba53734 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -43,13 +43,6 @@ func setStartBlock(parentCtx context.Context, db dbTypes.DB, block uint64, syncT return db.StoreLastProcessedBlock(ctx, block, string(syncTask)) } -func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) { - ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) - defer cancel() - - return db.ListOffChainData(ctx, keys) -} - func storeUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() @@ -71,9 +64,23 @@ func deleteUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys [] return db.DeleteUnresolvedBatchKeys(ctx, keys) } +func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) + defer cancel() + + return db.ListOffChainData(ctx, keys) +} + func storeOffchainData(parentCtx context.Context, db dbTypes.DB, data []types.OffChainData) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() return db.StoreOffChainData(ctx, data) } + +func detectOffchainDataGaps(parentCtx context.Context, db dbTypes.DB) (map[uint64]uint64, error) { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) + defer cancel() + + return db.DetectOffchainDataGaps(ctx) +} diff --git a/synchronizer/store_test.go b/synchronizer/store_test.go index 7a9315a3..c32091e1 100644 --- a/synchronizer/store_test.go +++ b/synchronizer/store_test.go @@ -14,6 +14,8 @@ import ( ) func Test_getStartBlock(t *testing.T) { + t.Parallel() + testError := errors.New("test error") tests := []struct { @@ -50,7 +52,11 @@ func Test_getStartBlock(t *testing.T) { }, } for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testDB := tt.db(t) if block, err := getStartBlock(context.Background(), testDB, L1SyncTask); tt.wantErr { @@ -64,6 +70,8 @@ func Test_getStartBlock(t *testing.T) { } func Test_setStartBlock(t *testing.T) { + t.Parallel() + testError := errors.New("test error") tests := []struct { @@ -101,7 +109,11 @@ func Test_setStartBlock(t *testing.T) { }, } for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testDB := tt.db(t) if err := setStartBlock(context.Background(), testDB, tt.block, L1SyncTask); tt.wantErr { @@ -114,6 +126,8 @@ func Test_setStartBlock(t *testing.T) { } func Test_storeUnresolvedBatchKeys(t *testing.T) { + t.Parallel() + testError := errors.New("test error") testData := []types.BatchKey{ { @@ -155,7 +169,11 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { }, } for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testDB := tt.db(t) if err := storeUnresolvedBatchKeys(context.Background(), testDB, tt.keys); tt.wantErr { @@ -168,6 +186,8 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { } func Test_getUnresolvedBatchKeys(t *testing.T) { + t.Parallel() + testError := errors.New("test error") testData := []types.BatchKey{ { @@ -209,7 +229,11 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { }, } for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testDB := tt.db(t) if keys, err := getUnresolvedBatchKeys(context.Background(), testDB); tt.wantErr { @@ -223,6 +247,8 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { } func Test_deleteUnresolvedBatchKeys(t *testing.T) { + t.Parallel() + testError := errors.New("test error") testData := []types.BatchKey{ { @@ -263,7 +289,11 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { }, } for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testDB := tt.db(t) if err := deleteUnresolvedBatchKeys(context.Background(), testDB, testData); tt.wantErr { @@ -276,6 +306,8 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { } func Test_storeOffchainData(t *testing.T) { + t.Parallel() + testError := errors.New("test error") testData := []types.OffChainData{ { @@ -317,7 +349,11 @@ func Test_storeOffchainData(t *testing.T) { }, } for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + testDB := tt.db(t) if err := storeOffchainData(context.Background(), testDB, tt.data); tt.wantErr { @@ -328,3 +364,61 @@ func Test_storeOffchainData(t *testing.T) { }) } } + +func Test_detectOffchainDataGaps(t *testing.T) { + t.Parallel() + + testError := errors.New("test error") + + tests := []struct { + name string + db func(t *testing.T) db.DB + gaps map[uint64]uint64 + wantErr bool + }{ + { + name: "DetectOffchainDataGaps returns error", + db: func(t *testing.T) db.DB { + t.Helper() + + mockDB := mocks.NewDB(t) + + mockDB.On("DetectOffchainDataGaps", mock.Anything).Return(nil, testError) + + return mockDB + }, + gaps: nil, + wantErr: true, + }, + { + name: "all good", + db: func(t *testing.T) db.DB { + t.Helper() + + mockDB := mocks.NewDB(t) + + mockDB.On("DetectOffchainDataGaps", mock.Anything).Return(map[uint64]uint64{1: 3}, nil) + + return mockDB + }, + gaps: map[uint64]uint64{1: 3}, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + testDB := tt.db(t) + + if gaps, err := detectOffchainDataGaps(context.Background(), testDB); tt.wantErr { + require.ErrorIs(t, err, testError) + } else { + require.NoError(t, err) + require.Equal(t, tt.gaps, gaps) + } + }) + } +} diff --git a/types/types.go b/types/types.go index 0aa5df98..40829a5a 100644 --- a/types/types.go +++ b/types/types.go @@ -17,10 +17,11 @@ const ( // DACStatus contains DAC status info type DACStatus struct { - Uptime string `json:"uptime"` - Version string `json:"version"` - KeyCount uint64 `json:"key_count"` - BackfillProgress uint64 `json:"backfill_progress"` + Uptime string `json:"uptime"` + Version string `json:"version"` + KeyCount uint64 `json:"key_count"` + BackfillProgress uint64 `json:"backfill_progress"` + OffchainDataGapsExist bool `json:"offchain_data_gaps_exist"` } // BatchKey is the pairing of batch number and data hash of a batch