Skip to content

Commit

Permalink
Store backfill_processes status for protection against interruptions
Browse files Browse the repository at this point in the history
  • Loading branch information
bitwiseguy committed Jun 17, 2024
1 parent 9c07467 commit 495fd46
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 51 deletions.
2 changes: 1 addition & 1 deletion api/service/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (a *API) blobSidecarHandler(w http.ResponseWriter, r *http.Request) {
return
}

result, storageErr := a.dataStoreClient.Read(r.Context(), beaconBlockHash)
result, storageErr := a.dataStoreClient.ReadBlob(r.Context(), beaconBlockHash)
if storageErr != nil {
if errors.Is(storageErr, storage.ErrNotFound) {
errUnknownBlock.write(w)
Expand Down
4 changes: 2 additions & 2 deletions api/service/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func TestAPIService(t *testing.T) {
},
}

err := fs.Write(context.Background(), blockOne)
err := fs.WriteBlob(context.Background(), blockOne)
require.NoError(t, err)

err = fs.Write(context.Background(), blockTwo)
err = fs.WriteBlob(context.Background(), blockTwo)
require.NoError(t, err)

beaconClient.Headers["finalized"] = &v1.BeaconBlockHeader{
Expand Down
78 changes: 56 additions & 22 deletions archiver/service/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
}

// The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
err = a.dataStoreClient.Write(ctx, blobData)
err = a.dataStoreClient.WriteBlob(ctx, blobData)

if err != nil {
a.log.Error("failed to write blob", "err", err)
Expand All @@ -135,32 +135,66 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
// If an error is encountered persisting a block, it will retry after waiting for a period of time.
func (a *Archiver) backfillBlobs(ctx context.Context, latest *v1.BeaconBlockHeader) {
current, alreadyExists, err := latest, false, error(nil)

defer func() {
a.log.Info("backfill complete", "endHash", current.Root.String(), "startHash", latest.Root.String())
}()
// Add backfill process that starts at latest slot, then loop through all backfill processes
backfillProcesses, err := a.dataStoreClient.ReadBackfillProcesses(ctx)
if err != nil {
a.log.Crit("failed to read backfill_processes", "err", err)
}
backfillProcesses[common.Hash(latest.Root)] = storage.BackfillProcess{Start: *latest, Current: *latest}
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)

backfillLoop := func(start *v1.BeaconBlockHeader, current *v1.BeaconBlockHeader) {
curr, alreadyExists, err := current, false, error(nil)
count := 0
a.log.Info("backfill process initiated",
"currHash", curr.Root.String(),
"currSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)

defer func() {
a.log.Info("backfill process complete",
"endHash", curr.Root.String(),
"endSlot", curr.Header.Message.Slot,
"startHash", start.Root.String(),
"startSlot", start.Header.Message.Slot,
)
delete(backfillProcesses, common.Hash(start.Root))
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}()

for !alreadyExists {
previous := curr

if common.Hash(curr.Root) == a.cfg.OriginBlock {
a.log.Info("reached origin block", "hash", curr.Root.String())
return
}

for !alreadyExists {
previous := current
curr, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
if err != nil {
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
// Revert back to block we failed to fetch
curr = previous
time.Sleep(backfillErrorRetryInterval)
continue
}

if common.Hash(current.Root) == a.cfg.OriginBlock {
a.log.Info("reached origin block", "hash", current.Root.String())
return
}
if !alreadyExists {
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
}

current, alreadyExists, err = a.persistBlobsForBlockToS3(ctx, previous.Header.Message.ParentRoot.String(), false)
if err != nil {
a.log.Error("failed to persist blobs for block, will retry", "err", err, "hash", previous.Header.Message.ParentRoot.String())
// Revert back to block we failed to fetch
current = previous
time.Sleep(backfillErrorRetryInterval)
continue
count++
if count%10 == 0 {
backfillProcesses[common.Hash(start.Root)] = storage.BackfillProcess{Start: *start, Current: *curr}
a.dataStoreClient.WriteBackfillProcesses(ctx, backfillProcesses)
}
}
}

if !alreadyExists {
a.metrics.RecordProcessedBlock(metrics.BlockSourceBackfill)
}
for _, process := range backfillProcesses {
backfillLoop(&process.Start, &process.Current)
}
}

Expand Down
87 changes: 83 additions & 4 deletions archiver/service/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestArchiver_BackfillToOrigin(t *testing.T) {
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.Write(context.Background(), storage.BlobData{
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.Write(context.Background(), storage.BlobData{
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
Expand All @@ -130,7 +130,7 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
require.NoError(t, err)

// We also have block 1 written to storage
err = fs.Write(context.Background(), storage.BlobData{
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.One,
},
Expand All @@ -156,13 +156,92 @@ func TestArchiver_BackfillToExistingBlock(t *testing.T) {
require.NoError(t, err)
require.True(t, exists)

data, err := fs.Read(context.Background(), blob)
data, err := fs.ReadBlob(context.Background(), blob)
require.NoError(t, err)
require.NotNil(t, data)
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
}
}

func TestArchiver_BackfillFinishOldProcess(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)

// We have the current head, which is block 5 written to storage
err := fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Five,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Five.String()],
},
})
require.NoError(t, err)

// We also have block 3 written to storage
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.Three,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.Three.String()],
},
})
require.NoError(t, err)

// We also have block 1 written to storage
err = fs.WriteBlob(context.Background(), storage.BlobData{
Header: storage.Header{
BeaconBlockHash: blobtest.One,
},
BlobSidecars: storage.BlobSidecars{
Data: beacon.Blobs[blobtest.One.String()],
},
})
require.NoError(t, err)

// We expect to backfill blob 4 first, then 2 in a separate process
expectedBlobs := []common.Hash{blobtest.Four, blobtest.Two}

for _, blob := range expectedBlobs {
exists, err := fs.Exists(context.Background(), blob)
require.NoError(t, err)
require.False(t, exists)
}

actualProcesses, err := svc.dataStoreClient.ReadBackfillProcesses(context.Background())
expectedProcesses := make(storage.BackfillProcesses)
require.NoError(t, err)
require.Equal(t, expectedProcesses, actualProcesses)

expectedProcesses[blobtest.Three] = storage.BackfillProcess{Start: *beacon.Headers[blobtest.Three.String()], Current: *beacon.Headers[blobtest.Three.String()]}
err = svc.dataStoreClient.WriteBackfillProcesses(context.Background(), expectedProcesses)
require.NoError(t, err)

actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
require.NoError(t, err)
require.Equal(t, expectedProcesses, actualProcesses)

svc.backfillBlobs(context.Background(), beacon.Headers[blobtest.Five.String()])

for _, blob := range expectedBlobs {
exists, err := fs.Exists(context.Background(), blob)
require.NoError(t, err)
require.True(t, exists)

data, err := fs.ReadBlob(context.Background(), blob)
require.NoError(t, err)
require.NotNil(t, data)
require.Equal(t, data.BlobSidecars.Data, beacon.Blobs[blob.String()])
}

actualProcesses, err = svc.dataStoreClient.ReadBackfillProcesses(context.Background())
require.NoError(t, err)
svc.log.Info("backfill processes", "processes", actualProcesses)
require.Equal(t, storage.BackfillProcesses{}, actualProcesses)

}

