From 108d7f8d9b91f61f8bf66f57c63233c3cac3ee01 Mon Sep 17 00:00:00 2001 From: Arnav Dugar <87779081+arnavdugar-stripe@users.noreply.github.com> Date: Thu, 15 Sep 2022 08:35:04 -0700 Subject: [PATCH] Remove legacy forwarding logic. (#991) --- config.go | 6 +- example.yaml | 4 - flusher.go | 142 +++------------------------------ flusher_test.go | 3 - server.go | 43 ++-------- server_test.go | 75 +++++++++-------- testdata/http_test_config.json | 4 +- testdata/http_test_config.yaml | 2 - 8 files changed, 56 insertions(+), 223 deletions(-) diff --git a/config.go b/config.go index 1008143a3..3a0b278ec 100644 --- a/config.go +++ b/config.go @@ -18,7 +18,6 @@ type Config struct { FlushOnShutdown bool `yaml:"flush_on_shutdown"` FlushWatchdogMissedFlushes int `yaml:"flush_watchdog_missed_flushes"` ForwardAddress string `yaml:"forward_address"` - ForwardUseGrpc bool `yaml:"forward_use_grpc"` GrpcAddress string `yaml:"grpc_address"` GrpcListenAddresses []util.Url `yaml:"grpc_listen_addresses"` Hostname string `yaml:"hostname"` @@ -63,9 +62,8 @@ type Config struct { } type Features struct { - DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"` - EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"` - ProxyProtocol string `yaml:"proxy_protocol"` + DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"` + EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"` } type HttpConfig struct { diff --git a/example.yaml b/example.yaml index 4ca9d81e8..c433f3c64 100644 --- a/example.yaml +++ b/example.yaml @@ -52,10 +52,6 @@ tls_authority_certificate: "" #forward_address: "veneur.example.com" forward_address: "" -# Whether or not to forward to an upstream Veneur over gRPC. If this is false -# or unset, HTTP will be used. -forward_use_grpc: false - # How often to flush. When flushing to Datadog, changing this # value when you've already emitted metrics will break your time # series data. diff --git a/flusher.go b/flusher.go index 1aa8d631f..f0428f39c 100644 --- a/flusher.go +++ b/flusher.go @@ -3,7 +3,6 @@ package veneur import ( "context" "fmt" - "net/http" "reflect" "runtime" "strings" @@ -14,7 +13,6 @@ import ( "github.com/axiomhq/hyperloglog" "github.com/sirupsen/logrus" "github.com/stripe/veneur/v14/forwardrpc" - vhttp "github.com/stripe/veneur/v14/http" "github.com/stripe/veneur/v14/samplers" "github.com/stripe/veneur/v14/samplers/metricpb" "github.com/stripe/veneur/v14/sinks" @@ -82,18 +80,10 @@ func (s *Server) Flush(ctx context.Context) { if s.IsLocal() { wg.Add(1) - switch s.proxyProtocol { - case ProxyProtocolGrpcStream, ProxyProtocolGrpcSingle: - go func() { - s.forwardGRPC(span.Attach(ctx), tempMetrics, s.proxyProtocol) - wg.Done() - }() - case ProxyProtocolRest: - go func() { - s.flushForward(span.Attach(ctx), tempMetrics) - wg.Done() - }() - } + go func() { + s.forward(span.Attach(ctx), tempMetrics) + wg.Done() + }() } else { s.reportGlobalMetricsFlushCounts(ms) s.reportGlobalReceivedProtocolMetrics() @@ -448,105 +438,6 @@ func (s *Server) reportGlobalReceivedProtocolMetrics() { s.Statsd.Count(perProtocolTotalMetricName, ssfGrpcTotal, []string{"veneurglobalonly:true", "protocol:" + SSF_GRPC.String()}, 1.0) } -func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { - span, _ := trace.StartSpanFromContext(ctx, "") - defer span.ClientFinish(s.TraceClient) - jmLength := 0 - for _, wm := range wms { - jmLength += len(wm.globalCounters) - jmLength += len(wm.globalGauges) - jmLength += len(wm.histograms) - jmLength += len(wm.sets) - jmLength += len(wm.timers) - } - - jsonMetrics := make([]samplers.JSONMetric, 0, jmLength) - exportStart := time.Now() - for _, wm := range wms { - for _, count := range wm.globalCounters { - jm, err := count.Export() - if err != nil { - s.logger.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "type": "counter", - "name": count.Name, - }).Error("Could not export metric") - continue - } - jsonMetrics = append(jsonMetrics, jm) - } - for _, gauge := range wm.globalGauges { - jm, err := gauge.Export() - if err != nil { - s.logger.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "type": "gauge", - "name": gauge.Name, - }).Error("Could not export metric") - continue - } - jsonMetrics = append(jsonMetrics, jm) - } - for _, histo := range wm.histograms { - jm, err := histo.Export() - if err != nil { - s.logger.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "type": "histogram", - "name": histo.Name, - }).Error("Could not export metric") - continue - } - jsonMetrics = append(jsonMetrics, jm) - } - for _, set := range wm.sets { - jm, err := set.Export() - if err != nil { - s.logger.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "type": "set", - "name": set.Name, - }).Error("Could not export metric") - continue - } - jsonMetrics = append(jsonMetrics, jm) - } - for _, timer := range wm.timers { - jm, err := timer.Export() - if err != nil { - s.logger.WithFields(logrus.Fields{ - logrus.ErrorKey: err, - "type": "timer", - "name": timer.Name, - }).Error("Could not export metric") - continue - } - // the exporter doesn't know that these two are "different" - jm.Type = "timer" - jsonMetrics = append(jsonMetrics, jm) - } - } - s.Statsd.TimeInMilliseconds("forward.duration_ns", float64(time.Since(exportStart).Nanoseconds()), []string{"part:export"}, 1.0) - s.Statsd.Count("forward.post_metrics_total", int64(len(jsonMetrics)), nil, 1.0) - if len(jsonMetrics) == 0 { - s.logger.Debug("Nothing to forward, skipping.") - return - } - - // the error has already been logged (if there was one), so we only care - // about the success case - endpoint := fmt.Sprintf("%s/import", s.ForwardAddr) - if vhttp.PostHelper( - span.Attach(ctx), s.HTTPClient, s.TraceClient, http.MethodPost, endpoint, - jsonMetrics, "forward", true, nil, s.logger) == nil { - s.logger.WithFields(logrus.Fields{ - "metrics": len(jsonMetrics), - "endpoint": endpoint, - "forwardAddr": s.ForwardAddr, - }).Info("Completed forward to upstream Veneur") - } -} - func (s *Server) flushTraces(ctx context.Context) { s.ssfInternalMetrics.Range(func(keyI, valueI interface{}) bool { key, ok := keyI.(string) @@ -585,12 +476,11 @@ func (s *Server) flushTraces(ctx context.Context) { s.SpanWorker.Flush() } -// forwardGRPC forwards all input metrics to a downstream Veneur, over gRPC. -func (s *Server) forwardGRPC( - ctx context.Context, wms []WorkerMetrics, protocol ProxyProtocol, +// forward forwards all input metrics to a downstream Veneur, over gRPC. +func (s *Server) forward( + ctx context.Context, wms []WorkerMetrics, ) { span, _ := trace.StartSpanFromContext(ctx, "") - span.SetTag("protocol", "grpc") defer span.ClientFinish(s.TraceClient) exportStart := time.Now() @@ -617,19 +507,13 @@ func (s *Server) forwardGRPC( entry := s.logger.WithFields(logrus.Fields{ "metrics": len(metrics), "destination": s.ForwardAddr, - "protocol": "grpc", "grpcstate": s.grpcForwardConn.GetState().String(), }) c := forwardrpc.NewForwardClient(s.grpcForwardConn) grpcStart := time.Now() - var err error - if protocol == ProxyProtocolGrpcSingle { - err = ForwardGrpcSingle(ctx, c, metrics) - } else { - err = ForwardGrpcStream(ctx, c, metrics) - } + err := forwardGrpc(ctx, c, metrics) if err != nil { if ctx.Err() != nil { // We exceeded the deadline of the flush context. @@ -655,15 +539,7 @@ func (s *Server) forwardGRPC( ) } -func ForwardGrpcSingle( - ctx context.Context, client forwardrpc.ForwardClient, - metrics []*metricpb.Metric, -) error { - _, err := client.SendMetrics(ctx, &forwardrpc.MetricList{Metrics: metrics}) - return err -} - -func ForwardGrpcStream( +func forwardGrpc( ctx context.Context, client forwardrpc.ForwardClient, metrics []*metricpb.Metric, ) error { diff --git a/flusher_test.go b/flusher_test.go index f2b2795ce..5717ed833 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -111,7 +111,6 @@ func TestServerFlushGRPC(t *testing.T) { localCfg := localConfig() localCfg.ForwardAddress = testServer.Addr().String() - localCfg.ForwardUseGrpc = true local := setupVeneurServer(t, localCfg, nil, nil, nil, nil) defer local.Shutdown() @@ -171,7 +170,6 @@ func TestServerFlushGRPCTimeout(t *testing.T) { localCfg := localConfig() localCfg.Interval = time.Duration(20 * time.Microsecond) localCfg.ForwardAddress = testServer.Addr().String() - localCfg.ForwardUseGrpc = true local := setupVeneurServer(t, localCfg, nil, nil, nil, cl) defer local.Shutdown() @@ -197,7 +195,6 @@ func TestServerFlushGRPCBadAddress(t *testing.T) { localCfg := localConfig() localCfg.ForwardAddress = "bad-address:123" - localCfg.ForwardUseGrpc = true local := setupVeneurServer(t, localCfg, nil, sink, nil, nil) defer local.Shutdown() diff --git a/server.go b/server.go index e21af9ea9..a4d237422 100644 --- a/server.go +++ b/server.go @@ -101,15 +101,6 @@ type ServerConfig struct { HttpCustomHandlers HttpCustomHandlers } -type ProxyProtocol int64 - -const ( - ProxyProtocolUnknown ProxyProtocol = iota - ProxyProtocolRest - ProxyProtocolGrpcSingle - ProxyProtocolGrpcStream -) - // A Server is the actual veneur instance that will be run. type Server struct { Config Config @@ -134,8 +125,7 @@ type Server struct { HttpCustomHandlers HttpCustomHandlers - ForwardAddr string - proxyProtocol ProxyProtocol + ForwardAddr string StatsdListenAddrs []net.Addr SSFListenAddrs []net.Addr @@ -681,22 +671,6 @@ func NewFromConfig(config ServerConfig) (*Server, error) { ret.httpQuit = true } - switch conf.Features.ProxyProtocol { - case "grpc-stream": - ret.proxyProtocol = ProxyProtocolGrpcStream - case "grpc-single": - ret.proxyProtocol = ProxyProtocolGrpcSingle - case "json": - ret.proxyProtocol = ProxyProtocolRest - default: - // TODO(arnavdugar): Remove conf.ForwardUseGrpc. - if conf.ForwardUseGrpc { - ret.proxyProtocol = ProxyProtocolGrpcSingle - } else { - ret.proxyProtocol = ProxyProtocolRest - } - } - ret.sources, err = ret.createSources(logger, &conf, config.SourceTypes) if err != nil { return nil, err @@ -841,15 +815,12 @@ func (s *Server) Start() { } // Initialize a gRPC connection for forwarding - if s.proxyProtocol == ProxyProtocolGrpcSingle || - s.proxyProtocol == ProxyProtocolGrpcStream { - var err error - s.grpcForwardConn, err = grpc.Dial(s.ForwardAddr, grpc.WithInsecure()) - if err != nil { - s.logger.WithError(err).WithFields(logrus.Fields{ - "forwardAddr": s.ForwardAddr, - }).Fatal("Failed to initialize a gRPC connection for forwarding") - } + var err error + s.grpcForwardConn, err = grpc.Dial(s.ForwardAddr, grpc.WithInsecure()) + if err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "forwardAddr": s.ForwardAddr, + }).Fatal("Failed to initialize a gRPC connection for forwarding") } // Flush every Interval forever! diff --git a/server_test.go b/server_test.go index e6b5d0eb7..cc661d841 100644 --- a/server_test.go +++ b/server_test.go @@ -2,13 +2,12 @@ package veneur import ( "bytes" - "compress/zlib" "context" "crypto/tls" "crypto/x509" - "encoding/json" "flag" "fmt" + "io" "io/ioutil" "log" "math/rand" @@ -22,12 +21,15 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stripe/veneur/v14/forwardrpc" "github.com/stripe/veneur/v14/protocol" "github.com/stripe/veneur/v14/samplers" + "github.com/stripe/veneur/v14/samplers/metricpb" "github.com/stripe/veneur/v14/sinks" "github.com/stripe/veneur/v14/sinks/blackhole" "github.com/stripe/veneur/v14/ssf" @@ -36,6 +38,8 @@ import ( "github.com/stripe/veneur/v14/trace/metrics" "github.com/stripe/veneur/v14/util" "github.com/zenazn/goji/graceful" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" ) const ε = .00002 @@ -308,6 +312,9 @@ func TestGlobalServerFlush(t *testing.T) { // TestLocalServerMixedMetrics ensures that stuff tagged as local only or local parts of mixed // scope metrics are sent directly to sinks while global metrics are forwarded. func TestLocalServerMixedMetrics(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + var HistogramValues = []float64{1.0, 2.0, 7.0, 8.0, 100.0} // Number of events observed (in 50ms interval) @@ -334,42 +341,31 @@ func TestLocalServerMixedMetrics(t *testing.T) { // This represents the global veneur instance, which receives request from // the local veneur instances, aggregates the data, and sends it to the remote API // (e.g. Datadog) - globalTD := make(chan *tdigest.MergingDigest) - globalVeneur := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, r.URL.Path, "/import", "Global veneur should receive request on /import path") - - zr, err := zlib.NewReader(r.Body) - if err != nil { - t.Fatal(err) - } - - type requestItem struct { - Name string `json:"name"` - Tags interface{} `json:"tags"` - Tagstring string `json:"tagstring"` - Type string `json:"type"` - Value []byte `json:"value"` - } - - var metrics []requestItem - - err = json.NewDecoder(zr).Decode(&metrics) - if err != nil { - t.Fatal(err) - } - - assert.Equal(t, 1, len(metrics), "incorrect number of elements in the flushed series") - - td := tdigest.NewMerging(100, false) - err = td.GobDecode(metrics[0].Value) - assert.NoError(t, err, "Should not have encountered error in decoding gob stream") - globalTD <- td - w.WriteHeader(http.StatusAccepted) - })) - defer globalVeneur.Close() + globalVeneur := grpc.NewServer() + server := forwardrpc.NewMockForwardServer(ctrl) + forwardrpc.RegisterForwardServer(globalVeneur, server) + listener, err := net.Listen("tcp", "0.0.0.0:0") + assert.NoError(t, err) + defer globalVeneur.GracefulStop() + go globalVeneur.Serve(listener) + + metricsChannel := make(chan *metricpb.Metric, 1) + server.EXPECT().SendMetricsV2(gomock.Any()).AnyTimes(). + Do(func(server forwardrpc.Forward_SendMetricsV2Server) { + for { + metric, err := server.Recv() + if err == io.EOF { + break + } + assert.NoError(t, err) + metricsChannel <- metric + } + close(metricsChannel) + server.SendAndClose(&emptypb.Empty{}) + }) config := localConfig() - config.ForwardAddress = globalVeneur.URL + config.ForwardAddress = listener.Addr().String() f := newFixture(t, config, nil, nil) defer f.Close() @@ -401,10 +397,13 @@ func TestLocalServerMixedMetrics(t *testing.T) { }) } - f.server.Flush(context.TODO()) + f.server.Flush(context.Background()) + + metric := <-metricsChannel + assert.Equal(t, metricpb.Type_Histogram, metric.Type) // the global veneur instance should get valid data - td := <-globalTD + td := tdigest.NewMergingFromData(metric.GetHistogram().TDigest) assert.Equal(t, expectedMetrics["a.b.c.min"], td.Min(), "Minimum value is incorrect") assert.Equal(t, expectedMetrics["a.b.c.max"], td.Max(), "Maximum value is incorrect") diff --git a/testdata/http_test_config.json b/testdata/http_test_config.json index 0f7d04385..bbb3a69b6 100644 --- a/testdata/http_test_config.json +++ b/testdata/http_test_config.json @@ -11,13 +11,11 @@ "ExtendTags": null, "Features": { "DiagnosticsMetricsEnabled": false, - "EnableMetricSinkRouting": false, - "ProxyProtocol": "" + "EnableMetricSinkRouting": false }, "FlushOnShutdown": false, "FlushWatchdogMissedFlushes": 0, "ForwardAddress": "", - "ForwardUseGrpc": false, "GrpcAddress": "", "GrpcListenAddresses": null, "Hostname": "", diff --git a/testdata/http_test_config.yaml b/testdata/http_test_config.yaml index 9a6beff7a..88ee44c05 100644 --- a/testdata/http_test_config.yaml +++ b/testdata/http_test_config.yaml @@ -10,11 +10,9 @@ extend_tags: [] features: diagnostics_metrics_enabled: false enable_metric_sink_routing: false - proxy_protocol: "" flush_on_shutdown: false flush_watchdog_missed_flushes: 0 forward_address: "" -forward_use_grpc: false grpc_address: "" grpc_listen_addresses: [] hostname: ""