Skip to content

Commit

Permalink
Merge pull request prometheus#14080 from bboreham/faster-exemplars
Browse files Browse the repository at this point in the history
[ENHANCEMENT] TSDB: Faster exemplars
  • Loading branch information
codesome committed May 30, 2024
2 parents 37b408c + 3ee52ab commit e47474d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 49 deletions.
56 changes: 27 additions & 29 deletions tsdb/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (

type CircularExemplarStorage struct {
lock sync.RWMutex
exemplars []*circularBufferEntry
exemplars []circularBufferEntry
nextIndex int
metrics *ExemplarMetrics

Expand Down Expand Up @@ -121,7 +121,7 @@ func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStora
length = 0
}
c := &CircularExemplarStorage{
exemplars: make([]*circularBufferEntry, length),
exemplars: make([]circularBufferEntry, length),
index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries),
metrics: m,
}
Expand Down Expand Up @@ -214,12 +214,12 @@ func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.
// Optimize by moving the lock to be per series (& benchmark it).
ce.lock.RLock()
defer ce.lock.RUnlock()
return ce.validateExemplar(seriesLabels, e, false)
return ce.validateExemplar(ce.index[string(seriesLabels)], e, false)
}

// Not thread safe. The appended parameters tells us whether this is an external validation, or internal
// as a result of an AddExemplar call, in which case we should update any relevant metrics.
func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemplar, appended bool) error {
func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.Exemplar, appended bool) error {
if len(ce.exemplars) == 0 {
return storage.ErrExemplarsDisabled
}
Expand All @@ -239,8 +239,7 @@ func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemp
return err
}

idx, ok := ce.index[string(key)]
if !ok {
if idx == nil {
return nil
}

Expand Down Expand Up @@ -292,7 +291,7 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
oldBuffer := ce.exemplars
oldNextIndex := int64(ce.nextIndex)

ce.exemplars = make([]*circularBufferEntry, l)
ce.exemplars = make([]circularBufferEntry, l)
ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries)
ce.nextIndex = 0

Expand All @@ -311,10 +310,11 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars.
startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer))

var buf [1024]byte
for i := int64(0); i < count; i++ {
idx := (startIndex + i) % int64(len(oldBuffer))
if entry := oldBuffer[idx]; entry != nil {
ce.migrate(entry)
if oldBuffer[idx].ref != nil {
ce.migrate(&oldBuffer[idx], buf[:])
migrated++
}
}
Expand All @@ -328,9 +328,8 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {

// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
// external lock and does not compute metrics.
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
var buf [1024]byte
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:])
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byte) {
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:0])

idx, ok := ce.index[string(seriesLabels)]
if !ok {
Expand All @@ -344,7 +343,7 @@ func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
idx.newest = ce.nextIndex

entry.next = noExemplar
ce.exemplars[ce.nextIndex] = entry
ce.exemplars[ce.nextIndex] = *entry

ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
}
Expand All @@ -362,7 +361,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
ce.lock.Lock()
defer ce.lock.Unlock()

err := ce.validateExemplar(seriesLabels, e, true)
idx, ok := ce.index[string(seriesLabels)]
err := ce.validateExemplar(idx, e, true)
if err != nil {
if errors.Is(err, storage.ErrDuplicateExemplar) {
// Duplicate exemplar, noop.
Expand All @@ -371,34 +371,32 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
return err
}

_, ok := ce.index[string(seriesLabels)]
if !ok {
ce.index[string(seriesLabels)] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
ce.index[string(seriesLabels)] = idx
} else {
ce.exemplars[ce.index[string(seriesLabels)].newest].next = ce.nextIndex
ce.exemplars[idx.newest].next = ce.nextIndex
}

if prev := ce.exemplars[ce.nextIndex]; prev == nil {
ce.exemplars[ce.nextIndex] = &circularBufferEntry{}
} else {
if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil {
// There exists an exemplar already on this ce.nextIndex entry,
// drop it, to make place for others.
var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
if prev.next == noExemplar {
// Last item for this series, remove index entry.
var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
delete(ce.index, string(prevLabels))
} else {
ce.index[string(prevLabels)].oldest = prev.next
prev.ref.oldest = prev.next
}
}

// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
// since this is the first exemplar stored for this series.
ce.exemplars[ce.nextIndex].next = noExemplar
ce.exemplars[ce.nextIndex].exemplar = e
ce.exemplars[ce.nextIndex].ref = ce.index[string(seriesLabels)]
ce.index[string(seriesLabels)].newest = ce.nextIndex
ce.exemplars[ce.nextIndex].ref = idx
idx.newest = ce.nextIndex

ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)

Expand All @@ -416,15 +414,15 @@ func (ce *CircularExemplarStorage) computeMetrics() {
return
}

if next := ce.exemplars[ce.nextIndex]; next != nil {
if ce.exemplars[ce.nextIndex].ref != nil {
ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars)))
ce.metrics.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000)
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[ce.nextIndex].exemplar.Ts) / 1000)
return
}

// We did not yet fill the buffer.
ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex))
if ce.exemplars[0] != nil {
if ce.exemplars[0].ref != nil {
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000)
}
}
Expand All @@ -438,7 +436,7 @@ func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.L
idx := ce.nextIndex
l := len(ce.exemplars)
for i := 0; i < l; i, idx = i+1, (idx+1)%l {
if ce.exemplars[idx] == nil {
if ce.exemplars[idx].ref == nil {
continue
}
err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar)
Expand Down
45 changes: 25 additions & 20 deletions tsdb/exemplar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,27 +415,29 @@ func BenchmarkAddExemplar(b *testing.B) {
// before adding.
exLabels := labels.FromStrings("trace_id", "89620921")

for _, n := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(n), func(b *testing.B) {
for j := 0; j < b.N; j++ {
b.StopTimer()
exs, err := NewCircularExemplarStorage(int64(n), eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
var l labels.Labels
b.StartTimer()

for i := 0; i < n; i++ {
if i%100 == 0 {
l = labels.FromStrings("service", strconv.Itoa(i))
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels})
if err != nil {
require.NoError(b, err)
for _, capacity := range []int{1000, 10000, 100000} {
for _, n := range []int{10000, 100000, 1000000} {
b.Run(fmt.Sprintf("%d/%d", n, capacity), func(b *testing.B) {
for j := 0; j < b.N; j++ {
b.StopTimer()
exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
var l labels.Labels
b.StartTimer()

for i := 0; i < n; i++ {
if i%100 == 0 {
l = labels.FromStrings("service", strconv.Itoa(i))
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels})
if err != nil {
require.NoError(b, err)
}
}
}
}
})
})
}
}
}

Expand Down Expand Up @@ -480,8 +482,11 @@ func BenchmarkResizeExemplars(b *testing.B) {
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)

var l labels.Labels
for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ {
l := labels.FromStrings("service", strconv.Itoa(i))
if i%100 == 0 {
l = labels.FromStrings("service", strconv.Itoa(i))
}

err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)})
if err != nil {
Expand Down

0 comments on commit e47474d

Please sign in to comment.