From 117929a659c381a213bd0859bda5745615ddfbf4 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Wed, 23 Oct 2024 10:29:46 +0800 Subject: [PATCH] feat(observability): add metrics support (#49) ### Summary Integrated metric, and the OpenTelemetry implementation. #### Runtime metrics - `webhookx.runtime.num_goroutine` - `webhookx.runtime.alloc_bytes` - `webhookx.runtime.sys_bytes` - `webhookx.runtime.mallocs` - `webhookx.runtime.frees` - `webhookx.runtime.heap_objects` - `webhookx.runtime.pause_total_ns` - `webhookx.runtime.num_gc` #### Request metrics - `webhookx.request.total` - `webhookx.request.duration` #### Event metrics - `webhookx.event.total` - `webhookx.event.persisted` - `webhookx.event.pending` #### Attempt metrics - `webhookx.attempt.total` - `webhookx.attempt.response.duration` - `webhookx.attempt.pending` - `webhookx.attempt.failed` --- .github/workflows/test.yml | 29 +-- .gitignore | 1 + Makefile | 20 +- README.md | 3 +- app/app.go | 44 +++-- cmd/migrations.go | 6 +- config.yml | 12 ++ config/config.go | 26 +-- config/config_test.go | 62 +++++++ config/metrics.go | 53 ++++++ config/types.go | 9 + dispatcher/dispatcher.go | 75 ++++---- go.mod | 36 +++- go.sum | 93 ++++++---- pkg/metrics/metrics.go | 87 +++++++++ pkg/metrics/opentelemetry.go | 103 +++++++++++ pkg/metrics/types.go | 111 +++++++++++ pkg/queue/queue.go | 1 + pkg/queue/redis/redis.go | 36 +++- pkg/taskqueue/queue.go | 1 + pkg/taskqueue/redis.go | 30 ++- proxy/models.go => pkg/types/types.go | 2 +- proxy/gateway.go | 24 ++- proxy/middlewares/metrics.go | 26 +++ .../recovery.go} | 7 +- test/README.md | 8 +- test/cache/cache_test.go | 2 +- test/docker-compose.yml | 34 ++++ test/helper/helper.go | 30 ++- test/metrics/opentelemetry_test.go | 174 ++++++++++++++++++ test/metrics/types.go | 36 ++++ test/otel-collector-config.yml | 38 ++++ test/test.go | 2 +- test/worker/requeue_test.go | 5 +- worker/worker.go | 24 ++- 35 files changed, 1068 insertions(+), 182 deletions(-) create mode 100644 config/metrics.go create mode 100644 config/types.go create mode 100644 pkg/metrics/metrics.go create mode 100644 pkg/metrics/opentelemetry.go create mode 100644 pkg/metrics/types.go rename proxy/models.go => pkg/types/types.go (89%) create mode 100644 proxy/middlewares/metrics.go rename proxy/{middleware.go => middlewares/recovery.go} (82%) create mode 100644 test/docker-compose.yml create mode 100644 test/metrics/opentelemetry_test.go create mode 100644 test/metrics/types.go create mode 100644 test/otel-collector-config.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 68c7918..f273869 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,6 +47,14 @@ jobs: with: go-version-file: go.mod + - name: start dependencies + run: | + mkdir -p test/output/otel + sudo chmod 777 -R test/output/otel + make deps + sleep 3 + docker compose -f test/docker-compose.yml logs + - name: install run: make install @@ -60,27 +68,6 @@ jobs: fail_ci_if_error: true flags: integration - services: - postgres: - image: postgres:13 - env: - POSTGRES_USER: webhookx - POSTGRES_DB: webhookx - POSTGRES_HOST_AUTH_METHOD: trust - ports: - - 5432:5432 - options: --health-cmd pg_isready --health-interval 3s --health-timeout 5s --health-retries 3 - - redis: - image: redis:6 - ports: - - 6379:6379 - - httpbin: - image: kennethreitz/httpbin - ports: - - 9999:80 - test-docker-compose: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index a65e08a..e4f3f7f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ coverage.* webhookx *.log dist/ +test/output diff --git a/Makefile b/Makefile index a7bcbe2..e2c6e60 100644 --- a/Makefile +++ b/Makefile @@ -1,41 +1,43 @@ +DIR := $(shell pwd) + +export WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH=$(DIR)/test/output/otel + LDFLAGS = --ldflags "\ -X github.com/webhookx-io/webhookx/config.COMMIT=`git rev-parse --verify --short HEAD` \ -X github.com/webhookx-io/webhookx/config.VERSION=`git tag -l --points-at HEAD | head -n 1`" -.PHONY: clean +.PHONY: clean build install generate test test-coverage test-integration \ + test-integration-coverage goreleaser migrate-create deps + clean: go clean go clean -testcache -.PHONY: build build: CGO_ENABLED=0 go build ${LDFLAGS} -.PHONY: install install: go install ${LDFLAGS} -.PHONY: generate generate: go generate ./... -.PHONY: test +deps: + mkdir -p test/output/otel + docker compose -f test/docker-compose.yml up -d + test: clean go test $$(go list ./... | grep -v /test/) -.PHONY: test-coverage test-coverage: clean go test $$(go list ./... | grep -v /test/) -coverprofile=coverage.txt -.PHONY: test-integration test-integration: clean go test -p 1 -v ./test/... -.PHONY: test-integration-coverage test-integration-coverage: clean go test -p 1 -v ./test/... --coverpkg ./... -coverprofile=coverage.txt -.PHONY: goreleaser goreleaser: goreleaser release --snapshot --clean diff --git a/README.md b/README.md index 5a40428..c52c25c 100644 --- a/README.md +++ b/README.md @@ -15,14 +15,13 @@ WebhookX is an open-source webhooks gateway for message receiving, processing, a - **Plugins:** - `webhookx-signature`: signing outbound requests with HMAC(SHA-256) by adding `Webhookx-Signature` and `Webhookx-Timestamp` to request header. - `transformer`(WIP): transform request before sending outbound requests. - +- **Observability:** Metrics and Tracing(WIP). ## Roadmap - [ ] Data retention policy - [ ] Insight admin APIs -- [ ] Observability(o11y) including tracing and metrics - [ ] Declarative configuration management #### Inbound diff --git a/app/app.go b/app/app.go index e83d53a..243b29e 100644 --- a/app/app.go +++ b/app/app.go @@ -13,6 +13,7 @@ import ( "github.com/webhookx-io/webhookx/mcache" "github.com/webhookx-io/webhookx/pkg/cache" "github.com/webhookx-io/webhookx/pkg/log" + "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/taskqueue" "github.com/webhookx-io/webhookx/proxy" "github.com/webhookx-io/webhookx/worker" @@ -41,6 +42,7 @@ type Application struct { dispatcher *dispatcher.Dispatcher cache cache.Cache bus *eventbus.EventBus + metrics *metrics.Metrics admin *admin.Admin gateway *proxy.Gateway @@ -72,7 +74,7 @@ func (app *Application) initialize() error { app.log = zap.S() // cache - client := cfg.RedisConfig.GetClient() + client := cfg.Redis.GetClient() app.cache = cache.NewRedisCache(client) mcache.Set(mcache.NewMCache(&mcache.Options{ @@ -83,44 +85,49 @@ func (app *Application) initialize() error { app.bus = eventbus.NewEventBus( app.NodeID(), - cfg.DatabaseConfig.GetDSN(), + cfg.Database.GetDSN(), app.log) registerEventHandler(app.bus) // db - db, err := db.NewDB(&cfg.DatabaseConfig) + db, err := db.NewDB(&cfg.Database) if err != nil { return err } app.db = db + app.metrics, err = metrics.New(cfg.Metrics) + if err != nil { + return err + } + // queue queue := taskqueue.NewRedisQueue(taskqueue.RedisTaskQueueOptions{ Client: client, - }, app.log) + }, app.log, app.metrics) app.queue = queue - app.dispatcher = dispatcher.NewDispatcher(log.Sugar(), queue, db) + app.dispatcher = dispatcher.NewDispatcher(log.Sugar(), queue, db, app.metrics) // worker - if cfg.WorkerConfig.Enabled { + if cfg.Worker.Enabled { opts := worker.WorkerOptions{ - PoolSize: int(cfg.WorkerConfig.Pool.Size), - PoolConcurrency: int(cfg.WorkerConfig.Pool.Concurrency), + PoolSize: int(cfg.Worker.Pool.Size), + PoolConcurrency: int(cfg.Worker.Pool.Concurrency), } - deliverer := deliverer.NewHTTPDeliverer(&cfg.WorkerConfig.Deliverer) - app.worker = worker.NewWorker(opts, db, deliverer, queue) + deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer) + app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics) } // admin - if cfg.AdminConfig.IsEnabled() { + if cfg.Admin.IsEnabled() { handler := api.NewAPI(cfg, db, app.dispatcher).Handler() - app.admin = admin.NewAdmin(cfg.AdminConfig, handler) + app.admin = admin.NewAdmin(cfg.Admin, handler) } // gateway - if cfg.ProxyConfig.IsEnabled() { - app.gateway = proxy.NewGateway(&cfg.ProxyConfig, db, app.dispatcher) + if cfg.Proxy.IsEnabled() { + app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics) } return nil @@ -196,15 +203,18 @@ func (app *Application) Stop() error { }() _ = app.bus.Stop() + if app.metrics != nil { + _ = app.metrics.Stop() + } // TODO: timeout if app.admin != nil { - app.admin.Stop() + _ = app.admin.Stop() } if app.gateway != nil { - app.gateway.Stop() + _ = app.gateway.Stop() } if app.worker != nil { - app.worker.Stop() + _ = app.worker.Stop() } app.started = false diff --git a/cmd/migrations.go b/cmd/migrations.go index 7680491..f769f75 100644 --- a/cmd/migrations.go +++ b/cmd/migrations.go @@ -20,7 +20,7 @@ func newMigrationsResetCmd() *cobra.Command { return errors.New("canceled") } } - m := migrator.New(&cfg.DatabaseConfig) + m := migrator.New(&cfg.Database) fmt.Println("resetting database...") if err := m.Reset(); err != nil { return err @@ -48,7 +48,7 @@ func newMigrationsCmd() *cobra.Command { Short: "Print the migration status", Long: ``, RunE: func(cmd *cobra.Command, args []string) error { - m := migrator.New(&cfg.DatabaseConfig) + m := migrator.New(&cfg.Database) version, dirty, err := m.Status() if err != nil { return err @@ -68,7 +68,7 @@ func newMigrationsCmd() *cobra.Command { Short: "Run any new migrations", Long: ``, RunE: func(cmd *cobra.Command, args []string) error { - m := migrator.New(&cfg.DatabaseConfig) + m := migrator.New(&cfg.Database) if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) { return err } diff --git a/config.yml b/config.yml index 681100f..9c821bf 100644 --- a/config.yml +++ b/config.yml @@ -61,3 +61,15 @@ proxy: port: 6379 password: database: 0 + +#------------------------------------------------------------------------------ +# METRICS +#------------------------------------------------------------------------------ +metrics: + attributes: # global attributes for each metric + env: prod + #exports: [ opentelemetry ] # list of enabled vendor exports. supported value are opentelemetry + opentelemetry: + push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector + protocol: http/protobuf # supported value are http/protobuf, grpc + endpoint: http://localhost:4318/v1/metrics # http/protobuf(http://localhost:4318/v1/metrics), grpc(localhost:4317) diff --git a/config/config.go b/config/config.go index 0d73219..7d12d97 100644 --- a/config/config.go +++ b/config/config.go @@ -18,12 +18,13 @@ var ( var cfg Config type Config struct { - Log LogConfig `yaml:"log" envconfig:"LOG"` - DatabaseConfig DatabaseConfig `yaml:"database" envconfig:"DATABASE"` - RedisConfig RedisConfig `yaml:"redis" envconfig:"REDIS"` - AdminConfig AdminConfig `yaml:"admin" envconfig:"ADMIN"` - ProxyConfig ProxyConfig `yaml:"proxy" envconfig:"PROXY"` - WorkerConfig WorkerConfig `yaml:"worker" envconfig:"WORKER"` + Log LogConfig `yaml:"log" envconfig:"LOG"` + Database DatabaseConfig `yaml:"database" envconfig:"DATABASE"` + Redis RedisConfig `yaml:"redis" envconfig:"REDIS"` + Admin AdminConfig `yaml:"admin" envconfig:"ADMIN"` + Proxy ProxyConfig `yaml:"proxy" envconfig:"PROXY"` + Worker WorkerConfig `yaml:"worker" envconfig:"WORKER"` + Metrics MetricsConfig `yaml:"metrics" envconfig:"METRICS"` } func (cfg Config) String() string { @@ -38,19 +39,22 @@ func (cfg Config) Validate() error { if err := cfg.Log.Validate(); err != nil { return err } - if err := cfg.DatabaseConfig.Validate(); err != nil { + if err := cfg.Database.Validate(); err != nil { return err } - if err := cfg.RedisConfig.Validate(); err != nil { + if err := cfg.Redis.Validate(); err != nil { return err } - if err := cfg.AdminConfig.Validate(); err != nil { + if err := cfg.Admin.Validate(); err != nil { return err } - if err := cfg.ProxyConfig.Validate(); err != nil { + if err := cfg.Proxy.Validate(); err != nil { return err } - if err := cfg.WorkerConfig.Validate(); err != nil { + if err := cfg.Worker.Validate(); err != nil { + return err + } + if err := cfg.Metrics.Validate(); err != nil { return err } diff --git a/config/config_test.go b/config/config_test.go index 5f92912..73b3c13 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -163,6 +163,68 @@ func TestProxyConfig(t *testing.T) { } } +func TestMetricsConfig(t *testing.T) { + tests := []struct { + desc string + cfg MetricsConfig + expectedValidateErr error + }{ + { + desc: "sanity", + cfg: MetricsConfig{ + Attributes: nil, + Exports: nil, + OpenTelemetry: Opentelemetry{ + PushInterval: 1, + Protocol: "http/protobuf", + }, + }, + expectedValidateErr: nil, + }, + { + desc: "invalid export", + cfg: MetricsConfig{ + Attributes: nil, + Exports: []Export{"unknown"}, + OpenTelemetry: Opentelemetry{ + PushInterval: 1, + Protocol: "http/protobuf", + }, + }, + expectedValidateErr: errors.New("invalid export: unknown"), + }, + { + desc: "invalid export", + cfg: MetricsConfig{ + Attributes: nil, + Exports: nil, + OpenTelemetry: Opentelemetry{ + PushInterval: 1, + Protocol: "unknown", + }, + }, + expectedValidateErr: errors.New("invalid protocol: unknown"), + }, + { + desc: "invalid PushInterval", + cfg: MetricsConfig{ + Attributes: nil, + Exports: nil, + OpenTelemetry: Opentelemetry{ + PushInterval: 61, + Protocol: "http/protobuf", + }, + }, + expectedValidateErr: errors.New("interval must be in the range [1, 60]"), + }, + } + + for _, test := range tests { + actualValidateErr := test.cfg.Validate() + assert.Equal(t, test.expectedValidateErr, actualValidateErr, "expected %v got %v", test.expectedValidateErr, actualValidateErr) + } +} + func TestConfig(t *testing.T) { cfg, err := Init() assert.Nil(t, err) diff --git a/config/metrics.go b/config/metrics.go new file mode 100644 index 0000000..67bd7e2 --- /dev/null +++ b/config/metrics.go @@ -0,0 +1,53 @@ +package config + +import ( + "fmt" + "slices" +) + +type MetricsConfig struct { + Attributes Map `yaml:"attributes" envconfig:"ATTRIBUTES"` + Exports []Export `yaml:"exports" envconfig:"EXPORTS"` + OpenTelemetry Opentelemetry `yaml:"opentelemetry" envconfig:"OPENTELEMETRY"` +} + +func (cfg *MetricsConfig) Validate() error { + if err := cfg.OpenTelemetry.Validate(); err != nil { + return err + } + for _, export := range cfg.Exports { + if !slices.Contains([]Export{ExportOpenTelemetry}, export) { + return fmt.Errorf("invalid export: %s", export) + } + } + return nil +} + +type Export string + +const ( + ExportOpenTelemetry Export = "opentelemetry" +) + +type OtlpProtocol string + +const ( + OtlpProtocolGRPC OtlpProtocol = "grpc" + OtlpProtocolHTTP OtlpProtocol = "http/protobuf" +) + +type Opentelemetry struct { + PushInterval uint32 `yaml:"push_interval" default:"10"` + Protocol OtlpProtocol `yaml:"protocol" envconfig:"PROTOCOL" default:"http/protobuf"` + Endpoint string `yaml:"endpoint" envconfig:"ENDPOINT" default:"http://localhost:4318/v1/metrics"` +} + +func (cfg Opentelemetry) Validate() error { + if cfg.PushInterval < 1 || cfg.PushInterval > 60 { + return fmt.Errorf("interval must be in the range [1, 60]") + } + if !slices.Contains([]OtlpProtocol{OtlpProtocolGRPC, OtlpProtocolHTTP}, cfg.Protocol) { + return fmt.Errorf("invalid protocol: %s", cfg.Protocol) + } + return nil +} diff --git a/config/types.go b/config/types.go new file mode 100644 index 0000000..137335b --- /dev/null +++ b/config/types.go @@ -0,0 +1,9 @@ +package config + +import "encoding/json" + +type Map map[string]string + +func (m *Map) Decode(value string) error { + return json.Unmarshal([]byte(value), m) +} diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 58f8716..7d8cfc5 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -6,6 +6,7 @@ import ( "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/model" + "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/taskqueue" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" @@ -15,76 +16,72 @@ import ( // Dispatcher is Event Dispatcher type Dispatcher struct { - log *zap.SugaredLogger - queue taskqueue.TaskQueue - db *db.DB + log *zap.SugaredLogger + queue taskqueue.TaskQueue + db *db.DB + metrics *metrics.Metrics } -func NewDispatcher(log *zap.SugaredLogger, queue taskqueue.TaskQueue, db *db.DB) *Dispatcher { +func NewDispatcher(log *zap.SugaredLogger, queue taskqueue.TaskQueue, db *db.DB, metrics *metrics.Metrics) *Dispatcher { dispatcher := &Dispatcher{ - log: log, - queue: queue, - db: db, + log: log, + queue: queue, + db: db, + metrics: metrics, } return dispatcher } func (d *Dispatcher) Dispatch(ctx context.Context, event *entities.Event) error { - endpoints, err := d.listSubscribedEndpoint(ctx, event.WorkspaceId, event.EventType) - if err != nil { - return err - } - - attempts := fanout(event, endpoints, entities.AttemptTriggerModeInitial) - if len(attempts) == 0 { - return d.db.Events.Insert(ctx, event) - } + return d.DispatchBatch(ctx, []*entities.Event{event}) +} - err = d.db.TX(ctx, func(ctx context.Context) error { - err := d.db.Events.Insert(ctx, event) - if err != nil { - return err - } - return d.db.Attempts.BatchInsert(ctx, attempts) - }) - if err != nil { - return err +func (d *Dispatcher) DispatchBatch(ctx context.Context, events []*entities.Event) error { + n, err := d.dispatchBatch(ctx, events) + if d.metrics.Enabled && err == nil { + d.metrics.EventPersistCounter.Add(float64(n)) } - - go d.sendToQueue(context.TODO(), attempts) - - return nil + return err } -func (d *Dispatcher) DispatchBatch(ctx context.Context, events []*entities.Event) error { +func (d *Dispatcher) dispatchBatch(ctx context.Context, events []*entities.Event) (int, error) { if len(events) == 0 { - return nil + return 0, nil } - eventAttempts := make(map[string][]*entities.Attempt) + maps := make(map[string][]*entities.Attempt) for _, event := range events { endpoints, err := d.listSubscribedEndpoint(ctx, event.WorkspaceId, event.EventType) if err != nil { - return err + return 0, err + } + if len(endpoints) != 0 { + maps[event.ID] = fanout(event, endpoints, entities.AttemptTriggerModeInitial) } - eventAttempts[event.ID] = fanout(event, endpoints, entities.AttemptTriggerModeInitial) + } + + if len(maps) == 0 { + ids, err := d.db.Events.BatchInsertIgnoreConflict(ctx, events) + return len(ids), err } attempts := make([]*entities.Attempt, 0) + n := 0 err := d.db.TX(ctx, func(ctx context.Context) error { ids, err := d.db.Events.BatchInsertIgnoreConflict(ctx, events) if err != nil { return err } + n = len(ids) for _, id := range ids { - attempts = append(attempts, eventAttempts[id]...) + attempts = append(attempts, maps[id]...) } return d.db.Attempts.BatchInsert(ctx, attempts) }) - - go d.sendToQueue(context.TODO(), attempts) - - return err + if err == nil { + go d.sendToQueue(context.TODO(), attempts) + } + return n, err } func fanout(event *entities.Event, endpoints []*entities.Endpoint, mode entities.AttemptTriggerMode) []*entities.Attempt { diff --git a/go.mod b/go.mod index 2b2d2be..7d31df3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.0 require ( github.com/Masterminds/squirrel v1.5.4 github.com/creasty/defaults v1.8.0 + github.com/go-kit/kit v0.13.0 github.com/go-playground/validator/v10 v10.22.1 github.com/go-resty/resty/v2 v2.15.3 github.com/golang-migrate/migrate/v4 v4.18.1 @@ -20,37 +21,52 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 + go.opentelemetry.io/otel/metric v1.31.0 + go.opentelemetry.io/otel/sdk v1.31.0 + go.opentelemetry.io/otel/sdk/metric v1.31.0 go.uber.org/mock v0.4.0 go.uber.org/zap v1.27.0 gopkg.in/yaml.v3 v3.0.1 ) require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/gabriel-vasile/mimetype v1.4.5 // indirect github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect + github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/kr/pretty v0.3.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/spf13/pflag v1.0.5 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect + go.opentelemetry.io/otel/trace v1.31.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.27.0 // indirect - golang.org/x/net v0.29.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect - golang.org/x/tools v0.24.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + golang.org/x/tools v0.25.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect + google.golang.org/grpc v1.67.1 // indirect + google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index 9d61ef6..8594817 100644 --- a/go.sum +++ b/go.sum @@ -6,18 +6,22 @@ github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8 github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= +github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creasty/defaults v1.8.0 h1:z27FJxCAa0JKt3utc0sCImAEb+spPucmKoOdLHvHYKk= github.com/creasty/defaults v1.8.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dhui/dktest v0.4.3 h1:wquqUxAFdcUgabAVLvSCOKOlag5cIZuaOjYIBOWdsR0= @@ -34,6 +38,9 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/gabriel-vasile/mimetype v1.4.5 h1:J7wGKdGu33ocBOhGy0z653k/lFKLFDPJMG8Gql0kxn4= github.com/gabriel-vasile/mimetype v1.4.5/go.mod h1:ibHel+/kbxn9x2407k1izTA1S81ku1z/DlgOW2QE0M4= +github.com/go-kit/kit v0.13.0 h1:OoneCcHKHQ03LfBpoQCUfCluwd2Vt3ohz+kvbJneZAU= +github.com/go-kit/kit v0.13.0/go.mod h1:phqEHMMUbyrCFCTgH48JueqrM3md2HcAZ8N3XE4FKDg= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -58,10 +65,14 @@ github.com/golang-migrate/migrate/v4 v4.18.1 h1:JML/k+t4tpHCpQTCAD62Nu43NUFzHY4C github.com/golang-migrate/migrate/v4 v4.18.1/go.mod h1:HAX6m3sQgcdO81tdjn5exv20+3Kb13cmGli1hrD6hks= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSFBy+X1V0o+l+8NF1avt4HWl7cA= -github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ= +github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -73,12 +84,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= @@ -107,13 +114,13 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= @@ -126,14 +133,24 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= -go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= -go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= -go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= -go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= -go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= -go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6ei8GFW7kyPYdxJaV2rgI6M+4tvZzhYsQ2wgyVC08= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -144,24 +161,28 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= -golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= +golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= +google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 h1:T6rh4haD3GVYsgEfWExoCZA2o2FmbNyKpTuAxbEFPTg= +google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:wp2WsuBYj6j8wUdo3ToZsdxxixbvQNAHqVJrTgi5E5M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..2cea523 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,87 @@ +package metrics + +import ( + "context" + "github.com/go-kit/kit/metrics" + "github.com/webhookx-io/webhookx/config" + "github.com/webhookx-io/webhookx/pkg/schedule" + "go.uber.org/zap" + "runtime" + "time" +) + +type Metrics struct { + ctx context.Context + cancel context.CancelFunc + + Enabled bool + Interval time.Duration + + // runtime metrics + + RuntimeGoroutine metrics.Gauge + RuntimeAlloc metrics.Gauge + RuntimeSys metrics.Gauge + RuntimeMallocs metrics.Gauge + RuntimeFrees metrics.Gauge + RuntimeHeapObjects metrics.Gauge + RuntimePauseTotalNs metrics.Gauge + RuntimeGC metrics.Gauge + + // worker metrics + + AttemptTotalCounter metrics.Counter + AttemptFailedCounter metrics.Counter + AttemptPendingGauge metrics.Gauge + AttemptResponseDurationHistogram metrics.Histogram + + // proxy metrics + + ProxyRequestCounter metrics.Counter + ProxyRequestDurationHistogram metrics.Histogram + + // events metrics + EventTotalCounter metrics.Counter + EventPersistCounter metrics.Counter + EventPendingGauge metrics.Gauge +} + +func (m *Metrics) Stop() error { + m.cancel() + return nil +} + +func New(cfg config.MetricsConfig) (*Metrics, error) { + ctx, cancel := context.WithCancel(context.Background()) + m := &Metrics{ + ctx: ctx, + cancel: cancel, + Enabled: len(cfg.Exports) > 0, + } + + if len(cfg.Exports) > 0 { + m.Interval = time.Second * time.Duration(cfg.OpenTelemetry.PushInterval) + err := SetupOpentelemetry(m.ctx, cfg.Attributes, cfg.OpenTelemetry, m) + if err != nil { + return nil, err + } + schedule.Schedule(m.ctx, m.collectRuntimeStats, m.Interval) + zap.S().Infof("enabled metric exports: %v", cfg.Exports) + } + + return m, nil +} + +func (m *Metrics) collectRuntimeStats() { + m.RuntimeGoroutine.Set(float64(runtime.NumGoroutine())) + + var stats runtime.MemStats + runtime.ReadMemStats(&stats) + m.RuntimeAlloc.Set(float64(stats.Alloc)) + m.RuntimeSys.Set(float64(stats.Sys)) + m.RuntimeMallocs.Set(float64(stats.Mallocs)) + m.RuntimeFrees.Set(float64(stats.Frees)) + m.RuntimeHeapObjects.Set(float64(stats.HeapObjects)) + m.RuntimePauseTotalNs.Set(float64(stats.PauseTotalNs)) + m.RuntimeGC.Set(float64(stats.NumGC)) +} diff --git a/pkg/metrics/opentelemetry.go b/pkg/metrics/opentelemetry.go new file mode 100644 index 0000000..c6c234c --- /dev/null +++ b/pkg/metrics/opentelemetry.go @@ -0,0 +1,103 @@ +package metrics + +import ( + "context" + "fmt" + "github.com/webhookx-io/webhookx/config" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/semconv/v1.26.0" + "time" +) + +const ( + prefix = "webhookx." +) + +func newHTTPExporter(endpoint string) (metric.Exporter, error) { + opts := []otlpmetrichttp.Option{ + otlpmetrichttp.WithEndpointURL(endpoint), + } + return otlpmetrichttp.New(context.Background(), opts...) +} + +func newGRPCExporter(endpoint string) (metric.Exporter, error) { + opts := []otlpmetricgrpc.Option{ + otlpmetricgrpc.WithEndpoint(endpoint), + otlpmetricgrpc.WithInsecure(), + } + return otlpmetricgrpc.New(context.Background(), opts...) +} + +func SetupOpentelemetry(ctx context.Context, attributes map[string]string, cfg config.Opentelemetry, metrics *Metrics) error { + var err error + var exporter metric.Exporter + switch cfg.Protocol { + case config.OtlpProtocolHTTP: + exporter, err = newHTTPExporter(cfg.Endpoint) + case config.OtlpProtocolGRPC: + exporter, err = newGRPCExporter(cfg.Endpoint) + } + if err != nil { + return fmt.Errorf("failed to setup exporter: %v", err) + } + + // custom attributes + attrs := make([]attribute.KeyValue, len(attributes)) + for name, value := range attributes { + attrs = append(attrs, attribute.String(name, value)) + } + + res, err := resource.New(ctx, + resource.WithAttributes(semconv.ServiceNameKey.String("webhookx")), + resource.WithAttributes(semconv.ServiceVersionKey.String(config.VERSION)), + resource.WithFromEnv(), + resource.WithAttributes(attrs...), + ) + if err != nil { + return fmt.Errorf("failed to build resource: %w", err) + } + + opts := []metric.PeriodicReaderOption{ + metric.WithInterval(time.Second * time.Duration(cfg.PushInterval)), + } + + meterProvider := metric.NewMeterProvider( + metric.WithResource(res), + metric.WithReader(metric.NewPeriodicReader(exporter, opts...)), + ) + otel.SetMeterProvider(meterProvider) + + meter := otel.Meter("github.com/webhookx-io/webhookx") + + // proxy metrics + metrics.ProxyRequestCounter = NewCounter(meter, prefix+"request.total", "") + metrics.ProxyRequestDurationHistogram = NewHistogram(meter, prefix+"request.duration", "", "s") + + // runtime metrics + metrics.RuntimeGoroutine = NewGauge(meter, prefix+"runtime.num_goroutine", "") + metrics.RuntimeAlloc = NewGauge(meter, prefix+"runtime.alloc_bytes", "") + metrics.RuntimeSys = NewGauge(meter, prefix+"runtime.sys_bytes", "") + metrics.RuntimeMallocs = NewGauge(meter, prefix+"runtime.mallocs", "") + metrics.RuntimeFrees = NewGauge(meter, prefix+"runtime.frees", "") + metrics.RuntimeHeapObjects = NewGauge(meter, prefix+"runtime.heap_objects", "") + metrics.RuntimePauseTotalNs = NewGauge(meter, prefix+"runtime.pause_total_ns", "") + metrics.RuntimeGC = NewGauge(meter, prefix+"runtime.num_gc", "") + + // worker metrics + metrics.AttemptTotalCounter = NewCounter(meter, prefix+"attempt.total", "") + metrics.AttemptFailedCounter = NewCounter(meter, prefix+"attempt.failed", "") + metrics.AttemptPendingGauge = NewGauge(meter, prefix+"attempt.pending", "") + metrics.AttemptResponseDurationHistogram = NewHistogram(meter, prefix+"attempt.response.duration", "", "s") + + // event metrics + metrics.EventTotalCounter = NewCounter(meter, prefix+"event.total", "") + metrics.EventPersistCounter = NewCounter(meter, prefix+"event.persisted", "") + metrics.EventPendingGauge = NewGauge(meter, prefix+"event.pending", "") + + return nil +} diff --git a/pkg/metrics/types.go b/pkg/metrics/types.go new file mode 100644 index 0000000..4b640aa --- /dev/null +++ b/pkg/metrics/types.go @@ -0,0 +1,111 @@ +package metrics + +import ( + "context" + "github.com/go-kit/kit/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type LabelValues []string + +func (lvs LabelValues) With(labelValues ...string) LabelValues { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, "unknown") + } + return append(lvs, labelValues...) +} + +func (lvs LabelValues) ToLabels() []attribute.KeyValue { + labels := make([]attribute.KeyValue, len(lvs)/2) + for i := 0; i < len(labels); i++ { + labels[i] = attribute.String(lvs[2*i], lvs[2*i+1]) + } + return labels +} + +type Counter struct { + lvs LabelValues + c metric.Float64Counter +} + +func (c *Counter) With(labelValues ...string) metrics.Counter { + return &Counter{ + lvs: c.lvs.With(labelValues...), + c: c.c, + } +} + +func (c *Counter) Add(delta float64) { + c.c.Add(context.Background(), delta, metric.WithAttributes(c.lvs.ToLabels()...)) +} + +func NewCounter(meter metric.Meter, name string, desc string) *Counter { + c, _ := meter.Float64Counter( + name, + metric.WithDescription(desc), + metric.WithUnit("1"), + ) + return &Counter{ + c: c, + } +} + +type Gauge struct { + lvs LabelValues + g metric.Float64Gauge +} + +func NewGauge(meter metric.Meter, name string, desc string) *Gauge { + g, _ := meter.Float64Gauge( + name, + metric.WithDescription(desc), + metric.WithUnit("1"), + ) + return &Gauge{ + g: g, + } +} + +func (g *Gauge) With(labelValues ...string) metrics.Gauge { + return &Gauge{ + lvs: g.lvs.With(labelValues...), + g: g.g, + } +} + +func (g *Gauge) Add(delta float64) { + g.g.Record(context.Background(), delta, metric.WithAttributes(g.lvs.ToLabels()...)) +} + +func (g *Gauge) Set(delta float64) { + g.g.Record(context.Background(), delta, metric.WithAttributes(g.lvs.ToLabels()...)) +} + +type Histogram struct { + lvs LabelValues + h metric.Float64Histogram +} + +func NewHistogram(meter metric.Meter, name string, desc string, unit string) *Histogram { + h, _ := meter.Float64Histogram( + name, + metric.WithDescription(desc), + metric.WithUnit(unit), + metric.WithExplicitBucketBoundaries(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10), + ) + return &Histogram{ + h: h, + } +} + +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + return &Histogram{ + lvs: h.lvs.With(labelValues...), + h: h.h, + } +} + +func (h *Histogram) Observe(value float64) { + h.h.Record(context.Background(), value, metric.WithAttributes(h.lvs.ToLabels()...)) +} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 2e47d62..b5144e1 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -22,4 +22,5 @@ type Queue interface { Enqueue(ctx context.Context, message *Message) error Dequeue(ctx context.Context, opts *Options) ([]*Message, error) Delete(ctx context.Context, message []*Message) error + Size(ctx context.Context) (int64, error) } diff --git a/pkg/queue/redis/redis.go b/pkg/queue/redis/redis.go index ddffe12..e94ae33 100644 --- a/pkg/queue/redis/redis.go +++ b/pkg/queue/redis/redis.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/redis/go-redis/v9" "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/queue" "github.com/webhookx-io/webhookx/utils" "go.uber.org/zap" @@ -19,8 +20,9 @@ type RedisQueue struct { consumer string visibilityTimeout time.Duration - c *redis.Client - log *zap.SugaredLogger + c *redis.Client + log *zap.SugaredLogger + metrics *metrics.Metrics } type RedisQueueOptions struct { @@ -32,7 +34,7 @@ type RedisQueueOptions struct { Client *redis.Client } -func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger) (queue.Queue, error) { +func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger, metrics *metrics.Metrics) (queue.Queue, error) { q := &RedisQueue{ stream: utils.DefaultIfZero(opts.StreamName, constants.QueueRedisQueueName), group: utils.DefaultIfZero(opts.GroupName, constants.QueueRedisGroupName), @@ -40,9 +42,14 @@ func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger) (queue.Que visibilityTimeout: utils.DefaultIfZero(opts.VisibilityTimeout, constants.QueueRedisVisibilityTimeout), c: opts.Client, log: logger, + metrics: metrics, } go q.process() + if metrics.Enabled { + go q.monitoring() + } + return q, nil } @@ -50,11 +57,7 @@ func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error args := &redis.XAddArgs{ Stream: q.stream, ID: "*", - Values: map[string]interface{}{ - "data": message.Data, - "time": message.Time.UnixMilli(), - "ws_id": message.WorkspaceID, - }, + Values: []interface{}{"data", message.Data, "time", message.Time.UnixMilli(), "ws_id", message.WorkspaceID}, } res := q.c.XAdd(ctx, args) if res.Err() != nil { @@ -139,6 +142,10 @@ func (q *RedisQueue) Delete(ctx context.Context, messages []*queue.Message) erro return nil } +func (q *RedisQueue) Size(ctx context.Context) (int64, error) { + return q.c.XLen(ctx, q.stream).Result() +} + func (q *RedisQueue) createConsumerGroup() { res := q.c.XGroupCreateMkStream(context.TODO(), q.stream, q.group, "0") if res.Err() == nil { @@ -193,3 +200,16 @@ func (q *RedisQueue) process() { }() } + +func (q *RedisQueue) monitoring() { + ticker := time.NewTicker(q.metrics.Interval) + defer ticker.Stop() + for range ticker.C { + size, err := q.Size(context.TODO()) + if err != nil { + q.log.Errorf("failed to get redis queue size: %v", err) + continue + } + q.metrics.EventPendingGauge.Set(float64(size)) + } +} diff --git a/pkg/taskqueue/queue.go b/pkg/taskqueue/queue.go index ce8bcbd..65a86c7 100644 --- a/pkg/taskqueue/queue.go +++ b/pkg/taskqueue/queue.go @@ -33,4 +33,5 @@ type TaskQueue interface { Add(ctx context.Context, tasks []*TaskMessage) error Get(ctx context.Context, opts *GetOptions) (tasks []*TaskMessage, err error) Delete(ctx context.Context, task *TaskMessage) error + Size(ctx context.Context) (int64, error) } diff --git a/pkg/taskqueue/redis.go b/pkg/taskqueue/redis.go index d211392..9614764 100644 --- a/pkg/taskqueue/redis.go +++ b/pkg/taskqueue/redis.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/redis/go-redis/v9" "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/utils" "go.uber.org/zap" "time" @@ -57,8 +58,9 @@ type RedisTaskQueue struct { queueData string visibilityTimeout time.Duration - c *redis.Client - log *zap.SugaredLogger + c *redis.Client + log *zap.SugaredLogger + metrics *metrics.Metrics } type RedisTaskQueueOptions struct { @@ -69,7 +71,7 @@ type RedisTaskQueueOptions struct { Client *redis.Client } -func NewRedisQueue(opts RedisTaskQueueOptions, logger *zap.SugaredLogger) *RedisTaskQueue { +func NewRedisQueue(opts RedisTaskQueueOptions, logger *zap.SugaredLogger, metrics *metrics.Metrics) *RedisTaskQueue { q := &RedisTaskQueue{ queue: utils.DefaultIfZero(opts.QueueName, constants.TaskQueueName), invisibleQueue: utils.DefaultIfZero(opts.InvisibleQueueName, constants.TaskQueueInvisibleQueueName), @@ -77,8 +79,14 @@ func NewRedisQueue(opts RedisTaskQueueOptions, logger *zap.SugaredLogger) *Redis queueData: utils.DefaultIfZero(opts.QueueDataName, constants.TaskQueueDataName), c: opts.Client, log: logger, + metrics: metrics, } q.process() + + if metrics.Enabled { + go q.monitoring() + } + return q } @@ -143,6 +151,9 @@ func (q *RedisTaskQueue) Delete(ctx context.Context, task *TaskMessage) error { return err } +func (q *RedisTaskQueue) Size(ctx context.Context) (int64, error) { + return q.c.ZCard(ctx, q.queue).Result() +} // process re-enqueue invisible tasks that reach the visibility timeout func (q *RedisTaskQueue) process() { @@ -165,3 +176,16 @@ func (q *RedisTaskQueue) process() { } }() } + +func (q *RedisTaskQueue) monitoring() { + ticker := time.NewTicker(q.metrics.Interval) + defer ticker.Stop() + for range ticker.C { + size, err := q.Size(context.TODO()) + if err != nil { + q.log.Errorf("failed to get task queue size: %v", err) + continue + } + q.metrics.AttemptPendingGauge.Set(float64(size)) + } +} diff --git a/proxy/models.go b/pkg/types/types.go similarity index 89% rename from proxy/models.go rename to pkg/types/types.go index 3945141..268a871 100644 --- a/proxy/models.go +++ b/pkg/types/types.go @@ -1,4 +1,4 @@ -package proxy +package types type ErrorResponse struct { Message string `json:"message"` diff --git a/proxy/gateway.go b/proxy/gateway.go index 7c5e2af..1080233 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -11,11 +11,13 @@ import ( "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/dispatcher" + "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/queue" "github.com/webhookx-io/webhookx/pkg/queue/redis" "github.com/webhookx-io/webhookx/pkg/schedule" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/pkg/ucontext" + "github.com/webhookx-io/webhookx/proxy/middlewares" "github.com/webhookx-io/webhookx/proxy/router" "github.com/webhookx-io/webhookx/utils" "go.uber.org/zap" @@ -42,16 +44,17 @@ type Gateway struct { dispatcher *dispatcher.Dispatcher - queue queue.Queue + queue queue.Queue + metrics *metrics.Metrics } -func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispatcher) *Gateway { +func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispatcher, metrics *metrics.Metrics) *Gateway { var q queue.Queue switch cfg.Queue.Type { case "redis": q, _ = redis.NewRedisQueue(redis.RedisQueueOptions{ Client: cfg.Queue.Redis.GetClient(), - }, zap.S()) + }, zap.S(), metrics) } gw := &Gateway{ @@ -61,10 +64,14 @@ func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispa db: db, dispatcher: dispatcher, queue: q, + metrics: metrics, } r := mux.NewRouter() - r.Use(panicRecovery) + r.Use(middlewares.PanicRecovery) + if metrics.Enabled { + r.Use(middlewares.NewMetricsMiddleware(metrics).Handle) + } r.PathPrefix("/").HandlerFunc(gw.Handle) gw.s = &http.Server{ @@ -116,7 +123,7 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { http.Error(w, http.StatusText(code), code) return } - utils.JsonResponse(400, w, ErrorResponse{ + utils.JsonResponse(400, w, types.ErrorResponse{ Message: err.Error(), }) return @@ -126,7 +133,7 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { event.IngestedAt = types.Time{Time: time.Now()} event.WorkspaceId = source.WorkspaceId if err := event.Validate(); err != nil { - utils.JsonResponse(400, w, ErrorResponse{ + utils.JsonResponse(400, w, types.ErrorResponse{ Message: "Request Validation", Error: err, }) @@ -139,6 +146,9 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { exit(w, 500, `{"message": "internal error"}`, nil) return } + if gw.metrics.Enabled { + gw.metrics.EventTotalCounter.Add(1) + } if source.Response != nil { exit(w, source.Response.Code, source.Response.Body, headers{"Content-Type": source.Response.ContentType}) @@ -217,7 +227,7 @@ func (gw *Gateway) listenQueue() { case <-gw.ctx.Done(): return default: - ctx := context.Background() + ctx := context.TODO() messages, err := gw.queue.Dequeue(ctx, opts) if err != nil { gw.log.Warnf("[proxy] [queue] failed to dequeue: %v", err) diff --git a/proxy/middlewares/metrics.go b/proxy/middlewares/metrics.go new file mode 100644 index 0000000..4c651b2 --- /dev/null +++ b/proxy/middlewares/metrics.go @@ -0,0 +1,26 @@ +package middlewares + +import ( + "github.com/webhookx-io/webhookx/pkg/metrics" + "net/http" + "time" +) + +type MetricsMiddleware struct { + metrics *metrics.Metrics +} + +func NewMetricsMiddleware(metrics *metrics.Metrics) *MetricsMiddleware { + return &MetricsMiddleware{ + metrics: metrics, + } +} + +func (m *MetricsMiddleware) Handle(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + m.metrics.ProxyRequestCounter.Add(1) + start := time.Now() + next.ServeHTTP(w, r) + m.metrics.ProxyRequestDurationHistogram.Observe(time.Since(start).Seconds()) + }) +} diff --git a/proxy/middleware.go b/proxy/middlewares/recovery.go similarity index 82% rename from proxy/middleware.go rename to proxy/middlewares/recovery.go index 7529f5d..0740ee8 100644 --- a/proxy/middleware.go +++ b/proxy/middlewares/recovery.go @@ -1,16 +1,17 @@ -package proxy +package middlewares import ( "encoding/json" "errors" "fmt" "github.com/webhookx-io/webhookx/db/dao" + "github.com/webhookx-io/webhookx/pkg/types" "go.uber.org/zap" "net/http" "runtime" ) -func panicRecovery(h http.Handler) http.Handler { +func PanicRecovery(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer func() { if e := recover(); e != nil { @@ -25,7 +26,7 @@ func panicRecovery(h http.Handler) http.Handler { if errors.Is(err, dao.ErrConstraintViolation) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(400) - bytes, _ := json.Marshal(ErrorResponse{Message: err.Error()}) + bytes, _ := json.Marshal(types.ErrorResponse{Message: err.Error()}) w.Write(bytes) return } diff --git a/test/README.md b/test/README.md index 60d116f..fa02d09 100644 --- a/test/README.md +++ b/test/README.md @@ -1,14 +1,14 @@ # Integration Tests -1. starts httpbin +1. starts dependencies -``` -docker run -p 9999:80 kennethreitz/httpbin +```shell +make deps ``` 2. runs integration tests -``` +```shell make test-integration ``` diff --git a/test/cache/cache_test.go b/test/cache/cache_test.go index 0aa6d55..9f61dc3 100644 --- a/test/cache/cache_test.go +++ b/test/cache/cache_test.go @@ -18,7 +18,7 @@ var _ = Describe("cache", Ordered, func() { BeforeAll(func() { cfg, err := config.Init() assert.NoError(GinkgoT(), err) - redisCache = cache.NewRedisCache(cfg.RedisConfig.GetClient()) + redisCache = cache.NewRedisCache(cfg.Redis.GetClient()) }) It("sanity", func() { diff --git a/test/docker-compose.yml b/test/docker-compose.yml new file mode 100644 index 0000000..3d2568f --- /dev/null +++ b/test/docker-compose.yml @@ -0,0 +1,34 @@ +services: + postgres: + image: postgres:13 + environment: + POSTGRES_DB: webhookx + POSTGRES_USER: webhookx + POSTGRES_HOST_AUTH_METHOD: trust + healthcheck: + test: [ "CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}" ] + interval: 3s + timeout: 5s + retries: 3 + ports: + - 5432:5432 + + redis: + image: redis:6.2 + command: "--appendonly yes --appendfsync everysec" + ports: + - 6379:6379 + + httpbin: + image: kennethreitz/httpbin + ports: + - 9999:80 + + otel-collector: + image: otel/opentelemetry-collector-contrib + volumes: + - ./otel-collector-config.yml:/etc/otelcol-contrib/config.yaml + - ./output/otel:/tmp/otel + ports: + - 4317:4317 + - 4318:4318 diff --git a/test/helper/helper.go b/test/helper/helper.go index f92f2e4..5e6eda9 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "os" + "path" "regexp" "time" @@ -19,12 +20,20 @@ import ( var cfg *config.Config +var ( + OtelCollectorMetricsFile = "/tmp/otel/metrics.json" +) + func init() { var err error cfg, err = config.Init() if err != nil { panic(err) } + + if v := os.Getenv("WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH"); v != "" { + OtelCollectorMetricsFile = path.Join(v, "metrics.json") + } } var defaultEnvs = map[string]string{ @@ -90,7 +99,7 @@ func DB() *db.DB { if err != nil { return nil } - db, err := db.NewDB(&cfg.DatabaseConfig) + db, err := db.NewDB(&cfg.Database) if err != nil { return nil } @@ -114,7 +123,7 @@ func InitDB(truncated bool, entities *EntitiesConfig) *db.DB { } } - db, err := db.NewDB(&cfg.DatabaseConfig) + db, err := db.NewDB(&cfg.Database) if err != nil { panic(err) } @@ -185,7 +194,7 @@ func ResetDB() error { return err } - migrator := migrator.New(&cfg.DatabaseConfig) + migrator := migrator.New(&cfg.Database) err = migrator.Reset() if err != nil { return err @@ -218,6 +227,21 @@ func FileLine(filename string, n int) (string, error) { return "", nil } +func FileCountLine(filename string) (int, error) { + file, err := os.Open(filename) + if err != nil { + return 0, err + } + defer file.Close() + scanner := bufio.NewScanner(file) + n := 0 + for scanner.Scan() { + scanner.Text() + n++ + } + return n, nil +} + func FileHasLine(filename string, regex string) (bool, error) { file, err := os.Open(filename) if err != nil { diff --git a/test/metrics/opentelemetry_test.go b/test/metrics/opentelemetry_test.go new file mode 100644 index 0000000..246739e --- /dev/null +++ b/test/metrics/opentelemetry_test.go @@ -0,0 +1,174 @@ +package metrics + +import ( + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/test/helper" + "testing" + "time" +) + +var _ = Describe("opentelemetry", Ordered, func() { + endpoints := map[string]string{ + "http/protobuf": "http://localhost:4318/v1/metrics", + "grpc": "localhost:4317", + } + + for _, protocol := range []string{"http/protobuf", "grpc"} { + Context(protocol, func() { + var proxyClient *resty.Client + var app *app.Application + + BeforeAll(func() { + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{helper.DefaultEndpoint(), helper.DefaultEndpoint()}, + Sources: []*entities.Source{helper.DefaultSource()}, + } + entitiesConfig.Endpoints[1].Request.Timeout = 1 + entitiesConfig.Sources[0].Async = true + helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + var err error + app, err = helper.Start(map[string]string{ + "WEBHOOKX_ADMIN_LISTEN": "0.0.0.0:8080", + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + "WEBHOOKX_METRICS_EXPORTS": "opentelemetry", + "WEBHOOKX_METRICS_OPENTELEMETRY_PUSHINTERVAL": "5", + "WEBHOOKX_METRICS_OPENTELEMETRY_PROTOCOL": protocol, + "WEBHOOKX_METRICS_OPENTELEMETRY_ENDPOINT": endpoints[protocol], + }) + assert.Nil(GinkgoT(), err) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + assert.Eventually(GinkgoT(), func() bool { + resp, err := proxyClient.R(). + SetBody(`{ + "event_type": "foo.bar", + "data": { + "key": "value" + } + }`).Post("/") + return err == nil && resp.StatusCode() == 200 + }, time.Second*5, time.Second) + + expected := []string{ + "webhookx.runtime.num_goroutine", + "webhookx.runtime.alloc_bytes", + "webhookx.runtime.sys_bytes", + "webhookx.runtime.mallocs", + "webhookx.runtime.frees", + "webhookx.runtime.heap_objects", + "webhookx.runtime.pause_total_ns", + "webhookx.runtime.num_gc", + + "webhookx.request.total", + "webhookx.request.duration", + + "webhookx.event.total", + "webhookx.event.persisted", + "webhookx.event.pending", + + "webhookx.attempt.total", + "webhookx.attempt.response.duration", + "webhookx.attempt.pending", + "webhookx.attempt.failed", + } + + n, err := helper.FileCountLine(helper.OtelCollectorMetricsFile) + assert.Nil(GinkgoT(), err) + n++ + uploaded := make(map[string]bool) + assert.Eventually(GinkgoT(), func() bool { + line, err := helper.FileLine(helper.OtelCollectorMetricsFile, n) + if err != nil || line == "" { + return false + } + n++ + var req ExportRequest + _ = json.Unmarshal([]byte(line), &req) + for _, resourceMetrics := range req.ResourceMetrics { + for _, scopeMetrics := range resourceMetrics.ScopeMetrics { + for _, metrics := range scopeMetrics.Metrics { + uploaded[metrics.Name] = true + } + } + } + for _, name := range expected { + if !uploaded[name] { + fmt.Println("missing metric: " + name) + return false + } + } + return true + }, time.Second*40, time.Second) + }) + }) + } + + Context("SDK configuration by env", func() { + var app *app.Application + + BeforeAll(func() { + var err error + app, err = helper.Start(map[string]string{ + "WEBHOOKX_METRICS_ATTRIBUTES": `{"env": "prod"}`, + "WEBHOOKX_METRICS_EXPORTS": "opentelemetry", + "WEBHOOKX_METRICS_OPENTELEMETRY_PROTOCOL": "grpc", + "WEBHOOKX_METRICS_OPENTELEMETRY_ENDPOINT": "localhost:4317", + "OTEL_RESOURCE_ATTRIBUTES": "key1=value1,key2=value2", + "WEBHOOKX_METRICS_OPENTELEMETRY_PUSHINTERVAL": "5", + }) + assert.Nil(GinkgoT(), err) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + n, err := helper.FileCountLine(helper.OtelCollectorMetricsFile) + assert.Nil(GinkgoT(), err) + n++ + expected := []string{"service.name", "service.version", "env", "key1", "key2"} + assert.Eventually(GinkgoT(), func() bool { + line, err := helper.FileLine(helper.OtelCollectorMetricsFile, n) + if err != nil || line == "" { + return false + } + n++ + var req ExportRequest + _ = json.Unmarshal([]byte(line), &req) + attributesMap := make(map[string]bool) + for _, resourceMetrics := range req.ResourceMetrics { + for _, attr := range resourceMetrics.Resource.Attributes { + attributesMap[attr.Key] = true + } + } + for _, name := range expected { + if !attributesMap[name] { + fmt.Println("missing attribute: " + name) + return false + } + } + return true + }, time.Second*40, time.Second) + }) + }) +}) + +func TestMetrics(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Metrics opentelemetry Suite") +} diff --git a/test/metrics/types.go b/test/metrics/types.go new file mode 100644 index 0000000..b0234d0 --- /dev/null +++ b/test/metrics/types.go @@ -0,0 +1,36 @@ +package metrics + +import ( + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +type ResourceMetrics struct { + Resource Resource `json:"resource,omitempty"` + ScopeMetrics []ScopeMetrics `json:"scopeMetrics"` +} + +type Resource struct { + Attributes []KeyValue `json:"attributes"` +} + +type KeyValue struct { + Key string `json:"key"` + Value interface{} `json:"value"` +} + +type ScopeMetrics struct { + Scope instrumentation.Scope `json:"scope"` + Metrics []Metrics `json:"metrics"` +} + +type Metrics struct { + Name string `json:"name"` + Description string `json:"description"` + Unit string `json:"unit"` + Data metricdata.Aggregation `json:"data"` +} + +type ExportRequest struct { + ResourceMetrics []ResourceMetrics `json:"resourceMetrics"` +} diff --git a/test/otel-collector-config.yml b/test/otel-collector-config.yml new file mode 100644 index 0000000..03df3bd --- /dev/null +++ b/test/otel-collector-config.yml @@ -0,0 +1,38 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + +exporters: + debug: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 200 + file/metrics: + path: /tmp/otel/metrics.json + file/traces: + path: /tmp/otel/traces.json + +extensions: + health_check: + pprof: + zpages: + endpoint: "0.0.0.0:55679" + +service: + extensions: [pprof, zpages, health_check] + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug, file/metrics] + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug, file/traces] diff --git a/test/test.go b/test/test.go index c2f4efb..0bf7424 100644 --- a/test/test.go +++ b/test/test.go @@ -22,7 +22,7 @@ func (s *BasicSuite) ResetDatabase() error { return err } - migrator := migrator.New(&cfg.DatabaseConfig) + migrator := migrator.New(&cfg.Database) err = migrator.Reset() if err != nil { return err diff --git a/test/worker/requeue_test.go b/test/worker/requeue_test.go index 20b2d03..afb29da 100644 --- a/test/worker/requeue_test.go +++ b/test/worker/requeue_test.go @@ -9,6 +9,7 @@ import ( "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/mocks" "github.com/webhookx-io/webhookx/utils" @@ -37,9 +38,11 @@ var _ = Describe("processRequeue", Ordered, func() { queue.EXPECT().Delete(gomock.Any(), gomock.Any()).AnyTimes() queue.EXPECT().Add(gomock.Any(), gomock.Any()).Times(10) + metrics, err := metrics.New(config.MetricsConfig{}) + assert.NoError(GinkgoT(), err) w = worker.NewWorker(worker.WorkerOptions{ RequeueJobInterval: time.Second, - }, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue) + }, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics) // data ws := utils.Must(db.Workspaces.GetDefault(context.TODO())) diff --git a/worker/worker.go b/worker/worker.go index 40369ba..f9826ae 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/mcache" + "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/plugin" plugintypes "github.com/webhookx-io/webhookx/pkg/plugin/types" "github.com/webhookx-io/webhookx/pkg/pool" @@ -35,6 +36,7 @@ type Worker struct { deliverer deliverer.Deliverer DB *db.DB pool *pool.Pool + metrics *metrics.Metrics } type WorkerOptions struct { @@ -44,18 +46,29 @@ type WorkerOptions struct { PoolConcurrency int } -func NewWorker(opts WorkerOptions, db *db.DB, deliverer deliverer.Deliverer, queue taskqueue.TaskQueue) *Worker { +func NewWorker( + opts WorkerOptions, + db *db.DB, + deliverer deliverer.Deliverer, + queue taskqueue.TaskQueue, + metrics *metrics.Metrics) *Worker { + opts.RequeueJobBatch = utils.DefaultIfZero(opts.RequeueJobBatch, constants.RequeueBatch) opts.RequeueJobInterval = utils.DefaultIfZero(opts.RequeueJobInterval, constants.RequeueInterval) opts.PoolSize = utils.DefaultIfZero(opts.PoolSize, 10000) opts.PoolConcurrency = utils.DefaultIfZero(opts.PoolConcurrency, runtime.NumCPU()*100) + + ctx, cancel := context.WithCancel(context.Background()) worker := &Worker{ + ctx: ctx, + cancel: cancel, opts: opts, queue: queue, log: zap.S(), deliverer: deliverer, DB: db, pool: pool.NewPool(opts.PoolSize, opts.PoolConcurrency), + metrics: metrics, } return worker @@ -125,7 +138,6 @@ func (w *Worker) run() { func (w *Worker) Start() error { go w.run() - w.ctx, w.cancel = context.WithCancel(context.Background()) schedule.Schedule(w.ctx, w.processRequeue, w.opts.RequeueJobInterval) w.log.Infof("[worker] created pool(size=%d, concurrency=%d)", w.opts.PoolSize, w.opts.PoolConcurrency) w.log.Info("[worker] started") @@ -268,6 +280,14 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er result.AttemptedAt = types.NewTime(startAt) result.Exhausted = data.Attempt >= len(endpoint.Retry.Config.Attempts) + if w.metrics.Enabled { + w.metrics.AttemptTotalCounter.Add(1) + if result.Status == entities.AttemptStatusFailure { + w.metrics.AttemptFailedCounter.Add(1) + } + w.metrics.AttemptResponseDurationHistogram.Observe(response.Latancy.Seconds()) + } + err = w.DB.Attempts.UpdateDelivery(ctx, task.ID, result) if err != nil { return err