diff --git a/flusher.go b/flusher.go index 5ae92da37..83166dd92 100644 --- a/flusher.go +++ b/flusher.go @@ -11,6 +11,9 @@ import ( "sync/atomic" "time" + "github.com/stripe/veneur/internal/metricsClient" + "github.com/stripe/veneur/trace/metrics" + "github.com/sirupsen/logrus" "github.com/stripe/veneur/forwardrpc" vhttp "github.com/stripe/veneur/http" @@ -19,7 +22,6 @@ import ( "github.com/stripe/veneur/sinks" "github.com/stripe/veneur/ssf" "github.com/stripe/veneur/trace" - "github.com/stripe/veneur/trace/metrics" "google.golang.org/grpc/status" ) @@ -31,9 +33,12 @@ func (s *Server) Flush(ctx context.Context) { mem := &runtime.MemStats{} runtime.ReadMemStats(mem) + // Just an example of usage for comparison purposes. In practice, we'd inject this into server and not + // construct it in the hot path. Will undo before land. + mc := metricsClient.NewSSFDirectClient(s.TraceClient) s.Statsd.Gauge("worker.span_chan.total_elements", float64(len(s.SpanChan)), nil, 1.0) s.Statsd.Gauge("worker.span_chan.total_capacity", float64(cap(s.SpanChan)), nil, 1.0) - s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0) + mc.Gauge("gc.number", float32(mem.NumGC), nil) s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0) s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0) @@ -457,9 +462,10 @@ func (s *Server) forwardGRPC(ctx context.Context, wms []WorkerMetrics) { metrics = append(metrics, wm.ForwardableMetrics(s.TraceClient)...) } + // again, just a sample integration + mc := metricsClient.NewSSFAddingClient(span) + mc.Timing("forward.duration_ns", time.Since(exportStart), map[string]string{"part": "export"}) span.Add( - ssf.Timing("forward.duration_ns", time.Since(exportStart), - time.Nanosecond, map[string]string{"part": "export"}), ssf.Gauge("forward.metrics_total", float32(len(metrics)), nil), // Maintain compatibility with metrics used in HTTP-based forwarding ssf.Count("forward.post_metrics_total", float32(len(metrics)), nil), diff --git a/internal/metricsClient/client.go b/internal/metricsClient/client.go new file mode 100644 index 000000000..7fe3222fc --- /dev/null +++ b/internal/metricsClient/client.go @@ -0,0 +1,65 @@ +// package metricsClient exports a metrics client capable +package metricsClient + +import ( + "time" + + "github.com/stripe/veneur/trace" + "github.com/stripe/veneur/trace/metrics" + + "github.com/stripe/veneur/ssf" +) + +// NewSSFAddingClient returns a SSFAddingClient that attaches metrics to an object +// that supports the Sampler interface. +// +// This exists for compatability with the existing convention of adding metrics +// to a Trace or SSFSample object which is later submitted in batch. +// +// Unlike the normal metric client, SSFAddingClient assumes the client will submit +// the sampler. +func NewSSFAddingClient(sampler Sampler) SSFAddingClient { + return SSFAddingClient{sampler} +} + +type SSFAddingClient struct { + adder Sampler +} + +func (s SSFAddingClient) Count(name string, incr float32, tags map[string]string) { + s.adder.Add(ssf.Count(name, incr, tags)) +} + +func (s SSFAddingClient) Gauge(name string, value float32, tags map[string]string) { + s.adder.Add(ssf.Gauge(name, value, tags)) +} + +func (s SSFAddingClient) Timing(name string, duration time.Duration, tags map[string]string) { + s.adder.Add(ssf.Timing(name, duration, time.Nanosecond, tags)) +} + +type Sampler interface { + Add(...*ssf.SSFSample) +} + +// NewSSFDirectClient returns a SSFDirectClient that submits metrics directly using +// the trace client. +func NewSSFDirectClient(tc *trace.Client) SSFDirectClient { + return SSFDirectClient{tc} +} + +type SSFDirectClient struct { + tc *trace.Client +} + +func (s SSFDirectClient) Count(name string, incr float32, tags map[string]string) { + metrics.ReportOne(s.tc, ssf.Count(name, incr, tags)) +} + +func (s SSFDirectClient) Gauge(name string, value float32, tags map[string]string) { + metrics.ReportOne(s.tc, ssf.Gauge(name, value, tags)) +} + +func (s SSFDirectClient) Timing(name string, duration time.Duration, tags map[string]string) { + metrics.ReportOne(s.tc, ssf.Timing(name, duration, time.Nanosecond, tags)) +}