Skip to content

Commit

Permalink
[minibatcher] Fullbatch state management (#665)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Jul 26, 2024
1 parent 58a2469 commit f36918b
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 35 deletions.
137 changes: 126 additions & 11 deletions disperser/batcher/inmem/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package inmem

import (
"context"
"fmt"
"errors"
"sort"
"sync"

"github.com/Layr-Labs/eigenda/core"
Expand All @@ -11,6 +12,8 @@ import (
"github.com/google/uuid"
)

var BatchNotFound = errors.New("batch not found")

type minibatchStore struct {
// BatchRecords maps batch IDs to batch records
BatchRecords map[uuid.UUID]*batcher.BatchRecord
Expand Down Expand Up @@ -53,11 +56,39 @@ func (m *minibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc

b, ok := m.BatchRecords[batchID]
if !ok {
return nil, fmt.Errorf("batch not found")
return nil, BatchNotFound
}
return b, nil
}

func (m *minibatchStore) GetBatchesByStatus(ctx context.Context, status batcher.BatchStatus) ([]*batcher.BatchRecord, error) {
m.mu.RLock()
defer m.mu.RUnlock()

var batches []*batcher.BatchRecord
for _, b := range m.BatchRecords {
if b.Status == status {
batches = append(batches, b)
}
}
sort.Slice(batches, func(i, j int) bool {
return batches[i].CreatedAt.Before(batches[j].CreatedAt)
})
return batches, nil
}

func (m *minibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status batcher.BatchStatus) error {
m.mu.Lock()
defer m.mu.Unlock()

b, ok := m.BatchRecords[batchID]
if !ok {
return BatchNotFound
}
b.Status = status
return nil
}

func (m *minibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -75,11 +106,30 @@ func (m *minibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi
defer m.mu.RUnlock()

if _, ok := m.MinibatchRecords[batchID]; !ok {
return nil, nil
return nil, BatchNotFound
}
return m.MinibatchRecords[batchID][minibatchIndex], nil
}

func (m *minibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*batcher.MinibatchRecord, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.MinibatchRecords[batchID]; !ok {
return nil, nil
}

res := make([]*batcher.MinibatchRecord, 0, len(m.MinibatchRecords[batchID]))
for _, minibatch := range m.MinibatchRecords[batchID] {
res = append(res, minibatch)
}
sort.Slice(res, func(i, j int) bool {
return res[i].MinibatchIndex < res[j].MinibatchIndex
})

return res, nil
}

func (m *minibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -109,7 +159,7 @@ func (m *minibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U
m.mu.RLock()
defer m.mu.RUnlock()

requests, err := m.GetDispersalRequests(ctx, batchID, minibatchIndex)
requests, err := m.GetMinibatchDispersalRequests(ctx, batchID, minibatchIndex)
if err != nil {
return nil, err
}
Expand All @@ -121,12 +171,12 @@ func (m *minibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U
return nil, nil
}

func (m *minibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) {
func (m *minibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.DispersalRequests[batchID]; !ok {
return nil, nil
return nil, BatchNotFound
}

return m.DispersalRequests[batchID][minibatchIndex], nil
Expand Down Expand Up @@ -161,7 +211,7 @@ func (m *minibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.
m.mu.RLock()
defer m.mu.RUnlock()

responses, err := m.GetDispersalResponses(ctx, batchID, minibatchIndex)
responses, err := m.GetMinibatchDispersalResponses(ctx, batchID, minibatchIndex)
if err != nil {
return nil, err
}
Expand All @@ -173,20 +223,85 @@ func (m *minibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.
return nil, nil
}

func (m *minibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) {
func (m *minibatchStore) GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.DispersalResponses[batchID]; !ok {
return nil, nil
return nil, BatchNotFound
}

return m.DispersalResponses[batchID][minibatchIndex], nil
}

func (m *minibatchStore) GetPendingBatch(ctx context.Context) (*batcher.BatchRecord, error) {
func (m *minibatchStore) GetLatestFormedBatch(ctx context.Context) (batch *batcher.BatchRecord, minibatches []*batcher.MinibatchRecord, err error) {
m.mu.RLock()
defer m.mu.RUnlock()

return nil, nil
batches, err := m.GetBatchesByStatus(ctx, batcher.BatchStatusFormed)
if err != nil {
return nil, nil, err
}
if len(batches) == 0 {
return nil, nil, nil
}

batch = batches[0]
minibatches, err = m.GetMinibatches(ctx, batches[0].ID)
if err != nil {
return nil, nil, err
}

return batch, minibatches, nil
}

func (m *minibatchStore) getDispersals(ctx context.Context, batchID uuid.UUID) ([]*batcher.DispersalRequest, []*batcher.DispersalResponse, error) {
m.mu.RLock()
defer m.mu.RUnlock()

if _, ok := m.DispersalRequests[batchID]; !ok {
return nil, nil, BatchNotFound
}

if _, ok := m.DispersalResponses[batchID]; !ok {
return nil, nil, BatchNotFound
}

requests := make([]*batcher.DispersalRequest, 0)
for _, reqs := range m.DispersalRequests[batchID] {
requests = append(requests, reqs...)
}

responses := make([]*batcher.DispersalResponse, 0)
for _, resp := range m.DispersalResponses[batchID] {
responses = append(responses, resp...)
}

return requests, responses, nil
}

func (m *minibatchStore) BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error) {
dispersed := true
requests, responses, err := m.getDispersals(ctx, batchID)
if err != nil {
return false, err
}

if len(requests) == 0 || len(responses) == 0 {
return false, nil
}

if len(requests) != len(responses) {
m.logger.Info("number of minibatch dispersal requests does not match the number of responses", "batchID", batchID, "numRequests", len(requests), "numResponses", len(responses))
return false, nil
}

for _, resp := range responses {
if resp.RespondedAt.IsZero() {
dispersed = false
m.logger.Info("response pending", "batchID", batchID, "minibatchIndex", resp.MinibatchIndex, "operatorID", resp.OperatorID.Hex())
}
}

return dispersed, nil
}
4 changes: 2 additions & 2 deletions disperser/batcher/inmem/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestPutDispersalRequest(t *testing.T) {
err = s.PutDispersalRequest(ctx, req2)
assert.NoError(t, err)

r, err := s.GetDispersalRequests(ctx, id, minibatchIndex)
r, err := s.GetMinibatchDispersalRequests(ctx, id, minibatchIndex)
assert.NoError(t, err)
assert.Len(t, r, 2)
assert.Equal(t, req1, r[0])
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestPutDispersalResponse(t *testing.T) {
err = s.PutDispersalResponse(ctx, resp2)
assert.NoError(t, err)

r, err := s.GetDispersalResponses(ctx, id, minibatchIndex)
r, err := s.GetMinibatchDispersalResponses(ctx, id, minibatchIndex)
assert.NoError(t, err)
assert.Len(t, r, 2)

Expand Down
34 changes: 31 additions & 3 deletions disperser/batcher/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,30 @@ import (
"github.com/google/uuid"
)

type BatchStatus uint

// Pending: the batch has been created and minibatches are being added. There can be only one pending batch at a time.
// Formed: the batch has been formed and no more minibatches can be added. Implies that all minibatch records and dispersal request records have been created.
//
// Attested: the batch has been attested.
// Failed: the batch has failed.
//
// The batch lifecycle is as follows:
// Pending -> Formed -> Attested
// \ /
// \-> Failed <-/
const (
BatchStatusPending BatchStatus = iota
BatchStatusFormed
BatchStatusAttested
BatchStatusFailed
)

type BatchRecord struct {
ID uuid.UUID
CreatedAt time.Time
ReferenceBlockNumber uint
Status BatchStatus
HeaderHash [32]byte
AggregatePubKey *core.G2Point
AggregateSignature *core.Signature
Expand Down Expand Up @@ -46,13 +66,21 @@ type DispersalResponse struct {
type MinibatchStore interface {
PutBatch(ctx context.Context, batch *BatchRecord) error
GetBatch(ctx context.Context, batchID uuid.UUID) (*BatchRecord, error)
GetBatchesByStatus(ctx context.Context, status BatchStatus) ([]*BatchRecord, error)
UpdateBatchStatus(ctx context.Context, batchID uuid.UUID, status BatchStatus) error
PutMinibatch(ctx context.Context, minibatch *MinibatchRecord) error
GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*MinibatchRecord, error)
GetMinibatches(ctx context.Context, batchID uuid.UUID) ([]*MinibatchRecord, error)
PutDispersalRequest(ctx context.Context, request *DispersalRequest) error
GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalRequest, error)
GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalRequest, error)
GetMinibatchDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalRequest, error)
PutDispersalResponse(ctx context.Context, response *DispersalResponse) error
GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*DispersalResponse, error)
GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error)
GetPendingBatch(ctx context.Context) (*BatchRecord, error)
GetMinibatchDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*DispersalResponse, error)

// GetLatestFormedBatch returns the latest batch that has been formed.
// If there is no formed batch, it returns nil.
// It also returns the minibatches that belong to the batch in the ascending order of minibatch index.
GetLatestFormedBatch(ctx context.Context) (batch *BatchRecord, minibatches []*MinibatchRecord, err error)
BatchDispersed(ctx context.Context, batchID uuid.UUID) (bool, error)
}
45 changes: 30 additions & 15 deletions disperser/batcher/minibatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ func NewMinibatcher(
AssignmentCoordinator: assignmentCoordinator,
EncodingStreamer: encodingStreamer,
Pool: workerpool,
ReferenceBlockNumber: 0,
MinibatchIndex: 0,

ReferenceBlockNumber: 0,
BatchID: uuid.Nil,
MinibatchIndex: 0,

ethClient: ethClient,
logger: logger.With("component", "Minibatcher"),
Expand Down Expand Up @@ -138,7 +140,18 @@ func (b *Minibatcher) HandleSingleBatch(ctx context.Context) error {
}
log.Debug("CreateMinibatch took", "duration", time.Since(stageTimer).String())

// Processing new full batch
if b.ReferenceBlockNumber < batch.BatchHeader.ReferenceBlockNumber {
// Update status of the previous batch
if b.BatchID != uuid.Nil {
err = b.MinibatchStore.UpdateBatchStatus(ctx, b.BatchID, BatchStatusFormed)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error updating batch status"))
return fmt.Errorf("error updating batch status: %w", err)
}
}

// Create new batch
b.BatchID, err = uuid.NewV7()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailReason("error generating batch UUID"))
Expand Down Expand Up @@ -212,22 +225,24 @@ func (b *Minibatcher) DisperseBatch(ctx context.Context, state *core.IndexedOper
for id, op := range state.IndexedOperators {
opInfo := op
opID := id
req := &DispersalRequest{
BatchID: batchID,
MinibatchIndex: minibatchIndex,
OperatorID: opID,
Socket: op.Socket,
NumBlobs: uint(len(blobs)),
RequestedAt: time.Now().UTC(),
}
err := b.MinibatchStore.PutDispersalRequest(ctx, req)
if err != nil {
b.logger.Error("failed to put dispersal request", "err", err)
continue
}
b.Pool.Submit(func() {
req := &DispersalRequest{
BatchID: batchID,
MinibatchIndex: minibatchIndex,
OperatorID: opID,
Socket: op.Socket,
NumBlobs: uint(len(blobs)),
RequestedAt: time.Now().UTC(),
}
err := b.MinibatchStore.PutDispersalRequest(ctx, req)
signatures, err := b.SendBlobsToOperatorWithRetries(ctx, blobs, batchHeader, opInfo, opID, int(b.MaxNumRetriesPerDispersal))
if err != nil {
b.logger.Error("failed to put dispersal request", "err", err)
return
b.logger.Errorf("failed to send blobs to operator %s: %v", opID.Hex(), err)
}
signatures, err := b.SendBlobsToOperatorWithRetries(ctx, blobs, batchHeader, opInfo, opID, int(b.MaxNumRetriesPerDispersal))

// Update the minibatch state
err = b.MinibatchStore.PutDispersalResponse(ctx, &DispersalResponse{
DispersalRequest: *req,
Expand Down
8 changes: 4 additions & 4 deletions disperser/batcher/minibatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestDisperseMinibatch(t *testing.T) {

c.pool.StopWait()
c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2)
dispersalRequests, err := c.minibatchStore.GetDispersalRequests(ctx, c.minibatcher.BatchID, 0)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalRequests, 2)
opIDs := make([]core.OperatorID, 2)
Expand All @@ -185,7 +185,7 @@ func TestDisperseMinibatch(t *testing.T) {
}
assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1})

dispersalResponses, err := c.minibatchStore.GetDispersalResponses(ctx, c.minibatcher.BatchID, 0)
dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalResponses, 2)
for _, resp := range dispersalResponses {
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestDisperseMinibatchFailure(t *testing.T) {

c.pool.StopWait()
c.dispatcher.AssertNumberOfCalls(t, "SendBlobsToOperator", 2)
dispersalRequests, err := c.minibatchStore.GetDispersalRequests(ctx, c.minibatcher.BatchID, 0)
dispersalRequests, err := c.minibatchStore.GetMinibatchDispersalRequests(ctx, c.minibatcher.BatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalRequests, 2)
opIDs := make([]core.OperatorID, 2)
Expand All @@ -276,7 +276,7 @@ func TestDisperseMinibatchFailure(t *testing.T) {
}
assert.ElementsMatch(t, opIDs, []core.OperatorID{opId0, opId1})

dispersalResponses, err := c.minibatchStore.GetDispersalResponses(ctx, c.minibatcher.BatchID, 0)
dispersalResponses, err := c.minibatchStore.GetMinibatchDispersalResponses(ctx, c.minibatcher.BatchID, 0)
assert.NoError(t, err)
assert.Len(t, dispersalResponses, 2)
for _, resp := range dispersalResponses {
Expand Down

0 comments on commit f36918b

Please sign in to comment.