Skip to content

Commit

Permalink
Integrate sets and thread hostname through everything.
Browse files Browse the repository at this point in the history
  • Loading branch information
clin-stripe committed Nov 19, 2018
1 parent 9fd6d5b commit 3251872
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 30 deletions.
5 changes: 2 additions & 3 deletions importsrv/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/stripe/veneur/importsrv"
"github.com/stripe/veneur/metricingester"
"github.com/stripe/veneur/samplers"
"github.com/stripe/veneur/sinks/channel"

"github.com/stripe/veneur/samplers/metricpb"
"github.com/stripe/veneur/sinks/channel"
)

var testE2EFlushingCases = []struct {
Expand Down Expand Up @@ -46,7 +45,7 @@ var testE2EFlushingCases = []struct {
"histo not present",
},
{
pbmetrics(pbset("test", []string{"asf"})),
pbmetrics(pbset("test", []string{"asf", "clin", "aditya"})),
samplers.TMetrics(samplers.TGauge("test", 3)),
"set not present",
},
Expand Down
21 changes: 20 additions & 1 deletion importsrv/testtools_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package importsrv_test

import (
"github.com/axiomhq/hyperloglog"
"github.com/stripe/veneur/samplers/metricpb"
"github.com/stripe/veneur/tdigest"
)
Expand Down Expand Up @@ -79,5 +80,23 @@ func pbhisto(name string, values []float64, opts ...func(m *metricpb.Metric)) *m
}

