Skip to content

Commit

Permalink
[5/N] Chunk encoding optimization: Add support of new encoding at Node (
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Jul 17, 2024
1 parent eaaf416 commit 00bdade
Show file tree
Hide file tree
Showing 9 changed files with 463 additions and 231 deletions.
319 changes: 160 additions & 159 deletions api/grpc/node/node.pb.go

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions api/proto/node/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,21 @@ message RetrieveChunksRequest {
uint32 quorum_id = 3;
}

// This describes how the chunks returned in RetrieveChunksReply are encoded.
// Used to facilitate the decoding of chunks.
enum ChunkEncoding {
UNKNOWN = 0;
GNARK = 1;
GOB = 2;
}

message RetrieveChunksReply {
// All chunks the Node is storing for the requested blob per RetrieveChunksRequest.
repeated bytes chunks = 1;
// Describes how the chunks above are encoded.
enum ChunkEncoding {
UNKNOWN = 0;
GNARK = 1;
GOB = 2;
}
// How the above chunks encoded.
ChunkEncoding encoding = 2;
}


// See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRequest.
message GetBlobHeaderRequest {
bytes batch_header_hash = 1;
Expand Down Expand Up @@ -177,5 +179,5 @@ message NodeInfoReply {
string arch = 2;
string os = 3;
uint32 num_cpu = 4;
uint64 mem_bytes = 5;
uint64 mem_bytes = 5;
}
18 changes: 14 additions & 4 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@ type SecurityParam struct {
QuorumRate common.RateParam
}

type BundleEncodingFormat = uint8

const (
// We use uint8 to count the number of quorums, so we can have at most 255 quorums,
// which means the max ID can not be larger than 254 (from 0 to 254, there are 255
// different IDs).
MaxQuorumID = 254

GnarkBundleEncodingFormat = 1
// How many bits for the bundle's header.
NumBundleHeaderBits = 64
// How many bits (out of header) for representing the bundle's encoding format.
NumBundleEncodingFormatBits = 8

// The list of supported encoding formats for bundle.
// Values must be in range [0, 255].
GobBundleEncodingFormat BundleEncodingFormat = 0
GnarkBundleEncodingFormat BundleEncodingFormat = 1
)

func (s *SecurityParam) String() string {
Expand Down Expand Up @@ -198,7 +208,7 @@ func (b Bundle) Serialize() ([]byte, error) {
}
result := make([]byte, size+8)
buf := result
metadata := uint64(GnarkBundleEncodingFormat) | (uint64(len(b[0].Coeffs)) << 8)
metadata := (uint64(GnarkBundleEncodingFormat) << (NumBundleHeaderBits - NumBundleEncodingFormatBits)) | uint64(len(b[0].Coeffs))
binary.LittleEndian.PutUint64(buf, metadata)
buf = buf[8:]
for _, f := range b {
Expand All @@ -218,10 +228,10 @@ func (b Bundle) Deserialize(data []byte) (Bundle, error) {
}
// Parse metadata
meta := binary.LittleEndian.Uint64(data)
if (meta & 0xFF) != GnarkBundleEncodingFormat {
if (meta >> (NumBundleHeaderBits - NumBundleEncodingFormatBits)) != uint64(GnarkBundleEncodingFormat) {
return nil, errors.New("invalid bundle data encoding format")
}
chunkLen := meta >> 8
chunkLen := (meta << NumBundleEncodingFormatBits) >> NumBundleEncodingFormatBits
if chunkLen == 0 {
return nil, errors.New("chunk length must be greater than zero")
}
Expand Down
4 changes: 2 additions & 2 deletions core/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ func TestInvalidBundleDeser(t *testing.T) {
assert.EqualError(t, err, "invalid bundle data encoding format")

invliadChunkLen := make([]byte, 0, 8)
invliadChunkLen = append(invliadChunkLen, byte(1))
for i := 0; i < 7; i++ {
invliadChunkLen = append(invliadChunkLen, byte(0))
}
invliadChunkLen = append(invliadChunkLen, byte(1))
_, err = new(core.Bundle).Deserialize(invliadChunkLen)
assert.EqualError(t, err, "chunk length must be greater than zero")

data := make([]byte, 0, 9)
data = append(data, byte(1))
for i := 0; i < 6; i++ {
data = append(data, byte(0))
}
data = append(data, byte(0b00100000))
data = append(data, byte(1))
data = append(data, byte(5))
data = append(data, byte(0b01000000))
_, err = new(core.Bundle).Deserialize(data)
Expand Down
4 changes: 2 additions & 2 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,13 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques
return nil, errors.New("request rate limited")
}

chunks, ok := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId()))
chunks, format, ok := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId()))
if !ok {
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure", time.Since(start))
return nil, fmt.Errorf("could not find chunks for batchHeaderHash %v, blob index: %v, quorumID: %v", batchHeaderHash, in.GetBlobIndex(), in.GetQuorumId())
}
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "success", time.Since(start))
return &pb.RetrieveChunksReply{Chunks: chunks}, nil
return &pb.RetrieveChunksReply{Chunks: chunks, Encoding: format}, nil
}

