Skip to content

Commit

Permalink
dispatcher method to StoreBlobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jul 19, 2024
1 parent 59dd16a commit abb41a2
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
69 changes: 69 additions & 0 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,55 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
return sig, nil
}

// SendBlobsToOperator sends blobs to an operator via the node's StoreBlobs endpoint
// It returns the signatures of the blobs sent to the operator in the same order as the blobs
// with nil values for blobs that were not attested by the operator
func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) {
// TODO Add secure Grpc

conn, err := grpc.Dial(
core.OperatorSocket(op.Socket).GetDispersalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err)
return nil, err
}
defer conn.Close()

gc := node.NewDispersalClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader)
if err != nil {
return nil, err
}
c.logger.Debug("sending chunks to operator", "operator", op.Socket, "num blobs", len(blobs), "size", totalSize, "request message size", proto.Size(request), "request serialization time", time.Since(start))
opt := grpc.MaxCallSendMsgSize(60 * 1024 * 1024 * 1024)
reply, err := gc.StoreBlobs(ctx, request, opt)

if err != nil {
return nil, err
}

signaturesInBytes := reply.GetSignatures()
signatures := make([]*core.Signature, len(signaturesInBytes))
for _, sigBytes := range signaturesInBytes {
sig := sigBytes.GetValue()
if sig != nil {
point, err := new(core.Signature).Deserialize(sig)
if err != nil {
return nil, err
}
signatures = append(signatures, &core.Signature{G1Point: point})
} else {
signatures = append(signatures, nil)
}
}
return signatures, nil
}

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
Expand All @@ -167,6 +216,26 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B
return request, totalSize, nil
}

func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreBlobsRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
for i, blob := range blobMessages {
var err error
blobs[i], err = getBlobMessage(blob)
if err != nil {
return nil, 0, err
}
totalSize += int64(blob.Bundles.Size())
}

request := &node.StoreBlobsRequest{
Blobs: blobs,
ReferenceBlockNumber: uint32(batchHeader.ReferenceBlockNumber),
}

return request, totalSize, nil
}

func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
if blob.BlobHeader == nil {
return nil, errors.New("blob header is nil")
Expand Down
1 change: 1 addition & 0 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ type BlobStore interface {

type Dispatcher interface {
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage
SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error)
}

// GenerateReverseIndexKey returns the key used to store the blob key in the reverse index
Expand Down
8 changes: 8 additions & 0 deletions disperser/mock/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,11 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera

return update
}

func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) {
args := d.Called(ctx, blobs, batchHeader, op)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]*core.Signature), args.Error(1)
}

0 comments on commit abb41a2

Please sign in to comment.