diff --git a/importsrv/server_test.go b/importsrv/server_test.go index 42e9fc9de..184fb8b69 100644 --- a/importsrv/server_test.go +++ b/importsrv/server_test.go @@ -1,76 +1,237 @@ -package importsrv +package importsrv_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/stripe/veneur/sinks/channel" + + "github.com/stripe/veneur/metricingester" + "github.com/stripe/veneur/samplers" + + "github.com/stretchr/testify/require" + "github.com/stripe/veneur/forwardrpc" + "github.com/stripe/veneur/importsrv" + "github.com/stripe/veneur/samplers/metricpb" +) + +var testE2EFlushingCases = []struct { + in []*metricpb.Metric + out []result + msg string +}{ + + // basic test cases + + { + pbmetrics(pbcounter("test", 5)), + results(rcounter("test", 5)), + "counter not present", + }, + { + pbmetrics(pbgauge("test", 100.5)), + results(rgauge("test", 100.5)), + "gauge not present", + }, + { + pbmetrics(pbhisto("test", []float64{1, 2, 3})), + results(rgauge("test.min", 1), rgauge("test.max", 3), rcounter("test.count", 3)), + "histo not present", + }, + { + pbmetrics(pbset("test", []string{"asf"})), + results(rgauge("test", 3)), + "set not present", + }, + + // basic tests that things are aggregated correctly + + { + pbmetrics(pbcounter("test", 5), pbcounter("test", 10), pbcounter("test2", 3)), + results(rcounter("test", 15), rcounter("test2", 3)), + "counters not aggregated correctly", + }, + { + pbmetrics(pbgauge("test", 5), pbgauge("test", 10)), + results(rgauge("test", 10)), + "gauge not aggregated correctly", + }, + { + pbmetrics(pbset("test", []string{"hey", "hey2"}), pbset("test", []string{"hey3", "hey4"})), + results(rgauge("test", 4)), + "sets not aggregated correctly", + }, + { + pbmetrics(pbcounter("test", 5), pbcounter("test", 10)), + results(rcounter("test", 15)), + "counters not aggregated correctly", + }, + { + pbmetrics(pbcounter("test", 3), pbgauge("test", 4), pbset("test", []string{"a", "b"})), + results(rcounter("test", 3), rgauge("test", 4), rgauge("test", 2)), + "different types not aggregated separately", + }, + + // test special aggregation rules + + { + pbmetrics( + pbcounter("test", 5, pbtags("a:b", "c:d")), + pbcounter("test", 3, pbtags("c:d", "a:b")), + ), + results(rcounter("test", 5, tags("a:b", "c:d"))), + "out of order tags don't aggregate together", + }, + { + pbmetrics( + pbcounter("test", 5, pbhostname("a")), + pbcounter("test", 5, pbhostname("a")), + pbcounter("test", 3, pbhostname("b")), + pbcounter("test", 3, pbhostname("")), + ), + results( + rcounter("test", 10, rhostname("a")), + rcounter("test", 3, rhostname("b")), + rcounter("test", 3, rhostname("")), + ), + "hostnames not being aggregated separately", + }, + { + pbmetrics( + pbcounter("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Local)), + pbcounter("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Global)), + pbcounter("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Mixed)), + pbcounter("test", 3, pbhostname("a")), + ), + results( + rcounter("test", 18, rhostname("a")), + ), + "scope field should be ignored for counter types", + }, + { + pbmetrics( + pbgauge("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Local)), + pbgauge("test", 6, pbhostname("a"), pbscope(metricpb.Scope_Global)), + pbgauge("test", 7, pbhostname("a"), pbscope(metricpb.Scope_Mixed)), + pbgauge("test", 3, pbhostname("a")), + ), + results( + rgauge("test", 3, rhostname("a")), + ), + "scope field should be ignored for gauge types", + }, + + // mixed histogram fun + + { + pbmetrics( + pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")), + ), + results( + rgauge("test.min", 1, rhostname("a")), + rgauge("test.max", 3, rhostname("a")), + rgauge("test.count", 3, rhostname("a")), + rgauge("test.p99", 3), + ), + "mixed histos not reporting host level aggregates", + }, + { + pbmetrics( + pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")), + pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_Mixed), pbhostname("b")), + ), + results( + rgauge("test.min", 1, rhostname("a")), + rgauge("test.max", 3, rhostname("a")), + rgauge("test.count", 3, rhostname("a")), + rgauge("test.min", 4, rhostname("b")), + rgauge("test.max", 6, rhostname("b")), + rgauge("test.count", 3, rhostname("b")), + rgauge("test.p99", 6), + ), + "mixed histos not reporting host level aggregates", + }, + { + pbmetrics(), + results(), + "empty metrics proto failed", + }, + + // sink tests + + // crazy random "real world" tests +} + +func toResults(ms []samplers.InterMetric) (outs []result) { + for _, inm := range ms { + outs = append(outs, result{ + Name: inm.Name, + Tags: strings.Join(inm.Tags, ","), + Value: inm.Value, + Type: inm.Type, + Message: inm.Message, + Hostname: inm.HostName, + Sinks: inm.Sinks, + }) + } + return outs +} + +// TestE2EFlushingIngester tests the integration of the import endpoint with +// the flushing ingester. +func TestE2EFlushingIngester(t *testing.T) { + for _, tc := range testE2EFlushingCases { + t.Run(tc.msg, func(t *testing.T) { + t.Parallel() + + // SETUP TEST + + // this sink collects the results + rc := make(chan []samplers.InterMetric, 20) + cms, _ := channel.NewChannelMetricSink(rc) + + flushc := make(chan time.Time) + workers := 5 + ing := metricingester.NewFlushingIngester( + workers, // we want to test the parallel case + 1, // this field is practically meaningless since we override the flush channel + []metricingester.Sink{cms}, + metricingester.FlushChan(flushc), // override the flush ticker channel so we control when flush + ) + s := importsrv.New(ing) + defer s.Stop() + + // EXECUTE THE TEST + + ing.Start() + defer ing.Stop() + _, err := s.SendMetrics(context.Background(), &forwardrpc.MetricList{Metrics: tc.in}) + require.NoError(t, err) + flushc <- time.Now() + + // COLLECT RESULTS + + var results []samplers.InterMetric + for i := 0; i < workers; i++ { + select { + case result := <-rc: + results = append(results, result...) + case <-time.After(time.Second): + t.Fatal("took too long to flush metric results") + } + } + + // ASSERT + + assert.ElementsMatch(t, tc.out, toResults(results), tc.msg) + }) + } +} -// TODO(clin): Add tests back. -// -//import ( -// "context" -// "fmt" -// "math/rand" -// "testing" -// "time" -// -// "github.com/stretchr/testify/assert" -// "github.com/stripe/veneur/forwardrpc" -// "github.com/stripe/veneur/samplers/metricpb" -// metrictest "github.com/stripe/veneur/samplers/metricpb/testutils" -// "github.com/stripe/veneur/trace" -//) -// -//type testMetricIngester struct { -// metrics []*metricpb.Metric -//} -// -//func (mi *testMetricIngester) IngestMetrics(ms []*metricpb.Metric) { -// mi.metrics = append(mi.metrics, ms...) -//} -// -//func (mi *testMetricIngester) clear() { -// mi.metrics = mi.metrics[:0] -//} -// -//// Test that sending the same metric to a Veneur results in it being hashed -//// to the same worker every time -//func TestSendMetrics_ConsistentHash(t *testing.T) { -// ingesters := []*testMetricIngester{&testMetricIngester{}, &testMetricIngester{}} -// -// casted := make([]MetricIngester, len(ingesters)) -// for i, ingester := range ingesters { -// casted[i] = ingester -// } -// s := New(casted) -// -// inputs := []*metricpb.Metric{ -// &metricpb.Metric{Name: "test.counter", Type: metricpb.Type_Counter, Tags: []string{"tag:1"}}, -// &metricpb.Metric{Name: "test.gauge", Type: metricpb.Type_Gauge}, -// &metricpb.Metric{Name: "test.histogram", Type: metricpb.Type_Histogram, Tags: []string{"type:histogram"}}, -// &metricpb.Metric{Name: "test.set", Type: metricpb.Type_Set}, -// &metricpb.Metric{Name: "test.gauge3", Type: metricpb.Type_Gauge}, -// } -// -// // Send the same inputs many times -// for i := 0; i < 10; i++ { -// s.SendMetrics(context.Background(), &forwardrpc.MetricList{inputs}) -// -// assert.Equal(t, []*metricpb.Metric{inputs[0], inputs[4]}, -// ingesters[0].metrics, "Ingester 0 has the wrong metrics") -// assert.Equal(t, []*metricpb.Metric{inputs[1], inputs[2], inputs[3]}, -// ingesters[1].metrics, "Ingester 1 has the wrong metrics") -// -// for _, ingester := range ingesters { -// ingester.clear() -// } -// } -//} -// -//func TestSendMetrics_Empty(t *testing.T) { -// ingester := &testMetricIngester{} -// s := New([]MetricIngester{ingester}) -// s.SendMetrics(context.Background(), &forwardrpc.MetricList{}) -// -// assert.Empty(t, ingester.metrics, "The server shouldn't have submitted "+ -// "any metrics") -//} -// //func TestOptions_WithTraceClient(t *testing.T) { // c, err := trace.NewClient(trace.DefaultVeneurAddress) // if err != nil { @@ -82,38 +243,6 @@ package importsrv // "set the trace client") //} // -//type noopChannelMetricIngester struct { -// in chan []*metricpb.Metric -// quit chan struct{} -//} -// -//func newNoopChannelMetricIngester() *noopChannelMetricIngester { -// return &noopChannelMetricIngester{ -// in: make(chan []*metricpb.Metric), -// quit: make(chan struct{}), -// } -//} -// -//func (mi *noopChannelMetricIngester) start() { -// go func() { -// for { -// select { -// case <-mi.in: -// case <-mi.quit: -// return -// } -// } -// }() -//} -// -//func (mi *noopChannelMetricIngester) stop() { -// mi.quit <- struct{}{} -//} -// -//func (mi *noopChannelMetricIngester) IngestMetrics(ms []*metricpb.Metric) { -// mi.in <- ms -//} -// //func BenchmarkImportServerSendMetrics(b *testing.B) { // rand.Seed(time.Now().Unix()) // diff --git a/importsrv/testtools_test.go b/importsrv/testtools_test.go new file mode 100644 index 000000000..68e2f688e --- /dev/null +++ b/importsrv/testtools_test.go @@ -0,0 +1,156 @@ +package importsrv_test + +import ( + "strings" + + "github.com/stripe/veneur/samplers" + "github.com/stripe/veneur/samplers/metricpb" +) + +// +// PROTOBUF +// + +func pbmetrics(ms ...*metricpb.Metric) []*metricpb.Metric { + return ms +} + +// options + +func pbtags(tags ...string) func(m *metricpb.Metric) { + return func(m *metricpb.Metric) { + m.Tags = tags + } +} + +func pbhostname(hostname string) func(m *metricpb.Metric) { + return func(m *metricpb.Metric) { + m.Hostname = hostname + } +} + +func pbscope(scope metricpb.Scope) func(m *metricpb.Metric) { + return func(m *metricpb.Metric) { + m.Scope = scope + } +} + +// metric types + +func pbcounter(name string, value int64, opts ...func(m *metricpb.Metric)) *metricpb.Metric { + c := &metricpb.Metric{ + Name: name, + Value: &metricpb.Metric_Counter{Counter: &metricpb.CounterValue{Value: value}}, + Type: metricpb.Type_Counter, + } + + for _, opt := range opts { + opt(c) + } + return c +} + +func pbgauge(name string, value float64, opts ...func(m *metricpb.Metric)) *metricpb.Metric { + c := &metricpb.Metric{ + Name: name, + Value: &metricpb.Metric_Gauge{Gauge: &metricpb.GaugeValue{Value: value}}, + Type: metricpb.Type_Gauge, + } + + for _, opt := range opts { + opt(c) + } + return c +} + +func pbhisto(name string, values []float64, opts ...func(m *metricpb.Metric)) *metricpb.Metric { + return nil +} + +func pbset(name string, values []string, opts ...func(m *metricpb.Metric)) *metricpb.Metric { + return nil +} + +// +// RESULTS +// + +type result struct { + Name string + Tags string + Value float64 + Hostname string + Type samplers.MetricType + Message string + Sinks samplers.RouteInformation +} + +func results(rs ...result) []result { + return rs +} + +func rcounter(name string, value float64, opts ...func(r result) result) result { + r := result{ + Name: name, + Value: value, + Type: samplers.CounterMetric, + } + for _, opt := range opts { + r = opt(r) + } + return r +} + +func rgauge(name string, value float64, opts ...func(r result) result) result { + r := result{ + Name: name, + Value: value, + Type: samplers.GaugeMetric, + } + for _, opt := range opts { + r = opt(r) + } + return r +} + +func rstatus(name string, value float64, opts ...func(r result) result) result { + r := result{ + Name: name, + Value: value, + Type: samplers.StatusMetric, + } + for _, opt := range opts { + r = opt(r) + } + return r +} + +// opts + +func tags(ts ...string) func(r result) result { + return func(r result) result { + r.Tags = strings.Join(ts, ",") + return r + } +} + +func rhostname(hn string) func(r result) result { + return func(r result) result { + r.Hostname = hn + return r + } +} + +func message(msg string) func(r result) result { + return func(r result) result { + r.Message = msg + return r + } +} + +func sinks(ri samplers.RouteInformation) func(r result) result { + return func(r result) result { + r.Sinks = ri + return r + } +} diff --git a/metricingester/aggregate_worker.go b/metricingester/aggregate_worker.go index 338cea0da..170f1b8b8 100644 --- a/metricingester/aggregate_worker.go +++ b/metricingester/aggregate_worker.go @@ -12,6 +12,15 @@ type aggWorker struct { flush chan chan<- samplerEnvelope } +func newAggWorker() aggWorker { + return aggWorker{ + samplers: newSamplerEnvelope(), + inC: make(chan Metric), + mergeC: make(chan Digest), + flush: make(chan chan<- samplerEnvelope), + } +} + func (a aggWorker) Start() { go func() { for { diff --git a/metricingester/aggregating_ingestor.go b/metricingester/aggregating_ingestor.go index 3e636520f..17a8800e8 100644 --- a/metricingester/aggregating_ingestor.go +++ b/metricingester/aggregating_ingestor.go @@ -1,12 +1,56 @@ package metricingester -import "time" +import ( + "context" + "time" +) type AggregatingIngestor struct { - workers []aggWorker - flusher func(samplerEnvelope) error - interval time.Duration - quit chan struct{} + workers []aggWorker + flusher flusher + ticker *time.Ticker + tickerC <-chan time.Time + quit chan struct{} +} + +type flusher interface { + Flush(ctx context.Context, envelope samplerEnvelope) +} + +type ingesterOption func(AggregatingIngestor) AggregatingIngestor + +// Override the ticker channel that triggers flushing. Useful for testing. +func FlushChan(tckr <-chan time.Time) ingesterOption { + return func(option AggregatingIngestor) AggregatingIngestor { + option.tickerC = tckr + return option + } +} + +// NewFlushingIngester creates an ingester that flushes metrics to the specified sinks. +func NewFlushingIngester( + workers int, + interval time.Duration, + sinks []Sink, + options ...ingesterOption, +) AggregatingIngestor { + var aggW []aggWorker + for i := 0; i < workers; i++ { + aggW = append(aggW, newAggWorker()) + } + + t := time.NewTicker(interval) + ing := AggregatingIngestor{ + workers: aggW, + flusher: newSinkFlusher(sinks), + ticker: t, + tickerC: t.C, + quit: make(chan struct{}), + } + for _, opt := range options { + ing = opt(ing) + } + return ing } // TODO(clin): This needs to take ctx. @@ -23,11 +67,14 @@ func (a AggregatingIngestor) Merge(d Digest) error { } func (a AggregatingIngestor) Start() { + for _, w := range a.workers { + w.Start() + } + go func() { - ticker := time.NewTicker(a.interval) for { select { - case <-ticker.C: + case <-a.tickerC: a.flush() case <-a.quit: return @@ -37,11 +84,17 @@ func (a AggregatingIngestor) Start() { } func (a AggregatingIngestor) Stop() { + for _, w := range a.workers { + w.Stop() + } + a.ticker.Stop() close(a.quit) } func (a AggregatingIngestor) flush() { for _, w := range a.workers { - go a.flusher(w.Flush()) + go func() { + a.flusher.Flush(context.Background(), w.Flush()) + }() } } diff --git a/metricingester/central_flusher.go b/metricingester/central_flusher.go index efe73eebe..a501540cf 100644 --- a/metricingester/central_flusher.go +++ b/metricingester/central_flusher.go @@ -5,26 +5,45 @@ import ( "sync" "time" - "github.com/sirupsen/logrus" "github.com/stripe/veneur/samplers" - "github.com/stripe/veneur/trace" ) type sinkFlusher struct { aggregates samplers.HistogramAggregates percentiles []float64 - sinks []sink + sinks []Sink +} + +type sinkFlusherOpt func(sinkFlusher) sinkFlusher + +func optPercentiles(ps []float64) sinkFlusherOpt { + return func(f sinkFlusher) sinkFlusher { + f.percentiles = ps + return f + } +} - log *logrus.Logger - trace trace.Client +func optAggregates(as samplers.HistogramAggregates) sinkFlusherOpt { + return func(f sinkFlusher) sinkFlusher { + f.aggregates = as + return f + } +} + +func newSinkFlusher(sinks []Sink, opts ...sinkFlusherOpt) sinkFlusher { + sf := sinkFlusher{sinks: sinks} + for _, opt := range opts { + sf = opt(sf) + } + return sf } -type sink interface { +type Sink interface { Name() string Flush(context.Context, []samplers.InterMetric) error } -func (s sinkFlusher) Flush(ctx context.Context, envelope samplerEnvelope) error { +func (s sinkFlusher) Flush(ctx context.Context, envelope samplerEnvelope) { metrics := make([]samplers.InterMetric, 0, countMetrics(envelope)) // get metrics from envelope for _, sampler := range envelope.counters { @@ -44,23 +63,23 @@ func (s sinkFlusher) Flush(ctx context.Context, envelope samplerEnvelope) error } if len(metrics) == 0 { - return nil + return } // TODO(clin): Add back metrics once we finalize the metrics client pull request. wg := sync.WaitGroup{} for _, sinkInstance := range s.sinks { wg.Add(1) - go func(ms sink) { + go func(ms Sink) { err := ms.Flush(ctx, metrics) if err != nil { - s.log.WithError(err).WithField("sink", ms.Name()).Warn("Error flushing sink") + //s.log.WithError(err).WithField("Sink", ms.Name()).Warn("Error flushing Sink") } wg.Done() }(sinkInstance) } wg.Wait() - return nil + return } func countMetrics(samplers samplerEnvelope) (count int) {