Skip to content

Commit

Permalink
feat(observability): support otel tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
cchenggit committed Nov 28, 2024
1 parent 9f55161 commit c45a38e
Show file tree
Hide file tree
Showing 41 changed files with 1,454 additions and 99 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
run: |
mkdir -p test/output/otel
sudo chmod 777 -R test/output/otel
make deps
make test-deps
sleep 3
docker compose -f test/docker-compose.yml logs
Expand Down
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
DIR := $(shell pwd)

export WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH=$(DIR)/test/output/otel

LDFLAGS = --ldflags "\
-X github.com/webhookx-io/webhookx/config.COMMIT=`git rev-parse --verify --short HEAD` \
-X github.com/webhookx-io/webhookx/config.VERSION=`git tag -l --points-at HEAD | head -n 1`"

.PHONY: clean build install generate test test-coverage test-integration \
test-integration-coverage goreleaser migrate-create deps
test-integration-coverage goreleaser migrate-create test-deps

clean:
go clean
Expand All @@ -22,7 +21,7 @@ install:
generate:
go generate ./...

deps:
test-deps:
mkdir -p test/output/otel
docker compose -f test/docker-compose.yml up -d

Expand All @@ -42,4 +41,4 @@ goreleaser:
goreleaser release --snapshot --clean

migrate-create:
migrate create -ext sql -dir db/migrations -seq -digits 1 $(message)
migrate create -ext sql -dir db/migrations -seq -digits 1 $(message)
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ WebhookX is an open-source webhooks gateway for message receiving, processing, a
## Installation

```shell
$ docker compose up
docker compose up
```

```shell
$ curl http://localhost:8080
curl http://localhost:8080
```


Expand All @@ -48,7 +48,7 @@ $ curl http://localhost:8080
> **Endpoint** represents the event's destination.
```
$ curl -X POST http://localhost:8080/workspaces/default/endpoints \
curl -X POST http://localhost:8080/workspaces/default/endpoints \
--header 'Content-Type: application/json' \
--data '{
"request": {
Expand All @@ -69,7 +69,7 @@ $ curl -X POST http://localhost:8080/workspaces/default/endpoints \
> **Source** represents the ingress of events
```
$ curl -X POST http://localhost:8080/workspaces/default/sources \
curl -X POST http://localhost:8080/workspaces/default/sources \
--header 'accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
Expand All @@ -81,7 +81,7 @@ $ curl -X POST http://localhost:8080/workspaces/default/sources \
#### 3. Send an event to the Proxy (port 8081)

