Skip to content

Commit

Permalink
[node] AttestBatch endpoint (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Aug 15, 2024
1 parent b53aee8 commit 12f1a6f
Show file tree
Hide file tree
Showing 14 changed files with 921 additions and 526 deletions.
20 changes: 10 additions & 10 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"errors"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/node"
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
node_utils "github.com/Layr-Labs/eigenda/node/grpc"
"github.com/Layr-Labs/eigenda/node"
"github.com/wealdtech/go-merkletree/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -50,11 +50,11 @@ func (c client) GetBlobHeader(
}
defer conn.Close()

n := node.NewRetrievalClient(conn)
n := grpcnode.NewRetrievalClient(conn)
nodeCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

request := &node.GetBlobHeaderRequest{
request := &grpcnode.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: blobIndex,
}
Expand All @@ -64,7 +64,7 @@ func (c client) GetBlobHeader(
return nil, nil, err
}

blobHeader, err := node_utils.GetBlobHeaderFromProto(reply.GetBlobHeader())
blobHeader, err := node.GetBlobHeaderFromProto(reply.GetBlobHeader())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -99,11 +99,11 @@ func (c client) GetChunks(
return
}

n := node.NewRetrievalClient(conn)
n := grpcnode.NewRetrievalClient(conn)
nodeCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

