diff --git a/config.go b/config.go index 3a0b278ec..5b772885b 100644 --- a/config.go +++ b/config.go @@ -64,6 +64,7 @@ type Config struct { type Features struct { DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"` EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"` + EnableSourceMetricCount bool `yaml:"enable_source_metric_count"` } type HttpConfig struct { diff --git a/server.go b/server.go index a4d237422..44769fa74 100644 --- a/server.go +++ b/server.go @@ -326,14 +326,23 @@ func scopesFromConfig(conf Config) (scopedstatsd.MetricScopes, error) { } type ingest struct { - server *Server - tags []string + sourceName string + server *Server + tags []string } var _ sources.Ingest = &ingest{} func (ingest *ingest) IngestMetric(metric *samplers.UDPMetric) { metric.Tags = append(metric.Tags, ingest.tags...) + + if ingest.server.Config.Features.EnableSourceMetricCount { + ingest.server.Statsd.Count( + "source.metrics_count", int64(1), []string{ + "protocol:udp", + "source:" + ingest.sourceName, + }, 1.0) + } ingest.server.ingestMetric(metric) } @@ -349,8 +358,15 @@ func (ingest *ingest) IngestMetricProto(metric *metricpb.Metric) { for _, tag := range metric.Tags { h = fnv1a.AddString32(h, tag) } - workerIndex := h % uint32(len(ingest.server.Workers)) + + if ingest.server.Config.Features.EnableSourceMetricCount { + ingest.server.Statsd.Count( + "source.metrics_count", int64(1), []string{ + "protocol:proto", + "source:" + ingest.sourceName, + }, 1.0) + } ingest.server.Workers[workerIndex].ImportMetricChan <- metric } @@ -510,20 +526,27 @@ func NewFromConfig(config ServerConfig) (*Server, error) { } ret.HistogramAggregates.Count = len(conf.Aggregates) - stats, err := statsd.New(conf.StatsAddress, statsd.WithoutTelemetry(), statsd.WithMaxMessagesPerPayload(4096)) + statsClient, err := statsd.New( + conf.StatsAddress, + statsd.WithAggregationInterval(conf.Interval), + statsd.WithChannelMode(), + statsd.WithChannelModeBufferSize(64), + statsd.WithClientSideAggregation(), + statsd.WithMaxMessagesPerPayload(4096), + statsd.WithoutTelemetry()) if err != nil { return ret, err } - stats.Namespace = "veneur." + statsClient.Namespace = "veneur." scopes, err := scopesFromConfig(conf) if err != nil { return ret, err } - ret.Statsd = scopedstatsd.NewClient(stats, conf.VeneurMetricsAdditionalTags, scopes) + ret.Statsd = scopedstatsd.NewClient(statsClient, conf.VeneurMetricsAdditionalTags, scopes) ret.TraceClient, err = trace.NewChannelClient(ret.SpanChan, - trace.ReportStatistics(stats, 1*time.Second, []string{"ssf_format:internal"}), + trace.ReportStatistics(statsClient, 1*time.Second, []string{"ssf_format:internal"}), normalizeSpans(conf), ) if err != nil { @@ -1343,8 +1366,9 @@ func (s *Server) Serve() { for _, source := range s.sources { go func(source internalSource) { source.source.Start(&ingest{ - server: s, - tags: source.tags, + sourceName: source.source.Name(), + server: s, + tags: source.tags, }) done <- struct{}{} }(source) diff --git a/testdata/http_test_config.json b/testdata/http_test_config.json index bbb3a69b6..7e661876c 100644 --- a/testdata/http_test_config.json +++ b/testdata/http_test_config.json @@ -11,7 +11,8 @@ "ExtendTags": null, "Features": { "DiagnosticsMetricsEnabled": false, - "EnableMetricSinkRouting": false + "EnableMetricSinkRouting": false, + "EnableSourceMetricCount": false }, "FlushOnShutdown": false, "FlushWatchdogMissedFlushes": 0, diff --git a/testdata/http_test_config.yaml b/testdata/http_test_config.yaml index 88ee44c05..4d79d2e75 100644 --- a/testdata/http_test_config.yaml +++ b/testdata/http_test_config.yaml @@ -10,6 +10,7 @@ extend_tags: [] features: diagnostics_metrics_enabled: false enable_metric_sink_routing: false + enable_source_metric_count: false flush_on_shutdown: false flush_watchdog_missed_flushes: 0 forward_address: ""