Skip to content

Commit

Permalink
Reporting offchain data gap detection (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman authored Jul 16, 2024
1 parent 7833b6e commit d1534fc
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 81 deletions.
6 changes: 6 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ packages:
SequencerTracker:
config:
filename: sequencer_tracker.generated.go
github.com/0xPolygon/cdk-data-availability/services/status:
config:
interfaces:
GapsDetector:
config:
filename: gaps_detector.generated.go
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func start(cliCtx *cli.Context) error {
[]rpc.Service{
{
Name: status.APISTATUS,
Service: status.NewEndpoints(storage),
Service: status.NewEndpoints(storage, batchSynchronizer),
},
{
Name: sync.APISYNC,
Expand Down
143 changes: 98 additions & 45 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,75 @@ import (
"github.com/jmoiron/sqlx"
)

const (
// storeLastProcessedBlockSQL is a query that stores the last processed block for a given task
storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();`

// getLastProcessedBlockSQL is a query that returns the last processed block for a given task
getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;`

// storeUnresolvedBatchesSQL is a query that stores unresolved batches in the database
storeUnresolvedBatchesSQL = `
INSERT INTO data_node.unresolved_batches (num, hash)
VALUES ($1, $2)
ON CONFLICT (num, hash) DO NOTHING;
`

// getUnresolvedBatchKeysSQL is a query that returns the unresolved batch keys from the database
getUnresolvedBatchKeysSQL = `SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;`

// deleteUnresolvedBatchKeysSQL is a query that deletes the unresolved batch keys from the database
deleteUnresolvedBatchKeysSQL = `DELETE FROM data_node.unresolved_batches WHERE num = $1 AND hash = $2;`

// storeOffChainDataSQL is a query that stores offchain data in the database
storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value, batch_num)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num;
`

// getOffchainDataSQL is a query that returns the offchain data for a given key
getOffchainDataSQL = `
SELECT key, value, batch_num
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`

// listOffchainDataSQL is a query that returns the offchain data for a given list of keys
listOffchainDataSQL = `
SELECT key, value, batch_num
FROM data_node.offchain_data
WHERE key IN (?);
`

// countOffchainDataSQL is a query that returns the count of rows in the offchain_data table
countOffchainDataSQL = "SELECT COUNT(*) FROM data_node.offchain_data;"

// selectOffchainDataGapsSQL is a query that returns the gaps in the offchain_data table
selectOffchainDataGapsSQL = `
WITH numbered_batches AS (
SELECT
batch_num,
ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number
FROM data_node.offchain_data
)
SELECT
nb1.batch_num AS current_batch_num,
nb2.batch_num AS next_batch_num
FROM
numbered_batches nb1
LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1
WHERE
nb1.batch_num IS NOT NULL
AND nb2.batch_num IS NOT NULL
AND nb1.batch_num + 1 <> nb2.batch_num;`
)

var (
// ErrStateNotSynchronized indicates the state database may be empty
ErrStateNotSynchronized = errors.New("state not synchronized")
Expand All @@ -28,8 +97,8 @@ type DB interface {
GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error)
ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error)
StoreOffChainData(ctx context.Context, od []types.OffChainData) error

CountOffchainData(ctx context.Context) (uint64, error)
DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error)
}

// DB is the database layer of the data node
Expand All @@ -46,13 +115,6 @@ func New(pg *sqlx.DB) DB {

// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, block uint64, task string) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
`

if _, err := db.pg.ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
}
Expand All @@ -62,8 +124,6 @@ func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, block uint64, task

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"

var (
lastBlock uint64
)
Expand All @@ -77,12 +137,6 @@ func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64,

// StoreUnresolvedBatchKeys stores unresolved batch keys in the database
func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
const storeUnresolvedBatchesSQL = `
INSERT INTO data_node.unresolved_batches (num, hash)
VALUES ($1, $2)
ON CONFLICT (num, hash) DO NOTHING;
`

tx, err := db.pg.BeginTxx(ctx, nil)
if err != nil {
return err
Expand All @@ -107,8 +161,6 @@ func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchK

// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) {
const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;"

rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL, limit)
if err != nil {
return nil, err
Expand Down Expand Up @@ -137,11 +189,6 @@ func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types

// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database
func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
const deleteUnresolvedBatchKeysSQL = `
DELETE FROM data_node.unresolved_batches
WHERE num = $1 AND hash = $2;
`

tx, err := db.pg.BeginTxx(ctx, nil)
if err != nil {
return err
Expand All @@ -166,13 +213,6 @@ func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.Batch

// StoreOffChainData stores and array of key values in the Db
func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData) error {
const storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value, batch_num)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num;
`

tx, err := db.pg.BeginTxx(ctx, nil)
if err != nil {
return err
Expand All @@ -198,12 +238,6 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData)

// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) {
const getOffchainDataSQL = `
SELECT key, value, batch_num
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`

data := struct {
Key string `db:"key"`
Value string `db:"value"`
Expand Down Expand Up @@ -231,12 +265,6 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ
return nil, nil
}

const listOffchainDataSQL = `
SELECT key, value, batch_num
FROM data_node.offchain_data
WHERE key IN (?);
`

preparedKeys := make([]string, len(keys))
for i, key := range keys {
preparedKeys[i] = key.Hex()
Expand Down Expand Up @@ -280,12 +308,37 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ

// CountOffchainData returns the count of rows in the offchain_data table
func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) {
const countQuery = "SELECT COUNT(*) FROM data_node.offchain_data;"

var count uint64
if err := db.pg.QueryRowContext(ctx, countQuery).Scan(&count); err != nil {
if err := db.pg.QueryRowContext(ctx, countOffchainDataSQL).Scan(&count); err != nil {
return 0, err
}

return count, nil
}

// DetectOffchainDataGaps returns the number of gaps in the offchain_data table
func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) {
rows, err := db.pg.QueryxContext(ctx, selectOffchainDataGapsSQL)
if err != nil {
return nil, err
}

defer rows.Close()

type row struct {
CurrentBatchNum uint64 `db:"current_batch_num"`
NextBatchNum uint64 `db:"next_batch_num"`
}

gaps := make(map[uint64]uint64)
for rows.Next() {
var data row
if err = rows.StructScan(&data); err != nil {
return nil, err
}

gaps[data.CurrentBatchNum] = data.NextBatchNum
}

return gaps, nil
}
96 changes: 96 additions & 0 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,102 @@ func Test_DB_CountOffchainData(t *testing.T) {
}
}

func Test_DB_DetectOffchainDataGaps(t *testing.T) {
t.Parallel()

testTable := []struct {
name string
seed []types.OffChainData
gaps map[uint64]uint64
returnErr error
}{
{
name: "one gap found",
seed: []types.OffChainData{{
Key: common.HexToHash("key1"),
Value: []byte("value1"),
BatchNum: 1,
}, {
Key: common.HexToHash("key2"),
Value: []byte("value2"),
BatchNum: 2,
}, {
Key: common.HexToHash("key4"),
Value: []byte("value4"),
BatchNum: 4,
}},
gaps: map[uint64]uint64{
2: 4,
},
},
{
name: "error returned",
seed: []types.OffChainData{{
Key: common.HexToHash("key1"),
Value: []byte("value1"),
}},
returnErr: errors.New("test error"),
},
}

for _, tt := range testTable {
tt := tt

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)

defer db.Close()

wdb := sqlx.NewDb(db, "postgres")

// Seed data
seedOffchainData(t, wdb, mock, tt.seed)

expected := mock.ExpectQuery(`WITH numbered_batches AS \(
SELECT
batch_num,
ROW_NUMBER\(\) OVER \(ORDER BY batch_num\) AS row_number
FROM data_node\.offchain_data
\)
SELECT
nb1\.batch_num AS current_batch_num,
nb2\.batch_num AS next_batch_num
FROM
numbered_batches nb1
LEFT JOIN numbered_batches nb2 ON nb1\.row_number = nb2\.row_number - 1
WHERE
nb1\.batch_num IS NOT NULL
AND nb2\.batch_num IS NOT NULL
AND nb1\.batch_num \+ 1 <> nb2\.batch_num`)

if tt.returnErr != nil {
expected.WillReturnError(tt.returnErr)
} else {
rows := sqlmock.NewRows([]string{"current_batch_num", "next_batch_num"})
for k, v := range tt.gaps {
rows.AddRow(k, v)
}
expected.WillReturnRows(rows)
}

dbPG := New(wdb)

actual, err := dbPG.DetectOffchainDataGaps(context.Background())
if tt.returnErr != nil {
require.ErrorIs(t, err, tt.returnErr)
} else {
require.NoError(t, err)
require.Equal(t, tt.gaps, actual)
}

require.NoError(t, mock.ExpectationsWereMet())
})
}
}

func seedOffchainData(t *testing.T, db *sqlx.DB, mock sqlmock.Sqlmock, od []types.OffChainData) {
t.Helper()

Expand Down
3 changes: 3 additions & 0 deletions db/migrations/0005.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ ALTER TABLE data_node.offchain_data DROP COLUMN IF EXISTS batch_num;
ALTER TABLE data_node.offchain_data
ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0;

-- Ensure batch_num is indexed for optimal performance
CREATE INDEX idx_batch_num ON data_node.offchain_data(batch_num);

-- It triggers resync with an updated logic of all batches
UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1';
Loading

0 comments on commit d1534fc

Please sign in to comment.