Skip to content

Commit

Permalink
[PLAT-111995] detect and prevent polluted data return to user (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnyi authored Jul 7, 2024
2 parents 40c085e + 1ec9240 commit e2cca9c
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 3 deletions.
14 changes: 11 additions & 3 deletions pkg/dedup/merge_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,17 @@ func (m *mergedSeriesIterator) Seek(t int64) chunkenc.ValueType {
}
}
currT := it.AtT()
if currT >= t && currT < picked {
picked = currT
m.lastIter = it
if currT >= t {
if currT < picked {
picked = currT
m.lastIter = it
} else if currT == picked {
_, currV := it.At()
_, pickedV := m.lastIter.At()
if currV < pickedV {
m.lastIter = it
}
}
}
}
if picked == math.MaxInt64 {
Expand Down
35 changes: 35 additions & 0 deletions pkg/dedup/merge_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,41 @@ func TestMergedSeriesIterator(t *testing.T) {
},
},
},
{
name: "Avoid corrupt Values",
input: []series{
{
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 23492}, {30000, 3}, {50000, 5}},
}, {
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
}, {
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
},
{
lset: labels.Labels{{Name: "b", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
}, {
lset: labels.Labels{{Name: "b", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
}, {
lset: labels.Labels{{Name: "b", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 1234}, {30000, 3}, {50000, 5}},
},
},
exp: []series{
{
lset: labels.Labels{{Name: "a", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
},
{
lset: labels.Labels{{Name: "b", Value: "5"}, {Name: "c", Value: "6"}},
samples: []sample{{10000, 1}, {20000, 2}, {30000, 3}, {50000, 5}},
},
},
},
{
name: "ignore sampling interval too small",
input: []series{
Expand Down
42 changes: 42 additions & 0 deletions pkg/store/detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"strings"

"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

// So far we found there is a bug in prometheus tsdb code when OOO is enabled.
// This is a workaround to detect the issue when tsdb selects irrelevant matched data.
// See https://github.com/thanos-io/thanos/issues/7481
func detectCorruptLabels(lbls labels.Labels, matchers []storepb.LabelMatcher) bool {
for _, m := range matchers {
v := lbls.Get(m.Name)
if v == "" {
continue
}
if (m.Type == storepb.LabelMatcher_EQ && v != m.Value) ||
(m.Type == storepb.LabelMatcher_NEQ && v == m.Value) {
return true
} else if m.Name == labels.MetricName &&
(m.Type == storepb.LabelMatcher_RE || m.Type == storepb.LabelMatcher_NRE) {
matcher, err := labels.NewFastRegexMatcher(m.Value)
return err != nil ||
(m.Type == storepb.LabelMatcher_RE && !matcher.MatchString(v)) ||
(m.Type == storepb.LabelMatcher_NRE && matcher.MatchString(v))
}
}
return false
}

func requestMatches(matchers []storepb.LabelMatcher) string {
var b strings.Builder
for _, m := range matchers {
b.WriteString(m.String())
}
return b.String()
}
34 changes: 34 additions & 0 deletions pkg/store/detector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

func TestDetectCorruptLabels_EQ(t *testing.T) {
good := labels.New(labels.Label{Name: labels.MetricName, Value: "up"})
bad := labels.New(labels.Label{Name: labels.MetricName, Value: "kube_proxy_corrupt"})
eq := []storepb.LabelMatcher{{Name: "__name__", Type: storepb.LabelMatcher_EQ, Value: "up"}}
neq := []storepb.LabelMatcher{{Name: "__name__", Type: storepb.LabelMatcher_NEQ, Value: "up"}}
assert.False(t, detectCorruptLabels(good, eq))
assert.True(t, detectCorruptLabels(bad, eq))
assert.False(t, detectCorruptLabels(bad, neq))
assert.True(t, detectCorruptLabels(good, neq))
}

func TestDetectCorruptLabels_RE(t *testing.T) {
good := labels.New(labels.Label{Name: labels.MetricName, Value: "usage_database_pool"})
bad := labels.New(labels.Label{Name: labels.MetricName, Value: "kube_proxy_corrupt"})
re := []storepb.LabelMatcher{{Name: "__name__", Type: storepb.LabelMatcher_RE, Value: "usage_.+_pool"}}
nre := []storepb.LabelMatcher{{Name: "__name__", Type: storepb.LabelMatcher_NRE, Value: "usage_.+_pool"}}
assert.False(t, detectCorruptLabels(good, re))
assert.True(t, detectCorruptLabels(bad, re))
assert.False(t, detectCorruptLabels(bad, nre))
assert.True(t, detectCorruptLabels(good, nre))
}
4 changes: 4 additions & 0 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser
if !shardMatcher.MatchesLabels(completeLabelset) {
continue
}
if detectCorruptLabels(completeLabelset, r.Matchers) {
return status.Errorf(codes.DataLoss, "corrupt prometheus tsdb index detected: requesting %s, got unmatched series %s",
requestMatches(r.Matchers), completeLabelset.String())
}

storeSeries := storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(completeLabelset)}
if r.SkipChunks {
Expand Down

0 comments on commit e2cca9c

Please sign in to comment.