Skip to content

Commit

Permalink
Merge pull request #6 from base-org/improve-error-handling
Browse files Browse the repository at this point in the history
Add retries to blob fetching
  • Loading branch information
danyalprout authored Feb 19, 2024
2 parents 0fa146d + 6de7b7b commit ec55442
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
12 changes: 10 additions & 2 deletions archiver/service/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import (
"github.com/base-org/blob-archiver/common/storage"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

const liveFetchBlobMaximumRetries = 10
const startupFetchBlobMaximumRetries = 3
const backfillErrorRetryInterval = 5 * time.Second

var ErrAlreadyStopped = errors.New("already stopped")
Expand Down Expand Up @@ -75,7 +78,10 @@ func (a *ArchiverService) Start(ctx context.Context) error {

a.log.Info("Archiver API server started", "address", srv.Addr().String())

currentBlob, _, err := a.persistBlobsForBlockToS3(ctx, "head")
currentBlob, _, err := retry.Do2(ctx, startupFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) {
return a.persistBlobsForBlockToS3(ctx, "head")
})

if err != nil {
a.log.Error("failed to seed archiver with initial block", "err", err)
return err
Expand Down Expand Up @@ -223,7 +229,9 @@ func (a *ArchiverService) processBlocksUntilKnownBlock(ctx context.Context) {
currentBlockId := "head"

for {
current, alreadyExisted, err := a.persistBlobsForBlockToS3(ctx, currentBlockId)
current, alreadyExisted, err := retry.Do2(ctx, liveFetchBlobMaximumRetries, retry.Exponential(), func() (*v1.BeaconBlockHeader, bool, error) {
return a.persistBlobsForBlockToS3(ctx, currentBlockId)
})

if err != nil {
a.log.Error("failed to update live blobs for block", "err", err, "blockId", currentBlockId)
Expand Down
54 changes: 54 additions & 0 deletions archiver/service/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,57 @@ func TestArchiver_LatestStopsAtOrigin(t *testing.T) {
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[hash.String()])
}
}

func TestArchiver_LatestRetriesOnFailure(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)

// 5 is the current head, if three already exists, we should write 5 and 4 and stop at three
fs.WriteOrFail(t, storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Three,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Three.String()],
},
})

fs.CheckNotExistsOrFail(t, blobtest.Five)
fs.CheckNotExistsOrFail(t, blobtest.Four)
fs.CheckExistsOrFail(t, blobtest.Three)

// One failure is retried
fs.WritesFailTimes(1)
svc.processBlocksUntilKnownBlock(context.Background())

fs.CheckExistsOrFail(t, blobtest.Five)
fs.CheckExistsOrFail(t, blobtest.Four)
fs.CheckExistsOrFail(t, blobtest.Three)
}

func TestArchiver_LatestHaltsOnPersistentError(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)

// 5 is the current head, if three already exists, we should write 5 and 4 and stop at three
fs.WriteOrFail(t, storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Three,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Three.String()],
},
})

fs.CheckNotExistsOrFail(t, blobtest.Five)
fs.CheckNotExistsOrFail(t, blobtest.Four)
fs.CheckExistsOrFail(t, blobtest.Three)

// Retries the maximum number of times, then fails and will not write the blobs
fs.WritesFailTimes(liveFetchBlobMaximumRetries + 1)
svc.processBlocksUntilKnownBlock(context.Background())

fs.CheckNotExistsOrFail(t, blobtest.Five)
fs.CheckNotExistsOrFail(t, blobtest.Four)
fs.CheckExistsOrFail(t, blobtest.Three)
}
14 changes: 14 additions & 0 deletions common/storage/storagetest/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type TestFileStorage struct {
*storage.FileStorage
writeFailCount int
}

func NewTestFileStorage(t *testing.T, l log.Logger) *TestFileStorage {
Expand All @@ -21,6 +22,19 @@ func NewTestFileStorage(t *testing.T, l log.Logger) *TestFileStorage {
}
}

func (s *TestFileStorage) WritesFailTimes(times int) {
s.writeFailCount = times
}

func (s *TestFileStorage) Write(_ context.Context, data storage.BlobData) error {
if s.writeFailCount > 0 {
s.writeFailCount--
return storage.ErrStorage
}

return s.FileStorage.Write(context.Background(), data)
}

func (fs *TestFileStorage) CheckExistsOrFail(t *testing.T, hash common.Hash) {
exists, err := fs.Exists(context.Background(), hash)
require.NoError(t, err)
Expand Down

0 comments on commit ec55442

Please sign in to comment.