Skip to content

Commit

Permalink
Add support for a mixedglobal scope to facilitate migrating leaf vene…
Browse files Browse the repository at this point in the history
…urs.
  • Loading branch information
clin-stripe committed Nov 20, 2018
1 parent 3251872 commit 8d6d344
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 61 deletions.
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package veneur

import "github.com/stripe/veneur/metricingester"

type Config struct {
Aggregates []string `yaml:"aggregates"`
AwsAccessKeyID string `yaml:"aws_access_key_id"`
Expand Down Expand Up @@ -95,4 +97,6 @@ type Config struct {
TraceLightstepNumClients int `yaml:"trace_lightstep_num_clients"`
TraceLightstepReconnectPeriod string `yaml:"trace_lightstep_reconnect_period"`
TraceMaxLengthBytes int `yaml:"trace_max_length_bytes"`

AdditionalMetricSinks []metricingester.Sink
}
2 changes: 1 addition & 1 deletion forward_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestE2EForwardingGRPCMetrics(t *testing.T) {
}

assert.ElementsMatch(t, expectedNames, actualNames,
"The global Veneur didn't flush the right metrics")
"The global Veneur didn't flush the right metrics.\nEXPECTED: %v\nACTUAL: %v", expectedNames, actualNames)
close(done)
}()
ff.local.Flush(context.TODO())
Expand Down
19 changes: 15 additions & 4 deletions importsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,25 @@ func (s *Server) SendMetrics(ctx context.Context, mlist *forwardrpc.MetricList)
// min/max/count values locally.
//
// We need a special metric type to represent this: the MixedHistogram.
//
// We also need to distinguish between the behavior before all metrics were forwarded
// to the behavior after. The before case is represented by Scope_Mixed, which makes that host's
// "min/max/count" not get emitted by the central veneur, since the local veneur is emitting it
// still. Scoped_MixedGlobal makes the central veneur emit the min/max/count.
//
// TODO(clin): After we completely migrate to the new veneur, delete support for this latter distinction!
switch m.GetScope() {
case metricpb.Scope_Local, metricpb.Scope_Global:
case metricpb.Scope_Mixed:
s.ingester.Merge(
metricingester.NewHistogramDigest(m.Name, v.Histogram, tags, hostname),
metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname, false),
)
case metricpb.Scope_Mixed:
case metricpb.Scope_MixedGlobal:
s.ingester.Merge(
metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname),
metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname, true),
)
case metricpb.Scope_Local, metricpb.Scope_Global:
s.ingester.Merge(
metricingester.NewHistogramDigest(m.Name, v.Histogram, tags, hostname),
)
}
case nil:
Expand Down
39 changes: 34 additions & 5 deletions importsrv/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,23 @@ var testE2EFlushingCases = []struct {
"gauge not present",
},
{
pbmetrics(pbhisto("test", []float64{1, 2, 3})),
pbmetrics(pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_MixedGlobal))),
samplers.TMetrics(
samplers.TGauge("test.min", 1),
samplers.TGauge("test.max", 3),
samplers.TGauge("test.count", 3),
samplers.TGauge("test.50percentile", 2),
samplers.TGauge("test.95percentile", 2.925),
),
"histo not present",
"mixed global histo not present",
},
{
pbmetrics(pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed))),
samplers.TMetrics(
samplers.TGauge("test.50percentile", 2),
samplers.TGauge("test.95percentile", 2.925),
),
"mixed histo not correct",
},
{
pbmetrics(pbset("test", []string{"asf", "clin", "aditya"})),
Expand Down Expand Up @@ -131,7 +139,7 @@ var testE2EFlushingCases = []struct {

{
pbmetrics(
pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")),
pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_MixedGlobal), pbhostname("a")),
),
samplers.TMetrics(
samplers.TGauge("test.min", 1, samplers.OptHostname("a")),
Expand All @@ -140,12 +148,22 @@ var testE2EFlushingCases = []struct {
samplers.TGauge("test.50percentile", 2),
samplers.TGauge("test.95percentile", 2.925),
),
"mixed histos not reporting host level aggregates for one host",
"global mixed histos not reporting host level aggregates for one host",
},
{
pbmetrics(
pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")),
pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_Mixed), pbhostname("b")),
),
samplers.TMetrics(
samplers.TGauge("test.50percentile", 2),
samplers.TGauge("test.95percentile", 2.925),
),
"mixed histos should not report host metrics",
},
{
pbmetrics(
pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_MixedGlobal), pbhostname("a")),
pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_MixedGlobal), pbhostname("b")),
),
samplers.TMetrics(
samplers.TGauge("test.min", 1, samplers.OptHostname("a")),
Expand All @@ -157,6 +175,17 @@ var testE2EFlushingCases = []struct {
samplers.TGauge("test.50percentile", 3.5),
samplers.TGauge("test.95percentile", 5.85),
),
"global mixed histos not reporting host level aggregates for two hosts",
},
{
pbmetrics(
pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")),
pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_Mixed), pbhostname("b")),
),
samplers.TMetrics(
samplers.TGauge("test.50percentile", 3.5),
samplers.TGauge("test.95percentile", 5.85),
),
"mixed histos not reporting host level aggregates for two hosts",
},

