From 3239494bebb36c3c60edc331f6d2de803eee337a Mon Sep 17 00:00:00 2001 From: Arnav Dugar <87779081+arnavdugar-stripe@users.noreply.github.com> Date: Fri, 9 Sep 2022 13:12:55 -0700 Subject: [PATCH] Enforce restrictions on metric sizes. (#988) --- config.go | 35 ++-- flusher.go | 136 ++++++++++----- flusher_test.go | 424 ++++++++++++++++++++++++++++++++++++++++----- networking_test.go | 2 + server.go | 16 +- 5 files changed, 499 insertions(+), 114 deletions(-) diff --git a/config.go b/config.go index 8d46c7ae1..1008143a3 100644 --- a/config.go +++ b/config.go @@ -8,17 +8,13 @@ import ( ) type Config struct { - Aggregates []string `yaml:"aggregates"` - BlockProfileRate int `yaml:"block_profile_rate"` - CountUniqueTimeseries bool `yaml:"count_unique_timeseries"` - Debug bool `yaml:"debug"` - EnableProfiling bool `yaml:"enable_profiling"` - ExtendTags []string `yaml:"extend_tags"` - Features struct { - DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"` - EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"` - ProxyProtocol string `yaml:"proxy_protocol"` - } `yaml:"features"` + Aggregates []string `yaml:"aggregates"` + BlockProfileRate int `yaml:"block_profile_rate"` + CountUniqueTimeseries bool `yaml:"count_unique_timeseries"` + Debug bool `yaml:"debug"` + EnableProfiling bool `yaml:"enable_profiling"` + ExtendTags []string `yaml:"extend_tags"` + Features Features `yaml:"features"` FlushOnShutdown bool `yaml:"flush_on_shutdown"` FlushWatchdogMissedFlushes int `yaml:"flush_watchdog_missed_flushes"` ForwardAddress string `yaml:"forward_address"` @@ -66,6 +62,12 @@ type Config struct { } `yaml:"veneur_metrics_scopes"` } +type Features struct { + DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"` + EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"` + ProxyProtocol string `yaml:"proxy_protocol"` +} + type HttpConfig struct { // Enables /config/json and /config/yaml endpoints for displaying the current // configuration. Entries of type util.StringSecret will be redacted unless @@ -91,8 +93,11 @@ type SourceConfig struct { } type SinkConfig struct { - Kind string `yaml:"kind"` - Name string `yaml:"name"` - Config interface{} `yaml:"config"` - StripTags []matcher.TagMatcher `yaml:"strip_tags"` + Kind string `yaml:"kind"` + Name string `yaml:"name"` + Config interface{} `yaml:"config"` + MaxNameLength int `yaml:"max_name_length"` + MaxTagLength int `yaml:"max_tag_length"` + MaxTags int `yaml:"max_tags"` + StripTags []matcher.TagMatcher `yaml:"strip_tags"` } diff --git a/flusher.go b/flusher.go index 5f907d1b1..1aa8d631f 100644 --- a/flusher.go +++ b/flusher.go @@ -99,7 +99,7 @@ func (s *Server) Flush(ctx context.Context) { s.reportGlobalReceivedProtocolMetrics() } - // If there's nothing to flush, don't bother calling the plugins and stuff. + // Return early if there's nothing to flush. if len(finalMetrics) == 0 { return } @@ -125,59 +125,99 @@ func (s *Server) Flush(ctx context.Context) { for _, sink := range s.metricSinks { wg.Add(1) go func(sink internalMetricSink) { - filteredMetrics := finalMetrics - if s.Config.Features.EnableMetricSinkRouting { - sinkName := sink.sink.Name() - filteredMetrics = []samplers.InterMetric{} - for _, metric := range finalMetrics { - _, ok := metric.Sinks[sinkName] - if !ok { - continue - } - filteredTags := []string{} - if len(sink.stripTags) == 0 { - filteredTags = metric.Tags - } else { - tagLoop: - for _, tag := range metric.Tags { - for _, tagMatcher := range sink.stripTags { - if tagMatcher.Match(tag) { - continue tagLoop - } - } - filteredTags = append(filteredTags, tag) + s.flushSink(ctx, sink, finalMetrics) + wg.Done() + }(sink) + } +} + +func (s *Server) flushSink( + ctx context.Context, sink internalMetricSink, metrics []samplers.InterMetric, +) { + flushStart := time.Now() + + skippedCount := int64(0) + maxNameLengthCount := int64(0) + maxTagsCount := int64(0) + maxTagLengthCount := int64(0) + flushedCount := int64(0) + + filteredMetrics := metrics + if s.Config.Features.EnableMetricSinkRouting { + sinkName := sink.sink.Name() + filteredMetrics = []samplers.InterMetric{} + metricLoop: + for _, metric := range metrics { + _, ok := metric.Sinks[sinkName] + if !ok { + skippedCount += 1 + continue metricLoop + } + if sink.maxNameLength != 0 && len(metric.Name) > sink.maxNameLength { + maxNameLengthCount += 1 + continue metricLoop + } + filteredTags := []string{} + if len(sink.stripTags) == 0 && sink.maxTagLength == 0 { + filteredTags = metric.Tags + } else { + tagLoop: + for _, tag := range metric.Tags { + for _, tagMatcher := range sink.stripTags { + if tagMatcher.Match(tag) { + continue tagLoop } } - metric.Tags = filteredTags - filteredMetrics = append(filteredMetrics, metric) + if sink.maxTagLength != 0 && len(tag) > sink.maxTagLength { + maxTagLengthCount += 1 + continue metricLoop + } + filteredTags = append(filteredTags, tag) } } - flushStart := time.Now() - flushResult, err := sink.sink.Flush(span.Attach(ctx), filteredMetrics) - flushCompleteMessageFields := logrus.Fields{ - "sink_name": sink.sink.Name(), - "sink_kind": sink.sink.Kind(), - "flushed": flushResult.MetricsFlushed, - "skipped": flushResult.MetricsSkipped, - "dropped": flushResult.MetricsDropped, - "duration_s": fmt.Sprintf("%.2f", time.Since(flushStart).Seconds()), + if sink.maxTags != 0 && len(filteredTags) > sink.maxTags { + maxTagsCount += 1 + continue metricLoop } - if err == nil { - s.logger.WithFields(flushCompleteMessageFields).WithField("success", true).Info(sinks.FlushCompleteMessage) - } else { - s.logger.WithFields(flushCompleteMessageFields).WithField("success", false).WithError(err).Warn(sinks.FlushCompleteMessage) - } - span.Add(ssf.Timing( - sinks.MetricKeyMetricFlushDuration, - time.Since(flushStart), - time.Millisecond, - map[string]string{ - "sink_name": sink.sink.Name(), - "sink_kind": sink.sink.Kind(), - })) - wg.Done() - }(sink) + metric.Tags = filteredTags + flushedCount += 1 + filteredMetrics = append(filteredMetrics, metric) + } + } + sinkNameTag := "sink_name:" + sink.sink.Name() + sinkKindTag := "sink_kind:" + sink.sink.Kind() + s.Statsd.Count("flushed_metrics", skippedCount, []string{ + sinkNameTag, sinkKindTag, "status:skipped", "veneurglobalonly:true", + }, 1) + s.Statsd.Count("flushed_metrics", maxNameLengthCount, []string{ + sinkNameTag, sinkKindTag, "status:max_name_length", "veneurglobalonly:true", + }, 1) + s.Statsd.Count("flushed_metrics", maxTagsCount, []string{ + sinkNameTag, sinkKindTag, "status:max_tags", "veneurglobalonly:true", + }, 1) + s.Statsd.Count("flushed_metrics", maxTagLengthCount, []string{ + sinkNameTag, sinkKindTag, "status:max_tag_length", "veneurglobalonly:true", + }, 1) + s.Statsd.Count("flushed_metrics", flushedCount, []string{ + sinkNameTag, sinkKindTag, "status:flushed", "veneurglobalonly:true", + }, 1) + flushResult, err := sink.sink.Flush(ctx, filteredMetrics) + flushCompleteMessageFields := logrus.Fields{ + "sink_name": sink.sink.Name(), + "sink_kind": sink.sink.Kind(), + "flushed": flushResult.MetricsFlushed, + "skipped": flushResult.MetricsSkipped, + "dropped": flushResult.MetricsDropped, + "duration_s": fmt.Sprintf("%.2f", time.Since(flushStart).Seconds()), + } + if err == nil { + s.logger.WithFields(flushCompleteMessageFields).WithField("success", true).Info(sinks.FlushCompleteMessage) + } else { + s.logger.WithFields(flushCompleteMessageFields).WithField("success", false).WithError(err).Warn(sinks.FlushCompleteMessage) } + s.Statsd.Timing( + sinks.MetricKeyMetricFlushDuration, time.Since(flushStart), + []string{sinkNameTag, sinkKindTag}, 1.0) } func (s *Server) tallyTimeseries() int64 { diff --git a/flusher_test.go b/flusher_test.go index 6d1f3d44d..f2b2795ce 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -3,11 +3,14 @@ package veneur import ( "context" "net/url" + "sync" "testing" "time" + "github.com/golang/mock/gomock" "github.com/sirupsen/logrus" "github.com/stripe/veneur/v14/samplers" + "github.com/stripe/veneur/v14/scopedstatsd" "github.com/stripe/veneur/v14/sinks" "github.com/stripe/veneur/v14/ssf" "github.com/stripe/veneur/v14/trace" @@ -340,39 +343,51 @@ func TestTallyTimeseries(t *testing.T) { assert.Equal(t, int64(2), summary) } -func TestStripTags(t *testing.T) { - config := localConfig() - config.Features.EnableMetricSinkRouting = true - config.MetricSinks = []SinkConfig{{ - Kind: "channel", - Name: "channel", - StripTags: []matcher.TagMatcher{ - matcher.CreateTagMatcher(&matcher.TagMatcherConfig{ - Kind: "prefix", - Value: "foo", - })}, - }} - config.MetricSinkRouting = []SinkRoutingConfig{{ - Name: "default", - Match: []matcher.Matcher{{ - Name: matcher.CreateNameMatcher(&matcher.NameMatcherConfig{ - Kind: "any", - }), - Tags: []matcher.TagMatcher{}, - }}, - Sinks: SinkRoutingSinks{ - Matched: []string{"channel"}, - }, - }} +func TestFlush(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() channel := make(chan []samplers.InterMetric) + mockStatsd := scopedstatsd.NewMockClient(ctrl) server, err := NewFromConfig(ServerConfig{ Logger: logrus.New(), - Config: config, + Config: Config{ + Debug: true, + Features: Features{ + EnableMetricSinkRouting: true, + }, + Hostname: "localhost", + Interval: DefaultFlushInterval, + MetricSinkRouting: []SinkRoutingConfig{{ + Name: "default", + Match: []matcher.Matcher{{ + Name: matcher.CreateNameMatcher(&matcher.NameMatcherConfig{ + Kind: "any", + }), + Tags: []matcher.TagMatcher{}, + }}, + Sinks: SinkRoutingSinks{ + Matched: []string{"channel"}, + }, + }}, + MetricSinks: []SinkConfig{{ + Kind: "channel", + Name: "channel", + MaxNameLength: 11, + MaxTagLength: 11, + MaxTags: 2, + StripTags: []matcher.TagMatcher{ + matcher.CreateTagMatcher(&matcher.TagMatcherConfig{ + Kind: "prefix", + Value: "foo", + })}, + }}, + StatsAddress: "localhost:8125", + }, MetricSinkTypes: MetricSinkTypes{ "channel": { Create: func( - server *Server, s2 string, logger *logrus.Entry, config Config, + server *Server, name string, logger *logrus.Entry, config Config, sinkConfig MetricSinkConfig, ) (sinks.MetricSink, error) { sink, err := NewChannelMetricSink(channel) @@ -381,32 +396,349 @@ func TestStripTags(t *testing.T) { } return sink, nil }, - ParseConfig: func(s string, i interface{}) (MetricSinkConfig, error) { + ParseConfig: func( + name string, config interface{}, + ) (MetricSinkConfig, error) { return nil, nil }, }, }, }) assert.NoError(t, err) - go server.Start() - defer server.Shutdown() + server.Statsd = mockStatsd - server.Workers[0].PacketChan <- samplers.UDPMetric{ - MetricKey: samplers.MetricKey{ - Name: "test.metric", - Type: "counter", - JoinedTags: "foo:value1,bar:value2", - }, - Digest: 0, - Scope: samplers.LocalOnly, - Tags: []string{"foo:value1", "bar:value2"}, - Value: 1.0, - SampleRate: 1.0, - } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + server.Start() + wg.Done() + }() + defer func() { + server.Shutdown() + wg.Wait() + }() + + mockStatsd.EXPECT(). + Count( + gomock.Not("flushed_metrics"), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + mockStatsd.EXPECT(). + Gauge( + gomock.Not("flushed_metrics"), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + mockStatsd.EXPECT(). + Timing( + gomock.Not("flushed_metrics"), gomock.All(), gomock.All(), gomock.All()). + AnyTimes() + + t.Run("WithStripTags", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + JoinedTags: "foo:value1,bar:value2", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"foo:value1", "bar:value2"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 1) { + assert.Equal(t, "bar:value2", result[0].Tags[0]) + } + } + }) + + t.Run("WithMaxNameLength", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.longmetric", + Type: "counter", + JoinedTags: "key1:value1,key2:value2", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"key1:value1", "key2:value2"}, + Value: 1.0, + SampleRate: 1.0, + } - result := <-channel - assert.Len(t, result, 1) - assert.Equal(t, "test.metric", result[0].Name) - assert.Len(t, result[0].Tags, 1) - assert.Equal(t, "bar:value2", result[0].Tags[0]) + result := <-channel + assert.Len(t, result, 0) + }) + + t.Run("WithMaxTags", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + JoinedTags: "key1:value1,key2:value2,key3:value3", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"key1:value1", "key2:value2", "key3:value3"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + assert.Len(t, result, 0) + }) + + t.Run("WithMaxTagsAndDroppedTag", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + JoinedTags: "key1:value1,key2:value2,key3:value3", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"foo:value1", "key2:value2", "key3:value3"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 2) { + assert.Equal(t, "key2:value2", result[0].Tags[0]) + assert.Equal(t, "key3:value3", result[0].Tags[1]) + } + } + }) + + t.Run("WithMaxTagLength", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + JoinedTags: "key1:value1,key2:value2,key3:value3", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"key1:long1", "key2:longvalue2", "key3:value3"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + assert.Len(t, result, 0) + }) + + t.Run("WithMaxTagLengthAndDroppedTag", func(t *testing.T) { + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:skipped", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_name_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tags", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(0), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:max_tag_length", + "veneurglobalonly:true", + }, 1.0) + mockStatsd.EXPECT().Count("flushed_metrics", int64(1), []string{ + "sink_name:channel", + "sink_kind:channel", + "status:flushed", + "veneurglobalonly:true", + }, 1.0) + + server.Workers[0].PacketChan <- samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "test.metric", + Type: "counter", + JoinedTags: "key1:value1,key2:value2,key3:value3", + }, + Digest: 0, + Scope: samplers.LocalOnly, + Tags: []string{"foo:longvalue1", "key2:value2", "key3:value3"}, + Value: 1.0, + SampleRate: 1.0, + } + + result := <-channel + if assert.Len(t, result, 1) { + assert.Equal(t, "test.metric", result[0].Name) + if assert.Len(t, result[0].Tags, 2) { + assert.Equal(t, "key2:value2", result[0].Tags[0]) + assert.Equal(t, "key3:value3", result[0].Tags[1]) + } + } + }) } diff --git a/networking_test.go b/networking_test.go index 2ae114813..40e2b32bc 100644 --- a/networking_test.go +++ b/networking_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/DataDog/datadog-go/statsd" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -58,6 +59,7 @@ func TestConnectUNIX(t *testing.T) { srv := &Server{ shutdown: make(chan struct{}), logger: logrus.NewEntry(logrus.New()), + Statsd: &statsd.NoOpClient{}, } source := SsfMetricsSource{ logger: srv.logger, diff --git a/server.go b/server.go index 338ad292c..e21af9ea9 100644 --- a/server.go +++ b/server.go @@ -121,7 +121,7 @@ type Server struct { SpanWorkerGoroutines int CountUniqueTimeseries bool - Statsd *scopedstatsd.ScopedClient + Statsd scopedstatsd.Client Hostname string Tags []string @@ -190,8 +190,11 @@ type internalSource struct { } type internalMetricSink struct { - sink sinks.MetricSink - stripTags []matcher.TagMatcher + sink sinks.MetricSink + maxNameLength int + maxTagLength int + maxTags int + stripTags []matcher.TagMatcher } type GlobalListeningPerProtocolMetrics struct { @@ -445,8 +448,11 @@ func (server *Server) createMetricSinks( return nil, err } sinks = append(sinks, internalMetricSink{ - sink: sink, - stripTags: sinkConfig.StripTags, + sink: sink, + maxNameLength: sinkConfig.MaxNameLength, + maxTagLength: sinkConfig.MaxTagLength, + maxTags: sinkConfig.MaxTags, + stripTags: sinkConfig.StripTags, }) } return sinks, nil