Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add veneur.flush.unique_timeseries_daily_cumulative metric with hotpatch #831

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ func (s *Server) Flush(ctx context.Context) {

if s.CountUniqueTimeseries {
s.Statsd.Count("flush.unique_timeseries_total", s.tallyTimeseries(), []string{fmt.Sprintf("global_veneur:%t", !s.IsLocal())}, 1.0)
s.Statsd.Count("flush.unique_timeseries_daily_cumulative", int64(s.cumulativeTimeseries.Estimate()), []string{"veneurglobalonly:true"}, 1.0)

t := time.Now()

// Ensure that the cumulative daily count is reset around midnight.
// It will reset at most 6 times between 00:00:00 and 00:01:00, which is
// an acceptable level of precision for this estimate.
// (Skipping at most five flush cycles' worth of data in the cumulative count
// will be unnoticable)
// The alternative would require using synchronization such as a time.Ticker
// and a lock around the HLL, which is overkill and fiddly.
if t.Minute() == 0 && t.Hour() == 0 {
s.cumulativeTimeseries = hyperloglog.New()
}
}

samples := s.EventWorker.Flush()
Expand Down Expand Up @@ -140,6 +154,7 @@ func (s *Server) tallyTimeseries() int64 {
w.uniqueMTS = hyperloglog.New()
w.uniqueMTSMtx.Unlock()
}
s.cumulativeTimeseries.Merge(allTimeseries)
return int64(allTimeseries.Estimate())
}

Expand Down
6 changes: 6 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
"github.com/stripe/veneur/v14/ssf"
"github.com/stripe/veneur/v14/trace"
"github.com/stripe/veneur/v14/trace/metrics"

"github.com/axiomhq/hyperloglog"
)

// VERSION stores the current veneur version.
Expand Down Expand Up @@ -148,6 +150,8 @@ type Server struct {

stuckIntervals int
lastFlushUnix int64

cumulativeTimeseries *hyperloglog.Sketch
}

type GlobalListeningPerProtocolMetrics struct {
Expand Down Expand Up @@ -314,6 +318,8 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) {
}
ret.HistogramAggregates.Count = len(conf.Aggregates)

ret.cumulativeTimeseries = hyperloglog.New()

var err error
ret.interval, err = conf.ParseInterval()
if err != nil {
Expand Down
22 changes: 10 additions & 12 deletions sinks/signalfx/signalfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,28 +435,25 @@ METRICLOOP: // Convenience label so that inner nested loops and `continue` easil
}
}

// metric-specified API key, if present, should override the common dimension
metricKey := ""
metricVaryByOverride := false

// Metric-specified API key, if present, should override the common dimension
metricOverrodeVaryBy := false
if sfx.varyBy != "" {
if _, ok := dims[sfx.varyBy]; ok {
metricOverrodeVaryBy = true
if val, ok := dims[sfx.varyBy]; ok {
metricKey = val
metricVaryByOverride = true
}
}

// Copy common dimensions, except for sfx.varyBy
// Copy common dimensions
for k, v := range sfx.commonDimensions {
if metricOverrodeVaryBy && k == sfx.varyBy {
continue
}
dims[k] = v
}

if sfx.varyBy != "" {
if val, ok := dims[sfx.varyBy]; ok {
metricKey = val
}
// re-copy metric-specified API key, if present
if metricVaryByOverride {
dims[sfx.varyBy] = metricKey
}

for k := range sfx.excludedTags {
Expand All @@ -469,6 +466,7 @@ METRICLOOP: // Convenience label so that inner nested loops and `continue` easil
case samplers.GaugeMetric:
point = sfxclient.GaugeF(metric.Name, dims, metric.Value)
case samplers.CounterMetric:
// TODO I am not certain if this should be a Counter or a Cumulative
point = sfxclient.Counter(metric.Name, dims, int64(metric.Value))
case samplers.StatusMetric:
countStatusMetrics++
Expand Down
40 changes: 0 additions & 40 deletions sinks/signalfx/signalfx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,43 +779,3 @@ LOOP:
assert.Subset(t, actualPerTagClients, expectedPerTagClients, "The expected values should be a subset of the actual values")

}

func TestSignalFxVaryByOverride(t *testing.T) {
derived := newDerivedProcessor()

varyByTagKey := "vary_by"
commonDimensions := map[string]string{"vary_by": "bar"}
defaultFakeSink := NewFakeSink()
customFakeSinkFoo := NewFakeSink()
customFakeSinkBar := NewFakeSink()
perTagClients := make(map[string]DPClient)
perTagClients["foo"] = customFakeSinkFoo
perTagClients["bar"] = customFakeSinkBar

sink, err := NewSignalFxSink("host", "glooblestoots", commonDimensions, logrus.New(), defaultFakeSink, varyByTagKey, perTagClients, nil, nil, derived, 0, "", false, time.Second, "", "", nil)

assert.NoError(t, err)

interMetrics := []samplers.InterMetric{samplers.InterMetric{
Name: "a.b.c",
Timestamp: 1476119058,
Value: float64(100),
Tags: []string{
"vary_by:foo",
},
Type: samplers.GaugeMetric,
},
samplers.InterMetric{
Name: "a.b.d",
Timestamp: 1476119059,
Value: float64(100),
Tags: []string{},
Type: samplers.GaugeMetric,
}}

sink.Flush(context.TODO(), interMetrics)

assert.Equal(t, 0, len(defaultFakeSink.points))
assert.Equal(t, 1, len(customFakeSinkFoo.points))
assert.Equal(t, 1, len(customFakeSinkBar.points))
}