request := &node.RetrieveChunksRequest{
request := &grpcnode.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: blobIndex,
QuorumId: uint32(quorumID),
Expand All @@ -123,11 +123,11 @@ func (c client) GetChunks(
for i, data := range reply.GetChunks() {
var chunk *encoding.Frame
switch reply.GetChunkEncodingFormat() {
case node.ChunkEncodingFormat_GNARK:
case grpcnode.ChunkEncodingFormat_GNARK:
chunk, err = new(encoding.Frame).DeserializeGnark(data)
case node.ChunkEncodingFormat_GOB:
case grpcnode.ChunkEncodingFormat_GOB:
chunk, err = new(encoding.Frame).Deserialize(data)
case node.ChunkEncodingFormat_UNKNOWN:
case grpcnode.ChunkEncodingFormat_UNKNOWN:
// For backward compatibility, we fallback the UNKNOWN to GOB
chunk, err = new(encoding.Frame).Deserialize(data)
if err != nil {
Expand Down
17 changes: 7 additions & 10 deletions core/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func (v *shardValidator) UpdateOperatorID(operatorID OperatorID) {
}

func (v *shardValidator) ValidateBatch(batchHeader *BatchHeader, blobs []*BlobMessage, operatorState *OperatorState, pool common.WorkerPool) error {
err := validateBatchHeaderRoot(batchHeader, blobs)
headers := make([]*BlobHeader, len(blobs))
for i, blob := range blobs {
headers[i] = blob.BlobHeader
}
err := ValidateBatchHeaderRoot(batchHeader, headers)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,18 +211,11 @@ func (v *shardValidator) VerifyBlobLengthWorker(blobCommitments encoding.BlobCom
out <- nil
}

func validateBatchHeaderRoot(batchHeader *BatchHeader, blobs []*BlobMessage) error {
func ValidateBatchHeaderRoot(batchHeader *BatchHeader, blobHeaders []*BlobHeader) error {
// Check the batch header root

headers := make([]*BlobHeader, len(blobs))

for i, blob := range blobs {
headers[i] = blob.BlobHeader
}

derivedHeader := &BatchHeader{}

_, err := derivedHeader.SetBatchRoot(headers)
_, err := derivedHeader.SetBatchRoot(blobHeaders)
if err != nil {
return fmt.Errorf("failed to compute batch header root: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/fxamacker/cbor/v2 v2.5.0
github.com/gin-contrib/logger v0.2.6
github.com/gin-gonic/gin v1.9.1
github.com/golang/protobuf v1.5.4
github.com/hashicorp/go-multierror v1.1.1
github.com/joho/godotenv v1.5.1
github.com/onsi/ginkgo/v2 v2.11.0
Expand Down Expand Up @@ -99,7 +100,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-bexpr v0.1.10 // indirect
Expand Down
112 changes: 105 additions & 7 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"reflect"
"runtime"
"sync"
"time"
Expand All @@ -21,6 +22,8 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/shirou/gopsutil/mem"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"

_ "go.uber.org/automaxprocs"

Expand Down Expand Up @@ -155,12 +158,12 @@ func (s *Server) handleStoreChunksRequest(ctx context.Context, in *pb.StoreChunk
start := time.Now()

// Get batch header hash
batchHeader, err := GetBatchHeader(in)
batchHeader, err := node.GetBatchHeader(in.GetBatchHeader())
if err != nil {
return nil, err
}

blobs, err := GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers)
blobs, err := node.GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -196,7 +199,7 @@ func (s *Server) validateStoreChunkRequest(in *pb.StoreChunksRequest) error {
if blob.GetHeader() == nil {
return api.NewInvalidArgError("missing blob header in request")
}
if ValidatePointsFromBlobHeader(blob.GetHeader()) != nil {
if node.ValidatePointsFromBlobHeader(blob.GetHeader()) != nil {
return api.NewInvalidArgError("invalid points contained in the blob header in request")
}
if len(blob.GetHeader().GetQuorumHeaders()) == 0 {
Expand Down Expand Up @@ -264,7 +267,7 @@ func (s *Server) validateStoreBlobsRequest(in *pb.StoreBlobsRequest) error {
if blob.GetHeader() == nil {
return api.NewInvalidArgError("missing blob header in request")
}
if ValidatePointsFromBlobHeader(blob.GetHeader()) != nil {
if node.ValidatePointsFromBlobHeader(blob.GetHeader()) != nil {
return api.NewInvalidArgError("invalid points contained in the blob header in request")
}
if len(blob.GetHeader().GetQuorumHeaders()) == 0 {
Expand Down Expand Up @@ -308,7 +311,7 @@ func (s *Server) StoreBlobs(ctx context.Context, in *pb.StoreBlobsRequest) (*pb.
s.node.Logger.Info("StoreBlobs RPC request received", "numBlobs", len(in.Blobs), "reqMsgSize", proto.Size(in), "blobHeadersSize", blobHeadersSize, "bundleSize", bundleSize, "referenceBlockNumber", in.GetReferenceBlockNumber())

// Process the request
blobs, err := GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers)
blobs, err := node.GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers)
if err != nil {
return nil, err
}
Expand All @@ -334,7 +337,44 @@ func (s *Server) StoreBlobs(ctx context.Context, in *pb.StoreBlobsRequest) (*pb.
}

func (s *Server) AttestBatch(ctx context.Context, in *pb.AttestBatchRequest) (*pb.AttestBatchReply, error) {
return nil, errors.New("AttestBatch is not implemented")
start := time.Now()

// Validate the batch root
blobHeaderHashes := make([][32]byte, len(in.GetBlobHeaderHashes()))
for i, hash := range in.GetBlobHeaderHashes() {
if len(hash) != 32 {
return nil, api.NewInvalidArgError("invalid blob header hash")
}
var h [32]byte
copy(h[:], hash)
blobHeaderHashes[i] = h
}
batchHeader, err := node.GetBatchHeader(in.GetBatchHeader())
if err != nil {
return nil, fmt.Errorf("failed to get the batch header: %w", err)
}
err = s.node.ValidateBatchContents(ctx, blobHeaderHashes, batchHeader)
if err != nil {
return nil, fmt.Errorf("failed to validate the batch header root: %w", err)
}

// Store the mapping from batch header + blob index to blob header hashes
err = s.node.Store.StoreBatchBlobMapping(ctx, batchHeader, blobHeaderHashes)
if err != nil {
return nil, fmt.Errorf("failed to store the batch blob mapping: %w", err)
}

// Sign the batch header
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
if err != nil {
return nil, fmt.Errorf("failed to get the batch header hash: %w", err)
}
sig := s.node.KeyPair.SignMessage(batchHeaderHash)

s.node.Logger.Info("AttestBatch complete", "duration", time.Since(start))
return &pb.AttestBatchReply{
Signature: sig.Serialize(),
}, nil
}

func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksRequest) (*pb.RetrieveChunksReply, error) {
Expand Down Expand Up @@ -445,6 +485,64 @@ func (s *Server) GetBlobHeader(ctx context.Context, in *pb.GetBlobHeaderRequest)
}, nil
}

// rebuildMerkleTree rebuilds the merkle tree from the blob headers and batch header.
func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte) (*merkletree.MerkleTree, error) {
batchHeaderBytes, err := s.node.Store.GetBatchHeader(context.Background(), batchHeaderHash)
if err != nil {
return nil, errors.New("failed to get the batch header from Store")
}

batchHeader, err := new(core.BatchHeader).Deserialize(batchHeaderBytes)
if err != nil {
return nil, err
}

blobIndex := 0
leafs := make([][]byte, 0)
for {
blobHeaderBytes, err := s.node.Store.GetBlobHeader(context.Background(), batchHeaderHash, blobIndex)
if err != nil {
if errors.Is(err, node.ErrKeyNotFound) {
break
}
return nil, err
}

var protoBlobHeader pb.BlobHeader
err = proto.Unmarshal(blobHeaderBytes, &protoBlobHeader)
if err != nil {
return nil, err
}

blobHeader, err := node.GetBlobHeaderFromProto(&protoBlobHeader)
if err != nil {
return nil, err
}

blobHeaderHash, err := blobHeader.GetBlobHeaderHash()
if err != nil {
return nil, err
}
leafs = append(leafs, blobHeaderHash[:])
blobIndex++
}

if len(leafs) == 0 {
return nil, errors.New("no blob header found")
}

tree, err := merkletree.NewTree(merkletree.WithData(leafs), merkletree.WithHashType(keccak256.New()))
if err != nil {
return nil, err
}

if !reflect.DeepEqual(tree.Root(), batchHeader.BatchRoot[:]) {
return nil, errors.New("invalid batch header")
}

return tree, nil
}

func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, blobIndex int) (*core.BlobHeader, *pb.BlobHeader, error) {

blobHeaderBytes, err := s.node.Store.GetBlobHeader(ctx, batchHeaderHash, blobIndex)
Expand All @@ -458,7 +556,7 @@ func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, bl
return nil, nil, err
}

blobHeader, err := GetBlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := node.GetBlobHeaderFromProto(&protoBlobHeader)
if err != nil {
return nil, nil, err
}
Expand Down
46 changes: 38 additions & 8 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,16 +416,46 @@ func TestStoreBlobs(t *testing.T) {
func TestAttestBatch(t *testing.T) {
server := newTestServer(t, true)

reqToCopy, _, _, _, _ := makeStoreChunksRequest(t, 66, 33)
reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33)
reqToCopy.BatchHeader = nil
req := &pb.AttestBatchRequest{
BatchHeader: reqToCopy.BatchHeader,
BlobHeaderHashes: [][]byte{},
req := &pb.StoreBlobsRequest{
Blobs: reqToCopy.Blobs,
ReferenceBlockNumber: 1,
}
reply, err := server.AttestBatch(context.Background(), req)
assert.Nil(t, reply)
assert.Error(t, err)
assert.Equal(t, strings.Compare(err.Error(), "AttestBatch is not implemented"), 0)
reply, err := server.StoreBlobs(context.Background(), req)
assert.NoError(t, err)
assert.NotNil(t, reply.GetSignatures())

assert.Len(t, blobHeaders, 2)
bhh0, err := blobHeaders[0].GetBlobHeaderHash()
assert.NoError(t, err)
bhh1, err := blobHeaders[1].GetBlobHeaderHash()
assert.NoError(t, err)
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: 1,
BatchRoot: [32]byte{},
}
_, err = batchHeader.SetBatchRoot([]*core.BlobHeader{blobHeaders[0], blobHeaders[1]})
assert.NoError(t, err)
attestReq := &pb.AttestBatchRequest{
BatchHeader: &pb.BatchHeader{
BatchRoot: batchHeader.BatchRoot[:],
ReferenceBlockNumber: 1,
},
BlobHeaderHashes: [][]byte{bhh0[:], bhh1[:]},
}
attestReply, err := server.AttestBatch(context.Background(), attestReq)
assert.NotNil(t, reply)
assert.NoError(t, err)
sig := attestReply.GetSignature()
assert.NotNil(t, sig)
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
assert.NoError(t, err)
point, err := new(core.Signature).Deserialize(sig)
assert.NoError(t, err)
s := &core.Signature{G1Point: point}
ok := s.Verify(keyPair.GetPubKeyG2(), batchHeaderHash)
assert.True(t, ok)
}

func TestRetrieveChunks(t *testing.T) {
Expand Down
Loading

0 comments on commit 12f1a6f

Please sign in to comment.