```
$ curl -X POST http://localhost:8081 \
curl -X POST http://localhost:8081 \
--header 'Content-Type: application/json' \
--data '{
"event_type": "charge.succeeded",
Expand All @@ -96,7 +96,7 @@ $ curl -X POST http://localhost:8081 \
> Attempt represents an event delivery attempt, and contains inspection information of a delivery.
```
$ curl http://localhost:8080/workspaces/default/attempts
curl http://localhost:8080/workspaces/default/attempts
```

<details>
Expand Down
1 change: 0 additions & 1 deletion admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func NewAdmin(cfg config.AdminConfig, handler http.Handler) *Admin {

WriteTimeout: 60 * time.Second,
ReadTimeout: 60 * time.Second,

// TODO: expose more to be configurable
}

Expand Down
10 changes: 8 additions & 2 deletions admin/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/webhookx-io/webhookx/db/query"
"github.com/webhookx-io/webhookx/dispatcher"
"github.com/webhookx-io/webhookx/pkg/errs"
"github.com/webhookx-io/webhookx/pkg/tracing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
"net/http"
"strconv"
Expand All @@ -22,14 +24,16 @@ type API struct {
log *zap.SugaredLogger
DB *db.DB
dispatcher *dispatcher.Dispatcher
tracer *tracing.Tracer
}

func NewAPI(cfg *config.Config, db *db.DB, dispatcher *dispatcher.Dispatcher) *API {
func NewAPI(cfg *config.Config, db *db.DB, dispatcher *dispatcher.Dispatcher, tracer *tracing.Tracer) *API {
return &API{
cfg: cfg,
log: zap.S(),
DB: db,
dispatcher: dispatcher,
tracer: tracer,
}
}

Expand Down Expand Up @@ -94,7 +98,9 @@ func (api *API) assert(err error) {
// Handler returns a http.Handler
func (api *API) Handler() http.Handler {
r := mux.NewRouter()

if api.tracer != nil {
r.Use(otelhttp.NewMiddleware("api.admin"))
}
r.Use(panicRecovery)
r.Use(api.contextMiddleware)

Expand Down
5 changes: 3 additions & 2 deletions admin/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"runtime"

"github.com/gorilla/mux"
"github.com/webhookx-io/webhookx/db/entities"
"github.com/webhookx-io/webhookx/db/errs"
"github.com/webhookx-io/webhookx/pkg/ucontext"
"go.uber.org/zap"
"net/http"
"runtime"
)

func (api *API) contextMiddleware(next http.Handler) http.Handler {
Expand Down
20 changes: 17 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/webhookx-io/webhookx/pkg/log"
"github.com/webhookx-io/webhookx/pkg/metrics"
"github.com/webhookx-io/webhookx/pkg/taskqueue"
"github.com/webhookx-io/webhookx/pkg/tracing"
"github.com/webhookx-io/webhookx/proxy"
"github.com/webhookx-io/webhookx/worker"
"github.com/webhookx-io/webhookx/worker/deliverer"
Expand Down Expand Up @@ -47,6 +48,7 @@ type Application struct {
admin *admin.Admin
gateway *proxy.Gateway
worker *worker.Worker
tracer *tracing.Tracer
}

func NewApplication(cfg *config.Config) (*Application, error) {
Expand Down Expand Up @@ -89,6 +91,14 @@ func (app *Application) initialize() error {
app.log)
registerEventHandler(app.bus)

// tracing
tracer, err := tracing.New(&cfg.Tracing)
if err != nil {
return err
}

app.tracer = tracer

// db
db, err := db.NewDB(&cfg.Database)
if err != nil {
Expand Down Expand Up @@ -116,18 +126,18 @@ func (app *Application) initialize() error {
PoolConcurrency: int(cfg.Worker.Pool.Concurrency),
}
deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer)
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics)
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer)
}

// admin
if cfg.Admin.IsEnabled() {
handler := api.NewAPI(cfg, db, app.dispatcher).Handler()
handler := api.NewAPI(cfg, db, app.dispatcher, app.tracer).Handler()
app.admin = admin.NewAdmin(cfg.Admin, handler)
}

// gateway
if cfg.Proxy.IsEnabled() {
app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics)
app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics, app.tracer)
}

return nil
Expand Down Expand Up @@ -221,6 +231,10 @@ func (app *Application) Stop() error {
_ = app.worker.Stop()
}

if app.tracer != nil {
_ = app.tracer.Stop()
}

app.started = false
app.stop <- struct{}{}

Expand Down
13 changes: 13 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,16 @@ metrics:
push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/metrics # http/protobuf(http://localhost:4318/v1/metrics), grpc(localhost:4317)

#------------------------------------------------------------------------------
# TRACING
#------------------------------------------------------------------------------
tracing:
enabled: false
service_name: WebhookX
attributes: # global attributes for each trace
env: prod
sampling_rate: 1.0
opentelemetry:
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317)
13 changes: 11 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
Proxy ProxyConfig `yaml:"proxy" envconfig:"PROXY"`
Worker WorkerConfig `yaml:"worker" envconfig:"WORKER"`
Metrics MetricsConfig `yaml:"metrics" envconfig:"METRICS"`
Tracing TracingConfig `yaml:"tracing" envconfig:"TRACING"`
}

func (cfg Config) String() string {
Expand Down Expand Up @@ -58,6 +59,10 @@ func (cfg Config) Validate() error {
return err
}

if err := cfg.Tracing.Validate(); err != nil {
return err
}

return nil
}

Expand All @@ -70,7 +75,7 @@ func Init() (*Config, error) {
if err != nil {
return nil, err
}

cfg.injectTracingEnabled()
return &cfg, nil
}

Expand All @@ -94,6 +99,10 @@ func InitWithFile(filename string) (*Config, error) {
if err != nil {
return nil, err
}

cfg.injectTracingEnabled()
return &cfg, nil
}

func (cfg *Config) injectTracingEnabled() {
cfg.Database.SetTracingEnabled(cfg.Tracing.Enabled)
}
39 changes: 39 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,45 @@ func TestMetricsConfig(t *testing.T) {
}
}

func TestTracingConfig(t *testing.T) {
tests := []struct {
desc string
cfg TracingConfig
expectedValidateErr error
}{
{
desc: "sanity",
cfg: TracingConfig{
Enabled: true,
ServiceName: "WebhookX",
SamplingRate: 0,
Opentelemetry: Opentelemetry{
Protocol: "http/protobuf",
Endpoint: "http://localhost:4318/v1/traces",
},
},
expectedValidateErr: nil,
},
{
desc: "invalid sampling rate",
cfg: TracingConfig{
Enabled: true,
ServiceName: "WebhookX",
SamplingRate: 1.1,
Opentelemetry: Opentelemetry{
Protocol: "http/protobuf",
Endpoint: "http://localhost:4318/v1/traces",
},
},
expectedValidateErr: errors.New("invalid sampling rate, must be [0,1]"),
},
}
for _, test := range tests {
actualValidateErr := test.cfg.Validate()
assert.Equal(t, test.expectedValidateErr, actualValidateErr, "expected %v got %v", test.expectedValidateErr, actualValidateErr)
}
}

func TestConfig(t *testing.T) {
cfg, err := Init()
assert.Nil(t, err)
Expand Down
50 changes: 42 additions & 8 deletions config/database.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package config

import (
"database/sql"
"fmt"
"github.com/XSAM/otelsql"
"time"
)

type DatabaseConfig struct {
Host string `yaml:"host" default:"localhost"`
Port uint32 `yaml:"port" default:"5432"`
Username string `yaml:"username" default:"webhookx"`
Password string `yaml:"password" default:""`
Database string `yaml:"database" default:"webhookx"`
Parameters string `yaml:"parameters" default:"application_name=webhookx&sslmode=disable&connect_timeout=10"`
MaxPoolSize uint32 `yaml:"max_pool_size" default:"40" envconfig:"MAX_POOL_SIZE"`
MaxLifetime uint32 `yaml:"max_life_time" default:"1800" envconfig:"MAX_LIFETIME"`
Host string `yaml:"host" default:"localhost"`
Port uint32 `yaml:"port" default:"5432"`
Username string `yaml:"username" default:"webhookx"`
Password string `yaml:"password" default:""`
Database string `yaml:"database" default:"webhookx"`
Parameters string `yaml:"parameters" default:"application_name=webhookx&sslmode=disable&connect_timeout=10"`
MaxPoolSize uint32 `yaml:"max_pool_size" default:"40" envconfig:"MAX_POOL_SIZE"`
MaxLifetime uint32 `yaml:"max_life_time" default:"1800" envconfig:"MAX_LIFETIME"`
tracingEnabled bool
}

func (cfg DatabaseConfig) GetDSN() string {
Expand All @@ -35,3 +39,33 @@ func (cfg DatabaseConfig) Validate() error {
}
return nil
}

func (cfg *DatabaseConfig) SetTracingEnabled(enabled bool) {
cfg.tracingEnabled = enabled
}

func (cfg *DatabaseConfig) InitSqlDB() (*sql.DB, error) {
var driverName = "postgres"
var err error
if cfg.tracingEnabled {
driverName, err = otelsql.Register(driverName,
otelsql.WithSpanOptions(otelsql.SpanOptions{
OmitConnResetSession: true,
OmitConnPrepare: true,
OmitConnectorConnect: true,
OmitRows: true,
}))
if err != nil {
return nil, err
}
}

db, err := sql.Open(driverName, cfg.GetDSN())
if err != nil {
return nil, err
}
db.SetMaxOpenConns(int(cfg.MaxPoolSize))
db.SetMaxIdleConns(int(cfg.MaxPoolSize))
db.SetConnMaxLifetime(time.Second * time.Duration(cfg.MaxLifetime))
return db, nil
}
1 change: 1 addition & 0 deletions config/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type RedisConfig struct {
Port uint32 `yaml:"port" default:"6379"`
Password string `yaml:"password" default:""`
Database uint32 `yaml:"database" default:"0"`

// fixme: pool property
}

Expand Down
Loading

0 comments on commit c45a38e

Please sign in to comment.