Skip to content

Commit

Permalink
Support ingesting native histograms in Ruler appender (cortexproject#…
Browse files Browse the repository at this point in the history
…6029)

* support ingesting native histogram samples in Ruler pusher

Signed-off-by: Ben Ye <[email protected]>

* handle evaluation delay for native histograms

Signed-off-by: Ben Ye <[email protected]>

* fix unit test

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jun 19, 2024
1 parent a19b867 commit d83f7b1
Show file tree
Hide file tree
Showing 5 changed files with 405 additions and 56 deletions.
9 changes: 9 additions & 0 deletions pkg/cortexpb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMe
return req
}

func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms []Histogram) {
for i := 0; i < len(lbls); i++ {
ts := TimeseriesFromPool()
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
ts.Histograms = append(ts.Histograms, histograms[i])
w.Timeseries = append(w.Timeseries, PreallocTimeseries{TimeSeries: ts})
}
}

// FromLabelAdaptersToLabels casts []LabelAdapter to labels.Labels.
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
// This allows us to use labels.Labels directly in protos.
Expand Down
16 changes: 9 additions & 7 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
)

var (
expectedTimeseries = 100
expectedLabels = 20
expectedSamplesPerSeries = 10
expectedExemplarsPerSeries = 1
expectedTimeseries = 100
expectedLabels = 20
expectedSamplesPerSeries = 10
expectedExemplarsPerSeries = 1
expectedHistogramsPerSeries = 1

/*
We cannot pool these as pointer-to-slice because the place we use them is in WriteRequest which is generated from Protobuf
Expand All @@ -31,9 +32,10 @@ var (
timeSeriesPool = sync.Pool{
New: func() interface{} {
return &TimeSeries{
Labels: make([]LabelAdapter, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries),
Labels: make([]LabelAdapter, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries),
Histograms: make([]Histogram, 0, expectedHistogramsPerSeries),
}
},
}
Expand Down
35 changes: 31 additions & 4 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,34 @@ type PusherAppender struct {
pusher Pusher
labels []labels.Labels
samples []cortexpb.Sample
histogramLabels []labels.Labels
histograms []cortexpb.Histogram
userID string
evaluationDelay time.Duration
}

func (a *PusherAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, errors.New("querying native histograms is not supported")
func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h == nil && fh == nil {
return 0, errors.New("no histogram")
}

if h != nil {
// A histogram sample is considered stale if its sum is set to NaN.
// https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346
if a.evaluationDelay > 0 && (value.IsStaleNaN(h.Sum)) {
t -= a.evaluationDelay.Milliseconds()
}
a.histograms = append(a.histograms, cortexpb.HistogramToHistogramProto(t, h))
} else {
// A histogram sample is considered stale if its sum is set to NaN.
// https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346
if a.evaluationDelay > 0 && (value.IsStaleNaN(fh.Sum)) {
t -= a.evaluationDelay.Milliseconds()
}
a.histograms = append(a.histograms, cortexpb.FloatHistogramToHistogramProto(t, fh))
}
a.histogramLabels = append(a.histogramLabels, l)
return 0, nil
}

func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
Expand Down Expand Up @@ -85,10 +107,11 @@ func (a *PusherAppender) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _
func (a *PusherAppender) Commit() error {
a.totalWrites.Inc()

req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)
req.AddHistogramTimeSeries(a.histogramLabels, a.histograms)
// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE))

_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)
if err != nil {
// Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.)
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code/100 != 4 {
Expand All @@ -98,6 +121,8 @@ func (a *PusherAppender) Commit() error {

a.labels = nil
a.samples = nil
a.histogramLabels = nil
a.histograms = nil
return err
}

Expand All @@ -108,6 +133,8 @@ func (a *PusherAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _
func (a *PusherAppender) Rollback() error {
a.labels = nil
a.samples = nil
a.histogramLabels = nil
a.histograms = nil
return nil
}

Expand Down
Loading

0 comments on commit d83f7b1

Please sign in to comment.