Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write on database the number of retries per certificate and the certificates in a history table #208

Merged
merged 10 commits into from
Dec 2, 2024
41 changes: 11 additions & 30 deletions .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,17 @@ jobs:

- name: Build Docker
run: make build-docker

# this is better to get the action in
- name: Install kurtosis
shell: bash
run: |
echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list
sudo apt update
sudo apt install kurtosis-cli=1.4.1
kurtosis version

- name: Disable kurtosis analytics
shell: bash
run: kurtosis analytics disable

- name: Install yq
shell: bash
run: |
pip3 install yq
yq --version


- name: Checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
path: kurtosis-cdk
ref: v0.2.24

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk

- name: Install polycli
run: |
POLYCLI_VERSION="${{ vars.POLYCLI_VERSION }}"
Expand All @@ -63,16 +54,6 @@ jobs:
sudo chmod +x /usr/local/bin/polycli
/usr/local/bin/polycli version

- name: Install foundry
uses: foundry-rs/foundry-toolchain@v1

- name: checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
path: "kurtosis-cdk"
ref: "v0.2.21"

- name: Setup Bats and bats libs
uses: bats-core/[email protected]

Expand Down
13 changes: 2 additions & 11 deletions .github/workflows/test-resequence.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
with:
path: cdk

- name: Checkout kurtosis-cdk
- name: Checkout cdk-erigon
uses: actions/checkout@v4
with:
repository: 0xPolygonHermez/cdk-erigon
Expand All @@ -34,21 +34,12 @@ jobs:
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
ref: a7a80b7b5d98a69a23415ab0018e556257a6dfb6
path: kurtosis-cdk
ref: v0.2.24

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1

- name: Install yq
run: |
sudo curl -L https://github.com/mikefarah/yq/releases/download/v4.44.2/yq_linux_amd64 -o /usr/local/bin/yq
sudo chmod +x /usr/local/bin/yq
/usr/local/bin/yq --version

