From a10544cd4e5ea60e163f730e283bee2ba09fe0e6 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Tue, 19 Nov 2024 17:34:44 +0800 Subject: [PATCH 01/11] feat(config): add database parameters, max_pool_size, and max_lifetime configuration (#57) --- app/app.go | 4 ++++ config.yml | 4 ++++ config/database.go | 19 +++++++++++++------ db/db.go | 8 ++++---- test/cfg/cfg_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 67 insertions(+), 10 deletions(-) create mode 100644 test/cfg/cfg_test.go diff --git a/app/app.go b/app/app.go index 243b29e..0fc1e12 100644 --- a/app/app.go +++ b/app/app.go @@ -156,6 +156,10 @@ func (app *Application) NodeID() string { return config.NODE } +func (app *Application) Config() *config.Config { + return app.cfg +} + // Start starts application func (app *Application) Start() error { app.mux.Lock() diff --git a/config.yml b/config.yml index 9c821bf..e9a4bc8 100644 --- a/config.yml +++ b/config.yml @@ -13,6 +13,10 @@ database: username: webhookx password: database: webhookx + parameters: 'application_name=webhookx&sslmode=disable&connect_timeout=10' # The connection uri parameters. + # See https://www.postgresql.org/docs/current/libpq-connect.html + max_pool_size: 40 # The maximum number of connections + max_lifetime: 1800 # The maximum lifetime (in seconds) of a connection redis: host: localhost diff --git a/config/database.go b/config/database.go index 02752bb..25105bb 100644 --- a/config/database.go +++ b/config/database.go @@ -5,21 +5,28 @@ import ( ) type DatabaseConfig struct { - Host string `yaml:"host" default:"localhost"` - Port uint32 `yaml:"port" default:"5432"` - Username string `yaml:"username" default:"webhookx"` - Password string `yaml:"password" default:""` - Database string `yaml:"database" default:"webhookx"` + Host string `yaml:"host" default:"localhost"` + Port uint32 `yaml:"port" default:"5432"` + Username string `yaml:"username" default:"webhookx"` + Password string `yaml:"password" default:""` + Database string `yaml:"database" default:"webhookx"` + Parameters string `yaml:"parameters" default:"application_name=webhookx&sslmode=disable&connect_timeout=10"` + MaxPoolSize uint32 `yaml:"max_pool_size" default:"40" envconfig:"MAX_POOL_SIZE"` + MaxLifetime uint32 `yaml:"max_life_time" default:"1800" envconfig:"MAX_LIFETIME"` } func (cfg DatabaseConfig) GetDSN() string { - return fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable", + dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s", cfg.Username, cfg.Password, cfg.Host, cfg.Port, cfg.Database, ) + if len(cfg.Parameters) > 0 { + dsn = fmt.Sprintf("%s?%s", dsn, cfg.Parameters) + } + return dsn } func (cfg DatabaseConfig) Validate() error { diff --git a/db/db.go b/db/db.go index 8ed9568..01c8297 100644 --- a/db/db.go +++ b/db/db.go @@ -10,6 +10,7 @@ import ( "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/transaction" "go.uber.org/zap" + "time" ) type DB struct { @@ -33,10 +34,9 @@ type DB struct { func initSqlxDB(cfg *config.DatabaseConfig) (*sqlx.DB, error) { db, err := sql.Open("postgres", cfg.GetDSN()) - // db.SetMaxOpenConns(100) - // db.SetMaxIdleConns(100) - // db.SetConnMaxLifetime(time.Hour) - // db.SetConnMaxIdleTime(time.Hour) + db.SetMaxOpenConns(int(cfg.MaxPoolSize)) + db.SetMaxIdleConns(int(cfg.MaxPoolSize)) + db.SetConnMaxLifetime(time.Second * time.Duration(cfg.MaxLifetime)) if err != nil { return nil, err } diff --git a/test/cfg/cfg_test.go b/test/cfg/cfg_test.go new file mode 100644 index 0000000..c79f92e --- /dev/null +++ b/test/cfg/cfg_test.go @@ -0,0 +1,42 @@ +package cfg + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/utils" + "strings" + "testing" +) + +var _ = Describe("Configuration", Ordered, func() { + + var app *app.Application + + BeforeAll(func() { + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_DATABASE_MAX_POOL_SIZE": "0", + "WEBHOOKX_DATABASE_MAX_LIFETIME": "3600", + "WEBHOOKX_DATABASE_PARAMETERS": "application_name=foo&sslmode=disable&connect_timeout=30", + })) + }) + + AfterAll(func() { + app.Stop() + }) + + It("database configuration", func() { + assert.EqualValues(GinkgoT(), 0, app.Config().Database.MaxPoolSize) + assert.EqualValues(GinkgoT(), 3600, app.Config().Database.MaxLifetime) + assert.Equal(GinkgoT(), "application_name=foo&sslmode=disable&connect_timeout=30", app.Config().Database.Parameters) + assert.True(GinkgoT(), strings.HasSuffix(app.Config().Database.GetDSN(), app.Config().Database.Parameters)) + }) + +}) + +func TestConfiguration(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Configuration Suite") +} From b6101999e673fdb2c6209c82d654d9f855c09794 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Tue, 19 Nov 2024 23:54:53 +0800 Subject: [PATCH 02/11] chore(deps): bump opentelemetry from 1.31.0 to 1.32.0 (#59) --- go.mod | 24 ++++++++++++------------ go.sum | 48 ++++++++++++++++++++++++------------------------ 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 7d31df3..f7e69cb 100644 --- a/go.mod +++ b/go.mod @@ -21,12 +21,12 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/otel v1.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 - go.opentelemetry.io/otel/metric v1.31.0 - go.opentelemetry.io/otel/sdk v1.31.0 - go.opentelemetry.io/otel/sdk/metric v1.31.0 + go.opentelemetry.io/otel v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0 + go.opentelemetry.io/otel/metric v1.32.0 + go.opentelemetry.io/otel/sdk v1.32.0 + go.opentelemetry.io/otel/sdk/metric v1.32.0 go.uber.org/mock v0.4.0 go.uber.org/zap v1.27.0 gopkg.in/yaml.v3 v3.0.1 @@ -46,7 +46,7 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -56,17 +56,17 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/spf13/pflag v1.0.5 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect - go.opentelemetry.io/otel/trace v1.31.0 // indirect + go.opentelemetry.io/otel/trace v1.32.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/net v0.30.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect golang.org/x/tools v0.25.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index 8594817..2c71c28 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -135,20 +135,20 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6ei8GFW7kyPYdxJaV2rgI6M+4tvZzhYsQ2wgyVC08= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= -go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 h1:j7ZSD+5yn+lo3sGV69nW04rRR0jhYnBwjuX3r0HvnK0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0/go.mod h1:WXbYJTUaZXAbYd8lbgGuvih0yuCfOFC5RJoYnoLcGz8= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0 h1:t/Qur3vKSkUCcDVaSumWF2PKHt85pc7fRvFuoVT8qFU= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0/go.mod h1:Rl61tySSdcOJWoEgYZVtmnKdA0GeKrSqkHC1t+91CH8= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -165,18 +165,18 @@ golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 h1:T6rh4haD3GVYsgEfWExoCZA2o2FmbNyKpTuAxbEFPTg= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:wp2WsuBYj6j8wUdo3ToZsdxxixbvQNAHqVJrTgi5E5M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= From 939fb9bcc3062bee702b245c904aa94d0f70c139 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Wed, 20 Nov 2024 16:01:45 +0800 Subject: [PATCH 03/11] release 0.3.0 (#58) --- docker-compose.yml | 4 ++-- openapi.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index e820ec9..42f9414 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: webhookx-migration: - image: "webhookx/webhookx:0.2.1" + image: "webhookx/webhookx:0.3.0" container_name: webhookx-migration environment: WEBHOOKX_DATABASE_HOST: webhookx-database @@ -13,7 +13,7 @@ services: condition: service_healthy webhookx: - image: "webhookx/webhookx:0.2.1" + image: "webhookx/webhookx:0.3.0" container_name: webhookx environment: WEBHOOKX_DATABASE_HOST: webhookx-database diff --git a/openapi.yml b/openapi.yml index 5ec2f72..ce92361 100644 --- a/openapi.yml +++ b/openapi.yml @@ -8,7 +8,7 @@ info: license: name: Apache 2.0 url: https://www.apache.org/licenses/LICENSE-2.0.html - version: 0.2.1 + version: 0.3.0 servers: - url: http://localhost:8080 From 4730008749d74875d9810cde088751100a099833 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Wed, 20 Nov 2024 16:16:14 +0800 Subject: [PATCH 04/11] chore: remove go generate (#60) --- .goreleaser.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index edae829..f92708a 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -3,7 +3,6 @@ version: 2 before: hooks: - go mod tidy - - go generate ./... builds: - env: From 540aed4f3152cb50c3103d93bbbd44c43af21d91 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Wed, 20 Nov 2024 17:40:52 +0800 Subject: [PATCH 05/11] fix: add missing envs (#61) --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 42f9414..e6925d6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,6 +25,8 @@ services: WEBHOOKX_ADMIN_LISTEN: 0.0.0.0:8080 WEBHOOKX_WORKER_ENABLED: true WEBHOOKX_PROXY_LISTEN: 0.0.0.0:8081 + WEBHOOKX_PROXY_QUEUE_REDIS_HOST: redis + WEBHOOKX_PROXY_QUEUE_REDIS_PORT: 6379 ports: - "8080:8080" - "8081:8081" From 5474323705c8cb956d3f3d870a810ece488936e9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:41:15 +0800 Subject: [PATCH 06/11] chore(deps): bump github.com/onsi/ginkgo/v2 from 2.20.2 to 2.21.0 (#55) Bumps [github.com/onsi/ginkgo/v2](https://github.com/onsi/ginkgo) from 2.20.2 to 2.21.0. - [Release notes](https://github.com/onsi/ginkgo/releases) - [Changelog](https://github.com/onsi/ginkgo/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/ginkgo/compare/v2.20.2...v2.21.0) --- updated-dependencies: - dependency-name: github.com/onsi/ginkgo/v2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 6 +++--- go.sum | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index f7e69cb..7756d76 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 - github.com/onsi/ginkgo/v2 v2.20.2 + github.com/onsi/ginkgo/v2 v2.21.0 github.com/onsi/gomega v1.34.2 github.com/pkg/errors v0.9.1 github.com/redis/go-redis/v9 v9.6.2 @@ -44,7 +44,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect + github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -64,7 +64,7 @@ require ( golang.org/x/net v0.30.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/text v0.20.0 // indirect - golang.org/x/tools v0.25.0 // indirect + golang.org/x/tools v0.26.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/grpc v1.67.1 // indirect diff --git a/go.sum b/go.sum index 2c71c28..f0cba8d 100644 --- a/go.sum +++ b/go.sum @@ -65,8 +65,8 @@ github.com/golang-migrate/migrate/v4 v4.18.1 h1:JML/k+t4tpHCpQTCAD62Nu43NUFzHY4C github.com/golang-migrate/migrate/v4 v4.18.1/go.mod h1:HAX6m3sQgcdO81tdjn5exv20+3Kb13cmGli1hrD6hks= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ= -github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= @@ -104,8 +104,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= -github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= -github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= +github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= +github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -171,8 +171,8 @@ golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= -golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= From cbcadbaccbeb8a06a71ad08e47655a7050053ecf Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:41:41 +0800 Subject: [PATCH 07/11] chore(deps): bump go.uber.org/mock from 0.4.0 to 0.5.0 (#53) Bumps [go.uber.org/mock](https://github.com/uber/mock) from 0.4.0 to 0.5.0. - [Release notes](https://github.com/uber/mock/releases) - [Changelog](https://github.com/uber-go/mock/blob/main/CHANGELOG.md) - [Commits](https://github.com/uber/mock/compare/v0.4.0...v0.5.0) --- updated-dependencies: - dependency-name: go.uber.org/mock dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 7756d76..daeb3c2 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( go.opentelemetry.io/otel/metric v1.32.0 go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/sdk/metric v1.32.0 - go.uber.org/mock v0.4.0 + go.uber.org/mock v0.5.0 go.uber.org/zap v1.27.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index f0cba8d..ce138d5 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,8 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= -go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= From dfd7d314a3cf990225ef0e1b27f8a072919f0143 Mon Sep 17 00:00:00 2001 From: cuisongliu Date: Wed, 20 Nov 2024 17:42:20 +0800 Subject: [PATCH 08/11] chore: upgrade go version to 1.23 (#50) --- Dockerfile | 2 +- go.mod | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index f2f833d..bcc2e16 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.22 AS build-env +FROM golang:1.23 AS build-env WORKDIR /go/src/webhookx-io/webhookx diff --git a/go.mod b/go.mod index daeb3c2..27c1b40 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/webhookx-io/webhookx -go 1.22.0 +go 1.23 require ( github.com/Masterminds/squirrel v1.5.4 From d1eb94278c8eb367b466615a36dd403be1086daa Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:45:46 +0800 Subject: [PATCH 09/11] chore(deps): bump github.com/redis/go-redis/v9 from 9.6.2 to 9.7.0 (#52) Bumps [github.com/redis/go-redis/v9](https://github.com/redis/go-redis) from 9.6.2 to 9.7.0. - [Release notes](https://github.com/redis/go-redis/releases) - [Changelog](https://github.com/redis/go-redis/blob/master/CHANGELOG.md) - [Commits](https://github.com/redis/go-redis/compare/v9.6.2...v9.7.0) --- updated-dependencies: - dependency-name: github.com/redis/go-redis/v9 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 27c1b40..c135075 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/onsi/ginkgo/v2 v2.21.0 github.com/onsi/gomega v1.34.2 github.com/pkg/errors v0.9.1 - github.com/redis/go-redis/v9 v9.6.2 + github.com/redis/go-redis/v9 v9.7.0 github.com/satori/go.uuid v1.2.0 github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.8.1 diff --git a/go.sum b/go.sum index ce138d5..baed6ef 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= -github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= From 9f551610edd8c4aa31f548dff85ef9f741e0af7f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:46:04 +0800 Subject: [PATCH 10/11] chore(deps): bump github.com/onsi/gomega from 1.34.2 to 1.35.1 (#54) Bumps [github.com/onsi/gomega](https://github.com/onsi/gomega) from 1.34.2 to 1.35.1. - [Release notes](https://github.com/onsi/gomega/releases) - [Changelog](https://github.com/onsi/gomega/blob/master/CHANGELOG.md) - [Commits](https://github.com/onsi/gomega/compare/v1.34.2...v1.35.1) --- updated-dependencies: - dependency-name: github.com/onsi/gomega dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c135075..5af91fe 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 github.com/onsi/ginkgo/v2 v2.21.0 - github.com/onsi/gomega v1.34.2 + github.com/onsi/gomega v1.35.1 github.com/pkg/errors v0.9.1 github.com/redis/go-redis/v9 v9.7.0 github.com/satori/go.uuid v1.2.0 diff --git a/go.sum b/go.sum index baed6ef..343e6b2 100644 --- a/go.sum +++ b/go.sum @@ -106,8 +106,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= -github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= -github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= +github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= +github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= From c45a38e5d823df00dc5dc13dd11a78af137b993d Mon Sep 17 00:00:00 2001 From: cchenggit <654031023@qq.com> Date: Fri, 29 Nov 2024 01:22:52 +0800 Subject: [PATCH 11/11] feat(observability): support otel tracing --- .github/workflows/test.yml | 2 +- Makefile | 7 +- README.md | 12 +- admin/admin.go | 1 - admin/api/api.go | 10 +- admin/api/middleware.go | 5 +- app/app.go | 20 ++- config.yml | 13 ++ config/config.go | 13 +- config/config_test.go | 39 +++++ config/database.go | 50 +++++- config/redis.go | 1 + config/tracing.go | 23 +++ db/dao/attempt_dao.go | 10 +- db/dao/attempt_detail_dao.go | 13 +- db/dao/dao.go | 57 ++++++- db/db.go | 24 ++- dispatcher/dispatcher.go | 11 +- go.mod | 19 ++- go.sum | 18 ++ pkg/queue/redis/redis.go | 19 +++ pkg/taskqueue/redis.go | 21 ++- pkg/tracing/opentelemetry.go | 122 +++++++++++++ pkg/tracing/tracing.go | 105 ++++++++++++ proxy/gateway.go | 39 ++++- proxy/middlewares/recovery.go | 9 + test/admin/endpoints_test.go | 1 + test/cmd/version_test.go | 5 +- test/helper/helper.go | 78 +++++++-- test/metrics/opentelemetry_test.go | 2 + test/otel-collector-config.yml | 1 + test/tracing/admin_test.go | 188 ++++++++++++++++++++ test/tracing/ginkgo_test.go | 12 ++ test/tracing/proxy_test.go | 266 +++++++++++++++++++++++++++++ test/tracing/types.go | 103 +++++++++++ test/tracing/worker_test.go | 168 ++++++++++++++++++ test/worker/requeue_test.go | 4 +- worker/deliverer/deliverer.go | 3 +- worker/deliverer/http.go | 12 +- worker/deliverer/http_test.go | 9 +- worker/worker.go | 38 +++-- 41 files changed, 1454 insertions(+), 99 deletions(-) create mode 100644 config/tracing.go create mode 100644 pkg/tracing/opentelemetry.go create mode 100644 pkg/tracing/tracing.go create mode 100644 test/tracing/admin_test.go create mode 100644 test/tracing/ginkgo_test.go create mode 100644 test/tracing/proxy_test.go create mode 100644 test/tracing/types.go create mode 100644 test/tracing/worker_test.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f273869..843f287 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -51,7 +51,7 @@ jobs: run: | mkdir -p test/output/otel sudo chmod 777 -R test/output/otel - make deps + make test-deps sleep 3 docker compose -f test/docker-compose.yml logs diff --git a/Makefile b/Makefile index e2c6e60..c7ff581 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,4 @@ DIR := $(shell pwd) - export WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH=$(DIR)/test/output/otel LDFLAGS = --ldflags "\ @@ -7,7 +6,7 @@ LDFLAGS = --ldflags "\ -X github.com/webhookx-io/webhookx/config.VERSION=`git tag -l --points-at HEAD | head -n 1`" .PHONY: clean build install generate test test-coverage test-integration \ - test-integration-coverage goreleaser migrate-create deps + test-integration-coverage goreleaser migrate-create test-deps clean: go clean @@ -22,7 +21,7 @@ install: generate: go generate ./... -deps: +test-deps: mkdir -p test/output/otel docker compose -f test/docker-compose.yml up -d @@ -42,4 +41,4 @@ goreleaser: goreleaser release --snapshot --clean migrate-create: - migrate create -ext sql -dir db/migrations -seq -digits 1 $(message) + migrate create -ext sql -dir db/migrations -seq -digits 1 $(message) \ No newline at end of file diff --git a/README.md b/README.md index c52c25c..885262a 100644 --- a/README.md +++ b/README.md @@ -33,11 +33,11 @@ WebhookX is an open-source webhooks gateway for message receiving, processing, a ## Installation ```shell -$ docker compose up +docker compose up ``` ```shell -$ curl http://localhost:8080 +curl http://localhost:8080 ``` @@ -48,7 +48,7 @@ $ curl http://localhost:8080 > **Endpoint** represents the event's destination. ``` -$ curl -X POST http://localhost:8080/workspaces/default/endpoints \ +curl -X POST http://localhost:8080/workspaces/default/endpoints \ --header 'Content-Type: application/json' \ --data '{ "request": { @@ -69,7 +69,7 @@ $ curl -X POST http://localhost:8080/workspaces/default/endpoints \ > **Source** represents the ingress of events ``` -$ curl -X POST http://localhost:8080/workspaces/default/sources \ +curl -X POST http://localhost:8080/workspaces/default/sources \ --header 'accept: application/json' \ --header 'Content-Type: application/json' \ --data '{ @@ -81,7 +81,7 @@ $ curl -X POST http://localhost:8080/workspaces/default/sources \ #### 3. Send an event to the Proxy (port 8081) ``` -$ curl -X POST http://localhost:8081 \ +curl -X POST http://localhost:8081 \ --header 'Content-Type: application/json' \ --data '{ "event_type": "charge.succeeded", @@ -96,7 +96,7 @@ $ curl -X POST http://localhost:8081 \ > Attempt represents an event delivery attempt, and contains inspection information of a delivery. ``` -$ curl http://localhost:8080/workspaces/default/attempts +curl http://localhost:8080/workspaces/default/attempts ```
diff --git a/admin/admin.go b/admin/admin.go index 183239a..7440d50 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -21,7 +21,6 @@ func NewAdmin(cfg config.AdminConfig, handler http.Handler) *Admin { WriteTimeout: 60 * time.Second, ReadTimeout: 60 * time.Second, - // TODO: expose more to be configurable } diff --git a/admin/api/api.go b/admin/api/api.go index fa3d7de..49de1b6 100644 --- a/admin/api/api.go +++ b/admin/api/api.go @@ -8,6 +8,8 @@ import ( "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/dispatcher" "github.com/webhookx-io/webhookx/pkg/errs" + "github.com/webhookx-io/webhookx/pkg/tracing" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/zap" "net/http" "strconv" @@ -22,14 +24,16 @@ type API struct { log *zap.SugaredLogger DB *db.DB dispatcher *dispatcher.Dispatcher + tracer *tracing.Tracer } -func NewAPI(cfg *config.Config, db *db.DB, dispatcher *dispatcher.Dispatcher) *API { +func NewAPI(cfg *config.Config, db *db.DB, dispatcher *dispatcher.Dispatcher, tracer *tracing.Tracer) *API { return &API{ cfg: cfg, log: zap.S(), DB: db, dispatcher: dispatcher, + tracer: tracer, } } @@ -94,7 +98,9 @@ func (api *API) assert(err error) { // Handler returns a http.Handler func (api *API) Handler() http.Handler { r := mux.NewRouter() - + if api.tracer != nil { + r.Use(otelhttp.NewMiddleware("api.admin")) + } r.Use(panicRecovery) r.Use(api.contextMiddleware) diff --git a/admin/api/middleware.go b/admin/api/middleware.go index 781b6c6..aabc987 100644 --- a/admin/api/middleware.go +++ b/admin/api/middleware.go @@ -4,13 +4,14 @@ import ( "encoding/json" "errors" "fmt" + "net/http" + "runtime" + "github.com/gorilla/mux" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/errs" "github.com/webhookx-io/webhookx/pkg/ucontext" "go.uber.org/zap" - "net/http" - "runtime" ) func (api *API) contextMiddleware(next http.Handler) http.Handler { diff --git a/app/app.go b/app/app.go index 0fc1e12..5880d24 100644 --- a/app/app.go +++ b/app/app.go @@ -15,6 +15,7 @@ import ( "github.com/webhookx-io/webhookx/pkg/log" "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/taskqueue" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/proxy" "github.com/webhookx-io/webhookx/worker" "github.com/webhookx-io/webhookx/worker/deliverer" @@ -47,6 +48,7 @@ type Application struct { admin *admin.Admin gateway *proxy.Gateway worker *worker.Worker + tracer *tracing.Tracer } func NewApplication(cfg *config.Config) (*Application, error) { @@ -89,6 +91,14 @@ func (app *Application) initialize() error { app.log) registerEventHandler(app.bus) + // tracing + tracer, err := tracing.New(&cfg.Tracing) + if err != nil { + return err + } + + app.tracer = tracer + // db db, err := db.NewDB(&cfg.Database) if err != nil { @@ -116,18 +126,18 @@ func (app *Application) initialize() error { PoolConcurrency: int(cfg.Worker.Pool.Concurrency), } deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer) - app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics) + app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer) } // admin if cfg.Admin.IsEnabled() { - handler := api.NewAPI(cfg, db, app.dispatcher).Handler() + handler := api.NewAPI(cfg, db, app.dispatcher, app.tracer).Handler() app.admin = admin.NewAdmin(cfg.Admin, handler) } // gateway if cfg.Proxy.IsEnabled() { - app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics) + app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics, app.tracer) } return nil @@ -221,6 +231,10 @@ func (app *Application) Stop() error { _ = app.worker.Stop() } + if app.tracer != nil { + _ = app.tracer.Stop() + } + app.started = false app.stop <- struct{}{} diff --git a/config.yml b/config.yml index e9a4bc8..b24bfc9 100644 --- a/config.yml +++ b/config.yml @@ -77,3 +77,16 @@ metrics: push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector protocol: http/protobuf # supported value are http/protobuf, grpc endpoint: http://localhost:4318/v1/metrics # http/protobuf(http://localhost:4318/v1/metrics), grpc(localhost:4317) + +#------------------------------------------------------------------------------ +# TRACING +#------------------------------------------------------------------------------ +tracing: + enabled: false + service_name: WebhookX + attributes: # global attributes for each trace + env: prod + sampling_rate: 1.0 + opentelemetry: + protocol: http/protobuf # supported value are http/protobuf, grpc + endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317) \ No newline at end of file diff --git a/config/config.go b/config/config.go index 7d12d97..41c51e6 100644 --- a/config/config.go +++ b/config/config.go @@ -25,6 +25,7 @@ type Config struct { Proxy ProxyConfig `yaml:"proxy" envconfig:"PROXY"` Worker WorkerConfig `yaml:"worker" envconfig:"WORKER"` Metrics MetricsConfig `yaml:"metrics" envconfig:"METRICS"` + Tracing TracingConfig `yaml:"tracing" envconfig:"TRACING"` } func (cfg Config) String() string { @@ -58,6 +59,10 @@ func (cfg Config) Validate() error { return err } + if err := cfg.Tracing.Validate(); err != nil { + return err + } + return nil } @@ -70,7 +75,7 @@ func Init() (*Config, error) { if err != nil { return nil, err } - + cfg.injectTracingEnabled() return &cfg, nil } @@ -94,6 +99,10 @@ func InitWithFile(filename string) (*Config, error) { if err != nil { return nil, err } - + cfg.injectTracingEnabled() return &cfg, nil } + +func (cfg *Config) injectTracingEnabled() { + cfg.Database.SetTracingEnabled(cfg.Tracing.Enabled) +} diff --git a/config/config_test.go b/config/config_test.go index 73b3c13..1b02ff9 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -225,6 +225,45 @@ func TestMetricsConfig(t *testing.T) { } } +func TestTracingConfig(t *testing.T) { + tests := []struct { + desc string + cfg TracingConfig + expectedValidateErr error + }{ + { + desc: "sanity", + cfg: TracingConfig{ + Enabled: true, + ServiceName: "WebhookX", + SamplingRate: 0, + Opentelemetry: Opentelemetry{ + Protocol: "http/protobuf", + Endpoint: "http://localhost:4318/v1/traces", + }, + }, + expectedValidateErr: nil, + }, + { + desc: "invalid sampling rate", + cfg: TracingConfig{ + Enabled: true, + ServiceName: "WebhookX", + SamplingRate: 1.1, + Opentelemetry: Opentelemetry{ + Protocol: "http/protobuf", + Endpoint: "http://localhost:4318/v1/traces", + }, + }, + expectedValidateErr: errors.New("invalid sampling rate, must be [0,1]"), + }, + } + for _, test := range tests { + actualValidateErr := test.cfg.Validate() + assert.Equal(t, test.expectedValidateErr, actualValidateErr, "expected %v got %v", test.expectedValidateErr, actualValidateErr) + } +} + func TestConfig(t *testing.T) { cfg, err := Init() assert.Nil(t, err) diff --git a/config/database.go b/config/database.go index 25105bb..8f0f958 100644 --- a/config/database.go +++ b/config/database.go @@ -1,18 +1,22 @@ package config import ( + "database/sql" "fmt" + "github.com/XSAM/otelsql" + "time" ) type DatabaseConfig struct { - Host string `yaml:"host" default:"localhost"` - Port uint32 `yaml:"port" default:"5432"` - Username string `yaml:"username" default:"webhookx"` - Password string `yaml:"password" default:""` - Database string `yaml:"database" default:"webhookx"` - Parameters string `yaml:"parameters" default:"application_name=webhookx&sslmode=disable&connect_timeout=10"` - MaxPoolSize uint32 `yaml:"max_pool_size" default:"40" envconfig:"MAX_POOL_SIZE"` - MaxLifetime uint32 `yaml:"max_life_time" default:"1800" envconfig:"MAX_LIFETIME"` + Host string `yaml:"host" default:"localhost"` + Port uint32 `yaml:"port" default:"5432"` + Username string `yaml:"username" default:"webhookx"` + Password string `yaml:"password" default:""` + Database string `yaml:"database" default:"webhookx"` + Parameters string `yaml:"parameters" default:"application_name=webhookx&sslmode=disable&connect_timeout=10"` + MaxPoolSize uint32 `yaml:"max_pool_size" default:"40" envconfig:"MAX_POOL_SIZE"` + MaxLifetime uint32 `yaml:"max_life_time" default:"1800" envconfig:"MAX_LIFETIME"` + tracingEnabled bool } func (cfg DatabaseConfig) GetDSN() string { @@ -35,3 +39,33 @@ func (cfg DatabaseConfig) Validate() error { } return nil } + +func (cfg *DatabaseConfig) SetTracingEnabled(enabled bool) { + cfg.tracingEnabled = enabled +} + +func (cfg *DatabaseConfig) InitSqlDB() (*sql.DB, error) { + var driverName = "postgres" + var err error + if cfg.tracingEnabled { + driverName, err = otelsql.Register(driverName, + otelsql.WithSpanOptions(otelsql.SpanOptions{ + OmitConnResetSession: true, + OmitConnPrepare: true, + OmitConnectorConnect: true, + OmitRows: true, + })) + if err != nil { + return nil, err + } + } + + db, err := sql.Open(driverName, cfg.GetDSN()) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(int(cfg.MaxPoolSize)) + db.SetMaxIdleConns(int(cfg.MaxPoolSize)) + db.SetConnMaxLifetime(time.Second * time.Duration(cfg.MaxLifetime)) + return db, nil +} diff --git a/config/redis.go b/config/redis.go index 610b68c..60cf808 100644 --- a/config/redis.go +++ b/config/redis.go @@ -10,6 +10,7 @@ type RedisConfig struct { Port uint32 `yaml:"port" default:"6379"` Password string `yaml:"password" default:""` Database uint32 `yaml:"database" default:"0"` + // fixme: pool property } diff --git a/config/tracing.go b/config/tracing.go new file mode 100644 index 0000000..908f7e6 --- /dev/null +++ b/config/tracing.go @@ -0,0 +1,23 @@ +package config + +import ( + "errors" +) + +type TracingConfig struct { + Enabled bool `yaml:"enabled" default:"false"` + Attributes Map `yaml:"attributes"` + Opentelemetry Opentelemetry `yaml:"opentelemetry"` + ServiceName string `yaml:"service_name" default:"WebhookX" envconfig:"SERVICE_NAME"` + SamplingRate float64 `yaml:"sampling_rate" default:"1.0" envconfig:"SAMPLING_RATE"` +} + +func (cfg TracingConfig) Validate() error { + if !cfg.Enabled { + return nil + } + if cfg.SamplingRate > 1 || cfg.SamplingRate < 0 { + return errors.New("invalid sampling rate, must be [0,1]") + } + return nil +} diff --git a/db/dao/attempt_dao.go b/db/dao/attempt_dao.go index 26ff796..755ce13 100644 --- a/db/dao/attempt_dao.go +++ b/db/dao/attempt_dao.go @@ -2,11 +2,14 @@ package dao import ( "context" - sq "github.com/Masterminds/squirrel" + "fmt" "github.com/jmoiron/sqlx" + sq "github.com/Masterminds/squirrel" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/pkg/types" + "go.opentelemetry.io/otel/trace" ) type attemptDao struct { @@ -61,6 +64,11 @@ func (dao *attemptDao) UpdateStatusBatch(ctx context.Context, status entities.At } func (dao *attemptDao) UpdateErrorCode(ctx context.Context, id string, status entities.AttemptStatus, code entities.AttemptErrorCode) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.updateErrorCode", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } _, err := dao.update(ctx, id, map[string]interface{}{ "status": status, "error_code": code, diff --git a/db/dao/attempt_detail_dao.go b/db/dao/attempt_detail_dao.go index 23b92d5..7ca0b1d 100644 --- a/db/dao/attempt_detail_dao.go +++ b/db/dao/attempt_detail_dao.go @@ -2,11 +2,13 @@ package dao import ( "context" - "github.com/webhookx-io/webhookx/constants" - "time" - + "fmt" "github.com/jmoiron/sqlx" + "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/pkg/tracing" + "go.opentelemetry.io/otel/trace" + "time" ) type attemptDetailDao struct { @@ -27,6 +29,11 @@ func NewAttemptDetailDao(db *sqlx.DB, workspace bool) AttemptDetailDAO { } func (dao *attemptDetailDao) Upsert(ctx context.Context, attemptDetail *entities.AttemptDetail) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.upsert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } now := time.Now() values := []interface{}{attemptDetail.ID, attemptDetail.RequestHeaders, attemptDetail.RequestBody, attemptDetail.ResponseHeaders, attemptDetail.ResponseBody, now, now, attemptDetail.WorkspaceId} diff --git a/db/dao/dao.go b/db/dao/dao.go index 34d6cd0..0b71d73 100644 --- a/db/dao/dao.go +++ b/db/dao/dao.go @@ -6,23 +6,24 @@ import ( "encoding/json" "errors" "fmt" + "github.com/jmoiron/sqlx" + sq "github.com/Masterminds/squirrel" "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/constants" - "github.com/webhookx-io/webhookx/eventbus" - "github.com/webhookx-io/webhookx/mcache" - "reflect" - "strings" - "time" - - sq "github.com/Masterminds/squirrel" - "github.com/jmoiron/sqlx" "github.com/webhookx-io/webhookx/db/errs" "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/db/transaction" + "github.com/webhookx-io/webhookx/eventbus" + "github.com/webhookx-io/webhookx/mcache" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/pkg/ucontext" "github.com/webhookx-io/webhookx/utils" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "reflect" + "strings" + "time" ) var ( @@ -92,6 +93,11 @@ func (dao *DAO[T]) UnsafeDB(ctx context.Context) Queryable { } func (dao *DAO[T]) Get(ctx context.Context, id string) (entity *T, err error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.get", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } builder := psql.Select("*").From(dao.opts.Table).Where(sq.Eq{"id": id}) if dao.workspace { wid := ucontext.GetWorkspaceID(ctx) @@ -124,6 +130,11 @@ func (dao *DAO[T]) selectByField(ctx context.Context, field string, value string } func (dao *DAO[T]) Delete(ctx context.Context, id string) (bool, error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.delete", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } builder := psql.Delete(dao.opts.Table).Where(sq.Eq{"id": id}) if dao.workspace { wid := ucontext.GetWorkspaceID(ctx) @@ -146,6 +157,11 @@ func (dao *DAO[T]) Delete(ctx context.Context, id string) (bool, error) { } func (dao *DAO[T]) Page(ctx context.Context, q query.Queryer) (list []*T, total int64, err error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.page", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } total, err = dao.Count(ctx, q.WhereMap()) if err != nil { return @@ -155,6 +171,11 @@ func (dao *DAO[T]) Page(ctx context.Context, q query.Queryer) (list []*T, total } func (dao *DAO[T]) Count(ctx context.Context, where map[string]interface{}) (total int64, err error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.count", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } builder := psql.Select("COUNT(*)").From(dao.opts.Table) if len(where) > 0 { builder = builder.Where(where) @@ -170,6 +191,11 @@ func (dao *DAO[T]) Count(ctx context.Context, where map[string]interface{}) (tot } func (dao *DAO[T]) List(ctx context.Context, q query.Queryer) (list []*T, err error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.list", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } builder := psql.Select("*").From(dao.opts.Table) where := q.WhereMap() if len(where) > 0 { @@ -214,6 +240,11 @@ func travel(entity interface{}, fn func(field reflect.StructField, value reflect } func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } columns := make([]string, 0) values := make([]interface{}, 0) travel(entity, func(f reflect.StructField, v reflect.Value) { @@ -238,6 +269,11 @@ func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error { } func (dao *DAO[T]) BatchInsert(ctx context.Context, entities []*T) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } if len(entities) == 0 { return nil } @@ -306,6 +342,11 @@ func (dao *DAO[T]) update(ctx context.Context, id string, maps map[string]interf } func (dao *DAO[T]) Update(ctx context.Context, entity *T) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, fmt.Sprintf("dao.%s.update", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } var id string builder := psql.Update(dao.opts.Table) travel(entity, func(f reflect.StructField, v reflect.Value) { diff --git a/db/db.go b/db/db.go index 01c8297..3289ce6 100644 --- a/db/db.go +++ b/db/db.go @@ -2,15 +2,15 @@ package db import ( "context" - "database/sql" "fmt" "github.com/jmoiron/sqlx" "github.com/pkg/errors" "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/transaction" + "github.com/webhookx-io/webhookx/pkg/tracing" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "time" ) type DB struct { @@ -32,22 +32,12 @@ type DB struct { PluginsWS dao.PluginDAO } -func initSqlxDB(cfg *config.DatabaseConfig) (*sqlx.DB, error) { - db, err := sql.Open("postgres", cfg.GetDSN()) - db.SetMaxOpenConns(int(cfg.MaxPoolSize)) - db.SetMaxIdleConns(int(cfg.MaxPoolSize)) - db.SetConnMaxLifetime(time.Second * time.Duration(cfg.MaxLifetime)) - if err != nil { - return nil, err - } - return sqlx.NewDb(db, "postgres"), nil -} - func NewDB(cfg *config.DatabaseConfig) (*DB, error) { - sqlxDB, err := initSqlxDB(cfg) + sqlDB, err := cfg.InitSqlDB() if err != nil { return nil, err } + sqlxDB := sqlx.NewDb(sqlDB, "postgres") db := &DB{ DB: sqlxDB, @@ -75,6 +65,12 @@ func (db *DB) Ping() error { } func (db *DB) TX(ctx context.Context, fn func(ctx context.Context) error) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "db.transaction", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } + tx, err := db.DB.Beginx() if err != nil { return err diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 7d8cfc5..35d73ed 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -8,8 +8,10 @@ import ( "github.com/webhookx-io/webhookx/model" "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/taskqueue" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "time" ) @@ -45,6 +47,11 @@ func (d *Dispatcher) DispatchBatch(ctx context.Context, events []*entities.Event } func (d *Dispatcher) dispatchBatch(ctx context.Context, events []*entities.Event) (int, error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "dispatcher.dispatch", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } if len(events) == 0 { return 0, nil } @@ -79,7 +86,7 @@ func (d *Dispatcher) dispatchBatch(ctx context.Context, events []*entities.Event return d.db.Attempts.BatchInsert(ctx, attempts) }) if err == nil { - go d.sendToQueue(context.TODO(), attempts) + go d.sendToQueue(context.WithoutCancel(ctx), attempts) } return n, err } @@ -112,7 +119,7 @@ func (d *Dispatcher) DispatchEndpoint(ctx context.Context, event *entities.Event return err } - d.sendToQueue(context.TODO(), attempts) + d.sendToQueue(ctx, attempts) return nil } diff --git a/go.mod b/go.mod index 5af91fe..3fc23bf 100644 --- a/go.mod +++ b/go.mod @@ -21,18 +21,32 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/contrib/propagators/autoprop v0.57.0 go.opentelemetry.io/otel v1.32.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 go.opentelemetry.io/otel/metric v1.32.0 go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/sdk/metric v1.32.0 go.uber.org/mock v0.5.0 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.67.1 gopkg.in/yaml.v3 v3.0.1 ) require ( + github.com/felixge/httpsnoop v1.0.4 // indirect + go.opentelemetry.io/contrib/propagators/aws v1.32.0 // indirect + go.opentelemetry.io/contrib/propagators/b3 v1.32.0 // indirect + go.opentelemetry.io/contrib/propagators/jaeger v1.32.0 // indirect + go.opentelemetry.io/contrib/propagators/ot v1.32.0 // indirect +) + +require ( + github.com/XSAM/otelsql v0.35.0 github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -55,8 +69,8 @@ require ( github.com/leodido/go-urn v1.4.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/spf13/pflag v1.0.5 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect - go.opentelemetry.io/otel/trace v1.32.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 + go.opentelemetry.io/otel/trace v1.32.0 go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect @@ -67,6 +81,5 @@ require ( golang.org/x/tools v0.26.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index 343e6b2..6705985 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/XSAM/otelsql v0.35.0 h1:nMdbU/XLmBIB6qZF61uDqy46E0LVA4ZgF/FCNw8Had4= +github.com/XSAM/otelsql v0.35.0/go.mod h1:wO028mnLzmBpstK8XPsoeRLl/kgt417yjAwOGDIptTc= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -135,12 +137,28 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= +go.opentelemetry.io/contrib/propagators/autoprop v0.57.0 h1:bNPJOdT5154XxzeFmrh8R+PXnV4t3TZEczy8gHEpcpg= +go.opentelemetry.io/contrib/propagators/autoprop v0.57.0/go.mod h1:Tb0j0mK+QatKdCxCKPN7CSzc7kx/q34/KaohJx/N96s= +go.opentelemetry.io/contrib/propagators/aws v1.32.0 h1:NELzr8bW7a7aHVZj5gaep1PfkvoSCGx+1qNGZx/uhhU= +go.opentelemetry.io/contrib/propagators/aws v1.32.0/go.mod h1:XKMrzHNka3eOA+nGEcNKYVL9s77TAhkwQEynYuaRFnQ= +go.opentelemetry.io/contrib/propagators/b3 v1.32.0 h1:MazJBz2Zf6HTN/nK/s3Ru1qme+VhWU5hm83QxEP+dvw= +go.opentelemetry.io/contrib/propagators/b3 v1.32.0/go.mod h1:B0s70QHYPrJwPOwD1o3V/R8vETNOG9N3qZf4LDYvA30= +go.opentelemetry.io/contrib/propagators/jaeger v1.32.0 h1:K/fOyTMD6GELKTIJBaJ9k3ppF2Njt8MeUGBOwfaWXXA= +go.opentelemetry.io/contrib/propagators/jaeger v1.32.0/go.mod h1:ISE6hda//MTWvtngG7p4et3OCngsrTVfl7c6DjN17f8= +go.opentelemetry.io/contrib/propagators/ot v1.32.0 h1:Poy02A4wOZubHyd2hpHPDgZW+rn6EIq0vCwTZJ6Lmu8= +go.opentelemetry.io/contrib/propagators/ot v1.32.0/go.mod h1:cbhaURV+VR3NIMarzDYZU1RDEkXG1fNd1WMP1XCcGkY= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 h1:j7ZSD+5yn+lo3sGV69nW04rRR0jhYnBwjuX3r0HvnK0= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0/go.mod h1:WXbYJTUaZXAbYd8lbgGuvih0yuCfOFC5RJoYnoLcGz8= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0 h1:t/Qur3vKSkUCcDVaSumWF2PKHt85pc7fRvFuoVT8qFU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0/go.mod h1:Rl61tySSdcOJWoEgYZVtmnKdA0GeKrSqkHC1t+91CH8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 h1:9kV11HXBHZAvuPUZxmMWrH8hZn/6UnHX4K0mu36vNsU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0/go.mod h1:JyA0FHXe22E1NeNiHmVp7kFHglnexDQ7uRWDiiJ1hKQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 h1:cMyu9O88joYEaI47CnQkxO1XZdpoTF9fEnW2duIddhw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0/go.mod h1:6Am3rn7P9TVVeXYG+wtcGE7IE1tsQ+bP3AuWcKt/gOI= go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= diff --git a/pkg/queue/redis/redis.go b/pkg/queue/redis/redis.go index e94ae33..b1bd833 100644 --- a/pkg/queue/redis/redis.go +++ b/pkg/queue/redis/redis.go @@ -7,7 +7,9 @@ import ( "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/queue" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/utils" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "strconv" "strings" @@ -54,6 +56,11 @@ func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger, metrics *m } func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "redis.queue.enqueue", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } args := &redis.XAddArgs{ Stream: q.stream, ID: "*", @@ -87,6 +94,12 @@ func toMessage(values map[string]interface{}) *queue.Message { } func (q *RedisQueue) Dequeue(ctx context.Context, opt *queue.Options) ([]*queue.Message, error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "redis.queue.dequeue", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } + var count int64 = 1 if opt != nil && opt.Count != 0 { count = opt.Count @@ -128,6 +141,12 @@ func (q *RedisQueue) Dequeue(ctx context.Context, opt *queue.Options) ([]*queue. } func (q *RedisQueue) Delete(ctx context.Context, messages []*queue.Message) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "redis.queue.delete", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } + ids := make([]string, 0, len(messages)) for _, message := range messages { ids = append(ids, message.ID) diff --git a/pkg/taskqueue/redis.go b/pkg/taskqueue/redis.go index 9614764..d91e238 100644 --- a/pkg/taskqueue/redis.go +++ b/pkg/taskqueue/redis.go @@ -6,7 +6,9 @@ import ( "github.com/redis/go-redis/v9" "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/pkg/metrics" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/utils" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "time" ) @@ -91,6 +93,12 @@ func NewRedisQueue(opts RedisTaskQueueOptions, logger *zap.SugaredLogger, metric } func (q *RedisTaskQueue) Add(ctx context.Context, tasks []*TaskMessage) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "taskqueue.redis.add", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } + members := make([]redis.Z, 0, len(tasks)) strs := make([]interface{}, 0, len(tasks)*2) for _, task := range tasks { @@ -113,6 +121,12 @@ func (q *RedisTaskQueue) Add(ctx context.Context, tasks []*TaskMessage) error { } func (q *RedisTaskQueue) Get(ctx context.Context, opts *GetOptions) ([]*TaskMessage, error) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "taskqueue.redis.get", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } + keys := []string{q.queue, q.queueData, q.invisibleQueue} argv := []interface{}{ opts.Count, @@ -142,6 +156,12 @@ func (q *RedisTaskQueue) Get(ctx context.Context, opts *GetOptions) ([]*TaskMess } func (q *RedisTaskQueue) Delete(ctx context.Context, task *TaskMessage) error { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "taskqueue.redis.delete", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + ctx = tracingCtx + } + q.log.Debugf("[redis-queue]: delete task %s", task.ID) pipeline := q.c.Pipeline() pipeline.HDel(ctx, q.queueData, task.ID) @@ -150,7 +170,6 @@ func (q *RedisTaskQueue) Delete(ctx context.Context, task *TaskMessage) error { _, err := pipeline.Exec(ctx) return err } - func (q *RedisTaskQueue) Size(ctx context.Context) (int64, error) { return q.c.ZCard(ctx, q.queue).Result() } diff --git a/pkg/tracing/opentelemetry.go b/pkg/tracing/opentelemetry.go new file mode 100644 index 0000000..c646dda --- /dev/null +++ b/pkg/tracing/opentelemetry.go @@ -0,0 +1,122 @@ +package tracing + +import ( + "context" + "fmt" + "github.com/webhookx-io/webhookx/config" + "go.opentelemetry.io/contrib/propagators/autoprop" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/encoding/gzip" + "io" + "net" + "net/url" + "time" +) + +const instrumentationName = "github.com/webhookx-io/webhookx" + +func SetupOTEL(o *config.TracingConfig) (trace.TracerProvider, io.Closer, error) { + var err error + var exporter *otlptrace.Exporter + + if o.Opentelemetry.Protocol == config.OtlpProtocolHTTP { + exporter, err = setupHTTPExporter(o.Opentelemetry) + } else if o.Opentelemetry.Protocol == config.OtlpProtocolGRPC { + exporter, err = setupGRPCExporter(o.Opentelemetry) + } + + if err != nil { + return nil, nil, fmt.Errorf("failed to setup exporter: %w", err) + } + + attr := []attribute.KeyValue{ + semconv.ServiceNameKey.String(o.ServiceName), + semconv.ServiceVersionKey.String(config.VERSION), + } + + for k, v := range o.Attributes { + attr = append(attr, attribute.String(k, v)) + } + + res, err := resource.New( + context.Background(), + resource.WithAttributes(attr...), + resource.WithFromEnv(), + ) + if err != nil { + return nil, nil, fmt.Errorf("failed to build resource: %w", err) + } + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(o.SamplingRate)), + sdktrace.WithResource(res), + sdktrace.WithBatcher(exporter), + ) + otel.SetTracerProvider(tracerProvider) + + otel.SetTextMapPropagator(autoprop.NewTextMapPropagator()) + return tracerProvider, &tpCloser{provider: tracerProvider}, err +} + +func setupHTTPExporter(c config.Opentelemetry) (*otlptrace.Exporter, error) { + endpoint, err := url.Parse(c.Endpoint) + if err != nil { + return nil, fmt.Errorf("invalid collector endpoint %q: %w", c.Endpoint, err) + } + + opts := []otlptracehttp.Option{ + otlptracehttp.WithEndpoint(endpoint.Host), + otlptracehttp.WithCompression(otlptracehttp.GzipCompression), + } + + if endpoint.Scheme == "http" { + opts = append(opts, otlptracehttp.WithInsecure()) + } + + if endpoint.Path != "" { + opts = append(opts, otlptracehttp.WithURLPath(endpoint.Path)) + } + + return otlptrace.New(context.Background(), otlptracehttp.NewClient(opts...)) +} + +func setupGRPCExporter(c config.Opentelemetry) (*otlptrace.Exporter, error) { + host, port, err := net.SplitHostPort(c.Endpoint) + if err != nil { + return nil, fmt.Errorf("invalid collector endpoint %q: %w", c.Endpoint, err) + } + + opts := []otlptracegrpc.Option{ + otlptracegrpc.WithEndpoint(fmt.Sprintf("%s:%s", host, port)), + otlptracegrpc.WithCompressor(gzip.Name), + otlptracegrpc.WithInsecure(), + } + + return otlptrace.New(context.Background(), otlptracegrpc.NewClient(opts...)) +} + +type tpCloser struct { + provider trace.TracerProvider +} + +func (t *tpCloser) Close() error { + if t == nil { + return nil + } + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + defer cancel() + + if pr, ok := t.provider.(*sdktrace.TracerProvider); ok { + return pr.Shutdown(ctx) + } + return nil +} diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go new file mode 100644 index 0000000..abbdfd4 --- /dev/null +++ b/pkg/tracing/tracing.go @@ -0,0 +1,105 @@ +package tracing + +import ( + "context" + "github.com/webhookx-io/webhookx/config" + "go.opentelemetry.io/otel/trace" + "io" +) + +var internalTracer *Tracer + +func New(conf *config.TracingConfig) (*Tracer, error) { + if !conf.Enabled { + return nil, nil + } + + tr, closer, err := SetupOTEL(conf) + if err != nil { + return nil, err + } + + internalTracer = NewTracer(tr, closer) + return internalTracer, nil +} + +func TracerFromContext(ctx context.Context) *Tracer { + if !trace.SpanContextFromContext(ctx).IsValid() { + return nil + } + + span := trace.SpanFromContext(ctx) + if span != nil && span.TracerProvider() != nil { + tracerProvider := span.TracerProvider() + tracer := tracerProvider.Tracer(instrumentationName) + tr, ok := tracer.(*Tracer) + if ok { + return tr + } else { + if internalTracer == nil { + internalTracer = NewTracer(tracerProvider, &tpCloser{tracerProvider}) + } + return internalTracer + } + } + + return nil +} + +type Span struct { + trace.Span + + tracerProvider *TracerProvider +} + +func (s Span) TracerProvider() trace.TracerProvider { + return s.tracerProvider +} + +type TracerProvider struct { + trace.TracerProvider + + tracer *Tracer +} + +func (t TracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer { + if name == instrumentationName { + return t.tracer + } + + return t.TracerProvider.Tracer(name, options...) +} + +type Tracer struct { + trace.Tracer + io.Closer +} + +func NewTracer(tracerProvider trace.TracerProvider, closer io.Closer) *Tracer { + return &Tracer{ + Tracer: tracerProvider.Tracer(instrumentationName), + Closer: closer, + } +} + +func (t *Tracer) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + if t == nil { + return ctx, nil + } + + spanCtx, span := t.Tracer.Start(ctx, spanName, opts...) + + wrappedSpan := &Span{Span: span, tracerProvider: &TracerProvider{tracer: t}} + + return trace.ContextWithSpan(spanCtx, wrappedSpan), wrappedSpan +} + +func (t *Tracer) Stop() error { + if t == nil { + return nil + } + if t.Closer != nil { + return t.Closer.Close() + } + return nil +} diff --git a/proxy/gateway.go b/proxy/gateway.go index 1080233..575a29a 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "github.com/gorilla/mux" "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/constants" @@ -15,11 +16,16 @@ import ( "github.com/webhookx-io/webhookx/pkg/queue" "github.com/webhookx-io/webhookx/pkg/queue/redis" "github.com/webhookx-io/webhookx/pkg/schedule" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/pkg/ucontext" "github.com/webhookx-io/webhookx/proxy/middlewares" "github.com/webhookx-io/webhookx/proxy/router" "github.com/webhookx-io/webhookx/utils" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "net/http" "os" @@ -46,15 +52,21 @@ type Gateway struct { queue queue.Queue metrics *metrics.Metrics + tracer *tracing.Tracer } -func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispatcher, metrics *metrics.Metrics) *Gateway { +func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispatcher, metrics *metrics.Metrics, tracer *tracing.Tracer) *Gateway { var q queue.Queue switch cfg.Queue.Type { case "redis": - q, _ = redis.NewRedisQueue(redis.RedisQueueOptions{ + rq, err := redis.NewRedisQueue(redis.RedisQueueOptions{ Client: cfg.Queue.Redis.GetClient(), }, zap.S(), metrics) + if err != nil { + zap.S().Warnf("[proxy] failed to create redis queue: %v", err) + } else { + q = rq + } } gw := &Gateway{ @@ -65,6 +77,7 @@ func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispa dispatcher: dispatcher, queue: q, metrics: metrics, + tracer: tracer, } r := mux.NewRouter() @@ -72,10 +85,15 @@ func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispa if metrics.Enabled { r.Use(middlewares.NewMetricsMiddleware(metrics).Handle) } + if gw.tracer != nil { + r.Use(otelhttp.NewMiddleware("api.proxy")) + } r.PathPrefix("/").HandlerFunc(gw.Handle) + var handler http.Handler = r + gw.s = &http.Server{ - Handler: r, + Handler: handler, Addr: cfg.Listen, ReadTimeout: time.Duration(cfg.TimeoutRead) * time.Second, @@ -104,7 +122,9 @@ func (gw *Gateway) buildRouter() { } func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { - source, _ := gw.router.Execute(r).(*entities.Source) + var source *entities.Source + + source, _ = gw.router.Execute(r).(*entities.Source) if source == nil { exit(w, 404, `{"message": "not found"}`, nil) return @@ -115,6 +135,17 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { }) r = r.WithContext(ctx) + if gw.tracer != nil { + tracingCtx, span := gw.tracer.Start(r.Context(), "proxy.handle", trace.WithSpanKind(trace.SpanKindServer)) + span.SetAttributes(attribute.String("router.id", source.ID)) + span.SetAttributes(attribute.String("router.name", utils.PointerValue(source.Name))) + span.SetAttributes(attribute.String("router.workspaceId", source.WorkspaceId)) + span.SetAttributes(attribute.String("router.async", fmt.Sprint(source.Async))) + span.SetAttributes(semconv.HTTPRoute(source.Path)) + defer span.End() + r = r.WithContext(tracingCtx) + } + var event entities.Event r.Body = http.MaxBytesReader(w, r.Body, gw.cfg.MaxRequestBodySize) if err := json.NewDecoder(r.Body).Decode(&event); err != nil { diff --git a/proxy/middlewares/recovery.go b/proxy/middlewares/recovery.go index 0740ee8..edf9719 100644 --- a/proxy/middlewares/recovery.go +++ b/proxy/middlewares/recovery.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/webhookx-io/webhookx/db/dao" + "github.com/webhookx-io/webhookx/db/errs" "github.com/webhookx-io/webhookx/pkg/types" "go.uber.org/zap" "net/http" @@ -31,6 +32,14 @@ func PanicRecovery(h http.Handler) http.Handler { return } + if e, ok := err.(*errs.DBError); ok { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(400) + bytes, _ := json.Marshal(types.ErrorResponse{Message: e.Error()}) + _, _ = w.Write(bytes) + return + } + buf := make([]byte, 2048) n := runtime.Stack(buf, false) buf = buf[:n] diff --git a/test/admin/endpoints_test.go b/test/admin/endpoints_test.go index 3146cd3..7633523 100644 --- a/test/admin/endpoints_test.go +++ b/test/admin/endpoints_test.go @@ -3,6 +3,7 @@ package admin import ( "context" "fmt" + "github.com/go-resty/resty/v2" . "github.com/onsi/ginkgo/v2" "github.com/stretchr/testify/assert" diff --git a/test/cmd/version_test.go b/test/cmd/version_test.go index eac644f..7d68512 100644 --- a/test/cmd/version_test.go +++ b/test/cmd/version_test.go @@ -1,11 +1,12 @@ package cmd import ( + "os/exec" + "testing" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" - "os/exec" - "testing" ) var _ = Describe("version", Ordered, func() { diff --git a/test/helper/helper.go b/test/helper/helper.go index 5e6eda9..ebce511 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -3,11 +3,8 @@ package helper import ( "bufio" "context" - "os" - "path" - "regexp" - "time" - + "crypto/rand" + "encoding/hex" "github.com/creasty/defaults" "github.com/go-resty/resty/v2" "github.com/webhookx-io/webhookx/app" @@ -16,24 +13,25 @@ import ( "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/migrator" "github.com/webhookx-io/webhookx/utils" + "os" + "path" + "regexp" + "time" ) -var cfg *config.Config - var ( - OtelCollectorMetricsFile = "/tmp/otel/metrics.json" + OtelCollectorTracesFile = "../output/otel/traces.json" + OtelCollectorMetricsFile = "../output/otel/metrics.json" ) +var cfg *config.Config + func init() { var err error cfg, err = config.Init() if err != nil { panic(err) } - - if v := os.Getenv("WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH"); v != "" { - OtelCollectorMetricsFile = path.Join(v, "metrics.json") - } } var defaultEnvs = map[string]string{ @@ -212,17 +210,25 @@ func TruncateFile(filename string) { func FileLine(filename string, n int) (string, error) { file, err := os.Open(filename) if err != nil { - return "", err + panic(err) } defer file.Close() scanner := bufio.NewScanner(file) + + const maxCapacity = 1024 * 1024 + buf := make([]byte, maxCapacity) + scanner.Buffer(buf, maxCapacity) + for i := 1; scanner.Scan(); i++ { s := scanner.Text() if i == n { return s, nil } } + if err := scanner.Err(); err != nil { + panic(err) + } return "", nil } @@ -233,12 +239,20 @@ func FileCountLine(filename string) (int, error) { return 0, err } defer file.Close() + scanner := bufio.NewScanner(file) + + const maxCapacity = 1024 * 1024 + buf := make([]byte, maxCapacity) + scanner.Buffer(buf, maxCapacity) n := 0 for scanner.Scan() { scanner.Text() n++ } + if err := scanner.Err(); err != nil { + panic(err) + } return n, nil } @@ -254,12 +268,19 @@ func FileHasLine(filename string, regex string) (bool, error) { return false, err } scanner := bufio.NewScanner(file) + + const maxCapacity = 2 * 1024 * 1024 + buf := make([]byte, maxCapacity) + scanner.Buffer(buf, maxCapacity) for scanner.Scan() { line := scanner.Text() if r.MatchString(line) { return true, nil } } + if err := scanner.Err(); err != nil { + panic(err) + } return false, nil } @@ -300,3 +321,34 @@ func DefaultEvent() *entities.Event { return &entity } + +func PathExist(_path string) bool { + _, err := os.Stat(_path) + if err != nil && os.IsNotExist(err) { + return false + } + return true +} + +func InitOtelOutput() { + if v := os.Getenv("WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH"); v != "" { + OtelCollectorMetricsFile = path.Join(v, "metrics.json") + OtelCollectorTracesFile = path.Join(v, "traces.json") + } + + if !PathExist(OtelCollectorTracesFile) { + os.Create(OtelCollectorTracesFile) + } + if !PathExist(OtelCollectorMetricsFile) { + os.Create(OtelCollectorMetricsFile) + } +} + +func GenerateTraceID() string { + traceID := make([]byte, 16) + _, err := rand.Read(traceID) + if err != nil { + panic(err) + } + return hex.EncodeToString(traceID) +} diff --git a/test/metrics/opentelemetry_test.go b/test/metrics/opentelemetry_test.go index 246739e..10c1a35 100644 --- a/test/metrics/opentelemetry_test.go +++ b/test/metrics/opentelemetry_test.go @@ -33,6 +33,7 @@ var _ = Describe("opentelemetry", Ordered, func() { entitiesConfig.Endpoints[1].Request.Timeout = 1 entitiesConfig.Sources[0].Async = true helper.InitDB(true, &entitiesConfig) + helper.InitOtelOutput() proxyClient = helper.ProxyClient() var err error app, err = helper.Start(map[string]string{ @@ -122,6 +123,7 @@ var _ = Describe("opentelemetry", Ordered, func() { BeforeAll(func() { var err error + helper.InitOtelOutput() app, err = helper.Start(map[string]string{ "WEBHOOKX_METRICS_ATTRIBUTES": `{"env": "prod"}`, "WEBHOOKX_METRICS_EXPORTS": "opentelemetry", diff --git a/test/otel-collector-config.yml b/test/otel-collector-config.yml index 03df3bd..9a2f3c7 100644 --- a/test/otel-collector-config.yml +++ b/test/otel-collector-config.yml @@ -8,6 +8,7 @@ receivers: processors: batch: + timeout: 0 # regardless of batch size, data will be sent immediately exporters: debug: diff --git a/test/tracing/admin_test.go b/test/tracing/admin_test.go new file mode 100644 index 0000000..dee3b77 --- /dev/null +++ b/test/tracing/admin_test.go @@ -0,0 +1,188 @@ +package tracing + +import ( + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + . "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/admin/api" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/utils" + "time" +) + +var _ = Describe("tracing admin", Ordered, func() { + endpoints := map[string]string{ + "http/protobuf": "http://localhost:4318/v1/traces", + "grpc": "localhost:4317", + } + for protocol, address := range endpoints { + Context(protocol, func() { + var app *app.Application + var proxyClient *resty.Client + var adminClient *resty.Client + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{helper.DefaultEndpoint()}, + Sources: []*entities.Source{helper.DefaultSource()}, + } + entitiesConfig.Sources[0].Async = true + var gotScopeNames map[string]bool + var gotSpanAttributes map[string]map[string]string + + BeforeAll(func() { + helper.InitOtelOutput() + helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + adminClient = helper.AdminClient() + + envs := map[string]string{ + "WEBHOOKX_ADMIN_LISTEN": "0.0.0.0:8080", + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_TRACING_ENABLED": "true", + "WEBHOOKX_TRACING_SERVICE_NAME": "WebhookX", // env splite by _ + "WEBHOOKX_TRACING_SAMPLING_RATE": "1.0", + "WEBHOOKX_TRACING_OPENTELEMETRY_PROTOCOL": protocol, + "WEBHOOKX_TRACING_OPENTELEMETRY_ENDPOINT": address, + } + + app = utils.Must(helper.Start(envs)) + gotScopeNames = make(map[string]bool) + gotSpanAttributes = make(map[string]map[string]string) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + traceID := helper.GenerateTraceID() + expectedScopeNames := []string{ + "github.com/XSAM/otelsql", + "github.com/webhookx-io/webhookx", + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp", + } + entrypoint := map[string]string{ + "http.method": "GET", + "http.scheme": "http", + "http.target": "/workspaces/default/attempts", + "http.status_code": "200", + "http.response_content_length": "*", + "user_agent.original": "*", + "net.host.name": "localhost", + "net.host.port": "8080", + "net.protocol.version": "*", + "net.sock.peer.addr": "*", + "net.sock.peer.port": "*", + } + + expectedScopeSpans := map[string]map[string]string{ + "api.admin": entrypoint, + "sql.conn.query": { + "db.statement": "SELECT * FROM attempts WHERE ws_id = $1 ORDER BY id DESC LIMIT 20 OFFSET 0", + }, + "dao.attempts.page": {}, + "dao.attempts.count": {}, + "dao.attempts.list": {}, + } + + // wait for export + proxyFunc := func() bool { + resp, err := proxyClient.R(). + SetBody(`{ + "event_type": "foo.bar", + "data": { + "key": "value" + } + }`).Post("/") + return err == nil && resp.StatusCode() == 200 + } + assert.Eventually(GinkgoT(), proxyFunc, time.Second*5, time.Second) + // make more tracing data + for i := 0; i < 20; i++ { + go proxyFunc() + } + + n, err := helper.FileCountLine(helper.OtelCollectorTracesFile) + assert.Nil(GinkgoT(), err) + n++ + + assert.Eventually(GinkgoT(), func() bool { + resp, err := adminClient.R(). + SetHeader("traceparent", fmt.Sprintf("00-%s-0000000000000001-01", traceID)). + SetResult(api.Pagination[*entities.Attempt]{}). + Get("/workspaces/default/attempts?page_no=1") + result := resp.Result().(*api.Pagination[*entities.Attempt]) + return err == nil && resp.StatusCode() == 200 && len(result.Data) == 20 + }, time.Second*5, time.Second) + + time.Sleep(time.Second * 10) + + assert.Eventually(GinkgoT(), func() bool { + line, err := helper.FileLine(helper.OtelCollectorTracesFile, n) + if err != nil || line == "" { + fmt.Printf("read empty line %d", n) + fmt.Println("") + return false + } + n++ + var trace ExportedTrace + err = json.Unmarshal([]byte(line), &trace) + if err != nil { + fmt.Printf("unmarshal err %v", err) + fmt.Println("") + return false + } + + if len(trace.ResourceSpans) == 0 { + fmt.Printf("no resource spans") + return false + } + scopeNames, spanAttrs := trace.filterSpansByTraceID(traceID) + for k, v := range scopeNames { + gotScopeNames[k] = v + } + for k, v := range spanAttrs { + gotSpanAttributes[k] = v + } + + for _, expectedScopeName := range expectedScopeNames { + if !gotScopeNames[expectedScopeName] { + fmt.Printf("scope %s not exist", expectedScopeName) + fmt.Println("") + return false + } + } + + for spanName, expectedAttributes := range expectedScopeSpans { + gotAttributes, ok := gotSpanAttributes[spanName] + if !ok { + fmt.Printf("span %s not exist", spanName) + fmt.Println() + return false + } + + if len(expectedAttributes) > 0 { + for k, v := range expectedAttributes { + if _, ok := gotAttributes[k]; !ok { + fmt.Printf("expected span %s attribute %s not exist", spanName, k) + fmt.Println("") + return false + } + valMatch := (v == "*" || gotAttributes[k] == v) + if !valMatch { + fmt.Printf("expected span %s attribute %s value not match: %s", spanName, k, v) + fmt.Println("") + return false + } + } + } + } + return true + }, time.Second*30, time.Second) + }) + }) + } +}) diff --git a/test/tracing/ginkgo_test.go b/test/tracing/ginkgo_test.go new file mode 100644 index 0000000..2bb31c0 --- /dev/null +++ b/test/tracing/ginkgo_test.go @@ -0,0 +1,12 @@ +package tracing + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "testing" +) + +func TestTracing(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Tracing Suite") +} diff --git a/test/tracing/proxy_test.go b/test/tracing/proxy_test.go new file mode 100644 index 0000000..7a9a748 --- /dev/null +++ b/test/tracing/proxy_test.go @@ -0,0 +1,266 @@ +package tracing + +import ( + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + . "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/config" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/utils" + "time" +) + +var _ = Describe("tracing proxy", Ordered, func() { + endpoints := map[string]string{ + "grpc": "localhost:4317", + "http/protobuf": "http://localhost:4318/v1/traces", + } + for _, protocol := range []string{"grpc", "http/protobuf"} { + Context(protocol, func() { + var app *app.Application + var proxyClient *resty.Client + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{helper.DefaultEndpoint()}, + Sources: []*entities.Source{helper.DefaultSource()}, + } + + BeforeAll(func() { + helper.InitOtelOutput() + helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + envs := map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_TRACING_SERVICE_NAME": "WebhookX", // env splite by _ + "WEBHOOKX_TRACING_ENABLED": "true", + "WEBHOOKX_TRACING_SAMPLING_RATE": "1.0", + "WEBHOOKX_TRACING_ATTRIBUTES": `{"env":"test"}`, + "WEBHOOKX_TRACING_OPENTELEMETRY_PROTOCOL": protocol, + "WEBHOOKX_TRACING_OPENTELEMETRY_ENDPOINT": endpoints[protocol], + } + app = utils.Must(helper.Start(envs)) + + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity "+protocol, func() { + var traceID = helper.GenerateTraceID() + n, err := helper.FileCountLine(helper.OtelCollectorTracesFile) + assert.Nil(GinkgoT(), err) + n++ + fmt.Println("start line " + fmt.Sprint(n)) + + expectedScopeNames := []string{ + "github.com/XSAM/otelsql", + "github.com/webhookx-io/webhookx", + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp", + } + + entrypoint := map[string]string{ + "http.method": "POST", + "http.scheme": "http", + "http.target": "/", + "http.status_code": "200", + "http.request_content_length": "*", + "http.response_content_length": "*", + "user_agent.original": "*", + "net.host.name": "localhost", + "net.host.port": "8081", + "net.protocol.version": "*", + "net.sock.peer.addr": "*", + "net.sock.peer.port": "*", + } + router := map[string]string{ + "router.id": "*", + "router.name": "*", + "router.workspaceId": "*", + "http.route": "/", + } + expectedScopeSpans := map[string]map[string]string{ + "api.proxy": entrypoint, + "proxy.handle": router, + "dispatcher.dispatch": {}, + "dao.endpoints.list": {}, + "db.transaction": {}, + "dao.attempts.batch_insert": {}, + "taskqueue.redis.add": {}, + "sql.conn.exec": { + "db.statement": "UPDATE attempts SET status = $1 WHERE id IN ($2)", + }, + "sql.conn.query": { + "db.statement": "INSERT INTO attempts (id,event_id,endpoint_id,status,attempt_number,scheduled_at,attempted_at,trigger_mode,exhausted,error_code,request,response,ws_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) RETURNING *", + }, + } + + // wait for export + proxyFunc := func() bool { + resp, err := proxyClient.R(). + SetBody(`{ + "event_type": "foo.bar", + "data": { + "key": "value" + } + }`). + SetHeader("traceparent", fmt.Sprintf("00-%s-0000000000000001-01", traceID)). + Post("/") + return err == nil && resp.StatusCode() == 200 + } + assert.Eventually(GinkgoT(), proxyFunc, time.Second*5, time.Second) + + // make more tracing data + time.Sleep(time.Second * 3) + gotScopeNames := make(map[string]bool) + gotSpanAttributes := make(map[string]map[string]string) + assert.Eventually(GinkgoT(), func() bool { + line, err := helper.FileLine(helper.OtelCollectorTracesFile, n) + if err != nil || line == "" { + fmt.Printf("read empty line %d", n) + fmt.Println("") + proxyFunc() + return false + } + n++ + + var trace ExportedTrace + err = json.Unmarshal([]byte(line), &trace) + if err != nil { + return false + } + + if len(trace.ResourceSpans) == 0 { + return false + } + + scopeNames, spanAttrs := trace.filterSpansByTraceID(traceID) + for k, v := range scopeNames { + gotScopeNames[k] = v + } + for k, v := range spanAttrs { + gotSpanAttributes[k] = v + } + + for _, expectedScopeName := range expectedScopeNames { + if !gotScopeNames[expectedScopeName] { + fmt.Printf("scope %s not exist", expectedScopeName) + fmt.Println("") + return false + } + } + + for spanName, expectedAttributes := range expectedScopeSpans { + gotAttributes, ok := gotSpanAttributes[spanName] + if !ok { + fmt.Printf("span %s not exist", spanName) + fmt.Println() + return false + } + + if len(expectedAttributes) > 0 { + for k, v := range expectedAttributes { + if _, ok := gotAttributes[k]; !ok { + fmt.Printf("expected span %s attribute %s not exist", spanName, k) + fmt.Println("") + return false + } + valMatch := (v == "*" || gotAttributes[k] == v) + if !valMatch { + fmt.Printf("expected span %s attribute %s value not match: %s", spanName, k, v) + fmt.Println("") + return false + } + } + } + } + return true + }, time.Second*30, time.Second) + }) + }) + } + + Context("SDK configuration by env", func() { + var app *app.Application + var proxyClient *resty.Client + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{helper.DefaultEndpoint()}, + Sources: []*entities.Source{helper.DefaultSource()}, + } + entitiesConfig.Sources[0].Async = false + + BeforeAll(func() { + var err error + helper.InitOtelOutput() + helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + app, err = helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_TRACING_SERVICE_NAME": "WebhookX", // env splite by _ + "WEBHOOKX_TRACING_SAMPLING_RATE": "1", + "WEBHOOKX_TRACING_ATTRIBUTES": `{"env":"test"}`, + "WEBHOOKX_TRACING_OPENTELEMETRY_PROTOCOL": string(config.OtlpProtocolHTTP), + "WEBHOOKX_TRACING_OPENTELEMETRY_ENDPOINT": "http://localhost:4318/v1/traces", + "OTEL_RESOURCE_ATTRIBUTES": "service.version=0.3", + "OTEL_SERVICE_NAME": "WebhookX-Test", // env override + }) + assert.Nil(GinkgoT(), err) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + n, err := helper.FileCountLine(helper.OtelCollectorTracesFile) + assert.Nil(GinkgoT(), err) + n++ + assert.Eventually(GinkgoT(), func() bool { + resp, err := proxyClient.R(). + SetBody(`{ + "event_type": "foo.bar", + "data": { + "key": "value" + } + }`). + SetQueryParam("test", "true"). + Post("/") + return err == nil && resp.StatusCode() == 200 + }, time.Second*5, time.Second) + + expected := map[string]string{"service.name": "WebhookX-Test", "service.version": "0.3", "env": "test"} + assert.Eventually(GinkgoT(), func() bool { + line, err := helper.FileLine(helper.OtelCollectorTracesFile, n) + if err != nil || line == "" { + return false + } + n++ + var req ExportedTrace + _ = json.Unmarshal([]byte(line), &req) + attributesMap := make(map[string]string) + for _, resourceSpan := range req.ResourceSpans { + for _, attr := range resourceSpan.Resource.Attributes { + if attr.Value.StringValue != nil { + attributesMap[attr.Key] = *attr.Value.StringValue + } + } + } + for name, expectVal := range expected { + if val, ok := attributesMap[name]; !ok || val != expectVal { + fmt.Printf("expected attribute %s not exist or value %s not match", name, val) + fmt.Println("") + return false + } + } + return true + }, time.Second*30, time.Second) + }) + }) +}) diff --git a/test/tracing/types.go b/test/tracing/types.go new file mode 100644 index 0000000..99a76b7 --- /dev/null +++ b/test/tracing/types.go @@ -0,0 +1,103 @@ +package tracing + +import "fmt" + +type ExportedTrace struct { + ResourceSpans []struct { + Resource struct { + Attributes []struct { + Key string `json:"key"` + Value struct { + ArrayValue *struct { + Values []struct { + StringValue string `json:"stringValue"` + } `json:"values"` + } `json:"arrayValue,omitempty"` + IntValue *string `json:"intValue,omitempty"` + StringValue *string `json:"stringValue,omitempty"` + } `json:"value"` + } `json:"attributes"` + } `json:"resource"` + SchemaURL string `json:"schemaUrl"` + ScopeSpans []struct { + Scope struct { + Name string `json:"name"` + Version *string `json:"version,omitempty"` + } `json:"scope"` + Spans []struct { + Attributes []struct { + Key string `json:"key"` + Value struct { + IntValue *string `json:"intValue,omitempty"` + StringValue *string `json:"stringValue,omitempty"` + ArrayValue *struct { + Values []struct { + StringValue string `json:"stringValue"` + } `json:"values"` + } `json:"arrayValue,omitempty"` + } `json:"value"` + } `json:"attributes,omitempty"` + EndTimeUnixNano string `json:"endTimeUnixNano"` + Flags int `json:"flags"` + Kind int `json:"kind"` + Name string `json:"name"` + ParentSpanID string `json:"parentSpanId"` + SpanID string `json:"spanId"` + StartTimeUnixNano string `json:"startTimeUnixNano"` + Status struct { + } `json:"status"` + TraceID string `json:"traceId"` + } `json:"spans"` + } `json:"scopeSpans"` + } `json:"resourceSpans"` +} + +func (t *ExportedTrace) filterSpansByTraceID(traceID string) (scopeNames map[string]bool, spanAttrs map[string]map[string]string) { + scopeNames = make(map[string]bool) + spanAttrs = make(map[string]map[string]string) + for _, resourceSpan := range t.ResourceSpans { + scopeSpans := resourceSpan.ScopeSpans + for _, scopeSpan := range scopeSpans { + scopeNames[scopeSpan.Scope.Name] = true + for _, span := range scopeSpan.Spans { + if span.TraceID != traceID { + continue + } + attributes := make(map[string]string) + for _, attr := range span.Attributes { + if attr.Value.StringValue != nil { + attributes[attr.Key] = *attr.Value.StringValue + } else if attr.Value.IntValue != nil { + attributes[attr.Key] = *attr.Value.IntValue + } else if attr.Value.ArrayValue != nil { + if len(attr.Value.ArrayValue.Values) == 1 { + attributes[attr.Key] = attr.Value.ArrayValue.Values[0].StringValue + } else { + var values []string + for _, v := range attr.Value.ArrayValue.Values { + values = append(values, v.StringValue) + } + attributes[attr.Key] = fmt.Sprintf("[%s]", values) + } + } + } + spanAttrs[span.Name] = attributes + } + } + } + return +} + +func (t *ExportedTrace) getTraceIDBySpanName(spanName string) string { + for _, resourceSpan := range t.ResourceSpans { + scopeSpans := resourceSpan.ScopeSpans + for _, scopeSpan := range scopeSpans { + for _, span := range scopeSpan.Spans { + if span.Name == spanName { + return span.TraceID + } + } + } + } + return "" +} diff --git a/test/tracing/worker_test.go b/test/tracing/worker_test.go new file mode 100644 index 0000000..c0038d5 --- /dev/null +++ b/test/tracing/worker_test.go @@ -0,0 +1,168 @@ +package tracing + +import ( + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + . "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/utils" + "time" +) + +var _ = Describe("tracing worker", Ordered, func() { + endpoints := map[string]string{ + "http/protobuf": "http://localhost:4318/v1/traces", + "grpc": "localhost:4317", + } + for protocol, address := range endpoints { + Context(protocol, func() { + var app *app.Application + var proxyClient *resty.Client + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{helper.DefaultEndpoint()}, + Sources: []*entities.Source{helper.DefaultSource()}, + } + entitiesConfig.Sources[0].Async = false + + BeforeAll(func() { + helper.InitOtelOutput() + helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + envs := map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_TRACING_ENABLED": "true", + "WEBHOOKX_WORKER_ENABLED": "true", // env splite by _ + "WEBHOOKX_TRACING_SERVICE_NAME": "WebhookX", // env splite by _ + "WEBHOOKX_TRACING_SAMPLING_RATE": "1.0", + "WEBHOOKX_TRACING_OPENTELEMETRY_PROTOCOL": protocol, + "WEBHOOKX_TRACING_OPENTELEMETRY_ENDPOINT": address, + } + + app = utils.Must(helper.Start(envs)) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + expectedScopeNames := []string{ + "github.com/XSAM/otelsql", + "github.com/webhookx-io/webhookx", + } + expectedScopeSpans := map[string]map[string]string{ + "taskqueue.redis.get": {}, + "worker.handle": {}, + "dao.endpoints.get": {}, + "dao.plugins.list": {}, + "dao.events.get": {}, + "worker.deliver": {}, + "dao.attempt_details.upsert": {}, + "taskqueue.redis.delete": {}, + "sql.conn.query": { + "db.statement": "SELECT * FROM plugins WHERE enabled = $1 AND endpoint_id = $2", + }, + "sql.conn.exec": { + "db.statement": "INSERT INTO attempt_details (id, request_headers, request_body, response_headers, response_body, created_at, updated_at, ws_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \n\t\tON CONFLICT (id) DO UPDATE SET \n\t\trequest_headers = EXCLUDED.request_headers, \n\t\trequest_body = EXCLUDED.request_body, \n\t\tresponse_headers = EXCLUDED.response_headers, \n\t\tresponse_body = EXCLUDED.response_body, \n\t\tupdated_at = EXCLUDED.updated_at", + }, + } + + n, err := helper.FileCountLine(helper.OtelCollectorTracesFile) + assert.Nil(GinkgoT(), err) + n++ + + // wait for export + proxyFunc := func() bool { + resp, err := proxyClient.R(). + SetBody(`{ + "event_type": "foo.bar", + "data": { + "key": "value" + } + }`).Post("/") + return err == nil && resp.StatusCode() == 200 + } + assert.Eventually(GinkgoT(), proxyFunc, time.Second*5, time.Second) + + time.Sleep(time.Second * 3) + + gotScopeNames := make(map[string]bool) + gotSpanAttributes := make(map[string]map[string]string) + + assert.Eventually(GinkgoT(), func() bool { + line, err := helper.FileLine(helper.OtelCollectorTracesFile, n) + if err != nil || line == "" { + fmt.Printf("read empty line %d", n) + fmt.Println("") + proxyFunc() + return false + } + n++ + var trace ExportedTrace + err = json.Unmarshal([]byte(line), &trace) + if err != nil { + fmt.Printf("unmarshal err %v", err) + return false + } + + if len(trace.ResourceSpans) == 0 { + fmt.Printf("no resource spans") + return false + } + + // make sure worker handle full trace + traceID := trace.getTraceIDBySpanName("worker.handle") + if traceID == "" { + fmt.Printf("trace id not exist") + return false + } + scopeNames, spanAttrs := trace.filterSpansByTraceID(traceID) + for k, v := range scopeNames { + gotScopeNames[k] = v + } + for k, v := range spanAttrs { + gotSpanAttributes[k] = v + } + + for _, expectedScopeName := range expectedScopeNames { + if !gotScopeNames[expectedScopeName] { + fmt.Printf("scope %s not exist", expectedScopeName) + fmt.Println("") + return false + } + } + + for spanName, expectedAttributes := range expectedScopeSpans { + gotAttributes, ok := gotSpanAttributes[spanName] + if !ok { + fmt.Printf("span %s not exist", spanName) + fmt.Println() + return false + } + + if len(expectedAttributes) > 0 { + for k, v := range expectedAttributes { + if _, ok := gotAttributes[k]; !ok { + fmt.Printf("expected span %s attribute %s not exist", spanName, k) + fmt.Println("") + return false + } + valMatch := (v == "*" || gotAttributes[k] == v) + if !valMatch { + fmt.Printf("expected span %s attribute %s value not match: %s", spanName, k, v) + fmt.Println("") + return false + } + } + } + } + return true + }, time.Second*30, time.Second) + }) + }) + } +}) diff --git a/test/worker/requeue_test.go b/test/worker/requeue_test.go index afb29da..be89040 100644 --- a/test/worker/requeue_test.go +++ b/test/worker/requeue_test.go @@ -10,6 +10,7 @@ import ( "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/pkg/metrics" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/test/mocks" "github.com/webhookx-io/webhookx/utils" @@ -26,6 +27,7 @@ var _ = Describe("processRequeue", Ordered, func() { var w *worker.Worker var ctrl *gomock.Controller var queue *mocks.MockTaskQueue + var tracer *tracing.Tracer endpoint := helper.DefaultEndpoint() BeforeAll(func() { @@ -42,7 +44,7 @@ var _ = Describe("processRequeue", Ordered, func() { assert.NoError(GinkgoT(), err) w = worker.NewWorker(worker.WorkerOptions{ RequeueJobInterval: time.Second, - }, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics) + }, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics, tracer) // data ws := utils.Must(db.Workspaces.GetDefault(context.TODO())) diff --git a/worker/deliverer/deliverer.go b/worker/deliverer/deliverer.go index 3186060..24ddd95 100644 --- a/worker/deliverer/deliverer.go +++ b/worker/deliverer/deliverer.go @@ -1,13 +1,14 @@ package deliverer import ( + "context" "fmt" "net/http" "time" ) type Deliverer interface { - Deliver(req *Request) (res *Response) + Deliver(ctx context.Context, req *Request) (res *Response) } type Request struct { diff --git a/worker/deliverer/http.go b/worker/deliverer/http.go index a2126c0..9485faa 100644 --- a/worker/deliverer/http.go +++ b/worker/deliverer/http.go @@ -5,6 +5,8 @@ import ( "context" "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/pkg/tracing" + "go.opentelemetry.io/otel/trace" "io" "net/http" "time" @@ -18,7 +20,6 @@ type HTTPDeliverer struct { func NewHTTPDeliverer(cfg *config.WorkerDeliverer) *HTTPDeliverer { client := &http.Client{} - return &HTTPDeliverer{ defaultTimeout: time.Duration(cfg.Timeout) * time.Millisecond, client: client, @@ -32,13 +33,18 @@ func timing(fn func()) time.Duration { return time.Duration(stop.UnixNano() - start.UnixNano()) } -func (d *HTTPDeliverer) Deliver(req *Request) (res *Response) { +func (d *HTTPDeliverer) Deliver(ctx context.Context, req *Request) (res *Response) { + if tracer := tracing.TracerFromContext(ctx); tracer != nil { + tracingCtx, span := tracer.Start(ctx, "worker.deliver", trace.WithSpanKind(trace.SpanKindClient)) + defer span.End() + ctx = tracingCtx + } timeout := req.Timeout if timeout == 0 { timeout = d.defaultTimeout } - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() res = &Response{ diff --git a/worker/deliverer/http_test.go b/worker/deliverer/http_test.go index d6e7e85..90418dd 100644 --- a/worker/deliverer/http_test.go +++ b/worker/deliverer/http_test.go @@ -4,10 +4,11 @@ import ( "context" "encoding/json" "errors" - "github.com/stretchr/testify/assert" - "github.com/webhookx-io/webhookx/config" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/config" ) func Test(t *testing.T) { @@ -26,7 +27,7 @@ func Test(t *testing.T) { }, } - res := deliverer.Deliver(req) + res := deliverer.Deliver(context.Background(), req) assert.NoError(t, res.Error) assert.Equal(t, res.StatusCode, 200) data := make(map[string]interface{}) @@ -49,7 +50,7 @@ func Test(t *testing.T) { Timeout: time.Microsecond * 1, } - res := deliverer.Deliver(req) + res := deliverer.Deliver(context.Background(), req) assert.NotNil(t, res.Error) assert.True(t, errors.Is(res.Error, context.DeadlineExceeded)) }) diff --git a/worker/worker.go b/worker/worker.go index f9826ae..5d532be 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -4,24 +4,25 @@ import ( "context" "errors" "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/dao" + "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/mcache" + "github.com/webhookx-io/webhookx/model" "github.com/webhookx-io/webhookx/pkg/metrics" "github.com/webhookx-io/webhookx/pkg/plugin" plugintypes "github.com/webhookx-io/webhookx/pkg/plugin/types" "github.com/webhookx-io/webhookx/pkg/pool" "github.com/webhookx-io/webhookx/pkg/schedule" "github.com/webhookx-io/webhookx/pkg/taskqueue" - "runtime" - "time" - - "github.com/webhookx-io/webhookx/db" - "github.com/webhookx-io/webhookx/db/dao" - "github.com/webhookx-io/webhookx/db/entities" - "github.com/webhookx-io/webhookx/model" + "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" "github.com/webhookx-io/webhookx/worker/deliverer" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "runtime" + "time" ) type Worker struct { @@ -35,6 +36,7 @@ type Worker struct { queue taskqueue.TaskQueue deliverer deliverer.Deliverer DB *db.DB + tracer *tracing.Tracer pool *pool.Pool metrics *metrics.Metrics } @@ -51,7 +53,8 @@ func NewWorker( db *db.DB, deliverer deliverer.Deliverer, queue taskqueue.TaskQueue, - metrics *metrics.Metrics) *Worker { + metrics *metrics.Metrics, + tracer *tracing.Tracer) *Worker { opts.RequeueJobBatch = utils.DefaultIfZero(opts.RequeueJobBatch, constants.RequeueBatch) opts.RequeueJobInterval = utils.DefaultIfZero(opts.RequeueJobInterval, constants.RequeueInterval) @@ -69,6 +72,7 @@ func NewWorker( DB: db, pool: pool.NewPool(opts.PoolSize, opts.PoolConcurrency), metrics: metrics, + tracer: tracer, } return worker @@ -87,7 +91,14 @@ func (w *Worker) run() { return case <-ticker.C: for { - ctx := context.TODO() + // TODO: start trace with task context + ctx := context.Background() + if w.tracer != nil { + start := time.Now() + tracingCtx, span := w.tracer.Start(ctx, "worker.fetch", trace.WithSpanKind(trace.SpanKindConsumer), trace.WithTimestamp(start)) + defer span.End() + ctx = tracingCtx + } tasks, err := w.queue.Get(ctx, options) if err != nil { w.log.Errorf("[worker] failed to get tasks from queue: %v", err) @@ -101,6 +112,13 @@ func (w *Worker) run() { var errs []error for _, task := range tasks { err = w.pool.SubmitFn(time.Second*5, func() { + // TODO: start trace with task Context + if w.tracer != nil { + start := time.Now() + tracingCtx, span := w.tracer.Start(ctx, "worker.handle", trace.WithSpanKind(trace.SpanKindClient), trace.WithTimestamp(start)) + defer span.End() + ctx = tracingCtx + } task.Data = &model.MessageData{} err = task.UnmarshalData(task.Data) if err != nil { @@ -268,7 +286,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er // deliver the request startAt := time.Now() - response := w.deliverer.Deliver(request) + response := w.deliverer.Deliver(ctx, request) finishAt := time.Now() if response.Error != nil {