Skip to content

Commit

Permalink
Add a metric for counting metrics received.
Browse files Browse the repository at this point in the history
  • Loading branch information
arnavdugar-stripe committed Aug 29, 2022
1 parent b86033b commit 0a2ac07
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
Features struct {
DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"`
EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"`
EnableSourceMetricCount bool `yaml:"enable_source_metric_count"`
MigrateMetricSinks bool `yaml:"migrate_metric_sinks"`
MigrateSpanSinks bool `yaml:"migrate_span_sinks"`
ProxyProtocol string `yaml:"proxy_protocol"`
Expand Down
42 changes: 33 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,23 @@ func scopesFromConfig(conf Config) (scopedstatsd.MetricScopes, error) {
}

type ingest struct {
server *Server
tags []string
sourceName string
server *Server
tags []string
}

var _ sources.Ingest = &ingest{}

func (ingest *ingest) IngestMetric(metric *samplers.UDPMetric) {
metric.Tags = append(metric.Tags, ingest.tags...)

if ingest.server.Config.Features.EnableSourceMetricCount {
ingest.server.Statsd.Count(
"source.metrics_count", int64(1), []string{
"protocol:udp",
"source:" + ingest.sourceName,
}, 1.0)
}
ingest.server.ingestMetric(metric)
}

Expand All @@ -356,8 +365,15 @@ func (ingest *ingest) IngestMetricProto(metric *metricpb.Metric) {
for _, tag := range metric.Tags {
h = fnv1a.AddString32(h, tag)
}

workerIndex := h % uint32(len(ingest.server.Workers))

if ingest.server.Config.Features.EnableSourceMetricCount {
ingest.server.Statsd.Count(
"source.metrics_count", int64(1), []string{
"protocol:proto",
"source:" + ingest.sourceName,
}, 1.0)
}
ingest.server.Workers[workerIndex].ImportMetricChan <- metric
}

Expand Down Expand Up @@ -514,20 +530,27 @@ func NewFromConfig(config ServerConfig) (*Server, error) {
}
ret.HistogramAggregates.Count = len(conf.Aggregates)

stats, err := statsd.New(conf.StatsAddress, statsd.WithoutTelemetry(), statsd.WithMaxMessagesPerPayload(4096))
statsClient, err := statsd.New(
conf.StatsAddress,
statsd.WithAggregationInterval(conf.Interval),
statsd.WithChannelMode(),
statsd.WithChannelModeBufferSize(64),
statsd.WithClientSideAggregation(),
statsd.WithMaxMessagesPerPayload(4096),
statsd.WithoutTelemetry())
if err != nil {
return ret, err
}
stats.Namespace = "veneur."
statsClient.Namespace = "veneur."

scopes, err := scopesFromConfig(conf)
if err != nil {
return ret, err
}
ret.Statsd = scopedstatsd.NewClient(stats, conf.VeneurMetricsAdditionalTags, scopes)
ret.Statsd = scopedstatsd.NewClient(statsClient, conf.VeneurMetricsAdditionalTags, scopes)

ret.TraceClient, err = trace.NewChannelClient(ret.SpanChan,
trace.ReportStatistics(stats, 1*time.Second, []string{"ssf_format:internal"}),
trace.ReportStatistics(statsClient, 1*time.Second, []string{"ssf_format:internal"}),
normalizeSpans(conf),
)
if err != nil {
Expand Down Expand Up @@ -1374,8 +1397,9 @@ func (s *Server) Serve() {
for _, source := range s.sources {
go func(source internalSource) {
source.source.Start(&ingest{
server: s,
tags: source.tags,
sourceName: source.source.Name(),
server: s,
tags: source.tags,
})
done <- struct{}{}
}(source)
Expand Down

0 comments on commit 0a2ac07

Please sign in to comment.