Skip to content

Commit

Permalink
Merge branch 'master' into prudhvi.upgrade_datadog
Browse files Browse the repository at this point in the history
  • Loading branch information
prudhvi authored May 27, 2019
2 parents c40e1f8 + b15940a commit 78c8a7c
Show file tree
Hide file tree
Showing 53 changed files with 806 additions and 196 deletions.
51 changes: 0 additions & 51 deletions .travis.yml

This file was deleted.

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* A flush "watchdog" controlled by the config setting `flush_watchdog_missed_flushes`. If veneur has not started this many flushes, the watchdog panics and terminates veneur (so it can be restarted by process supervision). Thanks, [antifuchs](https://github.com/antifuchs)!
* Splunk sink: Trace-related IDs are now represented in hexadecimal for cross-tool compatibility and a small byte savings. Thanks, [myndzi](https://github.com/myndzi)
* Splunk sink: Indicator spans are now tagged with `"partial":true` when they would otherwise have been sampled, distinguishing between partial and full traces. Thanks, [myndzi](https://github.com/myndzi)
* New configuration options `veneur_metrics_scopes` and `veneur_metrics_additional_tags`, which allow configuring veneur such that it aggregates its own metrics globally (rather than reporting a set of internal metrics per instance/container/etc). Thanks, [antifuchs](https://github.com/antifuchs)!
* New SSF `sample` field: `scope`. This field lets clients tell Veneur what to do with the sample - it corresponds exactly to the `veneurglobalonly` and `veneurlocalonly` tags that metrics can hold. Thanks, [antifuchs](https://github.com/antifuchs)!

## Updated

Expand Down
14 changes: 11 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,15 @@ type Config struct {
TraceLightstepNumClients int `yaml:"trace_lightstep_num_clients"`
TraceLightstepReconnectPeriod string `yaml:"trace_lightstep_reconnect_period"`
TraceMaxLengthBytes int `yaml:"trace_max_length_bytes"`
XrayAddress string `yaml:"xray_address"`
XrayAnnotationTags []string `yaml:"xray_annotation_tags"`
XraySamplePercentage int `yaml:"xray_sample_percentage"`
VeneurMetricsAdditionalTags []string `yaml:"veneur_metrics_additional_tags"`
VeneurMetricsScopes struct {
Counter string `yaml:"counter"`
Gauge string `yaml:"gauge"`
Histogram string `yaml:"histogram"`
Set string `yaml:"set"`
Status string `yaml:"status"`
} `yaml:"veneur_metrics_scopes"`
XrayAddress string `yaml:"xray_address"`
XrayAnnotationTags []string `yaml:"xray_annotation_tags"`
XraySamplePercentage int `yaml:"xray_sample_percentage"`
}
10 changes: 5 additions & 5 deletions consul_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ConsulOneRoundTripper struct {
func (rt *ConsulOneRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
rec := httptest.NewRecorder()
if strings.HasPrefix(req.URL.Path, "/v1/health/service/") {
resp, _ := ioutil.ReadFile("fixtures/consul/health_service_one.json")
resp, _ := ioutil.ReadFile("testdata/consul/health_service_one.json")
rec.Write(resp)
rec.Code = http.StatusOK
rt.HealthGotCalled = true
Expand All @@ -38,12 +38,12 @@ func (rt *ConsulChangingRoundTripper) RoundTrip(req *http.Request) (*http.Respon
var resp []byte
if rt.Count == 2 {
// On the second invocation, return zero hosts!
resp, _ = ioutil.ReadFile("fixtures/consul/health_service_zero.json")
resp, _ = ioutil.ReadFile("testdata/consul/health_service_zero.json")
} else if rt.Count == 1 {
// On the second invocation, return two hosts!
resp, _ = ioutil.ReadFile("fixtures/consul/health_service_two.json")
resp, _ = ioutil.ReadFile("testdata/consul/health_service_two.json")
} else {
resp, _ = ioutil.ReadFile("fixtures/consul/health_service_one.json")
resp, _ = ioutil.ReadFile("testdata/consul/health_service_one.json")
}
rec.Write(resp)
rec.Code = http.StatusOK
Expand All @@ -52,7 +52,7 @@ func (rt *ConsulChangingRoundTripper) RoundTrip(req *http.Request) (*http.Respon
} else if req.URL.Path == "/v1/health/service/traceServiceName" {
// These don't count. Since we make different calls, we'll return some junk
// for tracing and leave forwarding to it's own thing.
resp, _ := ioutil.ReadFile("fixtures/consul/health_service_one.json")
resp, _ := ioutil.ReadFile("testdata/consul/health_service_one.json")
rec.Write(resp)
rec.Code = http.StatusOK
}
Expand Down
6 changes: 6 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ concerning the run time of this process to veneur.

`veneur-emit` has a bunch more options, check out the usage for it
with `go run ./cmd/veneur-emit/main.go -help`!

## Adding test data files

When your tests depend on data files, put them into a `testdata`
directory next to the test file, and use relative paths to refer to
them.
35 changes: 35 additions & 0 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,41 @@ aggregates:
- "max"
- "count"

# Metrics that Veneur reports about its own operation. Each of the
# entries here can have the value "global", "local", "default" and ""
# ("default" and "" mean the same thing). Setting
# this to any value other than the default will make all metrics
# of that type have the following behavior:
#
# - "default"/"": scope remains unchanged
# - "global": scope for "default"-scoped metrics of that type will be
# changed to global, so they get forwarded to a global veneur node.
# - "local": scope for "default"-scoped metrics of that type will be
# changed to local, so they get reported from the local veneur node
# only.
#
# When this is unset in configuration, the default values for all
# metric types are "", indicating that veneur will use the default
# scope for each of the metrics it reports.
veneur_metrics_scopes:
counter: local

# changing the setting for "gauge" to "global" is not recommended,
# as the global aggregation method for gauges is "last write wins".
gauge: local

histogram: global

set: global

status: local


# Tags supplied here will be attached to all metrics that veneur
# reports about its own operation.
veneur_metrics_additional_tags:
- "veneur_internal_metric:true"

# == DEPRECATED ==

# This configuration has been replaced by datadog_flush_max_per_body.
Expand Down
36 changes: 27 additions & 9 deletions forward_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ func TestE2EForwardingGRPCMetrics(t *testing.T) {
}
done := make(chan struct{})
go func() {
metrics := <-ch
defer close(done)

expectedNames := []string{
expected := map[string]bool{}
for _, name := range []string{
testGRPCMetric("histogram.50percentile"),
testGRPCMetric("histogram.75percentile"),
testGRPCMetric("histogram.99percentile"),
Expand All @@ -216,17 +217,34 @@ func TestE2EForwardingGRPCMetrics(t *testing.T) {
testGRPCMetric("counter"),
testGRPCMetric("gauge"),
testGRPCMetric("set"),
} {
expected[name] = false
}

actualNames := make([]string, len(metrics))
for i, metric := range metrics {
actualNames[i] = metric.Name
metrics:
for {
metrics := <-ch

for _, metric := range metrics {
_, ok := expected[metric.Name]
if !ok {
t.Errorf("unexpected metric %q", metric.Name)
continue
}
expected[metric.Name] = true
}
for name, got := range expected {
if !got {
// we have more metrics to read:
t.Logf("metric %q still missing", name)
continue metrics
}
}
// if there had been metrics to read, we'd
// have restarted the loop:
return
}

assert.ElementsMatch(t, expectedNames, actualNames,
"The global Veneur didn't flush the right metrics: expectedNames=%v actualNames=%v",
expectedNames, actualNames)
close(done)
}()
ff.local.Flush(context.TODO())
ff.global.Flush(context.TODO())
Expand Down
12 changes: 6 additions & 6 deletions http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,20 @@ func testServerImport(t *testing.T, filename string, contentEncoding string) {
func TestServerImportCompressed(t *testing.T) {
// Test that the global veneur instance can handle
// requests that provide compressed metrics
testServerImport(t, filepath.Join("fixtures", "import.deflate"), "deflate")
testServerImport(t, filepath.Join("testdata", "import.deflate"), "deflate")
}

func TestServerImportUncompressed(t *testing.T) {
// Test that the global veneur instance can handle
// requests that provide uncompressed metrics
testServerImport(t, filepath.Join("fixtures", "import.uncompressed"), "")
testServerImport(t, filepath.Join("testdata", "import.uncompressed"), "")
}

func TestServerImportGzip(t *testing.T) {
// Test that the global veneur instance
// returns a 400 for gzipped-input

f, err := os.Open(filepath.Join("fixtures", "import.uncompressed"))
f, err := os.Open(filepath.Join("testdata", "import.uncompressed"))
assert.NoError(t, err, "Error reading response fixture")
defer f.Close()

Expand Down Expand Up @@ -171,7 +171,7 @@ func TestServerImportCompressedInvalid(t *testing.T) {

//TODO(aditya) test that the metrics are properly reported

f, err := os.Open(filepath.Join("fixtures", "import.uncompressed"))
f, err := os.Open(filepath.Join("testdata", "import.uncompressed"))
assert.NoError(t, err, "Error reading response fixture")
defer f.Close()

Expand All @@ -196,7 +196,7 @@ func TestServerImportUncompressedInvalid(t *testing.T) {

//TODO(aditya) test that the metrics are properly reported

f, err := os.Open(filepath.Join("fixtures", "import.deflate"))
f, err := os.Open(filepath.Join("testdata", "import.deflate"))
assert.NoError(t, err, "Error reading response fixture")
defer f.Close()

Expand Down Expand Up @@ -380,7 +380,7 @@ func testServerImportHelper(t *testing.T, data interface{}) {

func BenchmarkNewSortableJSONMetrics(b *testing.B) {
const numWorkers = 100
filename := filepath.Join("fixtures", "import.deflate")
filename := filepath.Join("testdata", "import.deflate")
contentEncoding := "deflate"

f, err := os.Open(filename)
Expand Down
2 changes: 1 addition & 1 deletion plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestGlobalServerS3PluginFlush(t *testing.T) {

client := &s3Mock.MockS3Client{}
client.SetPutObject(func(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
f, err := os.Open(path.Join("fixtures", "aws", "PutObject", "2016", "10", "14", "1476481302.tsv.gz"))
f, err := os.Open(path.Join("testdata", "aws", "PutObject", "2016", "10", "14", "1476481302.tsv.gz"))
assert.NoError(t, err)
defer f.Close()

Expand Down
4 changes: 2 additions & 2 deletions plugins/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestS3Post(t *testing.T) {
}()

client := &s3Mock.MockS3Client{}
f, err := os.Open(path.Join("..", "..", "fixtures", "aws", "PutObject", "2016", "10", "13", "1476370612.tsv.gz"))
f, err := os.Open(path.Join("testdata", "aws", "PutObject", "2016", "10", "13", "1476370612.tsv.gz"))
assert.NoError(t, err)
defer f.Close()

Expand Down Expand Up @@ -124,7 +124,7 @@ func TestS3Path(t *testing.T) {
func TestS3PostNoCredentials(t *testing.T) {
s3p := &S3Plugin{Logger: log, Svc: nil}

f, err := os.Open(path.Join("..", "..", "fixtures", "aws", "PutObject", "2016", "10", "07", "1475863542.json"))
f, err := os.Open(path.Join("testdata", "aws", "PutObject", "2016", "10", "07", "1475863542.json"))
assert.NoError(t, err)
defer f.Close()

Expand Down
4 changes: 2 additions & 2 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func (rt *ConsulTwoMetricRoundTripper) RoundTrip(req *http.Request) (*http.Respo

rec := httptest.NewRecorder()
if req.URL.Path == "/v1/health/service/forwardServiceName" {
resp, _ := ioutil.ReadFile("fixtures/consul/health_service_two.json")
resp, _ := ioutil.ReadFile("testdata/consul/health_service_two.json")
rec.Write(resp)
rec.Code = http.StatusOK
} else if req.URL.Path == "/v1/health/service/traceServiceName" {
resp, _ := ioutil.ReadFile("fixtures/consul/health_service_two.json")
resp, _ := ioutil.ReadFile("testdata/consul/health_service_two.json")
rec.Write(resp)
rec.Code = http.StatusOK
} else if req.URL.Path == "/api/v1/series" {
Expand Down
2 changes: 1 addition & 1 deletion regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNoTagName(t *testing.T) {
}

func TestOperation(t *testing.T) {
pbFile := filepath.Join("fixtures", "protobuf", "span-with-operation-062017.pb")
pbFile := filepath.Join("testdata", "protobuf", "span-with-operation-062017.pb")
pb, err := os.Open(pbFile)
assert.NoError(t, err)
defer pb.Close()
Expand Down
6 changes: 6 additions & 0 deletions samplers/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ func ParseMetricSSF(metric *ssf.SSFSample) (UDPMetric, error) {
default:
ret.Value = float64(metric.Value)
}
switch metric.Scope {
case ssf.SSFSample_LOCAL:
ret.Scope = LocalOnly
case ssf.SSFSample_GLOBAL:
ret.Scope = GlobalOnly
}
ret.SampleRate = metric.SampleRate
tempTags := make([]string, 0, len(metric.Tags))
for key, value := range metric.Tags {
Expand Down
Loading

0 comments on commit 78c8a7c

Please sign in to comment.