Skip to content

Commit

Permalink
Simplify the bytes decoding of chunks (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Jun 18, 2024
1 parent 2ade913 commit 7c8751a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 24 deletions.
37 changes: 13 additions & 24 deletions node/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/binary"
"errors"
"io"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/node"
Expand Down Expand Up @@ -335,7 +334,7 @@ func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobInd
}
log.Debug("Retrieved chunk", "blobKey", hexutil.Encode(blobKey), "length", len(data))

chunks, err := decodeChunks(data)
chunks, err := DecodeChunks(data)
if err != nil {
return nil, false
}
Expand Down Expand Up @@ -378,34 +377,24 @@ func EncodeChunks(chunks [][]byte) ([]byte, error) {
// Converts a flattened array of chunks into an array of its constituent chunks,
// throwing an error in case the chunks were not serialized correctly
//
// decodeChunks((len(chunks[0]), chunks[0], len(chunks[1]), chunks[1], ...)) = chunks
func decodeChunks(data []byte) ([][]byte, error) {
buf := bytes.NewReader(data)
// DecodeChunks((len(chunks[0]), chunks[0], len(chunks[1]), chunks[1], ...)) = chunks
func DecodeChunks(data []byte) ([][]byte, error) {
chunks := make([][]byte, 0)

for {
var length uint64
err := binary.Read(buf, binary.LittleEndian, &length)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
buf := data
for len(buf) > 0 {
if len(buf) < 8 {
return nil, errors.New("invalid data to decode")
}
chunkSize := binary.LittleEndian.Uint64(buf)
buf = buf[8:]

chunk := make([]byte, length)
_, err = buf.Read(chunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
if len(buf) < int(chunkSize) {
return nil, errors.New("invalid data to decode")
}
chunk := buf[:chunkSize]
buf = buf[chunkSize:]

chunks = append(chunks, chunk)
if buf.Len() < 8 {
break
}
}

return chunks, nil
Expand Down
43 changes: 43 additions & 0 deletions node/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,27 @@ func CreateBatch(t *testing.T) (*core.BatchHeader, []*core.BlobMessage, []*pb.Bl
return &batchHeader, blobMessage, blobs
}

func TestEncodeDecodeChunks(t *testing.T) {
numSamples := 32
numChunks := 10
chunkSize := 2 * 1024
for n := 0; n < numSamples; n++ {
chunks := make([][]byte, numChunks)
for i := 0; i < numChunks; i++ {
chunk := make([]byte, chunkSize)
_, _ = cryptorand.Read(chunk)
chunks[i] = chunk
}
encoded, err := node.EncodeChunks(chunks)
assert.Nil(t, err)
decoded, err := node.DecodeChunks(encoded)
assert.Nil(t, err)
for i := 0; i < numChunks; i++ {
assert.True(t, bytes.Equal(decoded[i], chunks[i]))
}
}
}

func TestStoringBlob(t *testing.T) {
staleMeasure := uint32(1)
storeDuration := uint32(1)
Expand Down Expand Up @@ -286,3 +307,25 @@ func BenchmarkEncodeChunks(b *testing.B) {
_, _ = node.EncodeChunks(sampleChunks[i%numSamples])
}
}

func BenchmarkDecocodeChunks(b *testing.B) {
numSamples := 32
numChunks := 10
chunkSize := 2 * 1024
sampleChunks := make([][]byte, numSamples)
for n := 0; n < numSamples; n++ {
chunks := make([][]byte, numChunks)
for i := 0; i < numChunks; i++ {
chunk := make([]byte, chunkSize)
_, _ = cryptorand.Read(chunk)
chunks[i] = chunk
}
encoded, _ := node.EncodeChunks(chunks)
sampleChunks[n] = encoded
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = node.DecodeChunks(sampleChunks[i%numSamples])
}
}

0 comments on commit 7c8751a

Please sign in to comment.