- name: Install polycli
run: |
POLYCLI_VERSION="${{ vars.POLYCLI_VERSION }}"
Expand Down
9 changes: 8 additions & 1 deletion aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func New(
l1InfoTreeSyncer *l1infotreesync.L1InfoTreeSync,
l2Syncer types.L2BridgeSyncer,
epochNotifier types.EpochNotifier) (*AggSender, error) {
storage, err := db.NewAggSenderSQLStorage(logger, cfg.StoragePath)
storageConfig := db.AggSenderSQLStorageConfig{
DBPath: cfg.StoragePath,
KeepCertificatesHistory: cfg.KeepCertificatesHistory,
}
storage, err := db.NewAggSenderSQLStorage(logger, storageConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,12 +157,14 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
return nil, err
}
previousToBlock := uint64(0)
retryCount := 0
if lastSentCertificateInfo != nil {
previousToBlock = lastSentCertificateInfo.ToBlock
if lastSentCertificateInfo.Status == agglayer.InError {
// if the last certificate was in error, we need to resend it
// from the block before the error
previousToBlock = lastSentCertificateInfo.FromBlock - 1
retryCount = lastSentCertificateInfo.RetryCount + 1
}
}

Expand Down Expand Up @@ -216,6 +222,7 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
prevLER := common.BytesToHash(certificate.PrevLocalExitRoot[:])
certInfo := types.CertificateInfo{
Height: certificate.Height,
RetryCount: retryCount,
CertificateID: certificateHash,
NewLocalExitRoot: certificate.NewLocalExitRoot,
PreviousLocalExitRoot: &prevLER,
Expand Down
6 changes: 5 additions & 1 deletion aggsender/aggsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,11 @@ func newAggsenderTestData(t *testing.T, creationFlags testDataFlags) *aggsenderT
pc, _, _, _ := runtime.Caller(1)
part := runtime.FuncForPC(pc)
dbPath := fmt.Sprintf("file:%d?mode=memory&cache=shared", part.Entry())
storage, err = db.NewAggSenderSQLStorage(logger, dbPath)
storageConfig := db.AggSenderSQLStorageConfig{
DBPath: dbPath,
KeepCertificatesHistory: true,
}
storage, err = db.NewAggSenderSQLStorage(logger, storageConfig)
require.NoError(t, err)
}

Expand Down
2 changes: 2 additions & 0 deletions aggsender/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {
// DelayBeetweenRetries is the delay between retries:
// is used on store Certificate and also in initial check
DelayBeetweenRetries types.Duration `mapstructure:"DelayBeetweenRetries"`
// KeepCertificatesHistory is a flag to keep the certificates history on storage
KeepCertificatesHistory bool `mapstructure:"KeepCertificatesHistory"`
joanestebanr marked this conversation as resolved.
Show resolved Hide resolved
}

// String returns a string representation of the Config
Expand Down
56 changes: 40 additions & 16 deletions aggsender/db/aggsender_db_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,33 @@ type AggSenderStorage interface {

var _ AggSenderStorage = (*AggSenderSQLStorage)(nil)

// AggSenderSQLStorageConfig is the configuration for the AggSenderSQLStorage
type AggSenderSQLStorageConfig struct {
DBPath string
KeepCertificatesHistory bool
}

// AggSenderSQLStorage is the struct that implements the AggSenderStorage interface
type AggSenderSQLStorage struct {
logger *log.Logger
db *sql.DB
cfg AggSenderSQLStorageConfig
}

// NewAggSenderSQLStorage creates a new AggSenderSQLStorage
func NewAggSenderSQLStorage(logger *log.Logger, dbPath string) (*AggSenderSQLStorage, error) {
if err := migrations.RunMigrations(dbPath); err != nil {
func NewAggSenderSQLStorage(logger *log.Logger, cfg AggSenderSQLStorageConfig) (*AggSenderSQLStorage, error) {
db, err := db.NewSQLiteDB(cfg.DBPath)
if err != nil {
return nil, err
}

db, err := db.NewSQLiteDB(dbPath)
if err != nil {
if err := migrations.RunMigrations(logger, db); err != nil {
return nil, err
}

return &AggSenderSQLStorage{
db: db,
logger: logger,
cfg: cfg,
}, nil
}

Expand Down Expand Up @@ -93,7 +100,7 @@ func (a *AggSenderSQLStorage) GetCertificateByHeight(height uint64) (*types.Cert
}

// getCertificateByHeight returns a certificate by its height using the provided db
func getCertificateByHeight(db meddler.DB,
func getCertificateByHeight(db db.Querier,
height uint64) (*types.CertificateInfo, error) {
var certificateInfo types.CertificateInfo
if err := meddler.QueryRow(db, &certificateInfo,
Expand All @@ -119,7 +126,7 @@ func (a *AggSenderSQLStorage) GetLastSentCertificate() (*types.CertificateInfo,
func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certificate types.CertificateInfo) error {
tx, err := db.NewTx(ctx, a.db)
if err != nil {
return err
return fmt.Errorf("saveLastSentCertificate NewTx. Err: %w", err)
}
defer func() {
if err != nil {
Expand All @@ -131,14 +138,14 @@ func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certi

cert, err := getCertificateByHeight(tx, certificate.Height)
if err != nil && !errors.Is(err, db.ErrNotFound) {
return err
return fmt.Errorf("saveLastSentCertificate getCertificateByHeight. Err: %w", err)
}

if cert != nil {
// we already have a certificate with this height
// we need to delete it before inserting the new one
if err = deleteCertificate(tx, cert.CertificateID); err != nil {
return err
if err = a.moveCertificateToHistoryOrDelete(tx, cert); err != nil {
return fmt.Errorf("saveLastSentCertificate moveCertificateToHistory Err: %w", err)
}
}

Expand All @@ -147,14 +154,31 @@ func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certi
}

if err = tx.Commit(); err != nil {
return err
return fmt.Errorf("saveLastSentCertificate commit. Err: %w", err)
}

a.logger.Debugf("inserted certificate - Height: %d. Hash: %s", certificate.Height, certificate.CertificateID)

return nil
}

func (a *AggSenderSQLStorage) moveCertificateToHistoryOrDelete(tx db.Querier,
certificate *types.CertificateInfo) error {
if a.cfg.KeepCertificatesHistory {
joanestebanr marked this conversation as resolved.
Show resolved Hide resolved
a.logger.Debugf("moving certificate to history - new CertificateID: %s", certificate.ID())
if _, err := tx.Exec(`INSERT INTO certificate_info_history SELECT * FROM certificate_info WHERE height = $1;`,
certificate.Height); err != nil {
return fmt.Errorf("error moving certificate to history: %w", err)
}
}
a.logger.Debugf("deleting certificate - CertificateID: %s", certificate.ID())
if err := deleteCertificate(tx, certificate.CertificateID); err != nil {
return fmt.Errorf("deleteCertificate %s . Error: %w", certificate.ID(), err)
}

return nil
}

// DeleteCertificate deletes a certificate from the storage
func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificateID common.Hash) error {
tx, err := db.NewTx(ctx, a.db)
Expand All @@ -169,7 +193,7 @@ func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificate
}
}()

if err = deleteCertificate(a.db, certificateID); err != nil {
if err = deleteCertificate(tx, certificateID); err != nil {
return err
}

Expand All @@ -183,8 +207,8 @@ func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificate
}

// deleteCertificate deletes a certificate from the storage using the provided db
func deleteCertificate(db meddler.DB, certificateID common.Hash) error {
if _, err := db.Exec(`DELETE FROM certificate_info WHERE certificate_id = $1;`, certificateID.String()); err != nil {
func deleteCertificate(tx db.Querier, certificateID common.Hash) error {
if _, err := tx.Exec(`DELETE FROM certificate_info WHERE certificate_id = $1;`, certificateID.String()); err != nil {
return fmt.Errorf("error deleting certificate info: %w", err)
}

Expand All @@ -205,8 +229,8 @@ func (a *AggSenderSQLStorage) UpdateCertificate(ctx context.Context, certificate
}
}()

if _, err = tx.Exec(`UPDATE certificate_info SET status = $1 WHERE certificate_id = $2;`,
certificate.Status, certificate.CertificateID.String()); err != nil {
if _, err = tx.Exec(`UPDATE certificate_info SET status = $1, updated_at = $2 WHERE certificate_id = $3;`,
certificate.Status, certificate.UpdatedAt, certificate.CertificateID.String()); err != nil {
return fmt.Errorf("error updating certificate info: %w", err)
}
if err = tx.Commit(); err != nil {
Expand Down
26 changes: 19 additions & 7 deletions aggsender/db/aggsender_db_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/0xPolygon/cdk/agglayer"
"github.com/0xPolygon/cdk/aggsender/db/migrations"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/log"
Expand All @@ -22,9 +21,12 @@ func Test_Storage(t *testing.T) {

path := path.Join(t.TempDir(), "file::memory:?cache=shared")
log.Debugf("sqlite path: %s", path)
require.NoError(t, migrations.RunMigrations(path))
cfg := AggSenderSQLStorageConfig{
DBPath: path,
KeepCertificatesHistory: true,
}

storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), path)
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)

updateTime := time.Now().UTC().UnixMilli()
Expand Down Expand Up @@ -201,6 +203,7 @@ func Test_Storage(t *testing.T) {
// Insert a certificate
certificate := types.CertificateInfo{
Height: 13,
RetryCount: 1234,
CertificateID: common.HexToHash("0xD"),
NewLocalExitRoot: common.HexToHash("0xE"),
FromBlock: 13,
Expand All @@ -213,12 +216,14 @@ func Test_Storage(t *testing.T) {

// Update the status of the certificate
certificate.Status = agglayer.Settled
certificate.UpdatedAt = updateTime + 1
require.NoError(t, storage.UpdateCertificate(ctx, certificate))

// Fetch the certificate and verify the status has been updated
certificateFromDB, err := storage.GetCertificateByHeight(certificate.Height)
require.NoError(t, err)
require.Equal(t, certificate.Status, certificateFromDB.Status)
require.Equal(t, certificate.Status, certificateFromDB.Status, "equal status")
require.Equal(t, certificate.UpdatedAt, certificateFromDB.UpdatedAt, "equal updated at")

require.NoError(t, storage.clean())
})
Expand All @@ -229,9 +234,12 @@ func Test_SaveLastSentCertificate(t *testing.T) {

path := path.Join(t.TempDir(), "file::memory:?cache=shared")
log.Debugf("sqlite path: %s", path)
require.NoError(t, migrations.RunMigrations(path))
cfg := AggSenderSQLStorageConfig{
DBPath: path,
KeepCertificatesHistory: true,
}

storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), path)
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)

updateTime := time.Now().UTC().UnixMilli()
Expand Down Expand Up @@ -372,7 +380,11 @@ func Test_SaveLastSentCertificate(t *testing.T) {
func Test_StoragePreviousLER(t *testing.T) {
ctx := context.TODO()
dbPath := path.Join(t.TempDir(), "Test_StoragePreviousLER.sqlite")
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), dbPath)
cfg := AggSenderSQLStorageConfig{
DBPath: dbPath,
KeepCertificatesHistory: true,
}
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)
require.NotNil(t, storage)

Expand Down
25 changes: 22 additions & 3 deletions aggsender/db/migrations/0001.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
-- +migrate Down
DROP TABLE IF EXISTS certificate_info;
DROP TABLE IF EXISTS certificate_info_history;
DROP TABLE IF EXISTS certificate_info_history;

-- +migrate Up
CREATE TABLE certificate_info (
height INTEGER NOT NULL,
certificate_id VARCHAR NOT NULL PRIMARY KEY,
retry_count INTEGER DEFAULT 0,
certificate_id VARCHAR NOT NULL,
status INTEGER NOT NULL,
previous_local_exit_root VARCHAR,
new_local_exit_root VARCHAR NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
signed_certificate TEXT
);
signed_certificate TEXT,
PRIMARY KEY (height)
);

CREATE TABLE certificate_info_history (
height INTEGER NOT NULL ,
retry_count INTEGER DEFAULT 0,
certificate_id VARCHAR NOT NULL,
status INTEGER NOT NULL,
previous_local_exit_root VARCHAR,
new_local_exit_root VARCHAR NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
signed_certificate TEXT,
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
PRIMARY KEY (height, retry_count)
);
Loading
Loading