Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Upgrade to google.golang.org/grpc v1.66.2 #9401

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
golang.org/x/net v0.29.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
google.golang.org/grpc v1.66.0
google.golang.org/grpc v1.66.2
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -317,7 +317,3 @@ replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-aler

// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked.
replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9

// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
// Following https://github.com/grafana/dskit/pull/581
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
323 changes: 78 additions & 245 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
pushReq := NewParsedRequest(req)
pushReq.AddCleanup(func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})

Expand Down
16 changes: 11 additions & 5 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6078,15 +6078,17 @@ func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQ
}

if len(exemplars) > 0 {
res.Timeseries = append(res.Timeseries, mimirpb.TimeSeries{
Labels: series.Labels,
Exemplars: exemplars,
res.Timeseries = append(res.Timeseries, mimirpb.CustomTimeSeries{
TimeSeries: &mimirpb.TimeSeries{
Labels: series.Labels,
Exemplars: exemplars,
},
})
}
}

// Sort series by labels because the real ingester returns sorted ones.
slices.SortFunc(res.Timeseries, func(a, b mimirpb.TimeSeries) int {
slices.SortFunc(res.Timeseries, func(a, b mimirpb.CustomTimeSeries) int {
aKey := mimirpb.FromLabelAdaptersToKeyString(a.Labels)
bKey := mimirpb.FromLabelAdaptersToKeyString(b.Labels)
return strings.Compare(aKey, bKey)
Expand Down Expand Up @@ -6118,7 +6120,11 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
for _, matchers := range multiMatchers {
for _, ts := range i.timeseries {
if match(ts.Labels, matchers) {
response.Metric = append(response.Metric, &mimirpb.Metric{Labels: ts.Labels})
response.Metric = append(response.Metric, mimirpb.CustomMetric{
Metric: &mimirpb.Metric{
Labels: ts.Labels,
},
})
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func mergeExemplarSets(a, b []mimirpb.Exemplar) []mimirpb.Exemplar {

func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryResponse) *ingester_client.ExemplarQueryResponse {
var keys []string
exemplarResults := make(map[string]mimirpb.TimeSeries)
exemplarResults := make(map[string]mimirpb.CustomTimeSeries)
for _, r := range results {
for _, ts := range r.Timeseries {
lbls := mimirpb.FromLabelAdaptersToKeyString(ts.Labels)
Expand All @@ -193,7 +193,7 @@ func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryRespons
// Query results from each ingester were sorted, but are not necessarily still sorted after merging.
slices.Sort(keys)

result := make([]mimirpb.TimeSeries, len(exemplarResults))
result := make([]mimirpb.CustomTimeSeries, len(exemplarResults))
for i, k := range keys {
result[i] = exemplarResults[k]
}
Expand All @@ -204,7 +204,7 @@ func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryRespons
type ingesterQueryResult struct {
// Why retain the batches rather than build a single slice? We don't need a single slice for each ingester, so building a single slice for each ingester is a waste of time.
chunkseriesBatches [][]ingester_client.TimeSeriesChunk
timeseriesBatches [][]mimirpb.TimeSeries
timeseriesBatches [][]mimirpb.CustomTimeSeries
streamingSeries seriesChunksStream
}

Expand Down Expand Up @@ -266,6 +266,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

defer func() {
resp.FreeBuffer()
}()

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
Expand Down
78 changes: 41 additions & 37 deletions pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,52 +565,52 @@ func TestMergeExemplars(t *testing.T) {
labels2 := []mimirpb.LabelAdapter{{Name: "label1", Value: "foo2"}}

for i, c := range []struct {
seriesA []mimirpb.TimeSeries
seriesB []mimirpb.TimeSeries
expected []mimirpb.TimeSeries
seriesA []mimirpb.CustomTimeSeries
seriesB []mimirpb.CustomTimeSeries
expected []mimirpb.CustomTimeSeries
nonReversible bool
}{
{
seriesA: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}},
seriesB: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}},
expected: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}},
seriesA: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}}},
seriesB: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}}},
expected: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}}},
},
{
seriesA: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}},
seriesB: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}},
expected: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}},
seriesA: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}}},
seriesB: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{}}}},
expected: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}}},
},
{
seriesA: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}},
seriesB: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}},
expected: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}},
seriesA: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}}},
seriesB: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}}},
expected: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1}}}},
},
{
seriesA: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3}}},
seriesB: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar3, exemplar4}}},
expected: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}},
seriesA: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3}}}},
seriesB: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar3, exemplar4}}}},
expected: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}}},
},
{ // Ensure that when there are exemplars with duplicate timestamps, the first one wins.
seriesA: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3}}},
seriesB: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar5, exemplar3, exemplar4}}},
expected: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}},
seriesA: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3}}}},
seriesB: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar5, exemplar3, exemplar4}}}},
expected: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2, exemplar3, exemplar4}}}},
nonReversible: true,
},
{ // Disjoint exemplars on two different series.
seriesA: []mimirpb.TimeSeries{{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}}},
seriesB: []mimirpb.TimeSeries{{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3, exemplar4}}},
expected: []mimirpb.TimeSeries{
{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}},
{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3, exemplar4}}},
seriesA: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}}}},
seriesB: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3, exemplar4}}}},
expected: []mimirpb.CustomTimeSeries{
{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}}},
{TimeSeries: &mimirpb.TimeSeries{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3, exemplar4}}}},
},
{ // Second input adds to first on one series.
seriesA: []mimirpb.TimeSeries{
{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}},
{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3}}},
seriesB: []mimirpb.TimeSeries{{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar4}}},
expected: []mimirpb.TimeSeries{
{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}},
{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3, exemplar4}}},
seriesA: []mimirpb.CustomTimeSeries{
{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}}},
{TimeSeries: &mimirpb.TimeSeries{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3}}}},
seriesB: []mimirpb.CustomTimeSeries{{TimeSeries: &mimirpb.TimeSeries{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar4}}}},
expected: []mimirpb.CustomTimeSeries{
{TimeSeries: &mimirpb.TimeSeries{Labels: labels1, Exemplars: []mimirpb.Exemplar{exemplar1, exemplar2}}},
{TimeSeries: &mimirpb.TimeSeries{Labels: labels2, Exemplars: []mimirpb.Exemplar{exemplar3, exemplar4}}}},
},
} {
t.Run(fmt.Sprint("test", i), func(t *testing.T) {
Expand All @@ -629,19 +629,23 @@ func TestMergeExemplars(t *testing.T) {

func makeExemplarQueryResponse(numSeries int) *ingester_client.ExemplarQueryResponse {
now := time.Now()
ts := make([]mimirpb.TimeSeries, numSeries)
ts := make([]mimirpb.CustomTimeSeries, 0, numSeries)
for i := 0; i < numSeries; i++ {
lbls := labels.NewBuilder(labels.EmptyLabels())
lbls.Set(model.MetricNameLabel, "foo")
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d_%d", i, rand.Intn(10)))
}
ts[i].Labels = mimirpb.FromLabelsToLabelAdapters(lbls.Labels())
ts[i].Exemplars = []mimirpb.Exemplar{{
Labels: []mimirpb.LabelAdapter{{Name: "traceid", Value: "trace1"}},
Value: float64(i),
TimestampMs: now.Add(time.Hour).UnixNano() / int64(time.Millisecond),
}}
ts = append(ts, mimirpb.CustomTimeSeries{
TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(lbls.Labels()),
Exemplars: []mimirpb.Exemplar{{
Labels: []mimirpb.LabelAdapter{{Name: "traceid", Value: "trace1"}},
Value: float64(i),
TimestampMs: now.Add(time.Hour).UnixNano() / int64(time.Millisecond),
}},
},
})
}

return &ingester_client.ExemplarQueryResponse{Timeseries: ts}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/client/chunkcompat.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TimeSeriesChunksToMatrix(from, through model.Time, serieses []TimeSeriesChu
return result, nil
}

func TimeseriesToMatrix(from, through model.Time, series []mimirpb.TimeSeries) (model.Matrix, error) {
func TimeseriesToMatrix(from, through model.Time, series []mimirpb.CustomTimeSeries) (model.Matrix, error) {
if series == nil {
return nil, nil
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/ingester/client/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,44 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"google.golang.org/grpc/mem"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/chunk"
)

var _ mimirpb.UnmarshalerV2 = &QueryStreamResponse{}

func (m *QueryStreamResponse) SetBuffer(buf mem.Buffer) {
for _, ts := range m.Timeseries {
ts.Buffer = buf
}
}

func (m *QueryStreamResponse) FreeBuffer() {
for _, ts := range m.Timeseries {
if ts.Buffer != nil {
ts.Buffer.Free()
}
}
}

var _ mimirpb.UnmarshalerV2 = &ExemplarQueryResponse{}

func (m *ExemplarQueryResponse) SetBuffer(buf mem.Buffer) {
for _, ts := range m.Timeseries {
ts.Buffer = buf
}
}

func (m *ExemplarQueryResponse) FreeBuffer() {
for _, ts := range m.Timeseries {
if ts.Buffer != nil {
ts.Buffer.Free()
}
}
}

func ChunksCount(series []TimeSeriesChunk) int {
if len(series) == 0 {
return 0
Expand Down Expand Up @@ -65,3 +99,19 @@ func ChunkFromMeta(meta chunks.Meta) (Chunk, error) {
func DefaultMetricsMetadataRequest() *MetricsMetadataRequest {
return &MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}
}

var _ mimirpb.UnmarshalerV2 = &MetricsForLabelMatchersResponse{}

func (m *MetricsForLabelMatchersResponse) SetBuffer(buf mem.Buffer) {
for _, metric := range m.Metric {
metric.Buffer = buf
}
}

func (m *MetricsForLabelMatchersResponse) FreeBuffer() {
for _, metric := range m.Metric {
if metric.Buffer != nil {
metric.Buffer.Free()
}
}
}
Loading
Loading