Skip to content

Commit

Permalink
Add a testing framework and an end to end test. Add wiring until test…
Browse files Browse the repository at this point in the history
… compiles.
  • Loading branch information
clin-stripe committed Nov 10, 2018
1 parent 4e50592 commit 1678923
Show file tree
Hide file tree
Showing 5 changed files with 489 additions and 123 deletions.
337 changes: 233 additions & 104 deletions importsrv/server_test.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,237 @@
package importsrv
package importsrv_test

import (
"context"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stripe/veneur/sinks/channel"

"github.com/stripe/veneur/metricingester"
"github.com/stripe/veneur/samplers"

"github.com/stretchr/testify/require"
"github.com/stripe/veneur/forwardrpc"
"github.com/stripe/veneur/importsrv"
"github.com/stripe/veneur/samplers/metricpb"
)

var testE2EFlushingCases = []struct {
in []*metricpb.Metric
out []result
msg string
}{

// basic test cases

{
pbmetrics(pbcounter("test", 5)),
results(rcounter("test", 5)),
"counter not present",
},
{
pbmetrics(pbgauge("test", 100.5)),
results(rgauge("test", 100.5)),
"gauge not present",
},
{
pbmetrics(pbhisto("test", []float64{1, 2, 3})),
results(rgauge("test.min", 1), rgauge("test.max", 3), rcounter("test.count", 3)),
"histo not present",
},
{
pbmetrics(pbset("test", []string{"asf"})),
results(rgauge("test", 3)),
"set not present",
},

// basic tests that things are aggregated correctly

{
pbmetrics(pbcounter("test", 5), pbcounter("test", 10), pbcounter("test2", 3)),
results(rcounter("test", 15), rcounter("test2", 3)),
"counters not aggregated correctly",
},
{
pbmetrics(pbgauge("test", 5), pbgauge("test", 10)),
results(rgauge("test", 10)),
"gauge not aggregated correctly",
},
{
pbmetrics(pbset("test", []string{"hey", "hey2"}), pbset("test", []string{"hey3", "hey4"})),
results(rgauge("test", 4)),
"sets not aggregated correctly",
},
{
pbmetrics(pbcounter("test", 5), pbcounter("test", 10)),
results(rcounter("test", 15)),
"counters not aggregated correctly",
},
{
pbmetrics(pbcounter("test", 3), pbgauge("test", 4), pbset("test", []string{"a", "b"})),
results(rcounter("test", 3), rgauge("test", 4), rgauge("test", 2)),
"different types not aggregated separately",
},

// test special aggregation rules

{
pbmetrics(
pbcounter("test", 5, pbtags("a:b", "c:d")),
pbcounter("test", 3, pbtags("c:d", "a:b")),
),
results(rcounter("test", 5, tags("a:b", "c:d"))),
"out of order tags don't aggregate together",
},
{
pbmetrics(
pbcounter("test", 5, pbhostname("a")),
pbcounter("test", 5, pbhostname("a")),
pbcounter("test", 3, pbhostname("b")),
pbcounter("test", 3, pbhostname("")),
),
results(
rcounter("test", 10, rhostname("a")),
rcounter("test", 3, rhostname("b")),
rcounter("test", 3, rhostname("")),
),
"hostnames not being aggregated separately",
},
{
pbmetrics(
pbcounter("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Local)),
pbcounter("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Global)),
pbcounter("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Mixed)),
pbcounter("test", 3, pbhostname("a")),
),
results(
rcounter("test", 18, rhostname("a")),
),
"scope field should be ignored for counter types",
},
{
pbmetrics(
pbgauge("test", 5, pbhostname("a"), pbscope(metricpb.Scope_Local)),
pbgauge("test", 6, pbhostname("a"), pbscope(metricpb.Scope_Global)),
pbgauge("test", 7, pbhostname("a"), pbscope(metricpb.Scope_Mixed)),
pbgauge("test", 3, pbhostname("a")),
),
results(
rgauge("test", 3, rhostname("a")),
),
"scope field should be ignored for gauge types",
},

// mixed histogram fun

{
pbmetrics(
pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")),
),
results(
rgauge("test.min", 1, rhostname("a")),
rgauge("test.max", 3, rhostname("a")),
rgauge("test.count", 3, rhostname("a")),
rgauge("test.p99", 3),
),
"mixed histos not reporting host level aggregates",
},
{
pbmetrics(
pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")),
pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_Mixed), pbhostname("b")),
),
results(
rgauge("test.min", 1, rhostname("a")),
rgauge("test.max", 3, rhostname("a")),
rgauge("test.count", 3, rhostname("a")),
rgauge("test.min", 4, rhostname("b")),
rgauge("test.max", 6, rhostname("b")),
rgauge("test.count", 3, rhostname("b")),
rgauge("test.p99", 6),
),
"mixed histos not reporting host level aggregates",
},
{
pbmetrics(),
results(),
"empty metrics proto failed",
},

// sink tests

// crazy random "real world" tests
}

func toResults(ms []samplers.InterMetric) (outs []result) {
for _, inm := range ms {
outs = append(outs, result{
Name: inm.Name,
Tags: strings.Join(inm.Tags, ","),
Value: inm.Value,
Type: inm.Type,
Message: inm.Message,
Hostname: inm.HostName,
Sinks: inm.Sinks,
})
}
return outs
}

// TestE2EFlushingIngester tests the integration of the import endpoint with
// the flushing ingester.
func TestE2EFlushingIngester(t *testing.T) {
for _, tc := range testE2EFlushingCases {
t.Run(tc.msg, func(t *testing.T) {
t.Parallel()

// SETUP TEST

// this sink collects the results
rc := make(chan []samplers.InterMetric, 20)
cms, _ := channel.NewChannelMetricSink(rc)

flushc := make(chan time.Time)
workers := 5
ing := metricingester.NewFlushingIngester(
workers, // we want to test the parallel case
1, // this field is practically meaningless since we override the flush channel
[]metricingester.Sink{cms},
metricingester.FlushChan(flushc), // override the flush ticker channel so we control when flush
)
s := importsrv.New(ing)
defer s.Stop()

// EXECUTE THE TEST

ing.Start()
defer ing.Stop()
_, err := s.SendMetrics(context.Background(), &forwardrpc.MetricList{Metrics: tc.in})
require.NoError(t, err)
flushc <- time.Now()

// COLLECT RESULTS

var results []samplers.InterMetric
for i := 0; i < workers; i++ {
select {
case result := <-rc:
results = append(results, result...)
case <-time.After(time.Second):
t.Fatal("took too long to flush metric results")
}
}

// ASSERT

assert.ElementsMatch(t, tc.out, toResults(results), tc.msg)
})
}
}

// TODO(clin): Add tests back.
//
//import (
// "context"
// "fmt"
// "math/rand"
// "testing"
// "time"
//
// "github.com/stretchr/testify/assert"
// "github.com/stripe/veneur/forwardrpc"
// "github.com/stripe/veneur/samplers/metricpb"
// metrictest "github.com/stripe/veneur/samplers/metricpb/testutils"
// "github.com/stripe/veneur/trace"
//)
//
//type testMetricIngester struct {
// metrics []*metricpb.Metric
//}
//
//func (mi *testMetricIngester) IngestMetrics(ms []*metricpb.Metric) {
// mi.metrics = append(mi.metrics, ms...)
//}
//
//func (mi *testMetricIngester) clear() {
// mi.metrics = mi.metrics[:0]
//}
//
//// Test that sending the same metric to a Veneur results in it being hashed
//// to the same worker every time
//func TestSendMetrics_ConsistentHash(t *testing.T) {
// ingesters := []*testMetricIngester{&testMetricIngester{}, &testMetricIngester{}}
//
// casted := make([]MetricIngester, len(ingesters))
// for i, ingester := range ingesters {
// casted[i] = ingester
// }
// s := New(casted)
//
// inputs := []*metricpb.Metric{
// &metricpb.Metric{Name: "test.counter", Type: metricpb.Type_Counter, Tags: []string{"tag:1"}},
// &metricpb.Metric{Name: "test.gauge", Type: metricpb.Type_Gauge},
// &metricpb.Metric{Name: "test.histogram", Type: metricpb.Type_Histogram, Tags: []string{"type:histogram"}},
// &metricpb.Metric{Name: "test.set", Type: metricpb.Type_Set},
// &metricpb.Metric{Name: "test.gauge3", Type: metricpb.Type_Gauge},
// }
//
// // Send the same inputs many times
// for i := 0; i < 10; i++ {
// s.SendMetrics(context.Background(), &forwardrpc.MetricList{inputs})
//
// assert.Equal(t, []*metricpb.Metric{inputs[0], inputs[4]},
// ingesters[0].metrics, "Ingester 0 has the wrong metrics")
// assert.Equal(t, []*metricpb.Metric{inputs[1], inputs[2], inputs[3]},
// ingesters[1].metrics, "Ingester 1 has the wrong metrics")
//
// for _, ingester := range ingesters {
// ingester.clear()
// }
// }
//}
//
//func TestSendMetrics_Empty(t *testing.T) {
// ingester := &testMetricIngester{}
// s := New([]MetricIngester{ingester})
// s.SendMetrics(context.Background(), &forwardrpc.MetricList{})
//
// assert.Empty(t, ingester.metrics, "The server shouldn't have submitted "+
// "any metrics")
//}
//
//func TestOptions_WithTraceClient(t *testing.T) {
// c, err := trace.NewClient(trace.DefaultVeneurAddress)
// if err != nil {
Expand All @@ -82,38 +243,6 @@ package importsrv
// "set the trace client")
//}
//
//type noopChannelMetricIngester struct {
// in chan []*metricpb.Metric
// quit chan struct{}
//}
//
//func newNoopChannelMetricIngester() *noopChannelMetricIngester {
// return &noopChannelMetricIngester{
// in: make(chan []*metricpb.Metric),
// quit: make(chan struct{}),
// }
//}
//
//func (mi *noopChannelMetricIngester) start() {
// go func() {
// for {
// select {
// case <-mi.in:
// case <-mi.quit:
// return
// }
// }
// }()
//}
//
//func (mi *noopChannelMetricIngester) stop() {
// mi.quit <- struct{}{}
//}
//
//func (mi *noopChannelMetricIngester) IngestMetrics(ms []*metricpb.Metric) {
// mi.in <- ms
//}
//
//func BenchmarkImportServerSendMetrics(b *testing.B) {
// rand.Seed(time.Now().Unix())
//
Expand Down
Loading

0 comments on commit 1678923

Please sign in to comment.