Skip to content

Commit

Permalink
Reader fragment (#749)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Oct 3, 2024
1 parent e44ba45 commit b3d1c35
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 43 deletions.
19 changes: 19 additions & 0 deletions api/clients/mock/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,22 @@ func (c *MockRetrievalClient) RetrieveBlob(
result := args.Get(0)
return result.([]byte), args.Error(1)
}

func (c *MockRetrievalClient) RetrieveBlobChunks(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) (*clients.BlobChunks, error) {

args := c.Called(batchHeaderHash, blobIndex, referenceBlockNumber, batchRoot, quorumID)
return args.Get(0).(*clients.BlobChunks), args.Error(1)
}

func (c *MockRetrievalClient) CombineChunks(chunks *clients.BlobChunks) ([]byte, error) {
args := c.Called(chunks)

result := args.Get(0)
return result.([]byte), args.Error(1)
}
72 changes: 65 additions & 7 deletions api/clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,50 @@ import (
"context"
"errors"
"fmt"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/wealdtech/go-merkletree/v2"

"github.com/gammazero/workerpool"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
)

// RetrievalClient is an object that can retrieve blobs from the network.
type RetrievalClient interface {

// RetrieveBlob fetches a blob from the network. This method is equivalent to calling
// RetrieveBlobChunks to get the chunks and then CombineChunks to recombine those chunks into the original blob.
RetrieveBlob(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) ([]byte, error)

// RetrieveBlobChunks downloads the chunks of a blob from the network but do not recombine them. Use this method
// if detailed information about which node returned which chunk is needed. Otherwise, use RetrieveBlob.
RetrieveBlobChunks(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error)

// CombineChunks recombines the chunks into the original blob.
CombineChunks(chunks *BlobChunks) ([]byte, error)
}

// BlobChunks is a collection of chunks retrieved from the network which can be recombined into a blob.
type BlobChunks struct {
Chunks []*encoding.Frame
Indices []encoding.ChunkNumber
EncodingParams encoding.EncodingParams
BlobHeaderLength uint
Assignments map[core.OperatorID]core.Assignment
AssignmentInfo core.AssignmentInfo
}

type retrievalClient struct {
Expand All @@ -33,16 +59,14 @@ type retrievalClient struct {
numConnections int
}

var _ RetrievalClient = (*retrievalClient)(nil)

// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
chainState core.IndexedChainState,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
verifier encoding.Verifier,
numConnections int,
) (*retrievalClient, error) {
numConnections int) (RetrievalClient, error) {

return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
Expand All @@ -54,13 +78,31 @@ func NewRetrievalClient(
}, nil
}

// RetrieveBlob retrieves a blob from the network.
func (r *retrievalClient) RetrieveBlob(
ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) ([]byte, error) {

chunks, err := r.RetrieveBlobChunks(ctx, batchHeaderHash, blobIndex, referenceBlockNumber, batchRoot, quorumID)
if err != nil {
return nil, err
}

return r.CombineChunks(chunks)
}

// RetrieveBlobChunks retrieves the chunks of a blob from the network but does not recombine them.
func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
batchHeaderHash [32]byte,
blobIndex uint32,
referenceBlockNumber uint,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error) {

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,5 +215,21 @@ func (r *retrievalClient) RetrieveBlob(
indices = append(indices, assignment.GetIndices()...)
}

return r.verifier.Decode(chunks, indices, encodingParams, uint64(blobHeader.Length)*encoding.BYTES_PER_SYMBOL)
return &BlobChunks{
Chunks: chunks,
Indices: indices,
EncodingParams: encodingParams,
BlobHeaderLength: blobHeader.Length,
Assignments: assignments,
AssignmentInfo: info,
}, nil
}

// CombineChunks recombines the chunks into the original blob.
func (r *retrievalClient) CombineChunks(chunks *BlobChunks) ([]byte, error) {
return r.verifier.Decode(
chunks.Chunks,
chunks.Indices,
chunks.EncodingParams,
uint64(chunks.BlobHeaderLength)*encoding.BYTES_PER_SYMBOL)
}
1 change: 0 additions & 1 deletion tools/traffic/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name),
ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name),
RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name),
ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name),
FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name),
RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name),
StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name),
Expand Down
8 changes: 0 additions & 8 deletions tools/traffic/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,6 @@ var (
Value: 3.0,
EnvVar: common.PrefixEnvVar(envPrefix, "REQUIRED_DOWNLOADS"),
}
ReadOverflowTableSizeFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "read-overflow-table-size"),
Usage: "Size of the overflow table for read requests.",
Required: false,
Value: 1024,
EnvVar: common.PrefixEnvVar(envPrefix, "READ_OVERFLOW_TABLE_SIZE"),
}
FetchBatchHeaderTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "fetch-batch-header-timeout"),
Usage: "Amount of time to wait for a batch header to be fetched.",
Expand Down Expand Up @@ -243,7 +236,6 @@ var optionalFlags = []cli.Flag{
FetchBatchHeaderTimeoutFlag,
RetrieveBlobChunksTimeoutFlag,
GetBlobStatusTimeoutFlag,
ReadOverflowTableSizeFlag,
WriteTimeoutFlag,
VerificationChannelCapacityFlag,
}
Expand Down
4 changes: 0 additions & 4 deletions tools/traffic/config/worker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ type WorkerConfig struct {
// 0 or 1 times with the specified probability (e.g. 0.2 means each blob has a 20% chance of being downloaded).
// If greater than 1.0, then each blob will be downloaded the specified number of times.
RequiredDownloads float64
// The size of a table of blobs to optionally read when we run out of blobs that we are required to read. Blobs
// that are no longer required are added to this table, and when the table is at capacity they are randomly retired.
// Set this to 0 to disable this feature.
ReadOverflowTableSize uint
// The amount of time to wait for a batch header to be fetched.
FetchBatchHeaderTimeout time.Duration
// The amount of time to wait for a blob to be retrieved.
Expand Down
16 changes: 0 additions & 16 deletions tools/traffic/metrics/latency_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,6 @@ func (metric *latencyMetric) ReportLatency(latency time.Duration) {
metric.metrics.latency.WithLabelValues(metric.description).Observe(latency.Seconds())
}

// InvokeAndReportLatency performs an operation. If the operation does not produce an error, then the latency
// of the operation is reported to the metrics framework.
func InvokeAndReportLatency[T any](metric LatencyMetric, operation func() (T, error)) (T, error) {
start := time.Now()

t, err := operation()

if err == nil {
end := time.Now()
duration := end.Sub(start)
metric.ReportLatency(duration)
}

return t, err
}

// NewLatencyMetric creates a new prometheus collector for latency metrics.
func buildLatencyCollector(namespace string, registry *prometheus.Registry) *prometheus.SummaryVec {
return promauto.With(registry).NewSummaryVec(
Expand Down
Loading

0 comments on commit b3d1c35

Please sign in to comment.