diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index c6c83e2a7..b553aefeb 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -21,6 +21,15 @@ type MinibatcherConfig struct { MaxNumRetriesPerDispersal uint } +type BatchState struct { + BatchID uuid.UUID + ReferenceBlockNumber uint + BlobHeaders []*core.BlobHeader + BlobMetadata []*disperser.BlobMetadata + OperatorState *core.IndexedOperatorState + NumMinibatches uint +} + type Minibatcher struct { MinibatcherConfig @@ -33,8 +42,9 @@ type Minibatcher struct { Pool common.WorkerPool // local state + Batches map[uuid.UUID]*BatchState ReferenceBlockNumber uint - BatchID uuid.UUID + CurrentBatchID uuid.UUID MinibatchIndex uint ethClient common.EthClient @@ -48,7 +58,6 @@ func NewMinibatcher( dispatcher disperser.Dispatcher, chainState core.IndexedChainState, assignmentCoordinator core.AssignmentCoordinator, - // aggregator core.SignatureAggregator, encodingStreamer *EncodingStreamer, ethClient common.EthClient, workerpool common.WorkerPool, @@ -64,8 +73,9 @@ func NewMinibatcher( EncodingStreamer: encodingStreamer, Pool: workerpool, + Batches: make(map[uuid.UUID]*BatchState), ReferenceBlockNumber: 0, - BatchID: uuid.Nil, + CurrentBatchID: uuid.Nil, MinibatchIndex: 0, ethClient: ethClient, @@ -104,6 +114,15 @@ func (b *Minibatcher) Start(ctx context.Context) error { return nil } +func (b *Minibatcher) PopBatchState(batchID uuid.UUID) *BatchState { + batchState, ok := b.Batches[batchID] + if !ok { + return nil + } + delete(b.Batches, batchID) + return batchState +} + func (b *Minibatcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { var result *multierror.Error numPermanentFailures := 0 @@ -139,43 +158,51 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error { return err } log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String()) - // Processing new full batch if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber { // Update status of the previous batch - if b.BatchID != uuid.Nil { - err = b.MinibatchStore.UpdateBatchStatus(ctx, b.BatchID, BatchStatusFormed) + if b.CurrentBatchID != uuid.Nil { + err = b.MinibatchStore.UpdateBatchStatus(ctx, b.CurrentBatchID, BatchStatusFormed) if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error updating batch status")) return fmt.Errorf("error updating batch status: %w", err) } } - // Create new batch - b.BatchID, err = uuid.NewV7() + // Reset local batch state and create new batch + b.CurrentBatchID, err = uuid.NewV7() if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID")) return fmt.Errorf("error generating batch ID: %w", err) } - batchHeaderHash, err := batch.BatchHeader.GetBatchHeaderHash() - if err != nil { - _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error getting batch header hash")) - return fmt.Errorf("error getting batch header hash: %w", err) - } b.MinibatchIndex = 0 b.ReferenceBlockNumber = batch.BatchHeader.ReferenceBlockNumber err = b.MinibatchStore.PutBatch(ctx, &BatchRecord{ - ID: b.BatchID, + ID: b.CurrentBatchID, CreatedAt: time.Now().UTC(), ReferenceBlockNumber: b.ReferenceBlockNumber, - HeaderHash: batchHeaderHash, + Status: BatchStatusPending, }) if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error storing batch record")) return fmt.Errorf("error storing batch record: %w", err) } + b.Batches[b.CurrentBatchID] = &BatchState{ + BatchID: b.CurrentBatchID, + ReferenceBlockNumber: b.ReferenceBlockNumber, + BlobHeaders: make([]*core.BlobHeader, 0), + BlobMetadata: make([]*disperser.BlobMetadata, 0), + OperatorState: batch.State, + NumMinibatches: 0, + } } + // Accumulate batch metadata + batchState := b.Batches[b.CurrentBatchID] + batchState.BlobHeaders = append(batchState.BlobHeaders, batch.BlobHeaders...) + batchState.BlobMetadata = append(batchState.BlobMetadata, batch.BlobMetadata...) + batchState.NumMinibatches++ + // Store minibatch record blobHeaderHashes := make([][32]byte, 0, len(batch.EncodedBlobs)) batchSize := int64(0) @@ -189,7 +216,7 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error { batchSize += blob.BlobHeader.EncodedSizeAllQuorums() } err = b.MinibatchStore.PutMinibatch(ctx, &MinibatchRecord{ - BatchID: b.BatchID, + BatchID: b.CurrentBatchID, MinibatchIndex: b.MinibatchIndex, BlobHeaderHashes: blobHeaderHashes, BatchSize: uint64(batchSize), @@ -201,9 +228,9 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error { } // Dispatch encoded batch - log.Debug("Dispatching encoded batch...", "batchID", b.BatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs)) + log.Debug("Dispatching encoded batch...", "batchID", b.CurrentBatchID, "minibatchIndex", b.MinibatchIndex, "referenceBlockNumber", b.ReferenceBlockNumber, "numBlobs", len(batch.EncodedBlobs)) stageTimer = time.Now() - b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.BatchID, b.MinibatchIndex) + b.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.CurrentBatchID, b.MinibatchIndex) log.Debug("DisperseBatch took", "duration", time.Since(stageTimer).String()) h, err := batch.State.OperatorState.Hash() diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index 400b5120c..1f7c33e1f 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -136,47 +136,142 @@ func TestDisperseMinibatch(t *testing.T) { _, _ = queueBlob(t, ctx, &blob1, c.blobStore) _, _ = queueBlob(t, ctx, &blob2, c.blobStore) - // Start the batcher out := make(chan batcher.EncodingResultOrStatus) err := c.encodingStreamer.RequestEncoding(ctx, out) assert.NoError(t, err) - err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + encoded1 := <-out + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded1) assert.NoError(t, err) - err = c.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + encoded2 := <-out + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded2) assert.NoError(t, err) count, _ := c.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, 2, count) err = c.minibatcher.HandleSingleBatch(ctx) assert.NoError(t, err) - assert.NotNil(t, c.minibatcher.BatchID) + assert.NotNil(t, c.minibatcher.CurrentBatchID) assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1)) assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock) + assert.Len(t, c.minibatcher.Batches, 1) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(1)) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock) + assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 2) + assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata}) - b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID) + // Second minibatch + blob3 := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + ConfirmationThreshold: 100, + }}) + _, _ = queueBlob(t, ctx, &blob3, c.blobStore) + err = c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + encoded3 := <-out + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded3) + assert.NoError(t, err) + err = c.minibatcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + assert.NotNil(t, c.minibatcher.CurrentBatchID) + assert.Equal(t, c.minibatcher.MinibatchIndex, uint(2)) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock) + assert.Len(t, c.minibatcher.Batches, 1) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(2)) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock) + assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 3) + assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata, encoded3.BlobMetadata}) + assert.NotNil(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].OperatorState) + + b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID) assert.NoError(t, err) assert.NotNil(t, b) - assert.Equal(t, c.minibatcher.BatchID, b.ID) - assert.NotNil(t, b.HeaderHash) + assert.Equal(t, c.minibatcher.CurrentBatchID, b.ID) assert.NotNil(t, b.CreatedAt) assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) - mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0) + mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 0) assert.NoError(t, err) assert.NotNil(t, mb) - assert.Equal(t, c.minibatcher.BatchID, mb.BatchID) + assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID) assert.Equal(t, uint(0), mb.MinibatchIndex) assert.Len(t, mb.BlobHeaderHashes, 2) assert.Equal(t, uint64(12800), mb.BatchSize) assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber) + mb, err = c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 1) + assert.NoError(t, err) + assert.NotNil(t, mb) + assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID) + assert.Equal(t, uint(1), mb.MinibatchIndex) + assert.Len(t, mb.BlobHeaderHashes, 1) + assert.Equal(t, uint64(7680), mb.BatchSize) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, mb.ReferenceBlockNumber) + + // Create a new minibatch with increased reference block number + // Test that the previous batch is marked as formed and that the new batch is created with the correct reference block number + _, _ = queueBlob(t, ctx, &blob1, c.blobStore) + _, _ = queueBlob(t, ctx, &blob2, c.blobStore) + + err = c.encodingStreamer.UpdateReferenecBlock(initialBlock + 10) + assert.NoError(t, err) + err = c.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + encoded4 := <-out + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded4) + assert.NoError(t, err) + encoded5 := <-out + err = c.encodingStreamer.ProcessEncodedBlobs(ctx, encoded5) + assert.NoError(t, err) + err = c.minibatcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + assert.NotNil(t, c.minibatcher.CurrentBatchID) c.pool.StopWait() - c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) - dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0) + + // previous batch should be marked as formed + b, err = c.minibatchStore.GetBatch(ctx, b.ID) + assert.NoError(t, err) + assert.NotNil(t, b) + assert.Equal(t, b.Status, batcher.BatchStatusFormed) + + // new batch should be created + assert.NotEqual(t, c.minibatcher.CurrentBatchID, b.ID) + assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1)) + assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock+10) + assert.Len(t, c.minibatcher.Batches, 2) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BatchID, c.minibatcher.CurrentBatchID) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].NumMinibatches, uint(1)) + assert.Equal(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].ReferenceBlockNumber, initialBlock+10) + assert.Len(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobHeaders, 2) + assert.ElementsMatch(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].BlobMetadata, []*disperser.BlobMetadata{encoded4.BlobMetadata, encoded5.BlobMetadata}) + assert.NotNil(t, c.minibatcher.Batches[c.minibatcher.CurrentBatchID].OperatorState) + + newBatch, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID) + assert.NoError(t, err) + assert.NotNil(t, newBatch) + assert.Equal(t, newBatch.ReferenceBlockNumber, initialBlock+10) + assert.Equal(t, newBatch.Status, batcher.BatchStatusPending) + + // Test PopBatchState + batchState := c.minibatcher.PopBatchState(b.ID) + assert.NotNil(t, batchState) + assert.Equal(t, batchState.BatchID, b.ID) + assert.Equal(t, batchState.ReferenceBlockNumber, initialBlock) + assert.Equal(t, batchState.NumMinibatches, uint(2)) + assert.Len(t, batchState.BlobHeaders, 3) + assert.ElementsMatch(t, batchState.BlobMetadata, []*disperser.BlobMetadata{encoded1.BlobMetadata, encoded2.BlobMetadata, encoded3.BlobMetadata}) + assert.NotNil(t, batchState.OperatorState) + assert.Len(t, c.minibatcher.Batches, 1) + assert.Nil(t, c.minibatcher.Batches[b.ID]) + + c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 6) + dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, b.ID, 0) assert.NoError(t, err) assert.Len(t, dispersalRequests, 2) opIDs := make([]core.OperatorID, 2) for i, req := range dispersalRequests { - assert.Equal(t, req.BatchID, c.minibatcher.BatchID) + assert.Equal(t, req.BatchID, b.ID) assert.Equal(t, req.MinibatchIndex, uint(0)) assert.Equal(t, req.NumBlobs, uint(2)) assert.NotNil(t, req.Socket) @@ -185,11 +280,11 @@ func TestDisperseMinibatch(t *testing.T) { } assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) - dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0) + dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, b.ID, 0) assert.NoError(t, err) assert.Len(t, dispersalResponses, 2) for _, resp := range dispersalResponses { - assert.Equal(t, resp.BatchID, c.minibatcher.BatchID) + assert.Equal(t, resp.BatchID, b.ID) assert.Equal(t, resp.MinibatchIndex, uint(0)) assert.NotNil(t, resp.RespondedAt) assert.NoError(t, resp.Error) @@ -240,21 +335,21 @@ func TestDisperseMinibatchFailure(t *testing.T) { err = c.minibatcher.HandleSingleBatch(ctx) assert.NoError(t, err) - assert.NotNil(t, c.minibatcher.BatchID) + assert.NotNil(t, c.minibatcher.CurrentBatchID) assert.Equal(t, c.minibatcher.MinibatchIndex, uint(1)) assert.Equal(t, c.minibatcher.ReferenceBlockNumber, initialBlock) - b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.BatchID) + b, err := c.minibatchStore.GetBatch(ctx, c.minibatcher.CurrentBatchID) assert.NoError(t, err) assert.NotNil(t, b) - assert.Equal(t, c.minibatcher.BatchID, b.ID) + assert.Equal(t, c.minibatcher.CurrentBatchID, b.ID) assert.NotNil(t, b.HeaderHash) assert.NotNil(t, b.CreatedAt) assert.Equal(t, c.minibatcher.ReferenceBlockNumber, b.ReferenceBlockNumber) - mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.BatchID, 0) + mb, err := c.minibatchStore.GetMinibatch(ctx, c.minibatcher.CurrentBatchID, 0) assert.NoError(t, err) assert.NotNil(t, mb) - assert.Equal(t, c.minibatcher.BatchID, mb.BatchID) + assert.Equal(t, c.minibatcher.CurrentBatchID, mb.BatchID) assert.Equal(t, uint(0), mb.MinibatchIndex) assert.Len(t, mb.BlobHeaderHashes, 2) assert.Equal(t, uint64(12800), mb.BatchSize) @@ -262,12 +357,12 @@ func TestDisperseMinibatchFailure(t *testing.T) { c.pool.StopWait() c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2) - dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0) + dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.CurrentBatchID, 0) assert.NoError(t, err) assert.Len(t, dispersalRequests, 2) opIDs := make([]core.OperatorID, 2) for i, req := range dispersalRequests { - assert.Equal(t, req.BatchID, c.minibatcher.BatchID) + assert.Equal(t, req.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, req.MinibatchIndex, uint(0)) assert.Equal(t, req.NumBlobs, uint(2)) assert.NotNil(t, req.Socket) @@ -276,11 +371,11 @@ func TestDisperseMinibatchFailure(t *testing.T) { } assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1}) - dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0) + dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.CurrentBatchID, 0) assert.NoError(t, err) assert.Len(t, dispersalResponses, 2) for _, resp := range dispersalResponses { - assert.Equal(t, resp.BatchID, c.minibatcher.BatchID) + assert.Equal(t, resp.BatchID, c.minibatcher.CurrentBatchID) assert.Equal(t, resp.MinibatchIndex, uint(0)) assert.NotNil(t, resp.RespondedAt) assert.NoError(t, resp.Error)