diff --git a/flusher.go b/flusher.go index ea8631ae2..33c6f730f 100644 --- a/flusher.go +++ b/flusher.go @@ -44,6 +44,20 @@ func (s *Server) Flush(ctx context.Context) { if s.CountUniqueTimeseries { s.Statsd.Count("flush.unique_timeseries_total", s.tallyTimeseries(), []string{fmt.Sprintf("global_veneur:%t", !s.IsLocal())}, 1.0) + s.Statsd.Count("flush.unique_timeseries_daily_cumulative", int64(s.cumulativeTimeseries.Estimate()), []string{"veneurglobalonly:true"}, 1.0) + + t := time.Now() + + // Ensure that the cumulative daily count is reset around midnight. + // It will reset at most 6 times between 00:00:00 and 00:01:00, which is + // an acceptable level of precision for this estimate. + // (Skipping at most five flush cycles' worth of data in the cumulative count + // will be unnoticable) + // The alternative would require using synchronization such as a time.Ticker + // and a lock around the HLL, which is overkill and fiddly. + if t.Minute() == 0 && t.Hour() == 0 { + s.cumulativeTimeseries = hyperloglog.New() + } } samples := s.EventWorker.Flush() @@ -140,6 +154,7 @@ func (s *Server) tallyTimeseries() int64 { w.uniqueMTS = hyperloglog.New() w.uniqueMTSMtx.Unlock() } + s.cumulativeTimeseries.Merge(allTimeseries) return int64(allTimeseries.Estimate()) } diff --git a/server.go b/server.go index 0cbc69bbd..0651e5c94 100644 --- a/server.go +++ b/server.go @@ -58,6 +58,8 @@ import ( "github.com/stripe/veneur/v14/ssf" "github.com/stripe/veneur/v14/trace" "github.com/stripe/veneur/v14/trace/metrics" + + "github.com/axiomhq/hyperloglog" ) // VERSION stores the current veneur version. @@ -148,6 +150,8 @@ type Server struct { stuckIntervals int lastFlushUnix int64 + + cumulativeTimeseries *hyperloglog.Sketch } type GlobalListeningPerProtocolMetrics struct { @@ -314,6 +318,8 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) { } ret.HistogramAggregates.Count = len(conf.Aggregates) + ret.cumulativeTimeseries = hyperloglog.New() + var err error ret.interval, err = conf.ParseInterval() if err != nil { diff --git a/sinks/signalfx/signalfx.go b/sinks/signalfx/signalfx.go index 2b7f91c5b..3f1413f1b 100644 --- a/sinks/signalfx/signalfx.go +++ b/sinks/signalfx/signalfx.go @@ -435,28 +435,25 @@ METRICLOOP: // Convenience label so that inner nested loops and `continue` easil } } + // metric-specified API key, if present, should override the common dimension metricKey := "" + metricVaryByOverride := false - // Metric-specified API key, if present, should override the common dimension - metricOverrodeVaryBy := false if sfx.varyBy != "" { - if _, ok := dims[sfx.varyBy]; ok { - metricOverrodeVaryBy = true + if val, ok := dims[sfx.varyBy]; ok { + metricKey = val + metricVaryByOverride = true } } - // Copy common dimensions, except for sfx.varyBy + // Copy common dimensions for k, v := range sfx.commonDimensions { - if metricOverrodeVaryBy && k == sfx.varyBy { - continue - } dims[k] = v } - if sfx.varyBy != "" { - if val, ok := dims[sfx.varyBy]; ok { - metricKey = val - } + // re-copy metric-specified API key, if present + if metricVaryByOverride { + dims[sfx.varyBy] = metricKey } for k := range sfx.excludedTags { @@ -469,6 +466,7 @@ METRICLOOP: // Convenience label so that inner nested loops and `continue` easil case samplers.GaugeMetric: point = sfxclient.GaugeF(metric.Name, dims, metric.Value) case samplers.CounterMetric: + // TODO I am not certain if this should be a Counter or a Cumulative point = sfxclient.Counter(metric.Name, dims, int64(metric.Value)) case samplers.StatusMetric: countStatusMetrics++ diff --git a/sinks/signalfx/signalfx_test.go b/sinks/signalfx/signalfx_test.go index adcb5118f..597b00730 100644 --- a/sinks/signalfx/signalfx_test.go +++ b/sinks/signalfx/signalfx_test.go @@ -779,43 +779,3 @@ LOOP: assert.Subset(t, actualPerTagClients, expectedPerTagClients, "The expected values should be a subset of the actual values") } - -func TestSignalFxVaryByOverride(t *testing.T) { - derived := newDerivedProcessor() - - varyByTagKey := "vary_by" - commonDimensions := map[string]string{"vary_by": "bar"} - defaultFakeSink := NewFakeSink() - customFakeSinkFoo := NewFakeSink() - customFakeSinkBar := NewFakeSink() - perTagClients := make(map[string]DPClient) - perTagClients["foo"] = customFakeSinkFoo - perTagClients["bar"] = customFakeSinkBar - - sink, err := NewSignalFxSink("host", "glooblestoots", commonDimensions, logrus.New(), defaultFakeSink, varyByTagKey, perTagClients, nil, nil, derived, 0, "", false, time.Second, "", "", nil) - - assert.NoError(t, err) - - interMetrics := []samplers.InterMetric{samplers.InterMetric{ - Name: "a.b.c", - Timestamp: 1476119058, - Value: float64(100), - Tags: []string{ - "vary_by:foo", - }, - Type: samplers.GaugeMetric, - }, - samplers.InterMetric{ - Name: "a.b.d", - Timestamp: 1476119059, - Value: float64(100), - Tags: []string{}, - Type: samplers.GaugeMetric, - }} - - sink.Flush(context.TODO(), interMetrics) - - assert.Equal(t, 0, len(defaultFakeSink.points)) - assert.Equal(t, 1, len(customFakeSinkFoo.points)) - assert.Equal(t, 1, len(customFakeSinkBar.points)) -}