diff --git a/CHANGELOG.md b/CHANGELOG.md index edb3d0c5c3..1b97cd3b3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ * [FEATURE] Ruler: Support sending native histogram samples to Ingester. #6029 * [FEATURE] Ruler: Add support for filtering out alerts in ListRules API. #6011 * [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005 -* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 +* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 #6135 * [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081 * [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104 * [FEATURE] Store Gateway: Token bucket limiter. #6016 diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 5f45d6d59f..b6ea5fb7f9 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -173,7 +173,7 @@ func createDataPointsGauge(newMetric pmetric.Metric, attributes map[string]any, } func createDataPointsExponentialHistogram(newMetric pmetric.Metric, attributes map[string]any, histograms []prompb.Histogram) { - newMetric.SetEmptyExponentialHistogram() + newMetric.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) for _, h := range histograms { datapoint := newMetric.ExponentialHistogram().DataPoints().AppendEmpty() datapoint.SetTimestamp(pcommon.Timestamp(h.Timestamp * time.Millisecond.Nanoseconds())) diff --git a/integration/otlp_test.go b/integration/otlp_test.go index 0d56c3751b..9a603e8cb0 100644 --- a/integration/otlp_test.go +++ b/integration/otlp_test.go @@ -77,7 +77,7 @@ func TestOTLP(t *testing.T) { i := rand.Uint32() histogramSeries := e2e.GenerateHistogramSeries("histogram_series", now, i, false, prompb.Label{Name: "job", Value: "test"}) - res, err = c.Push(histogramSeries) + res, err = c.OTLP(histogramSeries) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) diff --git a/pkg/cortexpb/histograms.go b/pkg/cortexpb/histograms.go index 2e2afef457..60e7207a19 100644 --- a/pkg/cortexpb/histograms.go +++ b/pkg/cortexpb/histograms.go @@ -13,13 +13,41 @@ package cortexpb -import "github.com/prometheus/prometheus/model/histogram" +import ( + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/prompb" +) func (h Histogram) IsFloatHistogram() bool { _, ok := h.GetCount().(*Histogram_CountFloat) return ok } +// HistogramPromProtoToHistogramProto converts a prometheus protobuf Histogram to cortex protobuf Histogram. +func HistogramPromProtoToHistogramProto(h prompb.Histogram) Histogram { + ph := Histogram{ + Sum: h.Sum, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + NegativeSpans: spansPromProtoToSpansProto(h.NegativeSpans), + NegativeDeltas: h.NegativeDeltas, + NegativeCounts: h.NegativeCounts, + PositiveSpans: spansPromProtoToSpansProto(h.PositiveSpans), + PositiveDeltas: h.PositiveDeltas, + PositiveCounts: h.PositiveCounts, + ResetHint: Histogram_ResetHint(h.ResetHint), + TimestampMs: h.Timestamp, + } + if h.IsFloatHistogram() { + ph.Count = &Histogram_CountFloat{CountFloat: h.GetCountFloat()} + ph.ZeroCount = &Histogram_ZeroCountFloat{ZeroCountFloat: h.GetZeroCountFloat()} + } else { + ph.Count = &Histogram_CountInt{CountInt: h.GetCountInt()} + ph.ZeroCount = &Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()} + } + return ph +} + // HistogramProtoToHistogram extracts a (normal integer) Histogram from the // provided proto message. The caller has to make sure that the proto message // represents an interger histogram and not a float histogram. @@ -118,3 +146,12 @@ func spansToSpansProto(s []histogram.Span) []BucketSpan { return spans } + +func spansPromProtoToSpansProto(s []prompb.BucketSpan) []BucketSpan { + spans := make([]BucketSpan, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index b3bae4dd4e..14c3c98e5c 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -58,9 +58,10 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle tsList := []cortexpb.PreallocTimeseries(nil) for _, v := range promConverter.TimeSeries() { tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{ - Labels: makeLabels(v.Labels), - Samples: makeSamples(v.Samples), - Exemplars: makeExemplars(v.Exemplars), + Labels: makeLabels(v.Labels), + Samples: makeSamples(v.Samples), + Exemplars: makeExemplars(v.Exemplars), + Histograms: makeHistograms(v.Histograms), }}) } prwReq.Timeseries = tsList @@ -112,6 +113,14 @@ func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar { return out } +func makeHistograms(in []prompb.Histogram) []cortexpb.Histogram { + out := make([]cortexpb.Histogram, 0, len(in)) + for _, h := range in { + out = append(out, cortexpb.HistogramPromProtoToHistogramProto(h)) + } + return out +} + func convertToMetricsAttributes(md pmetric.Metrics) pmetric.Metrics { cloneMd := pmetric.NewMetrics() md.CopyTo(cloneMd) diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index 641fc2c566..be7e1f79b9 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -124,6 +124,11 @@ func verifyOTLPWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequ // TODO: test more things assert.Equal(t, expectSource, request.Source) assert.False(t, request.SkipLabelNameValidation) + for _, ts := range request.Timeseries { + assert.NotEmpty(t, ts.Labels) + // Make sure at least one of sample, exemplar or histogram is set. + assert.True(t, len(ts.Samples) > 0 || len(ts.Exemplars) > 0 || len(ts.Histograms) > 0) + } return &cortexpb.WriteResponse{}, nil } }