diff --git a/disperser/dataapi/metrics_handlers.go b/disperser/dataapi/metrics_handlers.go index 5f18ba422..9b6c68dcb 100644 --- a/disperser/dataapi/metrics_handlers.go +++ b/disperser/dataapi/metrics_handlers.go @@ -11,8 +11,9 @@ import ( ) const ( - avgThroughputWindowSize = 120 // The time window (in seconds) to calculate the data throughput. maxWorkersGetOperatorState = 10 // The maximum number of workers to use when querying operator state. + defaultThroughputRateSecs = 240 // 4m rate is used for < 7d window to match $__rate_interval + sevenDayThroughputRateSecs = 660 // 11m rate is used for >= 7d window to match $__rate_interval ) func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) (*Metric, error) { @@ -79,7 +80,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64) } func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*Throughput, error) { - result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), avgThroughputWindowSize) + throughputRateSecs := uint16(defaultThroughputRateSecs) + if end-start >= 7*24*60*60 { + throughputRateSecs = uint16(sevenDayThroughputRateSecs) + } + result, err := s.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(start, 0), time.Unix(end, 0), throughputRateSecs) if err != nil { return nil, err } @@ -89,7 +94,7 @@ func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]* } throughputs := make([]*Throughput, 0) - for i := avgThroughputWindowSize; i < len(result.Values); i++ { + for i := throughputRateSecs; i < uint16(len(result.Values)); i++ { v := result.Values[i] throughputs = append(throughputs, &Throughput{ Timestamp: uint64(v.Timestamp.Unix()), diff --git a/disperser/dataapi/prometheus_client.go b/disperser/dataapi/prometheus_client.go index 0d9db872f..939cbc099 100644 --- a/disperser/dataapi/prometheus_client.go +++ b/disperser/dataapi/prometheus_client.go @@ -11,14 +11,14 @@ import ( const ( // maxNumOfDataPoints is the maximum number of data points that can be queried from Prometheus based on latency that this API can provide - maxNumOfDataPoints = 3500 - throughputRateWindowInSec = 60 + maxNumOfDataPoints = 3500 + minRateWindowSecs = 120 ) type ( PrometheusClient interface { QueryDisperserBlobSizeBytesPerSecond(ctx context.Context, start time.Time, end time.Time) (*PrometheusResult, error) - QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint8) (*PrometheusResult, error) + QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint16) (*PrometheusResult, error) } PrometheusResultValues struct { @@ -47,12 +47,12 @@ func (pc *prometheusClient) QueryDisperserBlobSizeBytesPerSecond(ctx context.Con return pc.queryRange(ctx, query, start, end) } -func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint8) (*PrometheusResult, error) { - if windowSizeInSec < throughputRateWindowInSec { - windowSizeInSec = throughputRateWindowInSec +func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, throughputRateSecs uint16) (*PrometheusResult, error) { + if throughputRateSecs < minRateWindowSecs { + throughputRateSecs = minRateWindowSecs } - query := fmt.Sprintf("sum by (job) (rate(eigenda_batcher_blobs_total{state=\"confirmed\",data=\"size\",cluster=\"%s\"}[%ds]))", pc.cluster, windowSizeInSec) + query := fmt.Sprintf("sum by (job) (rate(eigenda_batcher_blobs_total{state=\"confirmed\",data=\"size\",cluster=\"%s\"}[%ds]))", pc.cluster, throughputRateSecs) return pc.queryRange(ctx, query, start, end) } diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index fc3fd5117..e7b09c5ea 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -323,10 +323,10 @@ func TestFetchMetricsThroughputHandler(t *testing.T) { } assert.Equal(t, http.StatusOK, res.StatusCode) - assert.Equal(t, 3481, len(response)) - assert.Equal(t, float64(11666.666666666666), response[0].Throughput) - assert.Equal(t, uint64(1701292800), response[0].Timestamp) - assert.Equal(t, float64(3.599722666666646e+07), totalThroughput) + assert.Equal(t, 3361, len(response)) + assert.Equal(t, float64(12000), response[0].Throughput) + assert.Equal(t, uint64(1701292920), response[0].Timestamp) + assert.Equal(t, float64(3.503022666666651e+07), totalThroughput) } func TestEjectOperatorHandler(t *testing.T) {