Skip to content

Commit

Permalink
Fix OTLP native histogram push (cortexproject#6135)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 committed Aug 5, 2024
1 parent b2ce96c commit ce9c4ba
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* [FEATURE] Ruler: Support sending native histogram samples to Ingester. #6029
* [FEATURE] Ruler: Add support for filtering out alerts in ListRules API. #6011
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 #6135
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [FEATURE] Store Gateway: Token bucket limiter. #6016
Expand Down
2 changes: 1 addition & 1 deletion integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func createDataPointsGauge(newMetric pmetric.Metric, attributes map[string]any,
}

func createDataPointsExponentialHistogram(newMetric pmetric.Metric, attributes map[string]any, histograms []prompb.Histogram) {
newMetric.SetEmptyExponentialHistogram()
newMetric.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
for _, h := range histograms {
datapoint := newMetric.ExponentialHistogram().DataPoints().AppendEmpty()
datapoint.SetTimestamp(pcommon.Timestamp(h.Timestamp * time.Millisecond.Nanoseconds()))
Expand Down
2 changes: 1 addition & 1 deletion integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestOTLP(t *testing.T) {

i := rand.Uint32()
histogramSeries := e2e.GenerateHistogramSeries("histogram_series", now, i, false, prompb.Label{Name: "job", Value: "test"})
res, err = c.Push(histogramSeries)
res, err = c.OTLP(histogramSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

Expand Down
39 changes: 38 additions & 1 deletion pkg/cortexpb/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,41 @@

package cortexpb

import "github.com/prometheus/prometheus/model/histogram"
import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/prompb"
)

func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}

// HistogramPromProtoToHistogramProto converts a prometheus protobuf Histogram to cortex protobuf Histogram.
func HistogramPromProtoToHistogramProto(h prompb.Histogram) Histogram {
ph := Histogram{
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
NegativeSpans: spansPromProtoToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeDeltas,
NegativeCounts: h.NegativeCounts,
PositiveSpans: spansPromProtoToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveDeltas,
PositiveCounts: h.PositiveCounts,
ResetHint: Histogram_ResetHint(h.ResetHint),
TimestampMs: h.Timestamp,
}
if h.IsFloatHistogram() {
ph.Count = &Histogram_CountFloat{CountFloat: h.GetCountFloat()}
ph.ZeroCount = &Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()}
} else {
ph.Count = &Histogram_CountInt{CountInt: h.GetCountInt()}
ph.ZeroCount = &Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}
}
return ph
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
Expand Down Expand Up @@ -118,3 +146,12 @@ func spansToSpansProto(s []histogram.Span) []BucketSpan {

return spans
}

func spansPromProtoToSpansProto(s []prompb.BucketSpan) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}
15 changes: 12 additions & 3 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
tsList := []cortexpb.PreallocTimeseries(nil)
for _, v := range promConverter.TimeSeries() {
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
Labels: makeLabels(v.Labels),
Samples: makeSamples(v.Samples),
Exemplars: makeExemplars(v.Exemplars),
Labels: makeLabels(v.Labels),
Samples: makeSamples(v.Samples),
Exemplars: makeExemplars(v.Exemplars),
Histograms: makeHistograms(v.Histograms),
}})
}
prwReq.Timeseries = tsList
Expand Down Expand Up @@ -112,6 +113,14 @@ func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar {
return out
}

func makeHistograms(in []prompb.Histogram) []cortexpb.Histogram {
out := make([]cortexpb.Histogram, 0, len(in))
for _, h := range in {
out = append(out, cortexpb.HistogramPromProtoToHistogramProto(h))
}
return out
}

func convertToMetricsAttributes(md pmetric.Metrics) pmetric.Metrics {
cloneMd := pmetric.NewMetrics()
md.CopyTo(cloneMd)
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func verifyOTLPWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequ
// TODO: test more things
assert.Equal(t, expectSource, request.Source)
assert.False(t, request.SkipLabelNameValidation)
for _, ts := range request.Timeseries {
assert.NotEmpty(t, ts.Labels)
// Make sure at least one of sample, exemplar or histogram is set.
assert.True(t, len(ts.Samples) > 0 || len(ts.Exemplars) > 0 || len(ts.Histograms) > 0)
}
return &cortexpb.WriteResponse{}, nil
}
}

0 comments on commit ce9c4ba

Please sign in to comment.