Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite metrics ingestion path and integrate with just the central veneur. #578

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.o
*.a
*.so
.idea/

# Folders
_obj
Expand Down
6 changes: 4 additions & 2 deletions forward_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/stripe/veneur/sinks/channel"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stripe/veneur/samplers"
Expand Down Expand Up @@ -181,7 +183,7 @@ func forwardGRPCTestMetrics() []*samplers.UDPMetric {
// after passing through a proxy.
func TestE2EForwardingGRPCMetrics(t *testing.T) {
ch := make(chan []samplers.InterMetric)
sink, _ := NewChannelMetricSink(ch)
sink, _ := channel.NewChannelMetricSink(ch)

ff := newForwardGRPCFixture(t, localConfig(), sink)
defer ff.stop()
Expand Down Expand Up @@ -224,7 +226,7 @@ func TestE2EForwardingGRPCMetrics(t *testing.T) {
}

assert.ElementsMatch(t, expectedNames, actualNames,
"The global Veneur didn't flush the right metrics")
"The global Veneur didn't flush the right metrics.\nEXPECTED: %v\nACTUAL: %v", expectedNames, actualNames)
close(done)
}()
ff.local.Flush(context.TODO())
Expand Down
6 changes: 4 additions & 2 deletions forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/stripe/veneur/sinks/channel"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -100,7 +102,7 @@ func (ff *forwardFixture) IngestMetric(m *samplers.UDPMetric) {
func TestE2EForwardingIndicatorMetrics(t *testing.T) {
t.Parallel()
ch := make(chan []samplers.InterMetric)
sink, _ := NewChannelMetricSink(ch)
sink, _ := channel.NewChannelMetricSink(ch)
cfg := localConfig()
cfg.IndicatorSpanTimerName = "indicator.span.timer"
ffx := newForwardingFixture(t, cfg, nil, sink)
Expand Down Expand Up @@ -143,7 +145,7 @@ func TestE2EForwardingIndicatorMetrics(t *testing.T) {
func TestE2EForwardMetric(t *testing.T) {
t.Parallel()
ch := make(chan []samplers.InterMetric)
sink, _ := NewChannelMetricSink(ch)
sink, _ := channel.NewChannelMetricSink(ch)
cfg := localConfig()
cfg.IndicatorSpanTimerName = "indicator.span.timer"
ffx := newForwardingFixture(t, cfg, nil, sink)
Expand Down
106 changes: 58 additions & 48 deletions importsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
package importsrv

import (
"errors"
"fmt"
"net"
"time"

"github.com/stripe/veneur/metricingester"

"github.com/golang/protobuf/ptypes/empty"
"github.com/segmentio/fasthash/fnv1a"
"golang.org/x/net/context" // This can be replace with "context" after Go 1.8 support is dropped
"google.golang.org/grpc"

Expand All @@ -25,9 +27,10 @@ const (
responseDurationMetric = "import.response_duration_ns"
)

// MetricIngester reads metrics from protobufs
type MetricIngester interface {
IngestMetrics([]*metricpb.Metric)
// metricsIngester is the contract expected of objects to which metrics will be submitted.
type metricsIngester interface {
Ingest(metricingester.Metric) error
Merge(metricingester.Digest) error
}

// Server wraps a gRPC server and implements the forwardrpc.Forward service.
Expand All @@ -36,8 +39,8 @@ type MetricIngester interface {
// should always be routed to the same MetricIngester.
type Server struct {
*grpc.Server
metricOuts []MetricIngester
opts *options
ingester metricsIngester
opts *options
}

type options struct {
Expand All @@ -50,11 +53,11 @@ type Option func(*options)

// New creates an unstarted Server with the input MetricIngester's to send
// output to.
func New(metricOuts []MetricIngester, opts ...Option) *Server {
func New(ingester metricsIngester, opts ...Option) *Server {
res := &Server{
Server: grpc.NewServer(),
metricOuts: metricOuts,
opts: &options{},
Server: grpc.NewServer(),
ingester: ingester,
opts: &options{},
}

for _, opt := range opts {
Expand Down Expand Up @@ -85,41 +88,63 @@ func (s *Server) Serve(addr string) error {

// Static maps of tags used in the SendMetrics handler
var (
grpcTags = map[string]string{"protocol": "grpc"}
responseGroupTags = map[string]string{
"protocol": "grpc",
"part": "group",
}
grpcTags = map[string]string{"protocol": "grpc"}
responseSendTags = map[string]string{
"protocol": "grpc",
"part": "send",
}
)

// SendMetrics takes a list of metrics and hashes each one (based on the
// metric key) to a specific metric ingester.
// SendMetrics accepts a batch of metrics for importing.
func (s *Server) SendMetrics(ctx context.Context, mlist *forwardrpc.MetricList) (*empty.Empty, error) {
span, _ := trace.StartSpanFromContext(ctx, "veneur.opentracing.importsrv.handle_send_metrics")
span.SetTag("protocol", "grpc")
defer span.ClientFinish(s.opts.traceClient)

dests := make([][]*metricpb.Metric, len(s.metricOuts))

// group metrics by their destination
groupStart := time.Now()
for _, m := range mlist.Metrics {
workerIdx := s.hashMetric(m) % uint32(len(dests))
dests[workerIdx] = append(dests[workerIdx], m)
}
span.Add(ssf.Timing(responseDurationMetric, time.Since(groupStart), time.Nanosecond, responseGroupTags))

// send each set of metrics to its destination. Since this is typically
// implemented with channels, batching the metrics together avoids
// repeated channel send operations
sendStart := time.Now()
for i, ms := range dests {
if len(ms) > 0 {
s.metricOuts[i].IngestMetrics(ms)
for _, m := range mlist.Metrics {
hostname := m.GetHostname()
tags := m.GetTags()
switch v := m.GetValue().(type) {
case *metricpb.Metric_Gauge:
s.ingester.Ingest(metricingester.NewGauge(m.Name, v.Gauge.GetValue(), tags, 1.0, hostname))
case *metricpb.Metric_Counter:
s.ingester.Ingest(metricingester.NewCounter(m.Name, v.Counter.GetValue(), tags, 1.0, hostname))
case *metricpb.Metric_Set:
s.ingester.Merge(metricingester.NewSetDigest(m.Name, v.Set, tags, hostname))
case *metricpb.Metric_Histogram:
// Scope is a legacy concept used to designate whether a metric needed to be emitted locally
// or aggregated globally.
//
// The presence of a hostname now encodes the same concept.
//
// However, histograms have a special "MixedScope" that emits percentiles globally and
// min/max/count values locally.
//
// We need a special metric type to represent this: the MixedHistogram.
//
// We also need to distinguish between the behavior before all metrics were forwarded
// to the behavior after. The before case is represented by Scope_Mixed, which makes that host's
// "min/max/count" not get emitted by the central veneur, since the local veneur is emitting it
// still. Scoped_MixedGlobal makes the central veneur emit the min/max/count.
//
// TODO(clin): After we completely migrate to the new veneur, delete support for this latter distinction!
switch m.GetScope() {
case metricpb.Scope_Mixed:
s.ingester.Merge(
metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname, false),
)
case metricpb.Scope_MixedGlobal:
s.ingester.Merge(
metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname, true),
)
case metricpb.Scope_Local, metricpb.Scope_Global:
s.ingester.Merge(
metricingester.NewHistogramDigest(m.Name, v.Histogram, tags, hostname),
)
}
case nil:
return nil, errors.New("can't import a metric with a nil value")
}
}

Expand All @@ -130,18 +155,3 @@ func (s *Server) SendMetrics(ctx context.Context, mlist *forwardrpc.MetricList)

return &empty.Empty{}, nil
}

// hashMetric returns a 32-bit hash from the input metric based on its name,
// type, and tags.
//
// The fnv1a package is used as opposed to fnv from the standard library, as
// it avoids allocations by not using the hash.Hash interface and by avoiding
// string to []byte conversions.
func (s *Server) hashMetric(m *metricpb.Metric) uint32 {
h := fnv1a.HashString32(m.Name)
h = fnv1a.AddString32(h, m.Type.String())
for _, tag := range m.Tags {
h = fnv1a.AddString32(h, tag)
}
return h
}
Loading