Skip to content

Commit

Permalink
[batcher] Recover blob state (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Aug 1, 2024
1 parent bc9703c commit 04e5cf3
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
20 changes: 19 additions & 1 deletion disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,26 @@ func NewBatcher(
}, nil
}

func (b *Batcher) RecoverState(ctx context.Context) error {
metas, err := b.Queue.GetBlobMetadataByStatus(ctx, disperser.Dispersing)
if err != nil {
return fmt.Errorf("failed to get blobs in dispersing state: %w", err)
}
for _, meta := range metas {
err = b.Queue.MarkBlobProcessing(ctx, meta.GetBlobKey())
if err != nil {
return fmt.Errorf("failed to mark blob (%s) as processing: %w", meta.GetBlobKey(), err)
}
}
return nil
}

func (b *Batcher) Start(ctx context.Context) error {
err := b.ChainState.Start(ctx)
err := b.RecoverState(ctx)
if err != nil {
return fmt.Errorf("failed to recover state: %w", err)
}
err = b.ChainState.Start(ctx)
if err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,3 +751,59 @@ func TestBlobAttestationFailures2(t *testing.T) {
err = batcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
}

func TestBatcherRecoverState(t *testing.T) {
blob0 := makeTestBlob([]*core.SecurityParam{
{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
},
{
QuorumID: 2,
AdversaryThreshold: 80,
ConfirmationThreshold: 50,
},
})

blob1 := makeTestBlob([]*core.SecurityParam{
{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
},
{
QuorumID: 2,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
},
})

components, batcher, _ := makeBatcher(t)

blobStore := components.blobStore
ctx := context.Background()
_, key1 := queueBlob(t, ctx, &blob0, blobStore)
_, _ = queueBlob(t, ctx, &blob1, blobStore)

err := blobStore.MarkBlobDispersing(ctx, key1)
assert.NoError(t, err)
processingBlobs, err := blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Len(t, processingBlobs, 1)

dispersingBlobs, err := blobStore.GetBlobMetadataByStatus(ctx, disperser.Dispersing)
assert.NoError(t, err)
assert.Len(t, dispersingBlobs, 1)

err = batcher.RecoverState(context.Background())
assert.NoError(t, err)

processingBlobs, err = blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Len(t, processingBlobs, 2)

dispersingBlobs, err = blobStore.GetBlobMetadataByStatus(ctx, disperser.Dispersing)
assert.NoError(t, err)
assert.Len(t, dispersingBlobs, 0)
}

0 comments on commit 04e5cf3

Please sign in to comment.