From 75530bbe52c59bf50ed5ce17b12d9a0bc1d28515 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Tue, 6 Aug 2024 17:14:43 +1000 Subject: [PATCH 1/2] asynchronously send request --- go.mod | 2 ++ go.sum | 2 ++ pkg/backends/otlp/backend.go | 51 ++++++++++++++++++++---------------- 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 4744e245..a12a0888 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,8 @@ require ( stathat.com/c/consistent v1.0.0 ) +require golang.org/x/sync v0.6.0 + require ( github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect diff --git a/go.sum b/go.sum index 08a5d428..d48a5f67 100644 --- a/go.sum +++ b/go.sum @@ -403,6 +403,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/backends/otlp/backend.go b/pkg/backends/otlp/backend.go index beab08cf..e28274d2 100644 --- a/pkg/backends/otlp/backend.go +++ b/pkg/backends/otlp/backend.go @@ -13,6 +13,7 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/viper" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "github.com/atlassian/gostatsd/pkg/stats" @@ -139,7 +140,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, bd.batchesRetried.SendIfChanged(statser, "backend.retried", nil) }() - group := newGroups(bd.metricsPerBatch) + currentGroup := newGroups(bd.metricsPerBatch) mm.Counters.Each(func(name, _ string, cm gostatsd.Counter) { resources, attributes := data.SplitMetricTagsByKeysAndConvert(cm.Tags, bd.resourceKeys) @@ -164,8 +165,8 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, ), ) - group.insert(bd.is, resources, rate) - group.insert(bd.is, resources, m) + currentGroup.insert(bd.is, resources, rate) + currentGroup.insert(bd.is, resources, m) }) mm.Gauges.Each(func(name, _ string, gm gostatsd.Gauge) { @@ -181,7 +182,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, ), ) - group.insert(bd.is, resources, m) + currentGroup.insert(bd.is, resources, m) }) mm.Sets.Each(func(name, _ string, sm gostatsd.Set) { @@ -197,7 +198,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, ), ) - group.insert(bd.is, resources, m) + currentGroup.insert(bd.is, resources, m) }) mm.Timers.Each(func(name, _ string, t gostatsd.Timer) { @@ -214,7 +215,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, } else { btags.Insert("le", strconv.FormatFloat(float64(boundry), 'f', -1, 64)) } - group.insert( + currentGroup.insert( bd.is, resources, data.NewMetric(fmt.Sprintf("%s.histogram", name)).SetGauge( @@ -248,7 +249,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, if calc.discarded { continue } - group.insert( + currentGroup.insert( bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, calc.suffix)).SetGauge( @@ -262,7 +263,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, } for _, pct := range t.Percentiles { - group.insert(bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, pct.Str)).SetGauge( + currentGroup.insert(bd.is, resources, data.NewMetric(fmt.Sprintf("%s.%s", name, pct.Str)).SetGauge( data.NewGauge(data.NewNumberDataPoint( uint64(t.Timestamp), data.WithNumberDataPointMap(attributes), @@ -280,27 +281,32 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, if len(t.Histogram) != 0 { opts = append(opts, data.WithHistogramDataPointCumulativeBucketValues(t.Histogram)) } - group.insert(bd.is, resources, data.NewMetric(name).SetHistogram( + currentGroup.insert(bd.is, resources, data.NewMetric(name).SetHistogram( data.NewHistogram(data.NewHistogramDataPoint(uint64(t.Timestamp), opts...)), )) } }) - var errs error - for _, b := range group.batches { + eg, ectx := errgroup.WithContext(ctx) + for _, b := range currentGroup.batches { atomic.AddUint64(&bd.batchesCreated, 1) - err := bd.postMetrics(ctx, b) - if err != nil { - bd.logger.WithError(err).WithFields(logrus.Fields{ - "endpoint": bd.metricsEndpoint, - }).Error("Issues trying to submit data") - errs = multierr.Append(errs, err) - } else { - atomic.AddUint64(&bd.batchesSent, 1) - atomic.AddUint64(&bd.seriesSent, uint64(b.lenMetrics())) - } + func(g group) { + eg.Go(func() error { + err := bd.postMetrics(ectx, b) + if err != nil { + bd.logger.WithError(err).WithFields(logrus.Fields{ + "endpoint": bd.metricsEndpoint, + }).Error("Issues trying to submit data") + atomic.AddUint64(&bd.batchesDropped, 1) + } else { + atomic.AddUint64(&bd.batchesSent, 1) + atomic.AddUint64(&bd.seriesSent, uint64(b.lenMetrics())) + } + return err + }) + }(b) } - cb(multierr.Errors(errs)) + cb(multierr.Errors(eg.Wait())) } func (c *Backend) postMetrics(ctx context.Context, batch group) error { @@ -314,7 +320,6 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error { resourceMetrics := batch.values() req, err = data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics) if err != nil { - atomic.AddUint64(&c.batchesDropped, 1) return err } From fa18514887854caa22df4cacd4a3a2f8eee14086 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 7 Aug 2024 01:31:48 +1000 Subject: [PATCH 2/2] compress --- pkg/backends/otlp/backend.go | 4 +++- pkg/backends/otlp/backend_test.go | 1 + pkg/backends/otlp/config.go | 3 +++ pkg/backends/otlp/config_test.go | 2 ++ .../otlp/internal/data/metric_request.go | 18 +++++++++++++++++- .../otlp/internal/data/metric_request_test.go | 1 + pkg/backends/otlp/internal/data/request.go | 11 ++++++++++- 7 files changed, 37 insertions(+), 3 deletions(-) diff --git a/pkg/backends/otlp/backend.go b/pkg/backends/otlp/backend.go index e28274d2..f165da27 100644 --- a/pkg/backends/otlp/backend.go +++ b/pkg/backends/otlp/backend.go @@ -51,6 +51,7 @@ type Backend struct { client *http.Client requestsBufferSem chan struct{} maxRetries int + CompressPayload bool // metricsPerBatch is the maximum number of metrics to send in a single batch. metricsPerBatch int @@ -85,6 +86,7 @@ func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transpo logger: logger, requestsBufferSem: make(chan struct{}, cfg.MaxRequests), maxRetries: cfg.MaxRetries, + CompressPayload: cfg.CompressPayload, metricsPerBatch: cfg.MetricsPerBatch, }, nil } @@ -318,7 +320,7 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error { ) resourceMetrics := batch.values() - req, err = data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics) + req, err = data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics, c.CompressPayload) if err != nil { return err } diff --git a/pkg/backends/otlp/backend_test.go b/pkg/backends/otlp/backend_test.go index 9e8ef98d..5c92dab8 100644 --- a/pkg/backends/otlp/backend_test.go +++ b/pkg/backends/otlp/backend_test.go @@ -362,6 +362,7 @@ func TestBackendSendAsyncMetrics(t *testing.T) { v := viper.New() v.Set("otlp.metrics_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/metrics")) v.Set("otlp.logs_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/logs")) + v.Set("otlp.compress_payload", false) if tc.enableHistograms { v.Set("otlp.conversion", ConversionAsHistogram) } diff --git a/pkg/backends/otlp/config.go b/pkg/backends/otlp/config.go index 82466227..a332aa5a 100644 --- a/pkg/backends/otlp/config.go +++ b/pkg/backends/otlp/config.go @@ -29,6 +29,8 @@ type Config struct { MaxRequests int `mapstructure:"max_requests"` // MaxRetries (Optional, default: 3) is the maximum number of retries to send a batch MaxRetries int `mapstructure:"max_retries"` + // CompressPayload (Optional, default: true) is used to enable payload compression + CompressPayload bool `mapstructure:"compress_payload"` // MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch. MetricsPerBatch int `mapstructure:"metrics_per_batch"` // ResourceKeys (Optional) is used to extract values from provided tags @@ -62,6 +64,7 @@ func newDefaultConfig() *Config { Transport: "default", MaxRequests: runtime.NumCPU() * 2, MaxRetries: 3, + CompressPayload: true, MetricsPerBatch: defaultMetricsPerBatch, Conversion: ConversionAsGauge, UserAgent: "gostatsd", diff --git a/pkg/backends/otlp/config_test.go b/pkg/backends/otlp/config_test.go index 3287e289..15819f63 100644 --- a/pkg/backends/otlp/config_test.go +++ b/pkg/backends/otlp/config_test.go @@ -30,6 +30,7 @@ func TestNewConfig(t *testing.T) { v.SetDefault("otlp.logs_endpoint", "http://local/v1/logs") v.SetDefault("otlp.max_requests", 1) v.SetDefault("otlp.max_retries", 3) + v.SetDefault("otlp.compress_payload", true) v.SetDefault("otlp.metrics_per_batch", 999) return v }(), @@ -38,6 +39,7 @@ func TestNewConfig(t *testing.T) { LogsEndpoint: "http://local/v1/logs", MaxRequests: 1, MaxRetries: 3, + CompressPayload: true, MetricsPerBatch: 999, Conversion: "AsGauge", Transport: "default", diff --git a/pkg/backends/otlp/internal/data/metric_request.go b/pkg/backends/otlp/internal/data/metric_request.go index 077b4929..60ae89f2 100644 --- a/pkg/backends/otlp/internal/data/metric_request.go +++ b/pkg/backends/otlp/internal/data/metric_request.go @@ -1,6 +1,8 @@ package data import ( + "bytes" + "compress/gzip" "context" "net/http" @@ -13,7 +15,7 @@ type metricsRequest struct { raw *v1export.ExportMetricsServiceRequest } -func NewMetricsRequest(ctx context.Context, endpoint string, metrics []ResourceMetrics) (*http.Request, error) { +func NewMetricsRequest(ctx context.Context, endpoint string, metrics []ResourceMetrics, compressPayload bool) (*http.Request, error) { mr := metricsRequest{ raw: &v1export.ExportMetricsServiceRequest{ ResourceMetrics: make([]*v1metrics.ResourceMetrics, 0, len(metrics)), @@ -29,5 +31,19 @@ func NewMetricsRequest(ctx context.Context, endpoint string, metrics []ResourceM return nil, err } + if compressPayload { + var b bytes.Buffer + w := gzip.NewWriter(&b) + if _, err = w.Write(buf); err != nil { + return nil, err + } + + if err = w.Close(); err != nil { + return nil, err + } + + return createProtobufRequest(ctx, endpoint, b.Bytes(), withHeader("Content-Encoding", "gzip")) + } + return createProtobufRequest(ctx, endpoint, buf) } diff --git a/pkg/backends/otlp/internal/data/metric_request_test.go b/pkg/backends/otlp/internal/data/metric_request_test.go index 3e44386d..dc94338b 100644 --- a/pkg/backends/otlp/internal/data/metric_request_test.go +++ b/pkg/backends/otlp/internal/data/metric_request_test.go @@ -15,6 +15,7 @@ func TestMetricsRequest(t *testing.T) { context.Background(), "not-a-valid-url", []ResourceMetrics{NewResourceMetrics(NewResource())}, + false, ) assert.NoError(t, err, "Must not error creating request") assert.NotNil(t, req, "Must have a valid request") diff --git a/pkg/backends/otlp/internal/data/request.go b/pkg/backends/otlp/internal/data/request.go index 0891a572..170c783a 100644 --- a/pkg/backends/otlp/internal/data/request.go +++ b/pkg/backends/otlp/internal/data/request.go @@ -6,7 +6,13 @@ import ( "net/http" ) -func createProtobufRequest(ctx context.Context, endpoint string, buf []byte) (*http.Request, error) { +func withHeader(key, value string) func(*http.Request) { + return func(req *http.Request) { + req.Header.Set(key, value) + } +} + +func createProtobufRequest(ctx context.Context, endpoint string, buf []byte, option ...func(*http.Request)) (*http.Request, error) { req, err := http.NewRequestWithContext( ctx, http.MethodPost, @@ -18,5 +24,8 @@ func createProtobufRequest(ctx context.Context, endpoint string, buf []byte) (*h } req.Header.Set("Content-Type", RequestContentTypeProtobuf) + for _, opt := range option { + opt(req) + } return req, nil }