Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(avail): Avail potential bug fix #1323

Merged
merged 9 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 97 additions & 22 deletions da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"time"

"github.com/avast/retry-go/v4"
Expand Down Expand Up @@ -119,6 +120,8 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S

// Set defaults
c.pubsubServer = pubsubServer

// TODO: Make configurable
c.txInclusionTimeout = defaultTxInculsionTimeout
c.batchRetryDelay = defaultBatchRetryDelay
c.batchRetryAttempts = defaultBatchRetryAttempts
Expand All @@ -128,6 +131,14 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
apply(c)
}

metrics.RollappConsecutiveFailedDASubmission.Set(0)
return nil
}

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
c.logger.Info("Starting Avail Data Availability Layer Client.")
c.ctx, c.cancel = context.WithCancel(context.Background())
// If client wasn't set, create a new one
if c.client == nil {
substrateApiClient, err := gsrpc.NewSubstrateAPI(c.config.ApiURL)
Expand All @@ -141,15 +152,9 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
}
}

metrics.RollappConsecutiveFailedDASubmission.Set(0)
// check for synced client
go c.sync()

c.ctx, c.cancel = context.WithCancel(context.Background())
return nil
}

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
c.synced <- struct{}{}
return nil
}

Expand Down Expand Up @@ -179,7 +184,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
Error: errors.Join(da.ErrRetrieval, err),
},
}
}
Expand All @@ -189,7 +194,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
Error: errors.Join(da.ErrRetrieval, err),
},
}
}
Expand All @@ -207,24 +212,34 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
err := proto.Unmarshal(data, &pbBatch)
if err != nil {
c.logger.Error("unmarshal batch", "daHeight", daMetaData.Height, "error", err)
continue
break
}
// Convert the proto batch to a batch
batch := &types.Batch{}
err = batch.FromProto(&pbBatch)
if err != nil {
c.logger.Error("batch from proto", "daHeight", daMetaData.Height, "error", err)
continue
break
}
// Add the batch to the list
batches = append(batches, batch)
// Remove the bytes we just decoded.
data = data[proto.Size(&pbBatch):]

}
}
}

// if no batches, return error
if len(batches) == 0 {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "Blob not found",
Error: da.ErrBlobNotFound,
},
}
}

return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Expand Down Expand Up @@ -259,13 +274,7 @@ func (c *DataAvailabilityLayerClient) submitBatchLoop(dataBlob []byte) da.Result
for {
select {
case <-c.ctx.Done():
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: "context done",
Error: c.ctx.Err(),
},
}
return da.ResultSubmitBatch{}
default:
var daBlockHeight uint64
err := retry.Do(
Expand Down Expand Up @@ -381,11 +390,10 @@ func (c *DataAvailabilityLayerClient) broadcastTx(tx []byte) (uint64, error) {
if err != nil {
return 0, fmt.Errorf("%w: %s", da.ErrTxBroadcastNetworkError, err)
}
defer sub.Unsubscribe()

c.logger.Info("Submitted batch to avail. Waiting for inclusion event")

defer sub.Unsubscribe()

inclusionTimer := time.NewTimer(c.txInclusionTimeout)
defer inclusionTimer.Stop()

Expand Down Expand Up @@ -451,3 +459,70 @@ func (d *DataAvailabilityLayerClient) GetMaxBlobSizeBytes() uint32 {
func (c *DataAvailabilityLayerClient) GetSignerBalance() (da.Balance, error) {
return da.Balance{}, nil
}

func (c *DataAvailabilityLayerClient) sync() {
// wrapper to get finalized height and current height from the client
getHeights := func() (uint64, uint64, error) {
finalizedHash, err := c.client.GetFinalizedHead()
if err != nil {
return 0, 0, fmt.Errorf("failed to get finalized head: %w", err)
}

finalizedHeader, err := c.client.GetHeader(finalizedHash)
if err != nil {
return 0, 0, fmt.Errorf("failed to get finalized header: %w", err)
}
finalizedHeight := uint64(finalizedHeader.Number)

currentBlock, err := c.client.GetBlockLatest()
if err != nil {
return 0, 0, fmt.Errorf("failed to get current block: %w", err)
}
currentHeight := uint64(currentBlock.Block.Header.Number)

return finalizedHeight, currentHeight, nil
}

checkSync := func() error {
finalizedHeight, currentHeight, err := getHeights()
if err != nil {
return err
}

// Calculate blocks behind
blocksBehind := uint64(math.Abs(float64(currentHeight - finalizedHeight)))
defaultSyncThreshold := uint64(3)

// Check if within sync threshold
if blocksBehind <= defaultSyncThreshold && currentHeight > 0 {
c.logger.Info("Node is synced",
"current_height", currentHeight,
"finalized_height", finalizedHeight,
"blocks_behind", blocksBehind)
return nil
}

c.logger.Debug("Node is not yet synced",
"current_height", currentHeight,
"finalized_height", finalizedHeight,
"blocks_behind", blocksBehind)

return fmt.Errorf("node not synced: current=%d, finalized=%d, behind=%d",
currentHeight, finalizedHeight, blocksBehind)
}

// Start sync with retry mechanism
err := retry.Do(checkSync,
retry.Attempts(0), // try forever
retry.Context(c.ctx),
retry.Delay(5*time.Second), // TODO: make configurable
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
retry.OnRetry(func(n uint, err error) {
c.logger.Error("sync Avail DA", "attempt", n, "error", err)
}),
)

c.logger.Info("Avail-node sync completed.", "err", err)
c.synced <- struct{}{}
}
14 changes: 12 additions & 2 deletions da/avail/avail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,20 @@ func TestRetrieveBatches(t *testing.T) {
pubsubServer := pubsub.NewServer()
err = pubsubServer.Start()
assert.NoError(err)

// set mocks for sync flow
// Set the mock functions
mockSubstrateApiClient.On("GetFinalizedHead", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil)
mockSubstrateApiClient.On("GetHeader", mock.Anything).Return(&availtypes.Header{Number: 1}, nil)
mockSubstrateApiClient.On("GetBlockLatest", mock.Anything).Return(&availtypes.SignedBlock{Block: availtypes.Block{Header: availtypes.Header{Number: 1}}}, nil)

// Start the DALC
dalc := avail.DataAvailabilityLayerClient{}
err = dalc.Init(configBytes, pubsubServer, nil, testutil.NewLogger(t), options...)
require.NoError(err)
err = dalc.Start()
require.NoError(err)
// Set the mock functions
mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil)

// Build batches for the block extrinsics
batch1 := testutil.MustGenerateBatchAndKey(0, 1)
batch2 := testutil.MustGenerateBatchAndKey(2, 3)
Expand Down Expand Up @@ -86,7 +92,11 @@ func TestRetrieveBatches(t *testing.T) {
},
},
}

// Set the mock functions
mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil)
mockSubstrateApiClient.On("GetBlock", mock.Anything).Return(signedBlock, nil)

// Retrieve the batches and make sure we only get the batches relevant for our app id
daMetaData := &da.DASubmitMetaData{
Height: 1,
Expand Down
4 changes: 0 additions & 4 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {
// Stop stops DataAvailabilityLayerClient.
func (c *DataAvailabilityLayerClient) Stop() error {
c.logger.Info("Stopping Celestia Data Availability Layer Client.")
err := c.pubsubServer.Stop()
if err != nil {
return err
}
c.cancel()
close(c.synced)
return nil
Expand Down
Loading