func (s *Server) GetBlobHeader(ctx context.Context, in *pb.GetBlobHeaderRequest) (*pb.GetBlobHeaderReply, error) {
Expand Down
23 changes: 18 additions & 5 deletions node/grpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,30 @@ func GetBlobMessages(in *pb.StoreChunksRequest, numWorkers int) ([]*core.BlobMes
return
}

format := node.GetBundleEncodingFormat(blob)
bundles := make(map[core.QuorumID]core.Bundle, len(blob.GetBundles()))
for j, chunks := range blob.GetBundles() {
for j, bundle := range blob.GetBundles() {
quorumID := blob.GetHeader().GetQuorumHeaders()[j].GetQuorumId()
bundles[uint8(quorumID)] = make([]*encoding.Frame, len(chunks.GetChunks()))
for k, data := range chunks.GetChunks() {
chunk, err := new(encoding.Frame).Deserialize(data)
if format == core.GnarkBundleEncodingFormat {
bundleMsg, err := new(core.Bundle).Deserialize(bundle.GetBundle())
if err != nil {
resultChan <- err
return
}
bundles[uint8(quorumID)][k] = chunk
bundles[uint8(quorumID)] = bundleMsg
} else if format == core.GobBundleEncodingFormat {
bundles[uint8(quorumID)] = make([]*encoding.Frame, len(bundle.GetChunks()))
for k, data := range bundle.GetChunks() {
chunk, err := new(encoding.Frame).Deserialize(data)
if err != nil {
resultChan <- err
return
}
bundles[uint8(quorumID)][k] = chunk
}
} else {
resultChan <- fmt.Errorf("invalid bundle encoding format: %d", format)
return
}
}

Expand Down
154 changes: 123 additions & 31 deletions node/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node/leveldb"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/ethereum/go-ethereum/common/hexutil"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -241,12 +244,21 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs
if len(rawBlob.GetBundles()) != len(blob.Bundles) {
return nil, errors.New("internal error: the number of bundles in parsed blob must be the same as in raw blob")
}
format := GetBundleEncodingFormat(rawBlob)
rawBundles := make(map[core.QuorumID][]byte)
rawChunks := make(map[core.QuorumID][][]byte)
for i, chunks := range rawBlob.GetBundles() {
for i, bundle := range rawBlob.GetBundles() {
quorumID := uint8(rawBlob.GetHeader().GetQuorumHeaders()[i].GetQuorumId())
rawChunks[quorumID] = make([][]byte, len(chunks.GetChunks()))
for j, chunk := range chunks.GetChunks() {
rawChunks[quorumID][j] = chunk
if format == core.GnarkBundleEncodingFormat {
if len(bundle.GetChunks()) > 0 && len(bundle.GetChunks()[0]) > 0 {
return nil, errors.New("chunks of a bundle are encoded together already")
}
rawBundles[quorumID] = bundle.GetBundle()
} else {
rawChunks[quorumID] = make([][]byte, len(bundle.GetChunks()))
for j, chunk := range bundle.GetChunks() {
rawChunks[quorumID][j] = chunk
}
}
}
serializationDuration += time.Since(start)
Expand All @@ -258,22 +270,36 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs
log.Error("Cannot generate the key for storing blob:", "err", err)
return nil, err
}
if len(rawChunks[quorumID]) != len(bundle) {
return nil, errors.New("internal error: the number of chunks in parsed blob bundle must be the same as in raw blob bundle")
}

bundleRaw := make([][]byte, len(bundle))
for i := 0; i < len(bundle); i++ {
bundleRaw[i] = rawChunks[quorumID][i]
if format == core.GnarkBundleEncodingFormat {
rawBundle, ok := rawBundles[quorumID]
if ok {
size += int64(len(rawBundle))
keys = append(keys, key)
values = append(values, rawBundle)
}
} else if format == core.GobBundleEncodingFormat {
if len(rawChunks[quorumID]) != len(bundle) {
return nil, errors.New("internal error: the number of chunks in parsed blob bundle must be the same as in raw blob bundle")
}
chunksBytes, ok := rawChunks[quorumID]
if ok {

bundleRaw := make([][]byte, len(bundle))
for i := 0; i < len(bundle); i++ {
bundleRaw[i] = chunksBytes[i]
}
chunkBytes, err := EncodeChunks(bundleRaw)
if err != nil {
return nil, err
}
size += int64(len(chunkBytes))
keys = append(keys, key)
values = append(values, chunkBytes)
}
} else {
return nil, fmt.Errorf("invalid bundle encoding format: %d", format)
}
chunkBytes, err := EncodeChunks(bundleRaw)
if err != nil {
return nil, err
}
size += int64(len(chunkBytes))

keys = append(keys, key)
values = append(values, chunkBytes)
}
encodingDuration += time.Since(start)
}
Expand Down Expand Up @@ -321,24 +347,24 @@ func (s *Store) GetBlobHeader(ctx context.Context, batchHeaderHash [32]byte, blo

// GetChunks returns the list of byte arrays stored for given blobKey along with a boolean
// indicating if the read was unsuccessful or the chunks were serialized correctly
func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, bool) {
func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, node.ChunkEncoding, bool) {
log := s.logger

blobKey, err := EncodeBlobKey(batchHeaderHash, blobIndex, quorumID)
if err != nil {
return nil, false
return nil, node.ChunkEncoding_UNKNOWN, false
}
data, err := s.db.Get(blobKey)
if err != nil {
return nil, false
return nil, node.ChunkEncoding_UNKNOWN, false
}
log.Debug("Retrieved chunk", "blobKey", hexutil.Encode(blobKey), "length", len(data))

chunks, err := DecodeChunks(data)
chunks, format, err := DecodeChunks(data)
if err != nil {
return nil, false
return nil, format, false
}
return chunks, true
return chunks, format, true
}

// HasKey returns if a given key has been stored.
Expand Down Expand Up @@ -374,11 +400,42 @@ func EncodeChunks(chunks [][]byte) ([]byte, error) {
return result, nil
}

// Converts a flattened array of chunks into an array of its constituent chunks,
// throwing an error in case the chunks were not serialized correctly
//
func DecodeGnarkChunks(data []byte) ([][]byte, error) {
format, chunkLen, err := parseHeader(data)
if err != nil {
return nil, err
}
if format != core.GnarkBundleEncodingFormat {
return nil, errors.New("invalid bundle data encoding format")
}
if chunkLen == 0 {
return nil, errors.New("chunk length must be greater than zero")
}
chunkSize := bn254.SizeOfG1AffineCompressed + encoding.BYTES_PER_SYMBOL*int(chunkLen)
chunks := make([][]byte, 0)
buf := data[8:]
for len(buf) > 0 {
if len(buf) < chunkSize {
return nil, errors.New("invalid data to decode")
}
chunks = append(chunks, buf[:chunkSize])
buf = buf[chunkSize:]
}
return chunks, nil
}

// DecodeChunks((len(chunks[0]), chunks[0], len(chunks[1]), chunks[1], ...)) = chunks
func DecodeChunks(data []byte) ([][]byte, error) {
func DecodeGobChunks(data []byte) ([][]byte, error) {
format, chunkLen, err := parseHeader(data)
if err != nil {
return nil, err
}
if format != core.GobBundleEncodingFormat {
return nil, errors.New("invalid bundle data encoding format")
}
if chunkLen == 0 {
return nil, errors.New("chunk length must be greater than zero")
}
chunks := make([][]byte, 0)
buf := data
for len(buf) > 0 {
Expand All @@ -391,13 +448,48 @@ func DecodeChunks(data []byte) ([][]byte, error) {
if len(buf) < int(chunkSize) {
return nil, errors.New("invalid data to decode")
}
chunk := buf[:chunkSize]
chunks = append(chunks, buf[:chunkSize])
buf = buf[chunkSize:]
}
return chunks, nil
}

chunks = append(chunks, chunk)
// parseHeader parses the header and returns the encoding format and the chunk length.
func parseHeader(data []byte) (core.BundleEncodingFormat, uint64, error) {
if len(data) < 8 {
return 0, 0, errors.New("no header found, the data size is less 8 bytes")
}
meta := binary.LittleEndian.Uint64(data)
format := binary.LittleEndian.Uint64(data) >> (core.NumBundleHeaderBits - core.NumBundleEncodingFormatBits)
chunkLen := (meta << core.NumBundleEncodingFormatBits) >> core.NumBundleEncodingFormatBits
return uint8(format), chunkLen, nil
}

return chunks, nil
// DecodeChunks converts a flattened array of chunks into an array of its constituent chunks,
// throwing an error in case the chunks were not serialized correctly.
func DecodeChunks(data []byte) ([][]byte, node.ChunkEncoding, error) {
// Empty chunk is valid, but there is nothing to decode.
if len(data) == 0 {
return [][]byte{}, node.ChunkEncoding_UNKNOWN, nil
}
format, _, err := parseHeader(data)
if err != nil {
return nil, node.ChunkEncoding_UNKNOWN, err
}

// Note: the encoding format IDs may not be the same as the field ID in protobuf.
// For example, GobBundleEncodingFormat is 1 but node.ChunkEncoding_GOB has proto
// field ID 2.
switch format {
case 0:
chunks, err := DecodeGobChunks(data)
return chunks, node.ChunkEncoding_GOB, err
case 1:
chunks, err := DecodeGnarkChunks(data)
return chunks, node.ChunkEncoding_GNARK, err
default:
return nil, node.ChunkEncoding_UNKNOWN, errors.New("invalid data encoding format")
}
}

func copyBytes(src []byte) []byte {
Expand Down
Loading

0 comments on commit 00bdade

Please sign in to comment.