Skip to content

Commit

Permalink
Remove legacy forwarding logic. (#991)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnavdugar-stripe authored Sep 15, 2022
1 parent 3239494 commit 108d7f8
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 223 deletions.
6 changes: 2 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type Config struct {
FlushOnShutdown bool `yaml:"flush_on_shutdown"`
FlushWatchdogMissedFlushes int `yaml:"flush_watchdog_missed_flushes"`
ForwardAddress string `yaml:"forward_address"`
ForwardUseGrpc bool `yaml:"forward_use_grpc"`
GrpcAddress string `yaml:"grpc_address"`
GrpcListenAddresses []util.Url `yaml:"grpc_listen_addresses"`
Hostname string `yaml:"hostname"`
Expand Down Expand Up @@ -63,9 +62,8 @@ type Config struct {
}

type Features struct {
DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"`
EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"`
ProxyProtocol string `yaml:"proxy_protocol"`
DiagnosticsMetricsEnabled bool `yaml:"diagnostics_metrics_enabled"`
EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"`
}

type HttpConfig struct {
Expand Down
4 changes: 0 additions & 4 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ tls_authority_certificate: ""
#forward_address: "veneur.example.com"
forward_address: ""

# Whether or not to forward to an upstream Veneur over gRPC. If this is false
# or unset, HTTP will be used.
forward_use_grpc: false

# How often to flush. When flushing to Datadog, changing this
# value when you've already emitted metrics will break your time
# series data.
Expand Down
142 changes: 9 additions & 133 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package veneur
import (
"context"
"fmt"
"net/http"
"reflect"
"runtime"
"strings"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/axiomhq/hyperloglog"
"github.com/sirupsen/logrus"
"github.com/stripe/veneur/v14/forwardrpc"
vhttp "github.com/stripe/veneur/v14/http"
"github.com/stripe/veneur/v14/samplers"
"github.com/stripe/veneur/v14/samplers/metricpb"
"github.com/stripe/veneur/v14/sinks"
Expand Down Expand Up @@ -82,18 +80,10 @@ func (s *Server) Flush(ctx context.Context) {

if s.IsLocal() {
wg.Add(1)
switch s.proxyProtocol {
case ProxyProtocolGrpcStream, ProxyProtocolGrpcSingle:
go func() {
s.forwardGRPC(span.Attach(ctx), tempMetrics, s.proxyProtocol)
wg.Done()
}()
case ProxyProtocolRest:
go func() {
s.flushForward(span.Attach(ctx), tempMetrics)
wg.Done()
}()
}
go func() {
s.forward(span.Attach(ctx), tempMetrics)
wg.Done()
}()
} else {
s.reportGlobalMetricsFlushCounts(ms)
s.reportGlobalReceivedProtocolMetrics()
Expand Down Expand Up @@ -448,105 +438,6 @@ func (s *Server) reportGlobalReceivedProtocolMetrics() {
s.Statsd.Count(perProtocolTotalMetricName, ssfGrpcTotal, []string{"veneurglobalonly:true", "protocol:" + SSF_GRPC.String()}, 1.0)
}

func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.ClientFinish(s.TraceClient)
jmLength := 0
for _, wm := range wms {
jmLength += len(wm.globalCounters)
jmLength += len(wm.globalGauges)
jmLength += len(wm.histograms)
jmLength += len(wm.sets)
jmLength += len(wm.timers)
}

jsonMetrics := make([]samplers.JSONMetric, 0, jmLength)
exportStart := time.Now()
for _, wm := range wms {
for _, count := range wm.globalCounters {
jm, err := count.Export()
if err != nil {
s.logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "counter",
"name": count.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
}
for _, gauge := range wm.globalGauges {
jm, err := gauge.Export()
if err != nil {
s.logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "gauge",
"name": gauge.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
}
for _, histo := range wm.histograms {
jm, err := histo.Export()
if err != nil {
s.logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "histogram",
"name": histo.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
}
for _, set := range wm.sets {
jm, err := set.Export()
if err != nil {
s.logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "set",
"name": set.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
}
for _, timer := range wm.timers {
jm, err := timer.Export()
if err != nil {
s.logger.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "timer",
"name": timer.Name,
}).Error("Could not export metric")
continue
}
// the exporter doesn't know that these two are "different"
jm.Type = "timer"
jsonMetrics = append(jsonMetrics, jm)
}
}
s.Statsd.TimeInMilliseconds("forward.duration_ns", float64(time.Since(exportStart).Nanoseconds()), []string{"part:export"}, 1.0)
s.Statsd.Count("forward.post_metrics_total", int64(len(jsonMetrics)), nil, 1.0)
if len(jsonMetrics) == 0 {
s.logger.Debug("Nothing to forward, skipping.")
return
}

// the error has already been logged (if there was one), so we only care
// about the success case
endpoint := fmt.Sprintf("%s/import", s.ForwardAddr)
if vhttp.PostHelper(
span.Attach(ctx), s.HTTPClient, s.TraceClient, http.MethodPost, endpoint,
jsonMetrics, "forward", true, nil, s.logger) == nil {
s.logger.WithFields(logrus.Fields{
"metrics": len(jsonMetrics),
"endpoint": endpoint,
"forwardAddr": s.ForwardAddr,
}).Info("Completed forward to upstream Veneur")
}
}

func (s *Server) flushTraces(ctx context.Context) {
s.ssfInternalMetrics.Range(func(keyI, valueI interface{}) bool {
key, ok := keyI.(string)
Expand Down Expand Up @@ -585,12 +476,11 @@ func (s *Server) flushTraces(ctx context.Context) {
s.SpanWorker.Flush()
}

// forwardGRPC forwards all input metrics to a downstream Veneur, over gRPC.
func (s *Server) forwardGRPC(
ctx context.Context, wms []WorkerMetrics, protocol ProxyProtocol,
// forward forwards all input metrics to a downstream Veneur, over gRPC.
func (s *Server) forward(
ctx context.Context, wms []WorkerMetrics,
) {
span, _ := trace.StartSpanFromContext(ctx, "")
span.SetTag("protocol", "grpc")
defer span.ClientFinish(s.TraceClient)

exportStart := time.Now()
Expand All @@ -617,19 +507,13 @@ func (s *Server) forwardGRPC(
entry := s.logger.WithFields(logrus.Fields{
"metrics": len(metrics),
"destination": s.ForwardAddr,
"protocol": "grpc",
"grpcstate": s.grpcForwardConn.GetState().String(),
})

c := forwardrpc.NewForwardClient(s.grpcForwardConn)

grpcStart := time.Now()
var err error
if protocol == ProxyProtocolGrpcSingle {
err = ForwardGrpcSingle(ctx, c, metrics)
} else {
err = ForwardGrpcStream(ctx, c, metrics)
}
err := forwardGrpc(ctx, c, metrics)
if err != nil {
if ctx.Err() != nil {
// We exceeded the deadline of the flush context.
Expand All @@ -655,15 +539,7 @@ func (s *Server) forwardGRPC(
)
}

func ForwardGrpcSingle(
ctx context.Context, client forwardrpc.ForwardClient,
metrics []*metricpb.Metric,
) error {
_, err := client.SendMetrics(ctx, &forwardrpc.MetricList{Metrics: metrics})
return err
}

func ForwardGrpcStream(
func forwardGrpc(
ctx context.Context, client forwardrpc.ForwardClient,
metrics []*metricpb.Metric,
) error {
Expand Down
3 changes: 0 additions & 3 deletions flusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func TestServerFlushGRPC(t *testing.T) {

localCfg := localConfig()
localCfg.ForwardAddress = testServer.Addr().String()
localCfg.ForwardUseGrpc = true
local := setupVeneurServer(t, localCfg, nil, nil, nil, nil)
defer local.Shutdown()

Expand Down Expand Up @@ -171,7 +170,6 @@ func TestServerFlushGRPCTimeout(t *testing.T) {
localCfg := localConfig()
localCfg.Interval = time.Duration(20 * time.Microsecond)
localCfg.ForwardAddress = testServer.Addr().String()
localCfg.ForwardUseGrpc = true
local := setupVeneurServer(t, localCfg, nil, nil, nil, cl)
defer local.Shutdown()

Expand All @@ -197,7 +195,6 @@ func TestServerFlushGRPCBadAddress(t *testing.T) {

localCfg := localConfig()
localCfg.ForwardAddress = "bad-address:123"
localCfg.ForwardUseGrpc = true

local := setupVeneurServer(t, localCfg, nil, sink, nil, nil)
defer local.Shutdown()
Expand Down
43 changes: 7 additions & 36 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,6 @@ type ServerConfig struct {
HttpCustomHandlers HttpCustomHandlers
}

type ProxyProtocol int64

const (
ProxyProtocolUnknown ProxyProtocol = iota
ProxyProtocolRest
ProxyProtocolGrpcSingle
ProxyProtocolGrpcStream
)

// A Server is the actual veneur instance that will be run.
type Server struct {
Config Config
Expand All @@ -134,8 +125,7 @@ type Server struct {

HttpCustomHandlers HttpCustomHandlers

ForwardAddr string
proxyProtocol ProxyProtocol
ForwardAddr string

StatsdListenAddrs []net.Addr
SSFListenAddrs []net.Addr
Expand Down Expand Up @@ -681,22 +671,6 @@ func NewFromConfig(config ServerConfig) (*Server, error) {
ret.httpQuit = true
}

switch conf.Features.ProxyProtocol {
case "grpc-stream":
ret.proxyProtocol = ProxyProtocolGrpcStream
case "grpc-single":
ret.proxyProtocol = ProxyProtocolGrpcSingle
case "json":
ret.proxyProtocol = ProxyProtocolRest
default:
// TODO(arnavdugar): Remove conf.ForwardUseGrpc.
if conf.ForwardUseGrpc {
ret.proxyProtocol = ProxyProtocolGrpcSingle
} else {
ret.proxyProtocol = ProxyProtocolRest
}
}

ret.sources, err = ret.createSources(logger, &conf, config.SourceTypes)
if err != nil {
return nil, err
Expand Down Expand Up @@ -841,15 +815,12 @@ func (s *Server) Start() {
}

// Initialize a gRPC connection for forwarding
if s.proxyProtocol == ProxyProtocolGrpcSingle ||
s.proxyProtocol == ProxyProtocolGrpcStream {
var err error
s.grpcForwardConn, err = grpc.Dial(s.ForwardAddr, grpc.WithInsecure())
if err != nil {
s.logger.WithError(err).WithFields(logrus.Fields{
"forwardAddr": s.ForwardAddr,
}).Fatal("Failed to initialize a gRPC connection for forwarding")
}
var err error
s.grpcForwardConn, err = grpc.Dial(s.ForwardAddr, grpc.WithInsecure())
if err != nil {
s.logger.WithError(err).WithFields(logrus.Fields{
"forwardAddr": s.ForwardAddr,
}).Fatal("Failed to initialize a gRPC connection for forwarding")
}

// Flush every Interval forever!
Expand Down
Loading

0 comments on commit 108d7f8

Please sign in to comment.