func pbset(name string, values []string, opts ...func(m *metricpb.Metric)) *metricpb.Metric {
return nil
hll := hyperloglog.New()
for _, s := range values {
hll.Insert([]byte(s))
}

v, err := hll.MarshalBinary()
if err != nil {
panic(err)
}
m := &metricpb.Metric{
Name: name,
Value: &metricpb.Metric_Set{Set: &metricpb.SetValue{HyperLogLog: v}},
Type: metricpb.Type_Set,
}

for _, opt := range opts {
opt(m)
}
return m
}
5 changes: 5 additions & 0 deletions metricingester/aggingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func NewFlushingIngester(
aggregates samplers.Aggregate,
options ...ingesterOption,
) AggregatingIngestor {
if workers < 1 {
panic("more than one worker required")
}

var aggW []aggWorker
for i := 0; i < workers; i++ {
aggW = append(aggW, newAggWorker())
Expand Down Expand Up @@ -97,6 +101,7 @@ func (a AggregatingIngestor) Start() {
}

func (a AggregatingIngestor) Stop() {
// nb: tickers must be explicitly stopped to be GCed.
a.ticker.Stop()
close(a.quit)
for _, w := range a.workers {
Expand Down
22 changes: 15 additions & 7 deletions metricingester/aggworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,30 @@ func (a aggWorker) ingest(m Metric) {
switch m.metricType {
case counter:
if _, present := a.samplers.counters[key]; !present {
a.samplers.counters[key] = samplers.NewCounter(m.name, m.tags)
a.samplers.counters[key] = &samplers.Counter{
Name: m.name,
Hostname: m.hostname,
Tags: m.tags,
}
}
a.samplers.counters[key].Sample(float64(m.countervalue), m.samplerate)
case gauge:
if _, present := a.samplers.gauges[key]; !present {
a.samplers.gauges[key] = samplers.NewGauge(m.name, m.tags)
a.samplers.gauges[key] = &samplers.Gauge{
Name: m.name,
Hostname: m.hostname,
Tags: m.tags,
}
}
a.samplers.gauges[key].Sample(m.gaugevalue, m.samplerate)
case set:
if _, present := a.samplers.sets[key]; !present {
a.samplers.sets[key] = samplers.NewSet(m.name, m.tags)
a.samplers.sets[key] = samplers.NewSet(m.name, m.tags, samplers.OptSetHostname(m.hostname))
}
a.samplers.sets[key].Sample(m.setvalue, m.samplerate)
case histogram:
if _, present := a.samplers.histograms[key]; !present {
a.samplers.histograms[key] = samplers.NewHist(m.name, m.tags)
a.samplers.histograms[key] = samplers.NewHist(m.name, m.tags, samplers.OptHistHostname(m.hostname))
}
a.samplers.histograms[key].Sample(m.histovalue, m.samplerate)
}
Expand All @@ -93,19 +101,19 @@ func (a aggWorker) merge(d Digest) {
case mixedHistoDigest:
key := d.MixedKey()
if _, present := a.samplers.mixedHistograms[key]; !present {
a.samplers.mixedHistograms[key] = samplers.NewMixedHisto(d.name, d.tags)
a.samplers.mixedHistograms[key] = samplers.NewMixedHisto(d.name, d.tags, samplers.OptMixedHistoHostname(d.hostname))
}
a.samplers.mixedHistograms[key].Merge(d.hostname, d.histodigest)
case histoDigest:
key := d.Key()
if _, present := a.samplers.histograms[key]; !present {
a.samplers.histograms[key] = samplers.NewHist(d.name, d.tags)
a.samplers.histograms[key] = samplers.NewHist(d.name, d.tags, samplers.OptHistHostname(d.hostname))
}
a.samplers.histograms[key].Merge(d.histodigest)
case setDigest:
key := d.Key()
if _, present := a.samplers.sets[key]; !present {
a.samplers.sets[key] = samplers.NewSet(d.name, d.tags)
a.samplers.sets[key] = samplers.NewSet(d.name, d.tags, samplers.OptSetHostname(d.hostname))
}
a.samplers.sets[key].Merge(d.setdigest)
}
Expand Down
20 changes: 17 additions & 3 deletions samplers/mixedhisto.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,29 @@ import (
"github.com/stripe/veneur/tdigest"
)

type optMixedHisto func(MixedHisto) MixedHisto

func OptMixedHistoHostname(hn string) optMixedHisto {
return func(histo MixedHisto) MixedHisto {
histo.hostname = hn
return histo
}
}

// NewMixedHisto creates a new mixed histogram.
func NewMixedHisto(name string, tags []string) MixedHisto {
return MixedHisto{
histo: NewHist(name, tags),
func NewMixedHisto(name string, tags []string, opts ...optMixedHisto) MixedHisto {
m := MixedHisto{
min: make(map[string]float64),
max: make(map[string]float64),
weight: make(map[string]float64),
sum: make(map[string]float64),
reciprocalSum: make(map[string]float64),
}
for _, opt := range opts {
m = opt(m)
}
m.histo = NewHist(name, tags)
return m
}

// MixedHisto is a sampler for the MixedScope histogram case.
Expand All @@ -33,6 +46,7 @@ func NewMixedHisto(name string, tags []string) MixedHisto {
// Note that we don't support the "median" aggregate for mixed histograms.
type MixedHisto struct {
histo *Histo
hostname string
min map[string]float64
max map[string]float64
weight map[string]float64
Expand Down
70 changes: 54 additions & 16 deletions samplers/samplers.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ func routeInfo(tags []string) RouteInformation {

// Counter is an accumulator
type Counter struct {
Name string
Tags []string
value int64
Name string
Tags []string
Hostname string
value int64
}

// GetName returns the name of the counter.
Expand All @@ -154,6 +155,7 @@ func (c *Counter) Flush(interval time.Duration) []InterMetric {
Tags: tags,
Type: CounterMetric,
Sinks: routeInfo(tags),
HostName: c.Hostname,
}}
}

Expand Down Expand Up @@ -216,9 +218,10 @@ func NewCounter(Name string, Tags []string) *Counter {

// Gauge retains whatever the last value was.
type Gauge struct {
Name string
Tags []string
value float64
Name string
Tags []string
Hostname string
value float64
}

// Sample takes on whatever value is passed in as a sample.
Expand All @@ -234,6 +237,7 @@ func (g *Gauge) Flush() []InterMetric {
Name: g.Name,
Timestamp: time.Now().Unix(),
Value: float64(g.value),
HostName: g.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: routeInfo(tags),
Expand Down Expand Up @@ -365,9 +369,10 @@ func NewStatusCheck(Name string, Tags []string) *StatusCheck {

// Set is a list of unique values seen.
type Set struct {
Name string
Tags []string
Hll *hyperloglog.Sketch
Name string
Tags []string
Hostname string
Hll *hyperloglog.Sketch
}

// Sample checks if the supplied value has is already in the filter. If not, it increments
Expand All @@ -376,16 +381,28 @@ func (s *Set) Sample(sample string, sampleRate float32) {
s.Hll.Insert([]byte(sample))
}

type setOpt func(*Set)

func OptSetHostname(hn string) setOpt {
return func(s *Set) {
s.Hostname = hn
}
}

// NewSet generates a new Set and returns it
func NewSet(Name string, Tags []string) *Set {
func NewSet(Name string, Tags []string, opts ...setOpt) *Set {
// error is only returned if precision is outside the 4-18 range
// TODO: this is the maximum precision, should it be configurable?
Hll := hyperloglog.New()
return &Set{
s := &Set{
Name: Name,
Tags: Tags,
Hll: Hll,
}
for _, opt := range opts {
opt(s)
}
return s
}

// Flush generates an InterMetric for the state of this Set.
Expand Down Expand Up @@ -465,9 +482,10 @@ func (s *Set) Merge(v *metricpb.SetValue) error {
// Histo is a collection of values that generates max, min, count, and
// percentiles over time.
type Histo struct {
Name string
Tags []string
Value *tdigest.MergingDigest
Name string
Tags []string
Value *tdigest.MergingDigest
Hostname string
// these values are computed from only the samples that came through this
// veneur instance, ignoring any histograms merged from elsewhere
// we separate them because they're easy to aggregate on the backend without
Expand All @@ -493,9 +511,17 @@ func (h *Histo) Sample(sample float64, sampleRate float32) {
h.LocalReciprocalSum += (1 / sample) * weight
}

type histOpt func(*Histo)

func OptHistHostname(hn string) histOpt {
return func(h *Histo) {
h.Hostname = hn
}
}

// NewHist generates a new Histo and returns it.
func NewHist(Name string, Tags []string) *Histo {
return &Histo{
func NewHist(Name string, Tags []string, opts ...histOpt) *Histo {
h := &Histo{
Name: Name,
Tags: Tags,
// we're going to allocate a lot of these, so we don't want them to be huge
Expand All @@ -504,6 +530,10 @@ func NewHist(Name string, Tags []string) *Histo {
LocalMax: math.Inf(-1),
LocalSum: 0,
}
for _, opt := range opts {
opt(h)
}
return h
}

// Flush generates InterMetrics for the current state of the Histo. percentiles
Expand Down Expand Up @@ -541,6 +571,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.max", h.Name),
Timestamp: now,
Value: val,
HostName: h.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: sinks,
Expand All @@ -557,6 +588,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.min", h.Name),
Timestamp: now,
Value: val,
HostName: h.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: sinks,
Expand All @@ -574,6 +606,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.sum", h.Name),
Timestamp: now,
Value: val,
HostName: h.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: sinks,
Expand All @@ -593,6 +626,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.avg", h.Name),
Timestamp: now,
Value: val,
HostName: h.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: sinks,
Expand All @@ -613,6 +647,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.count", h.Name),
Timestamp: now,
Value: val,
HostName: h.Hostname,
Tags: tags,
Type: CounterMetric,
Sinks: sinks,
Expand All @@ -628,6 +663,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.median", h.Name),
Timestamp: now,
Value: float64(h.Value.Quantile(0.5)),
HostName: h.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: sinks,
Expand All @@ -648,6 +684,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.hmean", h.Name),
Timestamp: now,
Value: val,
HostName: h.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: sinks,
Expand All @@ -664,6 +701,7 @@ func (h *Histo) Flush(interval time.Duration, percentiles []float64, aggregates
Name: fmt.Sprintf("%s.%dpercentile", h.Name, int(p*100)),
Timestamp: now,
Value: float64(h.Value.Quantile(p)),
HostName: h.Hostname,
Tags: tags,
Type: GaugeMetric,
Sinks: sinks,
Expand Down

0 comments on commit 3251872

Please sign in to comment.