diff --git a/archiver/service/archiver.go b/archiver/service/archiver.go index efbc35e..1ad6c1b 100644 --- a/archiver/service/archiver.go +++ b/archiver/service/archiver.go @@ -15,10 +15,13 @@ import ( "github.com/base-org/blob-archiver/common/storage" "github.com/ethereum-optimism/optimism/op-service/httputil" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) +const liveFetchBlobMaximumRetries = 10 +const startupFetchBlobMaximumRetries = 3 const backfillErrorRetryInterval = 5 * time.Second var ErrAlreadyStopped = errors.New("already stopped") @@ -75,7 +78,10 @@ func (a *ArchiverService) Start(ctx context.Context) error { a.log.Info("Archiver API server started", "address", srv.Addr().String()) - currentBlob, _, err := a.persistBlobsForBlockToS3(ctx, "head") + currentBlob, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { + return a.persistBlobsForBlockToS3(ctx, "head") + }) + if err != nil { a.log.Error("failed to seed archiver with initial block", "err", err) return err @@ -223,7 +229,9 @@ func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) { currentBlockId := "head" for { - current, alreadyExisted, err := a.persistBlobsForBlockToS3(ctx, currentBlockId) + current, alreadyExisted, err := retry.Do2(ctx, liveFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) { + return a.persistBlobsForBlockToS3(ctx, currentBlockId) + }) if err != nil { a.log.Error("failed to update live blobs for block", "err", err, "blockId", currentBlockId) diff --git a/archiver/service/archiver_test.go b/archiver/service/archiver_test.go index c334bd3..25a537b 100644 --- a/archiver/service/archiver_test.go +++ b/archiver/service/archiver_test.go @@ -249,3 +249,57 @@ func TestArchiver_LatestStopsAtOrigin(t *testing.T) { require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[hash.String()]) } } + +func TestArchiver_LatestRetriesOnFailure(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, fs := setup(t, beacon) + + // 5 is the current head, if three already exists, we should write 5 and 4 and stop at three + fs.WriteOrFail(t, storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Three, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Three.String()], + }, + }) + + fs.CheckNotExistsOrFail(t, blobtest.Five) + fs.CheckNotExistsOrFail(t, blobtest.Four) + fs.CheckExistsOrFail(t, blobtest.Three) + + // One failure is retried + fs.WritesFailTimes(1) + svc.processBlocksUntilKnownBlock(context.Background()) + + fs.CheckExistsOrFail(t, blobtest.Five) + fs.CheckExistsOrFail(t, blobtest.Four) + fs.CheckExistsOrFail(t, blobtest.Three) +} + +func TestArchiver_LatestHaltsOnPersistentError(t *testing.T) { + beacon := beacontest.NewDefaultStubBeaconClient(t) + svc, fs := setup(t, beacon) + + // 5 is the current head, if three already exists, we should write 5 and 4 and stop at three + fs.WriteOrFail(t, storage.BlobData{ + Header: storage.Header{ + BeaconBlockHash: blobtest.Three, + }, + BlobSidecars: storage.BlobSidecars{ + Data: beacon.Blobs[blobtest.Three.String()], + }, + }) + + fs.CheckNotExistsOrFail(t, blobtest.Five) + fs.CheckNotExistsOrFail(t, blobtest.Four) + fs.CheckExistsOrFail(t, blobtest.Three) + + // Retries the maximum number of times, then fails and will not write the blobs + fs.WritesFailTimes(liveFetchBlobMaximumRetries + 1) + svc.processBlocksUntilKnownBlock(context.Background()) + + fs.CheckNotExistsOrFail(t, blobtest.Five) + fs.CheckNotExistsOrFail(t, blobtest.Four) + fs.CheckExistsOrFail(t, blobtest.Three) +} diff --git a/common/storage/storagetest/stub.go b/common/storage/storagetest/stub.go index ebef425..0389e60 100644 --- a/common/storage/storagetest/stub.go +++ b/common/storage/storagetest/stub.go @@ -12,6 +12,7 @@ import ( type TestFileStorage struct { *storage.FileStorage + writeFailCount int } func NewTestFileStorage(t *testing.T, l log.Logger) *TestFileStorage { @@ -21,6 +22,19 @@ func NewTestFileStorage(t *testing.T, l log.Logger) *TestFileStorage { } } +func (s *TestFileStorage) WritesFailTimes(times int) { + s.writeFailCount = times +} + +func (s *TestFileStorage) Write(_ context.Context, data storage.BlobData) error { + if s.writeFailCount > 0 { + s.writeFailCount-- + return storage.ErrStorage + } + + return s.FileStorage.Write(context.Background(), data) +} + func (fs *TestFileStorage) CheckExistsOrFail(t *testing.T, hash common.Hash) { exists, err := fs.Exists(context.Background(), hash) require.NoError(t, err)