diff --git a/api/clients/mock/retrieval_client.go b/api/clients/mock/retrieval_client.go index 3a0e3ccf4..4c011c38a 100644 --- a/api/clients/mock/retrieval_client.go +++ b/api/clients/mock/retrieval_client.go @@ -35,3 +35,22 @@ func (c *MockRetrievalClient) RetrieveBlob( result := args.Get(0) return result.([]byte), args.Error(1) } + +func (c *MockRetrievalClient) RetrieveBlobChunks( + ctx context.Context, + batchHeaderHash [32]byte, + blobIndex uint32, + referenceBlockNumber uint, + batchRoot [32]byte, + quorumID core.QuorumID) (*clients.BlobChunks, error) { + + args := c.Called(batchHeaderHash, blobIndex, referenceBlockNumber, batchRoot, quorumID) + return args.Get(0).(*clients.BlobChunks), args.Error(1) +} + +func (c *MockRetrievalClient) CombineChunks(chunks *clients.BlobChunks) ([]byte, error) { + args := c.Called(chunks) + + result := args.Get(0) + return result.([]byte), args.Error(1) +} diff --git a/api/clients/retrieval_client.go b/api/clients/retrieval_client.go index 8a310dd2d..8bc034cd2 100644 --- a/api/clients/retrieval_client.go +++ b/api/clients/retrieval_client.go @@ -4,17 +4,20 @@ import ( "context" "errors" "fmt" - "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/wealdtech/go-merkletree/v2" "github.com/gammazero/workerpool" - "github.com/wealdtech/go-merkletree/v2" "github.com/wealdtech/go-merkletree/v2/keccak256" ) +// RetrievalClient is an object that can retrieve blobs from the network. type RetrievalClient interface { + + // RetrieveBlob fetches a blob from the network. This method is equivalent to calling + // RetrieveBlobChunks to get the chunks and then CombineChunks to recombine those chunks into the original blob. RetrieveBlob( ctx context.Context, batchHeaderHash [32]byte, @@ -22,6 +25,29 @@ type RetrievalClient interface { referenceBlockNumber uint, batchRoot [32]byte, quorumID core.QuorumID) ([]byte, error) + + // RetrieveBlobChunks downloads the chunks of a blob from the network but do not recombine them. Use this method + // if detailed information about which node returned which chunk is needed. Otherwise, use RetrieveBlob. + RetrieveBlobChunks( + ctx context.Context, + batchHeaderHash [32]byte, + blobIndex uint32, + referenceBlockNumber uint, + batchRoot [32]byte, + quorumID core.QuorumID) (*BlobChunks, error) + + // CombineChunks recombines the chunks into the original blob. + CombineChunks(chunks *BlobChunks) ([]byte, error) +} + +// BlobChunks is a collection of chunks retrieved from the network which can be recombined into a blob. +type BlobChunks struct { + Chunks []*encoding.Frame + Indices []encoding.ChunkNumber + EncodingParams encoding.EncodingParams + BlobHeaderLength uint + Assignments map[core.OperatorID]core.Assignment + AssignmentInfo core.AssignmentInfo } type retrievalClient struct { @@ -33,16 +59,14 @@ type retrievalClient struct { numConnections int } -var _ RetrievalClient = (*retrievalClient)(nil) - +// NewRetrievalClient creates a new retrieval client. func NewRetrievalClient( logger logging.Logger, chainState core.IndexedChainState, assignmentCoordinator core.AssignmentCoordinator, nodeClient NodeClient, verifier encoding.Verifier, - numConnections int, -) (*retrievalClient, error) { + numConnections int) (RetrievalClient, error) { return &retrievalClient{ logger: logger.With("component", "RetrievalClient"), @@ -54,6 +78,7 @@ func NewRetrievalClient( }, nil } +// RetrieveBlob retrieves a blob from the network. func (r *retrievalClient) RetrieveBlob( ctx context.Context, batchHeaderHash [32]byte, @@ -61,6 +86,23 @@ func (r *retrievalClient) RetrieveBlob( referenceBlockNumber uint, batchRoot [32]byte, quorumID core.QuorumID) ([]byte, error) { + + chunks, err := r.RetrieveBlobChunks(ctx, batchHeaderHash, blobIndex, referenceBlockNumber, batchRoot, quorumID) + if err != nil { + return nil, err + } + + return r.CombineChunks(chunks) +} + +// RetrieveBlobChunks retrieves the chunks of a blob from the network but does not recombine them. +func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context, + batchHeaderHash [32]byte, + blobIndex uint32, + referenceBlockNumber uint, + batchRoot [32]byte, + quorumID core.QuorumID) (*BlobChunks, error) { + indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID}) if err != nil { return nil, err @@ -173,5 +215,21 @@ func (r *retrievalClient) RetrieveBlob( indices = append(indices, assignment.GetIndices()...) } - return r.verifier.Decode(chunks, indices, encodingParams, uint64(blobHeader.Length)*encoding.BYTES_PER_SYMBOL) + return &BlobChunks{ + Chunks: chunks, + Indices: indices, + EncodingParams: encodingParams, + BlobHeaderLength: blobHeader.Length, + Assignments: assignments, + AssignmentInfo: info, + }, nil +} + +// CombineChunks recombines the chunks into the original blob. +func (r *retrievalClient) CombineChunks(chunks *BlobChunks) ([]byte, error) { + return r.verifier.Decode( + chunks.Chunks, + chunks.Indices, + chunks.EncodingParams, + uint64(chunks.BlobHeaderLength)*encoding.BYTES_PER_SYMBOL) } diff --git a/tools/traffic/config/config.go b/tools/traffic/config/config.go index 82c7251b8..3f7f7d41e 100644 --- a/tools/traffic/config/config.go +++ b/tools/traffic/config/config.go @@ -99,7 +99,6 @@ func NewConfig(ctx *cli.Context) (*Config, error) { NumReadInstances: ctx.GlobalUint(NumReadInstancesFlag.Name), ReadRequestInterval: ctx.Duration(ReadRequestIntervalFlag.Name), RequiredDownloads: ctx.Float64(RequiredDownloadsFlag.Name), - ReadOverflowTableSize: ctx.Uint(ReadOverflowTableSizeFlag.Name), FetchBatchHeaderTimeout: ctx.Duration(FetchBatchHeaderTimeoutFlag.Name), RetrieveBlobChunksTimeout: ctx.Duration(RetrieveBlobChunksTimeoutFlag.Name), StatusTrackerChannelCapacity: ctx.Uint(VerificationChannelCapacityFlag.Name), diff --git a/tools/traffic/config/flags.go b/tools/traffic/config/flags.go index 951935db6..20105de62 100644 --- a/tools/traffic/config/flags.go +++ b/tools/traffic/config/flags.go @@ -192,13 +192,6 @@ var ( Value: 3.0, EnvVar: common.PrefixEnvVar(envPrefix, "REQUIRED_DOWNLOADS"), } - ReadOverflowTableSizeFlag = cli.UintFlag{ - Name: common.PrefixFlag(FlagPrefix, "read-overflow-table-size"), - Usage: "Size of the overflow table for read requests.", - Required: false, - Value: 1024, - EnvVar: common.PrefixEnvVar(envPrefix, "READ_OVERFLOW_TABLE_SIZE"), - } FetchBatchHeaderTimeoutFlag = cli.DurationFlag{ Name: common.PrefixFlag(FlagPrefix, "fetch-batch-header-timeout"), Usage: "Amount of time to wait for a batch header to be fetched.", @@ -243,7 +236,6 @@ var optionalFlags = []cli.Flag{ FetchBatchHeaderTimeoutFlag, RetrieveBlobChunksTimeoutFlag, GetBlobStatusTimeoutFlag, - ReadOverflowTableSizeFlag, WriteTimeoutFlag, VerificationChannelCapacityFlag, } diff --git a/tools/traffic/config/worker_config.go b/tools/traffic/config/worker_config.go index 322d07aac..08dead295 100644 --- a/tools/traffic/config/worker_config.go +++ b/tools/traffic/config/worker_config.go @@ -31,10 +31,6 @@ type WorkerConfig struct { // 0 or 1 times with the specified probability (e.g. 0.2 means each blob has a 20% chance of being downloaded). // If greater than 1.0, then each blob will be downloaded the specified number of times. RequiredDownloads float64 - // The size of a table of blobs to optionally read when we run out of blobs that we are required to read. Blobs - // that are no longer required are added to this table, and when the table is at capacity they are randomly retired. - // Set this to 0 to disable this feature. - ReadOverflowTableSize uint // The amount of time to wait for a batch header to be fetched. FetchBatchHeaderTimeout time.Duration // The amount of time to wait for a blob to be retrieved. diff --git a/tools/traffic/metrics/latency_metric.go b/tools/traffic/metrics/latency_metric.go index 37413bfc2..892994079 100644 --- a/tools/traffic/metrics/latency_metric.go +++ b/tools/traffic/metrics/latency_metric.go @@ -22,22 +22,6 @@ func (metric *latencyMetric) ReportLatency(latency time.Duration) { metric.metrics.latency.WithLabelValues(metric.description).Observe(latency.Seconds()) } -// InvokeAndReportLatency performs an operation. If the operation does not produce an error, then the latency -// of the operation is reported to the metrics framework. -func InvokeAndReportLatency[T any](metric LatencyMetric, operation func() (T, error)) (T, error) { - start := time.Now() - - t, err := operation() - - if err == nil { - end := time.Now() - duration := end.Sub(start) - metric.ReportLatency(duration) - } - - return t, err -} - // NewLatencyMetric creates a new prometheus collector for latency metrics. func buildLatencyCollector(namespace string, registry *prometheus.Registry) *prometheus.SummaryVec { return promauto.With(registry).NewSummaryVec( diff --git a/tools/traffic/workers/blob_reader.go b/tools/traffic/workers/blob_reader.go new file mode 100644 index 000000000..033eb9ee6 --- /dev/null +++ b/tools/traffic/workers/blob_reader.go @@ -0,0 +1,233 @@ +package workers + +import ( + "context" + "crypto/md5" + "fmt" + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/retriever/eth" + "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/Layr-Labs/eigenda/tools/traffic/table" + "github.com/Layr-Labs/eigensdk-go/logging" + gcommon "github.com/ethereum/go-ethereum/common" + "math/big" + "sync" + "time" +) + +// BlobReader reads blobs from the DA network at a configured rate. +type BlobReader struct { + // The context for the generator. All work should cease when this context is cancelled. + ctx *context.Context + + // Tracks the number of active goroutines within the generator. + waitGroup *sync.WaitGroup + + // All logs should be written using this logger. + logger logging.Logger + + // config contains the configuration for the generator. + config *config.WorkerConfig + + retriever clients.RetrievalClient + chainClient eth.ChainClient + + // blobsToRead blobs we are required to read a certain number of times. + blobsToRead *table.BlobStore + + // metrics for the blob reader. + metrics *blobReaderMetrics +} + +type blobReaderMetrics struct { + generatorMetrics metrics.Metrics + fetchBatchHeaderMetric metrics.LatencyMetric + fetchBatchHeaderSuccess metrics.CountMetric + fetchBatchHeaderFailure metrics.CountMetric + readLatencyMetric metrics.LatencyMetric + readSuccessMetric metrics.CountMetric + readFailureMetric metrics.CountMetric + recombinationSuccessMetric metrics.CountMetric + recombinationFailureMetric metrics.CountMetric + validBlobMetric metrics.CountMetric + invalidBlobMetric metrics.CountMetric + operatorSuccessMetrics map[core.OperatorID]metrics.CountMetric + operatorFailureMetrics map[core.OperatorID]metrics.CountMetric + requiredReadPoolSizeMetric metrics.GaugeMetric + optionalReadPoolSizeMetric metrics.GaugeMetric +} + +// NewBlobReader creates a new BlobReader instance. +func NewBlobReader( + ctx *context.Context, + waitGroup *sync.WaitGroup, + logger logging.Logger, + config *config.WorkerConfig, + retriever clients.RetrievalClient, + chainClient eth.ChainClient, + blobStore *table.BlobStore, + generatorMetrics metrics.Metrics) BlobReader { + + return BlobReader{ + ctx: ctx, + waitGroup: waitGroup, + logger: logger, + config: config, + retriever: retriever, + chainClient: chainClient, + blobsToRead: blobStore, + metrics: &blobReaderMetrics{ + generatorMetrics: generatorMetrics, + fetchBatchHeaderMetric: generatorMetrics.NewLatencyMetric("fetch_batch_header"), + fetchBatchHeaderSuccess: generatorMetrics.NewCountMetric("fetch_batch_header_success"), + fetchBatchHeaderFailure: generatorMetrics.NewCountMetric("fetch_batch_header_failure"), + recombinationSuccessMetric: generatorMetrics.NewCountMetric("recombination_success"), + recombinationFailureMetric: generatorMetrics.NewCountMetric("recombination_failure"), + readLatencyMetric: generatorMetrics.NewLatencyMetric("read"), + validBlobMetric: generatorMetrics.NewCountMetric("valid_blob"), + invalidBlobMetric: generatorMetrics.NewCountMetric("invalid_blob"), + readSuccessMetric: generatorMetrics.NewCountMetric("read_success"), + readFailureMetric: generatorMetrics.NewCountMetric("read_failure"), + operatorSuccessMetrics: make(map[core.OperatorID]metrics.CountMetric), + operatorFailureMetrics: make(map[core.OperatorID]metrics.CountMetric), + requiredReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("required_read_pool_size"), + optionalReadPoolSizeMetric: generatorMetrics.NewGaugeMetric("optional_read_pool_size"), + }, + } +} + +// Start begins a blob reader goroutine. +func (r *BlobReader) Start() { + r.waitGroup.Add(1) + ticker := time.NewTicker(r.config.ReadRequestInterval) + go func() { + defer r.waitGroup.Done() + for { + select { + case <-(*r.ctx).Done(): + err := (*r.ctx).Err() + if err != nil { + r.logger.Info("blob reader context closed", "err:", err) + } + return + case <-ticker.C: + r.randomRead() + } + } + }() +} + +// randomRead reads a random blob. +func (r *BlobReader) randomRead() { + metadata := r.blobsToRead.GetNext() + if metadata == nil { + // There are no blobs that we are required to read. + return + } + + r.metrics.requiredReadPoolSizeMetric.Set(float64(r.blobsToRead.Size())) + + ctxTimeout, cancel := context.WithTimeout(*r.ctx, r.config.FetchBatchHeaderTimeout) + defer cancel() + + start := time.Now() + batchHeader, err := r.chainClient.FetchBatchHeader( + ctxTimeout, + gcommon.HexToAddress(r.config.EigenDAServiceManager), + metadata.BatchHeaderHash[:], + big.NewInt(int64(0)), + nil) + if err != nil { + r.logger.Error("failed to get batch header", "err:", err) + r.metrics.fetchBatchHeaderFailure.Increment() + return + } + r.metrics.fetchBatchHeaderMetric.ReportLatency(time.Since(start)) + + r.metrics.fetchBatchHeaderSuccess.Increment() + + ctxTimeout, cancel = context.WithTimeout(*r.ctx, r.config.RetrieveBlobChunksTimeout) + defer cancel() + + start = time.Now() + chunks, err := r.retriever.RetrieveBlobChunks( + ctxTimeout, + metadata.BatchHeaderHash, + uint32(metadata.BlobIndex), + uint(batchHeader.ReferenceBlockNumber), + batchHeader.BlobHeadersRoot, + core.QuorumID(0)) + if err != nil { + r.logger.Error("failed to read chunks", "err:", err) + r.metrics.readFailureMetric.Increment() + return + } + r.metrics.readLatencyMetric.ReportLatency(time.Since(start)) + + r.metrics.readSuccessMetric.Increment() + + assignments := chunks.Assignments + + data, err := r.retriever.CombineChunks(chunks) + if err != nil { + r.logger.Error("failed to combine chunks", "err:", err) + r.metrics.recombinationFailureMetric.Increment() + return + } + r.metrics.recombinationSuccessMetric.Increment() + + r.verifyBlob(metadata, &data) + + indexSet := make(map[encoding.ChunkNumber]bool) + for index := range chunks.Indices { + indexSet[chunks.Indices[index]] = true + } + + for id, assignment := range assignments { + for index := assignment.StartIndex; index < assignment.StartIndex+assignment.NumChunks; index++ { + if indexSet[index] { + r.reportChunk(id) + } else { + r.reportMissingChunk(id) + } + } + } +} + +// reportChunk reports a successful chunk read. +func (r *BlobReader) reportChunk(operatorId core.OperatorID) { + metric, exists := r.metrics.operatorSuccessMetrics[operatorId] + if !exists { + metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_returned_chunk", operatorId)) + r.metrics.operatorSuccessMetrics[operatorId] = metric + } + + metric.Increment() +} + +// reportMissingChunk reports a missing chunk. +func (r *BlobReader) reportMissingChunk(operatorId core.OperatorID) { + metric, exists := r.metrics.operatorFailureMetrics[operatorId] + if !exists { + metric = r.metrics.generatorMetrics.NewCountMetric(fmt.Sprintf("operator_%x_witheld_chunk", operatorId)) + r.metrics.operatorFailureMetrics[operatorId] = metric + } + + metric.Increment() +} + +// verifyBlob performs sanity checks on the blob. +func (r *BlobReader) verifyBlob(metadata *table.BlobMetadata, blob *[]byte) { + // Trim off the padding. + truncatedBlob := (*blob)[:metadata.Size] + recomputedChecksum := md5.Sum(truncatedBlob) + + if metadata.Checksum == recomputedChecksum { + r.metrics.validBlobMetric.Increment() + } else { + r.metrics.invalidBlobMetric.Increment() + } +} diff --git a/tools/traffic/workers/blob_reader_test.go b/tools/traffic/workers/blob_reader_test.go new file mode 100644 index 000000000..941351800 --- /dev/null +++ b/tools/traffic/workers/blob_reader_test.go @@ -0,0 +1,150 @@ +package workers + +import ( + "context" + "crypto/md5" + "github.com/Layr-Labs/eigenda/api/clients" + apiMock "github.com/Layr-Labs/eigenda/api/clients/mock" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + binding "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" + retrieverMock "github.com/Layr-Labs/eigenda/retriever/mock" + "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/Layr-Labs/eigenda/tools/traffic/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/exp/rand" + "sync" + "testing" + "time" +) + +// TestBlobReaderNoOptionalReads tests the BlobReader's basic functionality' +func TestBlobReader(t *testing.T) { + tu.InitializeRandom() + + ctx, cancel := context.WithCancel(context.Background()) + waitGroup := sync.WaitGroup{} + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + assert.Nil(t, err) + + blobTable := table.NewBlobStore() + + readerMetrics := metrics.NewMockMetrics() + + chainClient := &retrieverMock.MockChainClient{} + chainClient.On( + "FetchBatchHeader", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything).Return(&binding.IEigenDAServiceManagerBatchHeader{}, nil) + retrievalClient := &apiMock.MockRetrievalClient{} + + blobReader := NewBlobReader( + &ctx, + &waitGroup, + logger, + &config.WorkerConfig{}, + retrievalClient, + chainClient, + blobTable, + readerMetrics) + + blobSize := 1024 + readPermits := 2 + blobCount := 100 + + invalidBlobCount := 0 + + // Insert some blobs into the table. + for i := 0; i < blobCount; i++ { + + key := make([]byte, 32) + _, err = rand.Read(key) + assert.Nil(t, err) + + blobData := make([]byte, blobSize) + _, err = rand.Read(blobData) + assert.Nil(t, err) + + var checksum [16]byte + if i%10 == 0 { + // Simulate an invalid blob + invalidBlobCount++ + _, err = rand.Read(checksum[:]) + assert.Nil(t, err) + } else { + checksum = md5.Sum(blobData) + } + + batchHeaderHash := [32]byte{} + _, err = rand.Read(batchHeaderHash[:]) + assert.Nil(t, err) + + blobMetadata, err := table.NewBlobMetadata( + key, + checksum, + uint(blobSize), + uint(i), + batchHeaderHash, + readPermits) + assert.Nil(t, err) + + // Simplify tracking by hijacking the BlobHeaderLength field to store the blob index, + // which is used as a unique identifier within this test. + chunks := &clients.BlobChunks{BlobHeaderLength: blobMetadata.BlobIndex} + retrievalClient.On("RetrieveBlobChunks", + blobMetadata.BatchHeaderHash, + uint32(blobMetadata.BlobIndex), + mock.Anything, + mock.Anything, + mock.Anything).Return(chunks, nil) + retrievalClient.On("CombineChunks", chunks).Return(blobData, nil) + + blobTable.Add(blobMetadata) + } + + // Do a bunch of reads. + expectedTotalReads := uint(readPermits * blobCount) + for i := uint(0); i < expectedTotalReads; i++ { + blobReader.randomRead() + + chainClient.AssertNumberOfCalls(t, "FetchBatchHeader", int(i+1)) + retrievalClient.AssertNumberOfCalls(t, "RetrieveBlobChunks", int(i+1)) + retrievalClient.AssertNumberOfCalls(t, "CombineChunks", int(i+1)) + + remainingPermits := uint(0) + for _, metadata := range blobTable.GetAll() { + remainingPermits += uint(metadata.RemainingReadPermits) + } + assert.Equal(t, remainingPermits, expectedTotalReads-i-1) + + assert.Equal(t, i+1, uint(readerMetrics.GetCount("read_success"))) + assert.Equal(t, i+1, uint(readerMetrics.GetCount("fetch_batch_header_success"))) + assert.Equal(t, i+1, uint(readerMetrics.GetCount("recombination_success"))) + } + + expectedInvalidBlobs := uint(invalidBlobCount * readPermits) + expectedValidBlobs := expectedTotalReads - expectedInvalidBlobs + + assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) + assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) + assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("required_read_pool_size"))) + assert.Equal(t, uint(0), uint(readerMetrics.GetGaugeValue("optional_read_pool_size"))) + + // Table is empty, so doing a random read should have no effect. + blobReader.randomRead() + + // Give the system a moment to attempt to do work. This should not result in any reads. + time.Sleep(time.Second / 10) + assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("read_success"))) + assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("fetch_batch_header_success"))) + assert.Equal(t, expectedTotalReads, uint(readerMetrics.GetCount("recombination_success"))) + assert.Equal(t, expectedValidBlobs, uint(readerMetrics.GetCount("valid_blob"))) + assert.Equal(t, expectedInvalidBlobs, uint(readerMetrics.GetCount("invalid_blob"))) + + cancel() +} diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go index cad394af4..8209b2e6e 100644 --- a/tools/traffic/workers/blob_status_tracker.go +++ b/tools/traffic/workers/blob_status_tracker.go @@ -203,15 +203,14 @@ func (tracker *BlobStatusTracker) getBlobStatus(key *UnconfirmedKey) (*disperser ctxTimeout, cancel := context.WithTimeout(*tracker.ctx, tracker.config.GetBlobStatusTimeout) defer cancel() - status, err := metrics.InvokeAndReportLatency[*disperser.BlobStatusReply](tracker.getStatusLatencyMetric, - func() (*disperser.BlobStatusReply, error) { - return tracker.disperser.GetBlobStatus(ctxTimeout, key.Key) - }) + start := time.Now() + status, err := tracker.disperser.GetBlobStatus(ctxTimeout, key.Key) if err != nil { tracker.getStatusErrorCountMetric.Increment() return nil, err } + tracker.getStatusLatencyMetric.ReportLatency(time.Since(start)) return status, nil } diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 81b6c8a9e..a8a8e71ad 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -111,13 +111,16 @@ func (writer *BlobWriter) writeNextBlob() { writer.logger.Error("failed to get random data", "err", err) return } - key, err := metrics.InvokeAndReportLatency(writer.writeLatencyMetric, func() ([]byte, error) { - return writer.sendRequest(data) - }) + start := time.Now() + key, err := writer.sendRequest(data) if err != nil { writer.writeFailureMetric.Increment() writer.logger.Error("failed to send blob request", "err", err) return + } else { + end := time.Now() + duration := end.Sub(start) + writer.writeLatencyMetric.ReportLatency(duration) } writer.writeSuccessMetric.Increment()