Expand Down
6 changes: 1 addition & 5 deletions metricingester/aggingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ func NewFlushingIngester(
aggregates samplers.Aggregate,
options ...ingesterOption,
) AggregatingIngestor {
if workers < 1 {
panic("more than one worker required")
}

var aggW []aggWorker
for i := 0; i < workers; i++ {
aggW = append(aggW, newAggWorker())
Expand Down Expand Up @@ -74,7 +70,7 @@ func (a AggregatingIngestor) Ingest(m Metric) error {

func (a AggregatingIngestor) Merge(d Digest) error {
var workerid metricHash
if d.digestType == mixedHistoDigest {
if d.digestType == mixedHistoDigest || d.digestType == mixedGlobalHistoDigest {
workerid = d.MixedHash() % metricHash(len(a.workers))
} else {
workerid = d.Hash() % metricHash(len(a.workers))
Expand Down
3 changes: 3 additions & 0 deletions metricingester/aggworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (a aggWorker) merge(d Digest) {
a.samplers.mixedHistograms[key] = samplers.NewMixedHisto(d.name, d.tags, samplers.OptMixedHistoHostname(d.hostname))
}
a.samplers.mixedHistograms[key].Merge(d.hostname, d.histodigest)
if d.flushMixed {
a.samplers.mixedHosts[d.hostname] = struct{}{}
}
case histoDigest:
key := d.Key()
if _, present := a.samplers.histograms[key]; !present {
Expand Down
2 changes: 1 addition & 1 deletion metricingester/sinkflusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s sinkFlusher) Flush(ctx context.Context, envelope samplerEnvelope) {
metrics = append(metrics, sampler.Flush(time.Second, s.percentiles, s.aggregates, true)...)
}
for _, sampler := range envelope.mixedHistograms {
metrics = append(metrics, sampler.Flush(s.percentiles, s.aggregates)...)
metrics = append(metrics, sampler.Flush(s.percentiles, s.aggregates, envelope.mixedHosts)...)
}

if len(metrics) == 0 {
Expand Down
25 changes: 25 additions & 0 deletions metricingester/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type digestType int
const (
histoDigest digestType = iota + 1
mixedHistoDigest
mixedGlobalHistoDigest
setDigest
)

Expand All @@ -109,6 +110,8 @@ type Digest struct {

histodigest *metricpb.HistogramValue
setdigest *metricpb.SetValue

flushMixed bool
}

// TODO(clin): Ideally merge this with Hash when we choose an algorithm with sufficiently
Expand Down Expand Up @@ -164,6 +167,7 @@ func NewMixedHistogramDigest(
digest *metricpb.HistogramValue,
tags []string,
hostname string,
flushMixed bool,
) Digest {
sort.Sort(sort.StringSlice(tags))
return Digest{
Expand All @@ -172,6 +176,23 @@ func NewMixedHistogramDigest(
digestType: mixedHistoDigest,
hostname: hostname,
histodigest: digest,
flushMixed: flushMixed,
}
}

func NewMixedGlobalHistogramDigest(
name string,
digest *metricpb.HistogramValue,
tags []string,
hostname string,
) Digest {
sort.Sort(sort.StringSlice(tags))
return Digest{
name: name,
tags: tags,
digestType: mixedGlobalHistoDigest,
hostname: hostname,
histodigest: digest,
}
}

Expand Down Expand Up @@ -214,6 +235,8 @@ type samplerEnvelope struct {
histograms map[metricKey]*samplers.Histo
mixedHistograms map[metricKey]samplers.MixedHisto
sets map[metricKey]*samplers.Set

mixedHosts map[string]struct{}
}

func newSamplerEnvelope() samplerEnvelope {
Expand All @@ -223,5 +246,7 @@ func newSamplerEnvelope() samplerEnvelope {
histograms: make(map[metricKey]*samplers.Histo),
mixedHistograms: make(map[metricKey]samplers.MixedHisto),
sets: make(map[metricKey]*samplers.Set),

mixedHosts: make(map[string]struct{}),
}
}
73 changes: 38 additions & 35 deletions samplers/metricpb/metric.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions samplers/metricpb/metric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum Scope {
Mixed = 0;
Local = 1;
Global = 2;
MixedGlobal = 3;
}

// Type can be any of the valid metric types recognized by Veneur.
Expand Down
5 changes: 4 additions & 1 deletion samplers/mixedhisto.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (m MixedHisto) Sample(sample float64, sampleRate float32, host string) {
}

// Flush returns metrics for the specified percentiles and aggregates.
func (m MixedHisto) Flush(percentiles []float64, aggregates HistogramAggregates) []InterMetric {
func (m MixedHisto) Flush(percentiles []float64, aggregates HistogramAggregates, mixedHosts map[string]struct{}) []InterMetric {
ms := m.histo.Flush(0, percentiles, HistogramAggregates{}, false)

// doesn't support median! Would require implementing separated digests.
Expand All @@ -84,6 +84,9 @@ func (m MixedHisto) Flush(percentiles []float64, aggregates HistogramAggregates)
}
}
for host, _ := range m.max {
if _, ok := mixedHosts[host]; !ok {
continue
}
if (aggregates.Value & AggregateMax) != 0 {
ms = append(ms, metric("max", m.max[host], host))
}
Expand Down
8 changes: 6 additions & 2 deletions samplers/mixedhisto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ func testSample(ps []float64, aggs Aggregate, inms []sampleCase, ts []TestMetric
return func(t *testing.T) {
t.Parallel()
mh := NewMixedHisto("test", nil)
mixedServers := make(map[string]struct{})
for _, inm := range inms {
mh.Sample(inm.val, 1, inm.host)
mixedServers[inm.host] = struct{}{}
}

results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0}))
results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0}, mixedServers))
assert.ElementsMatch(
t,
results,
Expand Down Expand Up @@ -283,11 +285,13 @@ func testMerge(ps []float64, aggs Aggregate, mergeCase []mergeCase, ts []TestMet
return func(t *testing.T) {
t.Parallel()
mh := NewMixedHisto("test", nil)
mixedServers := make(map[string]struct{})
for _, c := range mergeCase {
mh.Merge(c.host, histvalue(c.samples))
mixedServers[c.host] = struct{}{}
}

results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0}))
results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0}, mixedServers))
assert.ElementsMatch(
t,
results,
Expand Down
Loading

0 comments on commit 8d6d344

Please sign in to comment.