Skip to content

Commit

Permalink
Enforce restrictions on metric sizes. (#988)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnavdugar-stripe authored Sep 9, 2022
1 parent bdc2568 commit 3239494
Show file tree
Hide file tree
Showing 5 changed files with 499 additions and 114 deletions.
35 changes: 20 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@ import (
)

type Config struct {
Aggregates []string `yaml:"aggregates"`
BlockProfileRate int `yaml:"block_profile_rate"`
CountUniqueTimeseries bool `yaml:"count_unique_timeseries"`
Debug bool `yaml:"debug"`
EnableProfiling bool `yaml:"enable_profiling"`
ExtendTags []string `yaml:"extend_tags"`
Features struct {
DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"`
EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"`
ProxyProtocol string `yaml:"proxy_protocol"`
} `yaml:"features"`
Aggregates []string `yaml:"aggregates"`
BlockProfileRate int `yaml:"block_profile_rate"`
CountUniqueTimeseries bool `yaml:"count_unique_timeseries"`
Debug bool `yaml:"debug"`
EnableProfiling bool `yaml:"enable_profiling"`
ExtendTags []string `yaml:"extend_tags"`
Features Features `yaml:"features"`
FlushOnShutdown bool `yaml:"flush_on_shutdown"`
FlushWatchdogMissedFlushes int `yaml:"flush_watchdog_missed_flushes"`
ForwardAddress string `yaml:"forward_address"`
Expand Down Expand Up @@ -66,6 +62,12 @@ type Config struct {
} `yaml:"veneur_metrics_scopes"`
}

type Features struct {
DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"`
EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"`
ProxyProtocol string `yaml:"proxy_protocol"`
}

type HttpConfig struct {
// Enables /config/json and /config/yaml endpoints for displaying the current
// configuration. Entries of type util.StringSecret will be redacted unless
Expand All @@ -91,8 +93,11 @@ type SourceConfig struct {
}

type SinkConfig struct {
Kind string `yaml:"kind"`
Name string `yaml:"name"`
Config interface{} `yaml:"config"`
StripTags []matcher.TagMatcher `yaml:"strip_tags"`
Kind string `yaml:"kind"`
Name string `yaml:"name"`
Config interface{} `yaml:"config"`
MaxNameLength int `yaml:"max_name_length"`
MaxTagLength int `yaml:"max_tag_length"`
MaxTags int `yaml:"max_tags"`
StripTags []matcher.TagMatcher `yaml:"strip_tags"`
}
136 changes: 88 additions & 48 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *Server) Flush(ctx context.Context) {
s.reportGlobalReceivedProtocolMetrics()
}

// If there's nothing to flush, don't bother calling the plugins and stuff.
// Return early if there's nothing to flush.
if len(finalMetrics) == 0 {
return
}
Expand All @@ -125,59 +125,99 @@ func (s *Server) Flush(ctx context.Context) {
for _, sink := range s.metricSinks {
wg.Add(1)
go func(sink internalMetricSink) {
filteredMetrics := finalMetrics
if s.Config.Features.EnableMetricSinkRouting {
sinkName := sink.sink.Name()
filteredMetrics = []samplers.InterMetric{}
for _, metric := range finalMetrics {
_, ok := metric.Sinks[sinkName]
if !ok {
continue
}
filteredTags := []string{}
if len(sink.stripTags) == 0 {
filteredTags = metric.Tags
} else {
tagLoop:
for _, tag := range metric.Tags {
for _, tagMatcher := range sink.stripTags {
if tagMatcher.Match(tag) {
continue tagLoop
}
}
filteredTags = append(filteredTags, tag)
s.flushSink(ctx, sink, finalMetrics)
wg.Done()
}(sink)
}
}

func (s *Server) flushSink(
ctx context.Context, sink internalMetricSink, metrics []samplers.InterMetric,
) {
flushStart := time.Now()

skippedCount := int64(0)
maxNameLengthCount := int64(0)
maxTagsCount := int64(0)
maxTagLengthCount := int64(0)
flushedCount := int64(0)

filteredMetrics := metrics
if s.Config.Features.EnableMetricSinkRouting {
sinkName := sink.sink.Name()
filteredMetrics = []samplers.InterMetric{}
metricLoop:
for _, metric := range metrics {
_, ok := metric.Sinks[sinkName]
if !ok {
skippedCount += 1
continue metricLoop
}
if sink.maxNameLength != 0 && len(metric.Name) > sink.maxNameLength {
maxNameLengthCount += 1
continue metricLoop
}
filteredTags := []string{}
if len(sink.stripTags) == 0 && sink.maxTagLength == 0 {
filteredTags = metric.Tags
} else {
tagLoop:
for _, tag := range metric.Tags {
for _, tagMatcher := range sink.stripTags {
if tagMatcher.Match(tag) {
continue tagLoop
}
}
metric.Tags = filteredTags
filteredMetrics = append(filteredMetrics, metric)
if sink.maxTagLength != 0 && len(tag) > sink.maxTagLength {
maxTagLengthCount += 1
continue metricLoop
}
filteredTags = append(filteredTags, tag)
}
}
flushStart := time.Now()
flushResult, err := sink.sink.Flush(span.Attach(ctx), filteredMetrics)
flushCompleteMessageFields := logrus.Fields{
"sink_name": sink.sink.Name(),
"sink_kind": sink.sink.Kind(),
"flushed": flushResult.MetricsFlushed,
"skipped": flushResult.MetricsSkipped,
"dropped": flushResult.MetricsDropped,
"duration_s": fmt.Sprintf("%.2f", time.Since(flushStart).Seconds()),
if sink.maxTags != 0 && len(filteredTags) > sink.maxTags {
maxTagsCount += 1
continue metricLoop
}
if err == nil {
s.logger.WithFields(flushCompleteMessageFields).WithField("success", true).Info(sinks.FlushCompleteMessage)
} else {
s.logger.WithFields(flushCompleteMessageFields).WithField("success", false).WithError(err).Warn(sinks.FlushCompleteMessage)
}
span.Add(ssf.Timing(
sinks.MetricKeyMetricFlushDuration,
time.Since(flushStart),
time.Millisecond,
map[string]string{
"sink_name": sink.sink.Name(),
"sink_kind": sink.sink.Kind(),
}))
wg.Done()
}(sink)
metric.Tags = filteredTags
flushedCount += 1
filteredMetrics = append(filteredMetrics, metric)
}
}
sinkNameTag := "sink_name:" + sink.sink.Name()
sinkKindTag := "sink_kind:" + sink.sink.Kind()
s.Statsd.Count("flushed_metrics", skippedCount, []string{
sinkNameTag, sinkKindTag, "status:skipped", "veneurglobalonly:true",
}, 1)
s.Statsd.Count("flushed_metrics", maxNameLengthCount, []string{
sinkNameTag, sinkKindTag, "status:max_name_length", "veneurglobalonly:true",
}, 1)
s.Statsd.Count("flushed_metrics", maxTagsCount, []string{
sinkNameTag, sinkKindTag, "status:max_tags", "veneurglobalonly:true",
}, 1)
s.Statsd.Count("flushed_metrics", maxTagLengthCount, []string{
sinkNameTag, sinkKindTag, "status:max_tag_length", "veneurglobalonly:true",
}, 1)
s.Statsd.Count("flushed_metrics", flushedCount, []string{
sinkNameTag, sinkKindTag, "status:flushed", "veneurglobalonly:true",
}, 1)
flushResult, err := sink.sink.Flush(ctx, filteredMetrics)
flushCompleteMessageFields := logrus.Fields{
"sink_name": sink.sink.Name(),
"sink_kind": sink.sink.Kind(),
"flushed": flushResult.MetricsFlushed,
"skipped": flushResult.MetricsSkipped,
"dropped": flushResult.MetricsDropped,
"duration_s": fmt.Sprintf("%.2f", time.Since(flushStart).Seconds()),
}
if err == nil {
s.logger.WithFields(flushCompleteMessageFields).WithField("success", true).Info(sinks.FlushCompleteMessage)
} else {
s.logger.WithFields(flushCompleteMessageFields).WithField("success", false).WithError(err).Warn(sinks.FlushCompleteMessage)
}
s.Statsd.Timing(
sinks.MetricKeyMetricFlushDuration, time.Since(flushStart),
[]string{sinkNameTag, sinkKindTag}, 1.0)
}

func (s *Server) tallyTimeseries() int64 {
Expand Down
Loading

0 comments on commit 3239494

Please sign in to comment.