func TestArchiver_LatestStopsAtExistingBlock(t *testing.T) {
beacon := beacontest.NewDefaultStubBeaconClient(t)
svc, fs := setup(t, beacon)
Expand Down
2 changes: 2 additions & 0 deletions common/blobtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (
Three = common.Hash{3}
Four = common.Hash{4}
Five = common.Hash{5}
Six = common.Hash{6}
Seven = common.Hash{7}

StartSlot = uint64(10)
EndSlot = uint64(15)
Expand Down
57 changes: 54 additions & 3 deletions common/storage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@ type FileStorage struct {
}

func NewFileStorage(dir string, l log.Logger) *FileStorage {
return &FileStorage{
storage := &FileStorage{
log: l,
directory: dir,
}

_, err := storage.ReadBackfillProcesses(context.Background())
if err == ErrNotFound {
storage.log.Info("creating empty backfill_processes object")
err = storage.WriteBackfillProcesses(context.Background(), BackfillProcesses{})
if err != nil {
storage.log.Crit("failed to create backfill_processes file")
}
}

return storage
}

func (s *FileStorage) Exists(_ context.Context, hash common.Hash) (bool, error) {
Expand All @@ -33,7 +44,7 @@ func (s *FileStorage) Exists(_ context.Context, hash common.Hash) (bool, error)
return true, nil
}

func (s *FileStorage) Read(_ context.Context, hash common.Hash) (BlobData, error) {
func (s *FileStorage) ReadBlob(_ context.Context, hash common.Hash) (BlobData, error) {
data, err := os.ReadFile(s.fileName(hash))
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -51,7 +62,47 @@ func (s *FileStorage) Read(_ context.Context, hash common.Hash) (BlobData, error
return result, nil
}

func (s *FileStorage) Write(_ context.Context, data BlobData) error {
func (s *FileStorage) ReadBackfillProcesses(ctx context.Context) (BackfillProcesses, error) {
BackfillMu.Lock()
defer BackfillMu.Unlock()

data, err := os.ReadFile(path.Join(s.directory, "backfill_processes"))
if err != nil {
if os.IsNotExist(err) {
return BackfillProcesses{}, ErrNotFound
}

return BackfillProcesses{}, err
}
var result BackfillProcesses
err = json.Unmarshal(data, &result)
if err != nil {
s.log.Warn("error decoding backfill_processes", "err", err)
return BackfillProcesses{}, ErrMarshaling
}
return result, nil
}

func (s *FileStorage) WriteBackfillProcesses(_ context.Context, data BackfillProcesses) error {
BackfillMu.Lock()
defer BackfillMu.Unlock()

b, err := json.Marshal(data)
if err != nil {
s.log.Warn("error encoding backfill_processes", "err", err)
return ErrMarshaling
}
err = os.WriteFile(path.Join(s.directory, "backfill_processes"), b, 0644)
if err != nil {
s.log.Warn("error writing backfill_processes", "err", err)
return err
}

s.log.Info("wrote backfill_processes")
return nil
}

func (s *FileStorage) WriteBlob(_ context.Context, data BlobData) error {
b, err := json.Marshal(data)
if err != nil {
s.log.Warn("error encoding blob", "err", err)
Expand Down
14 changes: 7 additions & 7 deletions common/storage/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func runTestExists(t *testing.T, s DataStore) {
require.NoError(t, err)
require.False(t, exists)

err = s.Write(context.Background(), BlobData{
err = s.WriteBlob(context.Background(), BlobData{
Header: Header{
BeaconBlockHash: id,
},
Expand All @@ -52,19 +52,19 @@ func TestExists(t *testing.T) {
func runTestRead(t *testing.T, s DataStore) {
id := common.Hash{1, 2, 3}

_, err := s.Read(context.Background(), id)
_, err := s.ReadBlob(context.Background(), id)
require.Error(t, err)
require.True(t, errors.Is(err, ErrNotFound))

err = s.Write(context.Background(), BlobData{
err = s.WriteBlob(context.Background(), BlobData{
Header: Header{
BeaconBlockHash: id,
},
BlobSidecars: BlobSidecars{},
})
require.NoError(t, err)

data, err := s.Read(context.Background(), id)
data, err := s.ReadBlob(context.Background(), id)
require.NoError(t, err)
require.Equal(t, id, data.Header.BeaconBlockHash)
}
Expand All @@ -84,14 +84,14 @@ func TestBrokenStorage(t *testing.T) {
// Delete the directory to simulate broken storage
cleanup()

_, err := fs.Read(context.Background(), id)
_, err := fs.ReadBlob(context.Background(), id)
require.Error(t, err)

exists, err := fs.Exists(context.Background(), id)
require.False(t, exists)
require.NoError(t, err) // No error should be returned, as in this test we've just delted the directory

err = fs.Write(context.Background(), BlobData{
err = fs.WriteBlob(context.Background(), BlobData{
Header: Header{
BeaconBlockHash: id,
},
Expand All @@ -109,7 +109,7 @@ func TestReadInvalidData(t *testing.T) {
err := os.WriteFile(fs.fileName(id), []byte("invalid json"), 0644)
require.NoError(t, err)

_, err = fs.Read(context.Background(), id)
_, err = fs.ReadBlob(context.Background(), id)
require.Error(t, err)
require.True(t, errors.Is(err, ErrMarshaling))
}
Loading

0 comments on commit 495fd46

Please sign in to comment.