diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 2271120ad..000000000 --- a/.travis.yml +++ /dev/null @@ -1,51 +0,0 @@ -language: go -go: - - "1.11" - - "1.12" - - tip - -install: - - wget https://github.com/google/protobuf/releases/download/v3.1.0/protoc-3.1.0-linux-x86_64.zip - - unzip protoc-3.1.0-linux-x86_64.zip -d /tmp - - sudo cp /tmp/bin/protoc /usr/bin/protoc - - sudo chmod 777 /usr/bin/protoc - - rm protoc-3.1.0-linux-x86_64.zip - # After a new major version hits stable, replace this link - # It is only used for gofmt - - wget https://storage.googleapis.com/golang/go1.11.linux-amd64.tar.gz - - tar -C /tmp -xvf go1.11.linux-amd64.tar.gz go/bin/gofmt - - sudo mv /tmp/go/bin/gofmt /usr/bin/gofmt - - rm go1.11.linux-amd64.tar.gz - -before_script: - - go get -u github.com/ChimeraCoder/gojson/gojson - - go get -d -v github.com/gogo/protobuf/protoc-gen-gogofaster - - pushd $GOPATH/src/github.com/gogo/protobuf - - git fetch - - git checkout v1.2.1 - - popd - - go install github.com/gogo/protobuf/protoc-gen-gogofaster - - curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh - - go get -u golang.org/x/tools/cmd/stringer - - pushd $GOPATH/src/golang.org/x/tools/cmd/stringer - - git checkout 25101aadb97aa42907eee6a238d6d26a6cb3c756 - - go install - - popd - - - go generate - # We need to ignore changes to this one file - # because the go-generated version will have a different - # fileDescriptor0 depending on which version of Go was used - # to build protoc-gen-go. - - git checkout ssf/sample.pb.go - - dep check - - mv vendor ../ && /usr/bin/gofmt -w . && mv ../vendor . - - # See the corresponding lines in the Dockerfile for an explanation - # of this logic - - git add . - - git diff --cached - - git diff-index --cached --exit-code HEAD - -script: - - go test -race -v -timeout 60s ./... diff --git a/CHANGELOG.md b/CHANGELOG.md index 45f76a3d0..4757c5f8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ * A flush "watchdog" controlled by the config setting `flush_watchdog_missed_flushes`. If veneur has not started this many flushes, the watchdog panics and terminates veneur (so it can be restarted by process supervision). Thanks, [antifuchs](https://github.com/antifuchs)! * Splunk sink: Trace-related IDs are now represented in hexadecimal for cross-tool compatibility and a small byte savings. Thanks, [myndzi](https://github.com/myndzi) * Splunk sink: Indicator spans are now tagged with `"partial":true` when they would otherwise have been sampled, distinguishing between partial and full traces. Thanks, [myndzi](https://github.com/myndzi) +* New configuration options `veneur_metrics_scopes` and `veneur_metrics_additional_tags`, which allow configuring veneur such that it aggregates its own metrics globally (rather than reporting a set of internal metrics per instance/container/etc). Thanks, [antifuchs](https://github.com/antifuchs)! +* New SSF `sample` field: `scope`. This field lets clients tell Veneur what to do with the sample - it corresponds exactly to the `veneurglobalonly` and `veneurlocalonly` tags that metrics can hold. Thanks, [antifuchs](https://github.com/antifuchs)! ## Updated diff --git a/config.go b/config.go index 43ee76695..be319bf1f 100644 --- a/config.go +++ b/config.go @@ -98,7 +98,15 @@ type Config struct { TraceLightstepNumClients int `yaml:"trace_lightstep_num_clients"` TraceLightstepReconnectPeriod string `yaml:"trace_lightstep_reconnect_period"` TraceMaxLengthBytes int `yaml:"trace_max_length_bytes"` - XrayAddress string `yaml:"xray_address"` - XrayAnnotationTags []string `yaml:"xray_annotation_tags"` - XraySamplePercentage int `yaml:"xray_sample_percentage"` + VeneurMetricsAdditionalTags []string `yaml:"veneur_metrics_additional_tags"` + VeneurMetricsScopes struct { + Counter string `yaml:"counter"` + Gauge string `yaml:"gauge"` + Histogram string `yaml:"histogram"` + Set string `yaml:"set"` + Status string `yaml:"status"` + } `yaml:"veneur_metrics_scopes"` + XrayAddress string `yaml:"xray_address"` + XrayAnnotationTags []string `yaml:"xray_annotation_tags"` + XraySamplePercentage int `yaml:"xray_sample_percentage"` } diff --git a/consul_discovery_test.go b/consul_discovery_test.go index 235081b5d..6c9d25992 100644 --- a/consul_discovery_test.go +++ b/consul_discovery_test.go @@ -18,7 +18,7 @@ type ConsulOneRoundTripper struct { func (rt *ConsulOneRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { rec := httptest.NewRecorder() if strings.HasPrefix(req.URL.Path, "/v1/health/service/") { - resp, _ := ioutil.ReadFile("fixtures/consul/health_service_one.json") + resp, _ := ioutil.ReadFile("testdata/consul/health_service_one.json") rec.Write(resp) rec.Code = http.StatusOK rt.HealthGotCalled = true @@ -38,12 +38,12 @@ func (rt *ConsulChangingRoundTripper) RoundTrip(req *http.Request) (*http.Respon var resp []byte if rt.Count == 2 { // On the second invocation, return zero hosts! - resp, _ = ioutil.ReadFile("fixtures/consul/health_service_zero.json") + resp, _ = ioutil.ReadFile("testdata/consul/health_service_zero.json") } else if rt.Count == 1 { // On the second invocation, return two hosts! - resp, _ = ioutil.ReadFile("fixtures/consul/health_service_two.json") + resp, _ = ioutil.ReadFile("testdata/consul/health_service_two.json") } else { - resp, _ = ioutil.ReadFile("fixtures/consul/health_service_one.json") + resp, _ = ioutil.ReadFile("testdata/consul/health_service_one.json") } rec.Write(resp) rec.Code = http.StatusOK @@ -52,7 +52,7 @@ func (rt *ConsulChangingRoundTripper) RoundTrip(req *http.Request) (*http.Respon } else if req.URL.Path == "/v1/health/service/traceServiceName" { // These don't count. Since we make different calls, we'll return some junk // for tracing and leave forwarding to it's own thing. - resp, _ := ioutil.ReadFile("fixtures/consul/health_service_one.json") + resp, _ := ioutil.ReadFile("testdata/consul/health_service_one.json") rec.Write(resp) rec.Code = http.StatusOK } diff --git a/docs/development.md b/docs/development.md index a6d0d0190..4209404ec 100644 --- a/docs/development.md +++ b/docs/development.md @@ -41,3 +41,9 @@ concerning the run time of this process to veneur. `veneur-emit` has a bunch more options, check out the usage for it with `go run ./cmd/veneur-emit/main.go -help`! + +## Adding test data files + +When your tests depend on data files, put them into a `testdata` +directory next to the test file, and use relative paths to refer to +them. diff --git a/example.yaml b/example.yaml index 3a4e1cdca..49f474421 100644 --- a/example.yaml +++ b/example.yaml @@ -128,6 +128,41 @@ aggregates: - "max" - "count" +# Metrics that Veneur reports about its own operation. Each of the +# entries here can have the value "global", "local", "default" and "" +# ("default" and "" mean the same thing). Setting +# this to any value other than the default will make all metrics +# of that type have the following behavior: +# +# - "default"/"": scope remains unchanged +# - "global": scope for "default"-scoped metrics of that type will be +# changed to global, so they get forwarded to a global veneur node. +# - "local": scope for "default"-scoped metrics of that type will be +# changed to local, so they get reported from the local veneur node +# only. +# +# When this is unset in configuration, the default values for all +# metric types are "", indicating that veneur will use the default +# scope for each of the metrics it reports. +veneur_metrics_scopes: + counter: local + + # changing the setting for "gauge" to "global" is not recommended, + # as the global aggregation method for gauges is "last write wins". + gauge: local + + histogram: global + + set: global + + status: local + + +# Tags supplied here will be attached to all metrics that veneur +# reports about its own operation. +veneur_metrics_additional_tags: + - "veneur_internal_metric:true" + # == DEPRECATED == # This configuration has been replaced by datadog_flush_max_per_body. diff --git a/forward_grpc_test.go b/forward_grpc_test.go index 05e093c8a..745b5bea4 100644 --- a/forward_grpc_test.go +++ b/forward_grpc_test.go @@ -192,9 +192,10 @@ func TestE2EForwardingGRPCMetrics(t *testing.T) { } done := make(chan struct{}) go func() { - metrics := <-ch + defer close(done) - expectedNames := []string{ + expected := map[string]bool{} + for _, name := range []string{ testGRPCMetric("histogram.50percentile"), testGRPCMetric("histogram.75percentile"), testGRPCMetric("histogram.99percentile"), @@ -216,17 +217,34 @@ func TestE2EForwardingGRPCMetrics(t *testing.T) { testGRPCMetric("counter"), testGRPCMetric("gauge"), testGRPCMetric("set"), + } { + expected[name] = false } - actualNames := make([]string, len(metrics)) - for i, metric := range metrics { - actualNames[i] = metric.Name + metrics: + for { + metrics := <-ch + + for _, metric := range metrics { + _, ok := expected[metric.Name] + if !ok { + t.Errorf("unexpected metric %q", metric.Name) + continue + } + expected[metric.Name] = true + } + for name, got := range expected { + if !got { + // we have more metrics to read: + t.Logf("metric %q still missing", name) + continue metrics + } + } + // if there had been metrics to read, we'd + // have restarted the loop: + return } - assert.ElementsMatch(t, expectedNames, actualNames, - "The global Veneur didn't flush the right metrics: expectedNames=%v actualNames=%v", - expectedNames, actualNames) - close(done) }() ff.local.Flush(context.TODO()) ff.global.Flush(context.TODO()) diff --git a/http_test.go b/http_test.go index 7023f26af..0e102acf0 100644 --- a/http_test.go +++ b/http_test.go @@ -127,20 +127,20 @@ func testServerImport(t *testing.T, filename string, contentEncoding string) { func TestServerImportCompressed(t *testing.T) { // Test that the global veneur instance can handle // requests that provide compressed metrics - testServerImport(t, filepath.Join("fixtures", "import.deflate"), "deflate") + testServerImport(t, filepath.Join("testdata", "import.deflate"), "deflate") } func TestServerImportUncompressed(t *testing.T) { // Test that the global veneur instance can handle // requests that provide uncompressed metrics - testServerImport(t, filepath.Join("fixtures", "import.uncompressed"), "") + testServerImport(t, filepath.Join("testdata", "import.uncompressed"), "") } func TestServerImportGzip(t *testing.T) { // Test that the global veneur instance // returns a 400 for gzipped-input - f, err := os.Open(filepath.Join("fixtures", "import.uncompressed")) + f, err := os.Open(filepath.Join("testdata", "import.uncompressed")) assert.NoError(t, err, "Error reading response fixture") defer f.Close() @@ -171,7 +171,7 @@ func TestServerImportCompressedInvalid(t *testing.T) { //TODO(aditya) test that the metrics are properly reported - f, err := os.Open(filepath.Join("fixtures", "import.uncompressed")) + f, err := os.Open(filepath.Join("testdata", "import.uncompressed")) assert.NoError(t, err, "Error reading response fixture") defer f.Close() @@ -196,7 +196,7 @@ func TestServerImportUncompressedInvalid(t *testing.T) { //TODO(aditya) test that the metrics are properly reported - f, err := os.Open(filepath.Join("fixtures", "import.deflate")) + f, err := os.Open(filepath.Join("testdata", "import.deflate")) assert.NoError(t, err, "Error reading response fixture") defer f.Close() @@ -380,7 +380,7 @@ func testServerImportHelper(t *testing.T, data interface{}) { func BenchmarkNewSortableJSONMetrics(b *testing.B) { const numWorkers = 100 - filename := filepath.Join("fixtures", "import.deflate") + filename := filepath.Join("testdata", "import.deflate") contentEncoding := "deflate" f, err := os.Open(filename) diff --git a/plugin_test.go b/plugin_test.go index 4423b3de1..dd25bf5b2 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -123,7 +123,7 @@ func TestGlobalServerS3PluginFlush(t *testing.T) { client := &s3Mock.MockS3Client{} client.SetPutObject(func(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) { - f, err := os.Open(path.Join("fixtures", "aws", "PutObject", "2016", "10", "14", "1476481302.tsv.gz")) + f, err := os.Open(path.Join("testdata", "aws", "PutObject", "2016", "10", "14", "1476481302.tsv.gz")) assert.NoError(t, err) defer f.Close() diff --git a/plugins/s3/s3_test.go b/plugins/s3/s3_test.go index 919a3af04..1aafa0e22 100644 --- a/plugins/s3/s3_test.go +++ b/plugins/s3/s3_test.go @@ -53,7 +53,7 @@ func TestS3Post(t *testing.T) { }() client := &s3Mock.MockS3Client{} - f, err := os.Open(path.Join("..", "..", "fixtures", "aws", "PutObject", "2016", "10", "13", "1476370612.tsv.gz")) + f, err := os.Open(path.Join("testdata", "aws", "PutObject", "2016", "10", "13", "1476370612.tsv.gz")) assert.NoError(t, err) defer f.Close() @@ -124,7 +124,7 @@ func TestS3Path(t *testing.T) { func TestS3PostNoCredentials(t *testing.T) { s3p := &S3Plugin{Logger: log, Svc: nil} - f, err := os.Open(path.Join("..", "..", "fixtures", "aws", "PutObject", "2016", "10", "07", "1475863542.json")) + f, err := os.Open(path.Join("testdata", "aws", "PutObject", "2016", "10", "07", "1475863542.json")) assert.NoError(t, err) defer f.Close() diff --git a/fixtures/aws/PutObject/2016/10/07/1475863542.json b/plugins/s3/testdata/aws/PutObject/2016/10/07/1475863542.json similarity index 100% rename from fixtures/aws/PutObject/2016/10/07/1475863542.json rename to plugins/s3/testdata/aws/PutObject/2016/10/07/1475863542.json diff --git a/fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz b/plugins/s3/testdata/aws/PutObject/2016/10/13/1476370612.tsv.gz similarity index 100% rename from fixtures/aws/PutObject/2016/10/13/1476370612.tsv.gz rename to plugins/s3/testdata/aws/PutObject/2016/10/13/1476370612.tsv.gz diff --git a/proxy_test.go b/proxy_test.go index 8701addd6..6e2ef3812 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -51,11 +51,11 @@ func (rt *ConsulTwoMetricRoundTripper) RoundTrip(req *http.Request) (*http.Respo rec := httptest.NewRecorder() if req.URL.Path == "/v1/health/service/forwardServiceName" { - resp, _ := ioutil.ReadFile("fixtures/consul/health_service_two.json") + resp, _ := ioutil.ReadFile("testdata/consul/health_service_two.json") rec.Write(resp) rec.Code = http.StatusOK } else if req.URL.Path == "/v1/health/service/traceServiceName" { - resp, _ := ioutil.ReadFile("fixtures/consul/health_service_two.json") + resp, _ := ioutil.ReadFile("testdata/consul/health_service_two.json") rec.Write(resp) rec.Code = http.StatusOK } else if req.URL.Path == "/api/v1/series" { diff --git a/regression_test.go b/regression_test.go index 6c2ed0ace..e25a6b33f 100644 --- a/regression_test.go +++ b/regression_test.go @@ -87,7 +87,7 @@ func TestNoTagName(t *testing.T) { } func TestOperation(t *testing.T) { - pbFile := filepath.Join("fixtures", "protobuf", "span-with-operation-062017.pb") + pbFile := filepath.Join("testdata", "protobuf", "span-with-operation-062017.pb") pb, err := os.Open(pbFile) assert.NoError(t, err) defer pb.Close() diff --git a/samplers/parser.go b/samplers/parser.go index 04dc70053..7ac98722d 100644 --- a/samplers/parser.go +++ b/samplers/parser.go @@ -266,6 +266,12 @@ func ParseMetricSSF(metric *ssf.SSFSample) (UDPMetric, error) { default: ret.Value = float64(metric.Value) } + switch metric.Scope { + case ssf.SSFSample_LOCAL: + ret.Scope = LocalOnly + case ssf.SSFSample_GLOBAL: + ret.Scope = GlobalOnly + } ret.SampleRate = metric.SampleRate tempTags := make([]string, 0, len(metric.Tags)) for key, value := range metric.Tags { diff --git a/samplers/samplers_test.go b/samplers/samplers_test.go index 694ae1948..1734cab04 100644 --- a/samplers/samplers_test.go +++ b/samplers/samplers_test.go @@ -565,51 +565,177 @@ func TestMetricKeyEquality(t *testing.T) { } func TestParseMetricSSF(t *testing.T) { - sample := ssf.SSFSample{ - Metric: ssf.SSFSample_GAUGE, - - Name: "my.test.metric", - Value: rand.Float32(), - Timestamp: time.Now().Unix(), - Message: "arbitrary test message", - Status: ssf.SSFSample_WARNING, - SampleRate: rand.Float32(), - Tags: map[string]string{ - "keats": "false", - "yeats": "false", - "wilde": "true", - "veneurglobalonly": "true", + val := rand.Float32() + now := time.Now().Unix() + sampleRate := rand.Float32() + + tests := []struct { + sample ssf.SSFSample + expected UDPMetric + }{ + { + sample: ssf.SSFSample{ + Metric: ssf.SSFSample_GAUGE, + + Name: "my.test.metric", + Value: val, + Timestamp: now, + Message: "arbitrary test message", + Status: ssf.SSFSample_WARNING, + SampleRate: sampleRate, + Tags: map[string]string{ + "keats": "false", + "yeats": "false", + "wilde": "true", + "veneurglobalonly": "true", + }, + Unit: "frobs per second", + }, + + expected: UDPMetric{ + MetricKey: MetricKey{ + Name: "my.test.metric", + Type: "gauge", + JoinedTags: "keats:false,wilde:true,yeats:false", + }, + Digest: 0x7ae783ad, + Value: val, + SampleRate: sampleRate, + Tags: []string{ + "keats:false", + "wilde:true", + "yeats:false", + }, + Scope: GlobalOnly, + }, }, - Unit: "frobs per second", - } + { + sample: ssf.SSFSample{ + Metric: ssf.SSFSample_GAUGE, + + Name: "my.test.metric", + Value: val, + Timestamp: now, + Message: "arbitrary test message", + Status: ssf.SSFSample_WARNING, + SampleRate: sampleRate, + Tags: map[string]string{ + "keats": "false", + "yeats": "false", + "wilde": "true", + "veneurlocalonly": "true", + }, + Unit: "frobs per second", + }, - expected := UDPMetric{ - MetricKey: MetricKey{ - Name: "my.test.metric", - Type: "gauge", - JoinedTags: "keats:false,wilde:true,yeats:false", + expected: UDPMetric{ + MetricKey: MetricKey{ + Name: "my.test.metric", + Type: "gauge", + JoinedTags: "keats:false,wilde:true,yeats:false", + }, + Digest: 0x7ae783ad, + Value: val, + SampleRate: sampleRate, + Tags: []string{ + "keats:false", + "wilde:true", + "yeats:false", + }, + Scope: LocalOnly, + }, }, - Digest: 0x7ae783ad, - Value: sample.Value, - SampleRate: sample.SampleRate, - Tags: []string{ - "keats:false", - "wilde:true", - "yeats:false", + { + sample: ssf.SSFSample{ + Metric: ssf.SSFSample_GAUGE, + + Name: "my.test.metric", + Value: val, + Timestamp: now, + Message: "arbitrary test message", + Status: ssf.SSFSample_WARNING, + SampleRate: sampleRate, + Scope: ssf.SSFSample_GLOBAL, + Tags: map[string]string{ + "keats": "false", + "yeats": "false", + "wilde": "true", + }, + Unit: "frobs per second", + }, + + expected: UDPMetric{ + MetricKey: MetricKey{ + Name: "my.test.metric", + Type: "gauge", + JoinedTags: "keats:false,wilde:true,yeats:false", + }, + Digest: 0x7ae783ad, + Value: val, + SampleRate: sampleRate, + Tags: []string{ + "keats:false", + "wilde:true", + "yeats:false", + }, + Scope: GlobalOnly, + }, + }, + { + sample: ssf.SSFSample{ + Metric: ssf.SSFSample_GAUGE, + + Name: "my.test.metric", + Value: val, + Timestamp: now, + Message: "arbitrary test message", + Status: ssf.SSFSample_WARNING, + SampleRate: sampleRate, + Scope: ssf.SSFSample_LOCAL, + Tags: map[string]string{ + "keats": "false", + "yeats": "false", + "wilde": "true", + }, + Unit: "frobs per second", + }, + + expected: UDPMetric{ + MetricKey: MetricKey{ + Name: "my.test.metric", + Type: "gauge", + JoinedTags: "keats:false,wilde:true,yeats:false", + }, + Digest: 0x7ae783ad, + Value: val, + SampleRate: sampleRate, + Tags: []string{ + "keats:false", + "wilde:true", + "yeats:false", + }, + Scope: LocalOnly, + }, }, - Scope: 2, } - udpMetric, err := ParseMetricSSF(&sample) - assert.NoError(t, err) - assert.Equal(t, udpMetric.MetricKey, expected.MetricKey) - assert.Equal(t, udpMetric.Type, expected.Type) - assert.Equal(t, udpMetric.Digest, expected.Digest) - assert.InEpsilon(t, udpMetric.Value, expected.Value, ε) - assert.InEpsilon(t, udpMetric.SampleRate, expected.SampleRate, ε) - assert.Equal(t, udpMetric.JoinedTags, expected.JoinedTags) - assert.Equal(t, udpMetric.Tags, expected.Tags) - assert.Equal(t, udpMetric.Scope, expected.Scope) + for i, elt := range tests { + sample := elt.sample + expected := elt.expected + t.Run(strconv.Itoa(i), func(t *testing.T) { + t.Parallel() + udpMetric, err := ParseMetricSSF(&sample) + assert.NoError(t, err) + assert.Equal(t, udpMetric.MetricKey, expected.MetricKey) + assert.Equal(t, udpMetric.Type, expected.Type) + assert.Equal(t, udpMetric.Digest, expected.Digest) + assert.InEpsilon(t, udpMetric.Value, expected.Value, ε) + assert.InEpsilon(t, udpMetric.SampleRate, expected.SampleRate, ε) + assert.Equal(t, udpMetric.JoinedTags, expected.JoinedTags) + assert.Equal(t, udpMetric.Tags, expected.Tags) + assert.Equal(t, udpMetric.Scope, expected.Scope) + }) + } } func BenchmarkParseMetricSSF(b *testing.B) { diff --git a/scopedstatsd/client.go b/scopedstatsd/client.go new file mode 100644 index 000000000..caffe8336 --- /dev/null +++ b/scopedstatsd/client.go @@ -0,0 +1,119 @@ +package scopedstatsd + +import ( + "time" + + "github.com/DataDog/datadog-go/statsd" + "github.com/stripe/veneur/ssf" +) + +// StatsdClient represents the statsd client functions that veneur's +// SSF sinks call (so as not to write-amplify by reporting its own +// metrics). +type Client interface { + Gauge(name string, value float64, tags []string, rate float64) error + Count(name string, value int64, tags []string, rate float64) error + Incr(name string, tags []string, rate float64) error + Histogram(name string, value float64, tags []string, rate float64) error + TimeInMilliseconds(name string, value float64, tags []string, rate float64) error + Timing(name string, value time.Duration, tags []string, rate float64) error +} + +// Ensure takes a statsd client and wraps it in such a way that it is +// safe to store in a struct if it should be nil. Otherwise returns +// the Client unchanged. +func Ensure(cl Client) Client { + if cl == nil { + return &ScopedClient{} + } + return cl +} + +// MetricScopes holds the scopes that are configured for each metric +// type. +type MetricScopes struct { + Gauge ssf.SSFSample_Scope + Count ssf.SSFSample_Scope + Histogram ssf.SSFSample_Scope +} + +type ScopedClient struct { + client *statsd.Client + + addTags []string + scopes MetricScopes +} + +var _ Client = &ScopedClient{} + +func addScopeTag(tags []string, scope ssf.SSFSample_Scope) []string { + switch scope { + case ssf.SSFSample_LOCAL: + return append(tags, "veneurlocalonly:true") + case ssf.SSFSample_GLOBAL: + return append(tags, "veneurglobalonly:true") + default: + return tags + } +} + +func (s *ScopedClient) Gauge(name string, value float64, tags []string, rate float64) error { + if s == nil { + return nil + } + tags = append(tags, s.addTags...) + tags = addScopeTag(tags, s.scopes.Gauge) + return s.client.Gauge(name, value, tags, rate) +} + +func (s *ScopedClient) Count(name string, value int64, tags []string, rate float64) error { + if s == nil { + return nil + } + tags = append(tags, s.addTags...) + tags = addScopeTag(tags, s.scopes.Count) + return s.client.Count(name, value, tags, rate) +} + +func (s *ScopedClient) Incr(name string, tags []string, rate float64) error { + if s == nil { + return nil + } + + return s.Count(name, 1, tags, rate) +} + +func (s *ScopedClient) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { + if s == nil { + return nil + } + tags = append(tags, s.addTags...) + tags = addScopeTag(tags, s.scopes.Histogram) + return s.client.TimeInMilliseconds(name, value, tags, rate) +} + +func (s *ScopedClient) Timing(name string, value time.Duration, tags []string, rate float64) error { + if s == nil { + return nil + } + tags = append(tags, s.addTags...) + tags = addScopeTag(tags, s.scopes.Histogram) + return s.client.Timing(name, value, tags, rate) +} + +func (s *ScopedClient) Histogram(name string, value float64, tags []string, rate float64) error { + if s == nil { + return nil + } + tags = append(tags, s.addTags...) + tags = addScopeTag(tags, s.scopes.Histogram) + return s.client.Histogram(name, value, tags, rate) +} + +func NewClient(inner *statsd.Client, addTags []string, scopes MetricScopes) *ScopedClient { + return &ScopedClient{ + client: inner, + addTags: addTags, + scopes: scopes, + } +} diff --git a/scopedstatsd/client_test.go b/scopedstatsd/client_test.go new file mode 100644 index 000000000..44d3438d1 --- /dev/null +++ b/scopedstatsd/client_test.go @@ -0,0 +1,56 @@ +package scopedstatsd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEnsure(t *testing.T) { + var theNilOne Client = nil + ensured := Ensure(theNilOne) + assert.NotNil(t, ensured) + assert.NoError(t, ensured.Count("hi", 0, nil, 1.0)) +} + +func TestDoesSomething(t *testing.T) { + type statsFunc func() error + + clients := []struct { + name string + client Client + }{ + {"nilClient", (*ScopedClient)(nil)}, + {"nilInner", NewClient(nil, []string{}, MetricScopes{})}, + } + for _, elt := range clients { + test := elt + t.Run(test.name, func(t *testing.T) { + t.Parallel() + cl := test.client + testFuncs := []statsFunc{ + func() error { + return cl.Gauge("hi", 1, nil, 1.0) + }, + func() error { + return cl.Count("hi", 1, nil, 1.0) + }, + func() error { + return cl.Incr("hi", nil, 1.0) + }, + func() error { + return cl.Timing("hi", 1, nil, 1.0) + }, + func() error { + return cl.Histogram("hi", 1, nil, 1.0) + }, + func() error { + return cl.TimeInMilliseconds("hi", 1, nil, 1.0) + }, + } + for _, fn := range testFuncs { + assert.NoError(t, fn()) + } + }) + } +} diff --git a/server.go b/server.go index b2c6d0aec..8ec7e2fc3 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,7 @@ import ( s3p "github.com/stripe/veneur/plugins/s3" "github.com/stripe/veneur/protocol" "github.com/stripe/veneur/samplers" + "github.com/stripe/veneur/scopedstatsd" "github.com/stripe/veneur/sinks" "github.com/stripe/veneur/sinks/datadog" "github.com/stripe/veneur/sinks/debug" @@ -84,7 +85,7 @@ type Server struct { SpanWorker *SpanWorker SpanWorkerGoroutines int - Statsd *statsd.Client + Statsd *scopedstatsd.ScopedClient Sentry *raven.Client Hostname string @@ -156,6 +157,100 @@ func SetLogger(logger *logrus.Logger) { log = logger } +func scopeFromName(name string) (ssf.SSFSample_Scope, error) { + switch name { + case "default": + fallthrough + case "": + return ssf.SSFSample_DEFAULT, nil + case "global": + return ssf.SSFSample_GLOBAL, nil + case "local": + return ssf.SSFSample_LOCAL, nil + default: + return 0, fmt.Errorf("unknown metric scope option %q", name) + } +} + +func normalizeSpans(conf Config) trace.ClientParam { + return func(cl *trace.Client) error { + var err error + typeScopes := map[ssf.SSFSample_Metric]ssf.SSFSample_Scope{} + typeScopes[ssf.SSFSample_COUNTER], err = scopeFromName(conf.VeneurMetricsScopes.Counter) + if err != nil { + return err + } + typeScopes[ssf.SSFSample_GAUGE], err = scopeFromName(conf.VeneurMetricsScopes.Gauge) + if err != nil { + return err + } + typeScopes[ssf.SSFSample_HISTOGRAM], err = scopeFromName(conf.VeneurMetricsScopes.Histogram) + if err != nil { + return err + } + typeScopes[ssf.SSFSample_SET], err = scopeFromName(conf.VeneurMetricsScopes.Set) + if err != nil { + return err + } + typeScopes[ssf.SSFSample_STATUS], err = scopeFromName(conf.VeneurMetricsScopes.Status) + if err != nil { + return err + } + + tags := map[string]string{} + for _, elem := range conf.VeneurMetricsAdditionalTags { + tag := strings.SplitN(elem, ":", 2) + switch len(tag) { + case 2: + tags[tag[0]] = tag[1] + case 1: + tags[tag[0]] = "" + } + } + + normalizer := func(sample *ssf.SSFSample) { + // adjust tags: + if sample.Tags == nil { + sample.Tags = map[string]string{} + } + for k, v := range tags { + if _, ok := sample.Tags[k]; ok { + // do not overwrite existing tags: + continue + } + sample.Tags[k] = v + } + + // adjust the scope: + toScope := typeScopes[sample.Metric] + if sample.Scope != ssf.SSFSample_DEFAULT || toScope == ssf.SSFSample_DEFAULT { + return + } + sample.Scope = toScope + } + option := trace.NormalizeSamples(normalizer) + return option(cl) + } +} + +func scopesFromConfig(conf Config) (scopedstatsd.MetricScopes, error) { + var err error + var ms scopedstatsd.MetricScopes + ms.Gauge, err = scopeFromName(conf.VeneurMetricsScopes.Gauge) + if err != nil { + return ms, err + } + ms.Count, err = scopeFromName(conf.VeneurMetricsScopes.Counter) + if err != nil { + return ms, err + } + ms.Histogram, err = scopeFromName(conf.VeneurMetricsScopes.Histogram) + if err != nil { + return ms, err + } + return ms, nil +} + // NewFromConfig creates a new veneur server from a configuration // specification and sets up the passed logger according to the // configuration. @@ -201,11 +296,16 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) { } stats.Namespace = "veneur." - ret.Statsd = stats + scopes, err := scopesFromConfig(conf) + if err != nil { + return ret, err + } + ret.Statsd = scopedstatsd.NewClient(stats, conf.VeneurMetricsAdditionalTags, scopes) ret.SpanChan = make(chan *ssf.SSFSpan, conf.SpanChannelCapacity) ret.TraceClient, err = trace.NewChannelClient(ret.SpanChan, trace.ReportStatistics(stats, 1*time.Second, []string{"ssf_format:internal"}), + normalizeSpans(conf), ) if err != nil { return ret, err diff --git a/server_sinks_test.go b/server_sinks_test.go index 9a582cb44..55c4769d7 100644 --- a/server_sinks_test.go +++ b/server_sinks_test.go @@ -28,13 +28,13 @@ func TestFlushTracesBySink(t *testing.T) { cases := []TestCase{ { Name: "Success", - ProtobufFile: filepath.Join("fixtures", "protobuf", "trace.pb"), - JSONFile: filepath.Join("fixtures", "tracing_agent", "spans", "trace.pb.json"), + ProtobufFile: filepath.Join("testdata", "protobuf", "trace.pb"), + JSONFile: filepath.Join("testdata", "tracing_agent", "spans", "trace.pb.json"), }, { Name: "Critical", - ProtobufFile: filepath.Join("fixtures", "protobuf", "trace_critical.pb"), - JSONFile: filepath.Join("fixtures", "tracing_agent", "spans", "trace_critical.pb.json"), + ProtobufFile: filepath.Join("testdata", "protobuf", "trace_critical.pb"), + JSONFile: filepath.Join("testdata", "tracing_agent", "spans", "trace_critical.pb.json"), }, } diff --git a/server_test.go b/server_test.go index 2481593ae..654ce2c1a 100644 --- a/server_test.go +++ b/server_test.go @@ -469,7 +469,7 @@ func readTestKeysCerts() (map[string]string, error) { "serverkey.pem", } for _, fileName := range pemFileNames { - b, err := ioutil.ReadFile(filepath.Join("fixtures", fileName)) + b, err := ioutil.ReadFile(filepath.Join("testdata", fileName)) if err != nil { return nil, err } diff --git a/sinks/splunk/splunk.go b/sinks/splunk/splunk.go index 2d5783e4c..6769fa662 100644 --- a/sinks/splunk/splunk.go +++ b/sinks/splunk/splunk.go @@ -374,9 +374,17 @@ func (sss *splunkSpanSink) makeHTTPRequest(req *http.Request, cancel func()) { dec := json.NewDecoder(resp.Body) err := dec.Decode(&parsed) if err != nil { - sss.log.WithError(err). - WithField("http_status_code", resp.StatusCode). - Warn("Could not parse response from splunk HEC") + entry := sss.log.WithError(err). + WithFields(logrus.Fields{ + "http_status_code": resp.StatusCode, + "endpoint": req.URL.String(), + }) + if sss.log.Level >= logrus.DebugLevel { + body, _ := ioutil.ReadAll(dec.Buffered()) + entry = entry.WithField("response_body", string(body)) + } + entry.Warn("Could not parse response from splunk HEC") + return } cause = "error" diff --git a/sinks/splunk/splunk_internal_test.go b/sinks/splunk/splunk_internal_test.go index 1bbc2ce07..cbe11787a 100644 --- a/sinks/splunk/splunk_internal_test.go +++ b/sinks/splunk/splunk_internal_test.go @@ -1,6 +1,8 @@ package splunk import ( + "net/http" + "net/http/httptest" "strconv" "testing" "time" @@ -23,8 +25,14 @@ func TestWorkerCount(t *testing.T) { workerProcs := out t.Run(strconv.Itoa(nWorkers), func(t *testing.T) { t.Parallel() - logger := logrus.StandardLogger() - sink, err := NewSplunkSpanSink("http://example.com", "00000000-0000-0000-0000-000000000000", + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + defer ts.Close() + + logger := logrus.New() + logger.SetLevel(logrus.DebugLevel) + sink, err := NewSplunkSpanSink(ts.URL, "00000000-0000-0000-0000-000000000000", "test-host", "", logger, time.Duration(0), time.Duration(0), 100, nWorkers, 10, 10*time.Millisecond, 0) sss := sink.(*splunkSpanSink) defer sss.Stop() diff --git a/sinks/splunk/splunk_test.go b/sinks/splunk/splunk_test.go index 7d197d81b..affef57a5 100644 --- a/sinks/splunk/splunk_test.go +++ b/sinks/splunk/splunk_test.go @@ -64,9 +64,15 @@ func jsonEndpoint(t testing.TB, ch chan<- splunk.Event) http.Handler { }) } +func testLogger() *logrus.Logger { + logger := logrus.New() + logger.SetLevel(logrus.DebugLevel) + return logger +} + func TestSpanIngestBatch(t *testing.T) { const nToFlush = 10 - logger := logrus.StandardLogger() + logger := testLogger() ch := make(chan splunk.Event, nToFlush) ts := httptest.NewServer(jsonEndpoint(t, ch)) @@ -140,7 +146,7 @@ func TestSpanIngestBatch(t *testing.T) { func TestTimeout(t *testing.T) { const nToFlush = 10 - logger := logrus.StandardLogger() + logger := testLogger() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(time.Duration(100 * time.Millisecond)) @@ -202,7 +208,7 @@ const benchmarkCapacity = 100 const benchmarkWorkers = 3 func BenchmarkBatchIngest(b *testing.B) { - logger := logrus.StandardLogger() + logger := testLogger() // set up a null responder that we can flush to: ts := httptest.NewServer(jsonEndpoint(b, nil)) @@ -248,7 +254,7 @@ func BenchmarkBatchIngest(b *testing.B) { func TestSampling(t *testing.T) { const nToFlush = 1000 - logger := logrus.StandardLogger() + logger := testLogger() ch := make(chan splunk.Event, nToFlush) ts := httptest.NewServer(jsonEndpoint(t, ch)) @@ -316,7 +322,7 @@ func TestSampling(t *testing.T) { func TestSamplingIndicators(t *testing.T) { const nToFlush = 100 - logger := logrus.StandardLogger() + logger := testLogger() ch := make(chan splunk.Event, nToFlush) ts := httptest.NewServer(jsonEndpoint(t, ch)) @@ -384,7 +390,7 @@ func TestSamplingIndicators(t *testing.T) { func TestClosedIngestionEndpoint(t *testing.T) { const nToFlush = 100 - logger := logrus.StandardLogger() + logger := testLogger() ch := make(chan splunk.Event, nToFlush) ts := httptest.NewServer(jsonEndpoint(t, ch)) diff --git a/fixtures/xray_segment.json b/sinks/xray/testdata/xray_segment.json similarity index 100% rename from fixtures/xray_segment.json rename to sinks/xray/testdata/xray_segment.json diff --git a/sinks/xray/xray_test.go b/sinks/xray/xray_test.go index 2283d29d8..5e191f9bd 100644 --- a/sinks/xray/xray_test.go +++ b/sinks/xray/xray_test.go @@ -28,7 +28,7 @@ func TestConstructor(t *testing.T) { func TestIngestSpans(t *testing.T) { // Load up a fixture to compare the output to what we get over UDP - reader, err := os.Open(filepath.Join("..", "..", "fixtures", "xray_segment.json")) + reader, err := os.Open(filepath.Join("testdata", "xray_segment.json")) assert.NoError(t, err) defer reader.Close() fixtureSegment, err := ioutil.ReadAll(reader) @@ -99,7 +99,7 @@ func TestIngestSpans(t *testing.T) { func TestSampleSpans(t *testing.T) { // Load up a fixture to compare the output to what we get over UDP - reader, err := os.Open(filepath.Join("..", "..", "fixtures", "xray_segment.json")) + reader, err := os.Open(filepath.Join("testdata", "xray_segment.json")) assert.NoError(t, err) defer reader.Close() fixtureSegment, err := ioutil.ReadAll(reader) diff --git a/ssf/sample.pb.go b/ssf/sample.pb.go index 8e830691e..da568acd6 100644 --- a/ssf/sample.pb.go +++ b/ssf/sample.pb.go @@ -87,6 +87,34 @@ func (SSFSample_Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor_7ef0544ca34aff6f, []int{0, 1} } +type SSFSample_Scope int32 + +const ( + SSFSample_DEFAULT SSFSample_Scope = 0 + SSFSample_LOCAL SSFSample_Scope = 1 + SSFSample_GLOBAL SSFSample_Scope = 2 +) + +var SSFSample_Scope_name = map[int32]string{ + 0: "DEFAULT", + 1: "LOCAL", + 2: "GLOBAL", +} + +var SSFSample_Scope_value = map[string]int32{ + "DEFAULT": 0, + "LOCAL": 1, + "GLOBAL": 2, +} + +func (x SSFSample_Scope) String() string { + return proto.EnumName(SSFSample_Scope_name, int32(x)) +} + +func (SSFSample_Scope) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_7ef0544ca34aff6f, []int{0, 2} +} + // SSFSample is similar of a StatsD-style, point in time metric. It has a Metric // type, a name, a value and a timestamp. Additionally it can contain a message, // a status, a sample rate, a map of tags as string keys and values and a unit @@ -104,6 +132,14 @@ type SSFSample struct { SampleRate float32 `protobuf:"fixed32,7,opt,name=sample_rate,json=sampleRate,proto3" json:"sample_rate,omitempty"` Tags map[string]string `protobuf:"bytes,8,rep,name=tags,proto3" json:"tags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Unit string `protobuf:"bytes,9,opt,name=unit,proto3" json:"unit,omitempty"` + // scope indicates to an SSF endpoint what it should do with a metric: + // + // - DEFAULT (or absent) - aggregate counters and gauges locally, + // handle histograms and sets globally. + // - LOCAL - aggregate all metrics locally. + // - GLOBAL - aggregate all metrics globally. + // + Scope SSFSample_Scope `protobuf:"varint,10,opt,name=scope,proto3,enum=ssf.SSFSample_Scope" json:"scope,omitempty"` } func (m *SSFSample) Reset() { *m = SSFSample{} } @@ -202,6 +238,13 @@ func (m *SSFSample) GetUnit() string { return "" } +func (m *SSFSample) GetScope() SSFSample_Scope { + if m != nil { + return m.Scope + } + return SSFSample_DEFAULT +} + // SSFSpan is the primary unit of reporting in SSF. It embeds a set of // SSFSamples, as well as start/stop time stamps and a parent ID // (which allows assembling a span lineage for distributed tracing @@ -388,6 +431,7 @@ func (m *SSFSpan) GetName() string { func init() { proto.RegisterEnum("ssf.SSFSample_Metric", SSFSample_Metric_name, SSFSample_Metric_value) proto.RegisterEnum("ssf.SSFSample_Status", SSFSample_Status_name, SSFSample_Status_value) + proto.RegisterEnum("ssf.SSFSample_Scope", SSFSample_Scope_name, SSFSample_Scope_value) proto.RegisterType((*SSFSample)(nil), "ssf.SSFSample") proto.RegisterMapType((map[string]string)(nil), "ssf.SSFSample.TagsEntry") proto.RegisterType((*SSFSpan)(nil), "ssf.SSFSpan") @@ -397,44 +441,47 @@ func init() { func init() { proto.RegisterFile("ssf/sample.proto", fileDescriptor_7ef0544ca34aff6f) } var fileDescriptor_7ef0544ca34aff6f = []byte{ - // 583 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xcd, 0x6e, 0x13, 0x3d, - 0x14, 0xcd, 0x8c, 0x93, 0xc9, 0xcc, 0x4d, 0x9b, 0xcf, 0xb2, 0xfa, 0x21, 0x03, 0x55, 0x88, 0xc2, - 0x82, 0x08, 0x41, 0x90, 0xca, 0x82, 0x8a, 0x5d, 0x28, 0x21, 0x84, 0xd2, 0x44, 0xf2, 0x4c, 0xd4, - 0x65, 0x65, 0x32, 0x6e, 0x35, 0xa2, 0x33, 0x19, 0xd9, 0x6e, 0xa5, 0xbe, 0x05, 0x8f, 0xc2, 0x63, - 0xb0, 0xec, 0x92, 0x25, 0x6a, 0x17, 0xbc, 0x06, 0xb2, 0x9d, 0x64, 0xf8, 0x5b, 0xb1, 0xf3, 0xb9, - 0xe7, 0xc8, 0xba, 0xe7, 0x9e, 0x7b, 0x01, 0x2b, 0x75, 0xfa, 0x4c, 0xf1, 0xbc, 0x3c, 0x17, 0x83, - 0x52, 0x2e, 0xf5, 0x92, 0x20, 0xa5, 0x4e, 0x7b, 0xdf, 0x11, 0x44, 0x71, 0xfc, 0x26, 0xb6, 0x04, - 0x79, 0x0a, 0x41, 0x2e, 0xb4, 0xcc, 0x16, 0xd4, 0xeb, 0x7a, 0xfd, 0xf6, 0xde, 0xff, 0x03, 0xa5, - 0x4e, 0x07, 0x1b, 0x7e, 0x70, 0x64, 0x49, 0xb6, 0x12, 0x11, 0x02, 0xf5, 0x82, 0xe7, 0x82, 0xfa, - 0x5d, 0xaf, 0x1f, 0x31, 0xfb, 0x26, 0x3b, 0xd0, 0xb8, 0xe4, 0xe7, 0x17, 0x82, 0xa2, 0xae, 0xd7, - 0xf7, 0x99, 0x03, 0x64, 0x17, 0x22, 0x9d, 0xe5, 0x42, 0x69, 0x9e, 0x97, 0xb4, 0xde, 0xf5, 0xfa, - 0x88, 0x55, 0x05, 0x42, 0xa1, 0x99, 0x0b, 0xa5, 0xf8, 0x99, 0xa0, 0x0d, 0xfb, 0xd5, 0x1a, 0x9a, - 0x86, 0x94, 0xe6, 0xfa, 0x42, 0xd1, 0xe0, 0xaf, 0x0d, 0xc5, 0x96, 0x64, 0x2b, 0x11, 0x79, 0x00, - 0x2d, 0x67, 0xf1, 0x44, 0x72, 0x2d, 0x68, 0xd3, 0xb6, 0x00, 0xae, 0xc4, 0xb8, 0x16, 0xe4, 0x09, - 0xd4, 0x35, 0x3f, 0x53, 0x34, 0xec, 0xa2, 0x7e, 0x6b, 0x8f, 0xfe, 0xf6, 0x5b, 0xc2, 0xcf, 0xd4, - 0xa8, 0xd0, 0xf2, 0x8a, 0x59, 0x95, 0xf1, 0x77, 0x51, 0x64, 0x9a, 0x46, 0xce, 0x9f, 0x79, 0xdf, - 0x7b, 0x01, 0xd1, 0x46, 0x46, 0x30, 0xa0, 0x8f, 0xe2, 0xca, 0x0e, 0x2b, 0x62, 0xe6, 0x59, 0xd9, - 0x77, 0x33, 0x71, 0xe0, 0xa5, 0xbf, 0xef, 0xf5, 0x5e, 0x43, 0xe0, 0xc6, 0x47, 0x5a, 0xd0, 0x3c, - 0x98, 0xcd, 0xa7, 0xc9, 0x88, 0xe1, 0x1a, 0x89, 0xa0, 0x31, 0x1e, 0xce, 0xc7, 0x23, 0xec, 0x91, - 0x6d, 0x88, 0xde, 0x4e, 0xe2, 0x64, 0x36, 0x66, 0xc3, 0x23, 0xec, 0x93, 0x26, 0xa0, 0x78, 0x94, - 0x60, 0x44, 0x00, 0x82, 0x38, 0x19, 0x26, 0xf3, 0x18, 0xd7, 0x7b, 0xfb, 0x10, 0x38, 0xcf, 0x24, - 0x00, 0x7f, 0x76, 0x88, 0x6b, 0xe6, 0xb7, 0xe3, 0x21, 0x9b, 0x4e, 0xa6, 0x63, 0xec, 0x91, 0x2d, - 0x08, 0x0f, 0xd8, 0x24, 0x99, 0x1c, 0x0c, 0xdf, 0x63, 0xdf, 0x50, 0xf3, 0xe9, 0xe1, 0x74, 0x76, - 0x3c, 0xc5, 0xa8, 0xf7, 0x19, 0x41, 0xd3, 0x58, 0x2d, 0x79, 0x61, 0x06, 0x7e, 0x29, 0xa4, 0xca, - 0x96, 0x85, 0xed, 0xbd, 0xc1, 0xd6, 0x90, 0xdc, 0x85, 0x50, 0x4b, 0xbe, 0x10, 0x27, 0x59, 0x6a, - 0x2d, 0x20, 0xd6, 0xb4, 0x78, 0x92, 0x92, 0x36, 0xf8, 0x59, 0x6a, 0x63, 0x45, 0xcc, 0xcf, 0x52, - 0x72, 0x1f, 0xa2, 0x92, 0x4b, 0x51, 0x68, 0xa3, 0x75, 0x99, 0x86, 0xae, 0x30, 0x49, 0xc9, 0x23, - 0xf8, 0x4f, 0x69, 0x2e, 0xf5, 0x49, 0x15, 0x7b, 0xc3, 0x4a, 0xda, 0xb6, 0x9c, 0x6c, 0xb2, 0x7f, - 0x08, 0xdb, 0xa2, 0x48, 0x7f, 0x92, 0x05, 0x56, 0xb6, 0x25, 0x8a, 0xb4, 0x12, 0xed, 0x40, 0x43, - 0x48, 0xb9, 0x94, 0x36, 0xd1, 0x90, 0x39, 0x60, 0x5c, 0x28, 0x21, 0x2f, 0xb3, 0x85, 0xa0, 0xa1, - 0x5b, 0x9b, 0x15, 0x24, 0x7d, 0xb3, 0x50, 0x66, 0xd6, 0x8a, 0x82, 0x4d, 0xba, 0xfd, 0x6b, 0xd2, - 0x6c, 0x4d, 0x93, 0xc7, 0xab, 0x85, 0x68, 0x59, 0xd9, 0x9d, 0x8d, 0xac, 0xe4, 0xc5, 0x1f, 0xeb, - 0xb0, 0x0b, 0x51, 0x56, 0xa4, 0xd9, 0x82, 0xeb, 0xa5, 0xa4, 0x5b, 0xb6, 0x93, 0xaa, 0xb0, 0x39, - 0x86, 0xed, 0xea, 0x18, 0xfe, 0x79, 0x59, 0xde, 0xd5, 0xc3, 0x08, 0xc3, 0x2b, 0xfa, 0xe5, 0xa6, - 0xe3, 0x5d, 0xdf, 0x74, 0xbc, 0x6f, 0x37, 0x1d, 0xef, 0xd3, 0x6d, 0xa7, 0x76, 0x7d, 0xdb, 0xa9, - 0x7d, 0xbd, 0xed, 0xd4, 0x3e, 0x04, 0xf6, 0x84, 0x9f, 0xff, 0x08, 0x00, 0x00, 0xff, 0xff, 0x17, - 0x14, 0x05, 0x19, 0xd6, 0x03, 0x00, 0x00, + // 628 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x41, 0x6f, 0xd3, 0x30, + 0x14, 0x6e, 0x92, 0x26, 0x4d, 0x5e, 0xb7, 0x62, 0x59, 0x03, 0x19, 0x98, 0x4a, 0x55, 0x0e, 0x54, + 0x03, 0x8a, 0x34, 0x0e, 0x4c, 0xdc, 0xba, 0xad, 0x2b, 0x65, 0x5d, 0x2b, 0x39, 0xa9, 0x76, 0x9c, + 0x4c, 0xe3, 0x4d, 0x11, 0x6b, 0x1a, 0xd9, 0xde, 0xa4, 0xfd, 0x0b, 0x7e, 0x0a, 0x3f, 0x81, 0x23, + 0xc7, 0x1d, 0x39, 0xa2, 0xed, 0x8f, 0x20, 0xdb, 0x5d, 0x03, 0x83, 0x13, 0x37, 0x7f, 0xef, 0xfb, + 0xfa, 0xf2, 0xbe, 0xe7, 0xcf, 0x05, 0x24, 0xe5, 0xe9, 0x1b, 0xc9, 0xe6, 0xc5, 0x39, 0xef, 0x16, + 0x62, 0xa1, 0x16, 0xd8, 0x93, 0xf2, 0xb4, 0xfd, 0xad, 0x0a, 0x51, 0x1c, 0x1f, 0xc4, 0x86, 0xc0, + 0xaf, 0x21, 0x98, 0x73, 0x25, 0xb2, 0x19, 0x71, 0x5a, 0x4e, 0xa7, 0xb1, 0xfd, 0xb0, 0x2b, 0xe5, + 0x69, 0x77, 0xc5, 0x77, 0x8f, 0x0c, 0x49, 0x97, 0x22, 0x8c, 0xa1, 0x9a, 0xb3, 0x39, 0x27, 0x6e, + 0xcb, 0xe9, 0x44, 0xd4, 0x9c, 0xf1, 0x06, 0xf8, 0x97, 0xec, 0xfc, 0x82, 0x13, 0xaf, 0xe5, 0x74, + 0x5c, 0x6a, 0x01, 0xde, 0x84, 0x48, 0x65, 0x73, 0x2e, 0x15, 0x9b, 0x17, 0xa4, 0xda, 0x72, 0x3a, + 0x1e, 0x2d, 0x0b, 0x98, 0x40, 0x6d, 0xce, 0xa5, 0x64, 0x67, 0x9c, 0xf8, 0xa6, 0xd5, 0x1d, 0xd4, + 0x03, 0x49, 0xc5, 0xd4, 0x85, 0x24, 0xc1, 0x3f, 0x07, 0x8a, 0x0d, 0x49, 0x97, 0x22, 0xfc, 0x0c, + 0xea, 0xd6, 0xe2, 0x89, 0x60, 0x8a, 0x93, 0x9a, 0x19, 0x01, 0x6c, 0x89, 0x32, 0xc5, 0xf1, 0x2b, + 0xa8, 0x2a, 0x76, 0x26, 0x49, 0xd8, 0xf2, 0x3a, 0xf5, 0x6d, 0x72, 0xaf, 0x5b, 0xc2, 0xce, 0x64, + 0x3f, 0x57, 0xe2, 0x8a, 0x1a, 0x95, 0xf6, 0x77, 0x91, 0x67, 0x8a, 0x44, 0xd6, 0x9f, 0x3e, 0xe3, + 0x2d, 0xf0, 0xe5, 0x6c, 0x51, 0x70, 0x02, 0x66, 0xa0, 0x8d, 0xfb, 0x03, 0x69, 0x8e, 0x5a, 0xc9, + 0x93, 0x77, 0x10, 0xad, 0x5a, 0x62, 0x04, 0xde, 0x67, 0x7e, 0x65, 0x16, 0x1b, 0x51, 0x7d, 0x2c, + 0x57, 0x65, 0xf7, 0x67, 0xc1, 0x7b, 0x77, 0xc7, 0x69, 0xef, 0x43, 0x60, 0x57, 0x8d, 0xeb, 0x50, + 0xdb, 0x9b, 0x4c, 0xc7, 0x49, 0x9f, 0xa2, 0x0a, 0x8e, 0xc0, 0x1f, 0xf4, 0xa6, 0x83, 0x3e, 0x72, + 0xf0, 0x3a, 0x44, 0x1f, 0x86, 0x71, 0x32, 0x19, 0xd0, 0xde, 0x11, 0x72, 0x71, 0x0d, 0xbc, 0xb8, + 0x9f, 0x20, 0x0f, 0x03, 0x04, 0x71, 0xd2, 0x4b, 0xa6, 0x31, 0xaa, 0xb6, 0x77, 0x20, 0xb0, 0xfb, + 0xc1, 0x01, 0xb8, 0x93, 0x43, 0x54, 0xd1, 0xdd, 0x8e, 0x7b, 0x74, 0x3c, 0x1c, 0x0f, 0x90, 0x83, + 0xd7, 0x20, 0xdc, 0xa3, 0xc3, 0x64, 0xb8, 0xd7, 0x1b, 0x21, 0x57, 0x53, 0xd3, 0xf1, 0xe1, 0x78, + 0x72, 0x3c, 0x46, 0x5e, 0xfb, 0x25, 0xf8, 0xc6, 0x88, 0xae, 0xee, 0xf7, 0x0f, 0x7a, 0xd3, 0x51, + 0x62, 0x3f, 0x3f, 0x9a, 0x68, 0xb5, 0xa3, 0x3f, 0x33, 0x18, 0x4d, 0x76, 0xf5, 0x2f, 0xdb, 0x5f, + 0x3d, 0xa8, 0xe9, 0x05, 0x14, 0x2c, 0xd7, 0x37, 0x79, 0xc9, 0x85, 0xcc, 0x16, 0xb9, 0x31, 0xea, + 0xd3, 0x3b, 0x88, 0x1f, 0x43, 0xa8, 0x04, 0x9b, 0xf1, 0x93, 0x2c, 0x35, 0x7e, 0x3d, 0x5a, 0x33, + 0x78, 0x98, 0xe2, 0x06, 0xb8, 0x59, 0x6a, 0xf2, 0xe2, 0x51, 0x37, 0x4b, 0xf1, 0x53, 0x88, 0x0a, + 0x26, 0x78, 0xae, 0xb4, 0xd6, 0x86, 0x25, 0xb4, 0x85, 0x61, 0x8a, 0x5f, 0xc0, 0x03, 0xa9, 0x98, + 0x50, 0x27, 0x65, 0x9e, 0x7c, 0x23, 0x69, 0x98, 0x72, 0xb2, 0x0a, 0xd5, 0x73, 0x58, 0xe7, 0x79, + 0xfa, 0x9b, 0x2c, 0x30, 0xb2, 0x35, 0x9e, 0xa7, 0xa5, 0x68, 0x03, 0x7c, 0x2e, 0xc4, 0x42, 0x98, + 0xa8, 0x84, 0xd4, 0x02, 0xed, 0x42, 0x72, 0x71, 0x99, 0xcd, 0x38, 0x09, 0x6d, 0x1e, 0x97, 0x10, + 0x77, 0x74, 0x52, 0xf5, 0xc5, 0x48, 0x02, 0x26, 0x42, 0x8d, 0x3f, 0xef, 0x9f, 0xde, 0xd1, 0x78, + 0x6b, 0x99, 0xb4, 0xba, 0x91, 0x3d, 0x5a, 0xc9, 0x0a, 0x96, 0xff, 0x95, 0xb3, 0x4d, 0x88, 0xb2, + 0x3c, 0xcd, 0x66, 0x4c, 0x2d, 0x04, 0x59, 0x33, 0x93, 0x94, 0x85, 0xd5, 0x2b, 0x5b, 0x2f, 0x5f, + 0xd9, 0x7f, 0x27, 0xeb, 0x63, 0x35, 0x8c, 0x10, 0xec, 0x92, 0xef, 0x37, 0x4d, 0xe7, 0xfa, 0xa6, + 0xe9, 0xfc, 0xbc, 0x69, 0x3a, 0x5f, 0x6e, 0x9b, 0x95, 0xeb, 0xdb, 0x66, 0xe5, 0xc7, 0x6d, 0xb3, + 0xf2, 0x29, 0x30, 0xff, 0x0d, 0x6f, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, 0x2b, 0x10, 0x26, 0x15, + 0x2f, 0x04, 0x00, 0x00, } func (m *SSFSample) Marshal() (dAtA []byte, err error) { @@ -514,6 +561,11 @@ func (m *SSFSample) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintSample(dAtA, i, uint64(len(m.Unit))) i += copy(dAtA[i:], m.Unit) } + if m.Scope != 0 { + dAtA[i] = 0x50 + i++ + i = encodeVarintSample(dAtA, i, uint64(m.Scope)) + } return i, nil } @@ -676,6 +728,9 @@ func (m *SSFSample) Size() (n int) { if l > 0 { n += 1 + l + sovSample(uint64(l)) } + if m.Scope != 0 { + n += 1 + sovSample(uint64(m.Scope)) + } return n } @@ -1078,6 +1133,25 @@ func (m *SSFSample) Unmarshal(dAtA []byte) error { } m.Unit = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Scope", wireType) + } + m.Scope = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSample + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Scope |= SSFSample_Scope(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipSample(dAtA[iNdEx:]) diff --git a/ssf/sample.proto b/ssf/sample.proto index 6b60fc66b..7d5f46aa7 100644 --- a/ssf/sample.proto +++ b/ssf/sample.proto @@ -21,6 +21,12 @@ message SSFSample { UNKNOWN = 3; } + enum Scope { + DEFAULT = 0; + LOCAL = 1; + GLOBAL = 2; + } + // The underlying type of the metric Metric metric = 1; @@ -34,6 +40,15 @@ message SSFSample { float sample_rate = 7; map tags = 8; string unit = 9; + + // scope indicates to an SSF endpoint what it should do with a metric: + // + // - DEFAULT (or absent) - aggregate counters and gauges locally, + // handle histograms and sets globally. + // - LOCAL - aggregate all metrics locally. + // - GLOBAL - aggregate all metrics globally. + // + Scope scope = 10; } // SSFSpan is the primary unit of reporting in SSF. It embeds a set of diff --git a/ssf/samples.go b/ssf/samples.go index 1c5d4eef2..ab41829e1 100644 --- a/ssf/samples.go +++ b/ssf/samples.go @@ -60,6 +60,25 @@ func Timestamp(ts time.Time) SampleOption { } } +// SampleScope is a slightly more ergonomic representation of the +// internal type SSFSample_Scope. +type SampleScope SSFSample_Scope + +const ( + DefaultScope SampleScope = SampleScope(SSFSample_DEFAULT) + Local SampleScope = SampleScope(SSFSample_LOCAL) + Global SampleScope = SampleScope(SSFSample_GLOBAL) +) + +// Scope is a SampleOption that sets the scope of a metric to be +// either "global" (i.e., aggregated on a central node), or "local" +// (i.e., aggregated exclusively on the central node). +func Scope(scope SampleScope) SampleOption { + return func(s *SSFSample) { + s.Scope = SSFSample_Scope(scope) + } +} + // SampleRate sets the rate at which a measurement is sampled. The // rate is a number on the interval (0..1] (1 means that the value is // not sampled). Any numbers outside this interval result in no change diff --git a/stripe-build.yaml b/stripe-build.yaml new file mode 100644 index 000000000..b8c5f15a4 --- /dev/null +++ b/stripe-build.yaml @@ -0,0 +1,7 @@ +# See go/stripe-build for more information about customizing this file +--- +henson_tarball: + docker: {} +network: + restrict_network: true + http_proxy_report_only: true diff --git a/tdigest/histo_test.go b/tdigest/histo_test.go index f4a6c6363..85d42d16e 100644 --- a/tdigest/histo_test.go +++ b/tdigest/histo_test.go @@ -126,7 +126,7 @@ func encodedGob(t *testing.T) ([]byte, error) { } func deserializeGob(t *testing.T, fname string) []byte { - f, err := os.Open("oldgob.base64") + f, err := os.Open("testdata/oldgob.base64") require.NoError(t, err) buf, err := ioutil.ReadAll(base64.NewDecoder(base64.StdEncoding, f)) require.NoError(t, err) diff --git a/tdigest/oldgob.base64 b/tdigest/testdata/oldgob.base64 similarity index 100% rename from tdigest/oldgob.base64 rename to tdigest/testdata/oldgob.base64 diff --git a/fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz b/testdata/aws/PutObject/2016/10/14/1476481302.tsv.gz similarity index 100% rename from fixtures/aws/PutObject/2016/10/14/1476481302.tsv.gz rename to testdata/aws/PutObject/2016/10/14/1476481302.tsv.gz diff --git a/fixtures/cacert.pem b/testdata/cacert.pem similarity index 100% rename from fixtures/cacert.pem rename to testdata/cacert.pem diff --git a/fixtures/clientcert_correct.pem b/testdata/clientcert_correct.pem similarity index 100% rename from fixtures/clientcert_correct.pem rename to testdata/clientcert_correct.pem diff --git a/fixtures/clientcert_wrong.pem b/testdata/clientcert_wrong.pem similarity index 100% rename from fixtures/clientcert_wrong.pem rename to testdata/clientcert_wrong.pem diff --git a/fixtures/clientkey.pem b/testdata/clientkey.pem similarity index 100% rename from fixtures/clientkey.pem rename to testdata/clientkey.pem diff --git a/fixtures/consul/health_service_one.json b/testdata/consul/health_service_one.json similarity index 100% rename from fixtures/consul/health_service_one.json rename to testdata/consul/health_service_one.json diff --git a/fixtures/consul/health_service_two.json b/testdata/consul/health_service_two.json similarity index 100% rename from fixtures/consul/health_service_two.json rename to testdata/consul/health_service_two.json diff --git a/fixtures/consul/health_service_zero.json b/testdata/consul/health_service_zero.json similarity index 100% rename from fixtures/consul/health_service_zero.json rename to testdata/consul/health_service_zero.json diff --git a/fixtures/datadog_trace.json b/testdata/datadog_trace.json similarity index 100% rename from fixtures/datadog_trace.json rename to testdata/datadog_trace.json diff --git a/fixtures/import.deflate b/testdata/import.deflate similarity index 100% rename from fixtures/import.deflate rename to testdata/import.deflate diff --git a/fixtures/import.uncompressed b/testdata/import.uncompressed similarity index 100% rename from fixtures/import.uncompressed rename to testdata/import.uncompressed diff --git a/fixtures/protobuf/span-with-operation-062017.pb b/testdata/protobuf/span-with-operation-062017.pb similarity index 100% rename from fixtures/protobuf/span-with-operation-062017.pb rename to testdata/protobuf/span-with-operation-062017.pb diff --git a/fixtures/protobuf/trace.pb b/testdata/protobuf/trace.pb similarity index 100% rename from fixtures/protobuf/trace.pb rename to testdata/protobuf/trace.pb diff --git a/fixtures/protobuf/trace_critical.pb b/testdata/protobuf/trace_critical.pb similarity index 100% rename from fixtures/protobuf/trace_critical.pb rename to testdata/protobuf/trace_critical.pb diff --git a/fixtures/servercert.pem b/testdata/servercert.pem similarity index 100% rename from fixtures/servercert.pem rename to testdata/servercert.pem diff --git a/fixtures/serverkey.pem b/testdata/serverkey.pem similarity index 100% rename from fixtures/serverkey.pem rename to testdata/serverkey.pem diff --git a/fixtures/tracing_agent/spans/trace.pb.json b/testdata/tracing_agent/spans/trace.pb.json similarity index 100% rename from fixtures/tracing_agent/spans/trace.pb.json rename to testdata/tracing_agent/spans/trace.pb.json diff --git a/fixtures/tracing_agent/spans/trace_critical.pb.json b/testdata/tracing_agent/spans/trace_critical.pb.json similarity index 100% rename from fixtures/tracing_agent/spans/trace_critical.pb.json rename to testdata/tracing_agent/spans/trace_critical.pb.json diff --git a/trace/client.go b/trace/client.go index 5908294c2..e1254737a 100644 --- a/trace/client.go +++ b/trace/client.go @@ -57,14 +57,15 @@ type Client struct { flushBackends []flushNotifier // Parameters adjusted by client initialization: - backendParams *backendParams - nBackends uint - cap uint - cancel context.CancelFunc - flush func(context.Context) - report func(context.Context) - records chan *recordOp - spans chan<- *ssf.SSFSpan + backendParams *backendParams + nBackends uint + cap uint + cancel context.CancelFunc + flush func(context.Context) + report func(context.Context) + records chan *recordOp + spans chan<- *ssf.SSFSpan + sampleNormalizer func(*ssf.SSFSample) // statistics: failedFlushes int64 @@ -285,6 +286,20 @@ func ParallelBackends(nBackends uint) ClientParam { } } +// NormalizeSamples takes a function that gets run on every SSFSample +// reported as part of a span. This allows conditionally adjusting +// tags or scopes on metrics that might exceed cardinality limits. +// +// Note that the normalizer gets run on Samples every time the +// trace.Report function is called. This happen more than once, +// depending on the error handling behavior of the reporting program. +func NormalizeSamples(normalizer func(*ssf.SSFSample)) ClientParam { + return func(cl *Client) error { + cl.sampleNormalizer = normalizer + return nil + } +} + func newFlushNofifier(backend ClientBackend) flushNotifier { fb := flushNotifier{backend: backend} if _, ok := backend.(FlushableClientBackend); ok { @@ -471,6 +486,13 @@ func Record(cl *Client, span *ssf.SSFSpan, done chan<- error) error { return ErrNoClient } + // fixup any samples: + if cl.sampleNormalizer != nil { + for _, sample := range span.Metrics { + cl.sampleNormalizer(sample) + } + } + op := &recordOp{span: span, result: done} select { case cl.spans <- span: diff --git a/trace/client_test.go b/trace/client_test.go index 83f609f1d..ad52758bf 100644 --- a/trace/client_test.go +++ b/trace/client_test.go @@ -479,3 +479,29 @@ func TestDropStatistics(t *testing.T) { close(done) close(blockNext) } + +func TestNormalize(t *testing.T) { + received := make(chan *ssf.SSFSpan, 1) + + normalizer := func(sample *ssf.SSFSample) { + sample.Scope = ssf.SSFSample_Scope(ssf.Global) + sample.Tags["woo_yay"] = "blort" + } + cl, err := trace.NewBackendClient(testbackend.NewBackend(received), + trace.Capacity(5), + trace.NormalizeSamples(normalizer)) + require.NoError(t, err) + + span := trace.StartTrace("hi there") + span.Add(ssf.Gauge("whee.gauge", 20, map[string]string{})) + span.Add(ssf.Count("whee.counter", 20, map[string]string{}, ssf.Scope(ssf.Local))) + go mustRecord(t, cl, span) + + out := <-received + for _, sample := range out.Metrics { + assert.Equal(t, ssf.SSFSample_Scope(ssf.Global), sample.Scope, + "sample: %v", sample) + assert.Equal(t, "blort", sample.Tags["woo_yay"], + "sample: %v", sample) + } +} diff --git a/worker.go b/worker.go index 9f6d75ca9..354c8ed7c 100644 --- a/worker.go +++ b/worker.go @@ -7,11 +7,11 @@ import ( "sync/atomic" "time" - "github.com/DataDog/datadog-go/statsd" "github.com/sirupsen/logrus" "github.com/stripe/veneur/protocol" "github.com/stripe/veneur/samplers" "github.com/stripe/veneur/samplers/metricpb" + "github.com/stripe/veneur/scopedstatsd" "github.com/stripe/veneur/sinks" "github.com/stripe/veneur/ssf" "github.com/stripe/veneur/trace" @@ -38,7 +38,7 @@ type Worker struct { traceClient *trace.Client logger *logrus.Logger wm WorkerMetrics - stats *statsd.Client + stats scopedstatsd.Client } // IngestUDP on a Worker feeds the metric into the worker's PacketChan. @@ -233,7 +233,7 @@ func (wm WorkerMetrics) appendExportedMetric(res []*metricpb.Metric, exp metricE } // NewWorker creates, and returns a new Worker object. -func NewWorker(id int, cl *trace.Client, logger *logrus.Logger, stats *statsd.Client) *Worker { +func NewWorker(id int, cl *trace.Client, logger *logrus.Logger, stats scopedstatsd.Client) *Worker { return &Worker{ id: id, PacketChan: make(chan samplers.UDPMetric, 32), @@ -246,7 +246,7 @@ func NewWorker(id int, cl *trace.Client, logger *logrus.Logger, stats *statsd.Cl traceClient: cl, logger: logger, wm: NewWorkerMetrics(), - stats: stats, + stats: scopedstatsd.Ensure(stats), } } @@ -471,16 +471,16 @@ type EventWorker struct { mutex *sync.Mutex samples []ssf.SSFSample traceClient *trace.Client - stats *statsd.Client + stats scopedstatsd.Client } // NewEventWorker creates an EventWorker ready to collect events and service checks. -func NewEventWorker(cl *trace.Client, stats *statsd.Client) *EventWorker { +func NewEventWorker(cl *trace.Client, stats scopedstatsd.Client) *EventWorker { return &EventWorker{ sampleChan: make(chan ssf.SSFSample), mutex: &sync.Mutex{}, traceClient: cl, - stats: stats, + stats: scopedstatsd.Ensure(stats), } } @@ -523,13 +523,13 @@ type SpanWorker struct { // cumulative time spent per sink, in nanoseconds cumulativeTimes []int64 traceClient *trace.Client - statsd *statsd.Client + statsd scopedstatsd.Client capCount int64 emptySSFCount int64 } // NewSpanWorker creates a SpanWorker ready to collect events and service checks. -func NewSpanWorker(sinks []sinks.SpanSink, cl *trace.Client, statsd *statsd.Client, spanChan <-chan *ssf.SSFSpan, commonTags map[string]string) *SpanWorker { +func NewSpanWorker(sinks []sinks.SpanSink, cl *trace.Client, statsd scopedstatsd.Client, spanChan <-chan *ssf.SSFSpan, commonTags map[string]string) *SpanWorker { tags := make([]map[string]string, len(sinks)) for i, sink := range sinks { tags[i] = map[string]string{ @@ -544,7 +544,7 @@ func NewSpanWorker(sinks []sinks.SpanSink, cl *trace.Client, statsd *statsd.Clie commonTags: commonTags, cumulativeTimes: make([]int64, len(sinks)), traceClient: cl, - statsd: statsd, + statsd: scopedstatsd.Ensure(statsd), } }