Skip to content

Commit

Permalink
feat(observability): optimize review request changes
Browse files Browse the repository at this point in the history
feat(observability): correct config

feat(observability): fix otelsql tracing missing
  • Loading branch information
cchenggit committed Nov 28, 2024
1 parent 164487f commit 79ef232
Show file tree
Hide file tree
Showing 41 changed files with 656 additions and 1,170 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DIR := $(shell pwd)
export WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH=$(DIR)/test/output/otel

LDFLAGS = --ldflags "\
-X github.com/webhookx-io/webhookx/config.COMMIT=`git rev-parse --verify --short HEAD` \
Expand All @@ -21,7 +22,6 @@ generate:
go generate ./...

test-deps:
export WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH=$(DIR)/test/output/otel
mkdir -p test/output/otel
docker compose -f test/docker-compose.yml up -d

Expand All @@ -41,4 +41,4 @@ goreleaser:
goreleaser release --snapshot --clean

migrate-create:
migrate create -ext sql -dir db/migrations -seq -digits 1 $(message)
migrate create -ext sql -dir db/migrations -seq -digits 1 $(message)
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ WebhookX is an open-source webhooks gateway for message receiving, processing, a
## Installation

```shell
$ docker compose up
docker compose up
```

```shell
$ curl http://localhost:8080
curl http://localhost:8080
```


Expand All @@ -48,7 +48,7 @@ $ curl http://localhost:8080
> **Endpoint** represents the event's destination.
```
$ curl -X POST http://localhost:8080/workspaces/default/endpoints \
curl -X POST http://localhost:8080/workspaces/default/endpoints \
--header 'Content-Type: application/json' \
--data '{
"request": {
Expand All @@ -69,7 +69,7 @@ $ curl -X POST http://localhost:8080/workspaces/default/endpoints \
> **Source** represents the ingress of events
```
$ curl -X POST http://localhost:8080/workspaces/default/sources \
curl -X POST http://localhost:8080/workspaces/default/sources \
--header 'accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
Expand All @@ -81,7 +81,7 @@ $ curl -X POST http://localhost:8080/workspaces/default/sources \
#### 3. Send an event to the Proxy (port 8081)

```
$ curl -X POST http://localhost:8081 \
curl -X POST http://localhost:8081 \
--header 'Content-Type: application/json' \
--data '{
"event_type": "charge.succeeded",
Expand All @@ -96,7 +96,7 @@ $ curl -X POST http://localhost:8081 \
> Attempt represents an event delivery attempt, and contains inspection information of a delivery.
```
$ curl http://localhost:8080/workspaces/default/attempts
curl http://localhost:8080/workspaces/default/attempts
```

<details>
Expand Down
13 changes: 3 additions & 10 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@ package admin

import (
"context"
"github.com/webhookx-io/webhookx/config"
"go.uber.org/zap"
"net/http"
"os"
"time"

"github.com/webhookx-io/webhookx/config"
"github.com/webhookx-io/webhookx/pkg/middlewares"
"go.uber.org/zap"
)

// Admin is an HTTP Server
type Admin struct {
s *http.Server
}

func NewAdmin(cfg config.AdminConfig, handler http.Handler, observabilityManager *middlewares.ObservabilityManager) *Admin {
if observabilityManager.Enabled {
chain := observabilityManager.BuildChain(context.Background(), "admin")
handler = chain.Then(handler)
}

func NewAdmin(cfg config.AdminConfig, handler http.Handler) *Admin {
s := &http.Server{
Handler: handler,
Addr: cfg.Listen,
Expand Down
18 changes: 11 additions & 7 deletions admin/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package api

import (
"encoding/json"
"net/http"
"strconv"

"github.com/gorilla/mux"
"github.com/webhookx-io/webhookx/config"
"github.com/webhookx-io/webhookx/db"
"github.com/webhookx-io/webhookx/db/query"
"github.com/webhookx-io/webhookx/dispatcher"
"github.com/webhookx-io/webhookx/pkg/errs"
"github.com/webhookx-io/webhookx/pkg/middlewares"
"github.com/webhookx-io/webhookx/pkg/tracing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
"net/http"
"strconv"
)

const (
Expand All @@ -24,14 +24,16 @@ type API struct {
log *zap.SugaredLogger
DB *db.DB
dispatcher *dispatcher.Dispatcher
tracer *tracing.Tracer
}

func NewAPI(cfg *config.Config, db *db.DB, dispatcher *dispatcher.Dispatcher) *API {
func NewAPI(cfg *config.Config, db *db.DB, dispatcher *dispatcher.Dispatcher, tracer *tracing.Tracer) *API {
return &API{
cfg: cfg,
log: zap.S(),
DB: db,
dispatcher: dispatcher,
tracer: tracer,
}
}

Expand Down Expand Up @@ -96,8 +98,10 @@ func (api *API) assert(err error) {
// Handler returns a http.Handler
func (api *API) Handler() http.Handler {
r := mux.NewRouter()

r.Use(middlewares.PanicRecovery)
if api.tracer != nil {
r.Use(otelhttp.NewMiddleware("api.admin"))
}
r.Use(panicRecovery)
r.Use(api.contextMiddleware)

r.HandleFunc("/", api.Index).Methods("GET")
Expand Down
44 changes: 20 additions & 24 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"sync"

"time"

"github.com/webhookx-io/webhookx/admin"
"github.com/webhookx-io/webhookx/admin/api"
"github.com/webhookx-io/webhookx/config"
Expand All @@ -18,12 +14,14 @@ import (
"github.com/webhookx-io/webhookx/pkg/cache"
"github.com/webhookx-io/webhookx/pkg/log"
"github.com/webhookx-io/webhookx/pkg/metrics"
"github.com/webhookx-io/webhookx/pkg/middlewares"
"github.com/webhookx-io/webhookx/pkg/taskqueue"
"github.com/webhookx-io/webhookx/pkg/tracing"
"github.com/webhookx-io/webhookx/proxy"
"github.com/webhookx-io/webhookx/worker"
"github.com/webhookx-io/webhookx/worker/deliverer"
"go.uber.org/zap"
"sync"
"time"
)

var (
Expand All @@ -47,10 +45,10 @@ type Application struct {
bus *eventbus.EventBus
metrics *metrics.Metrics

admin *admin.Admin
gateway *proxy.Gateway
worker *worker.Worker
observabilityManager *middlewares.ObservabilityManager
admin *admin.Admin
gateway *proxy.Gateway
worker *worker.Worker
tracer *tracing.Tracer
}

func NewApplication(cfg *config.Config) (*Application, error) {
Expand Down Expand Up @@ -93,6 +91,14 @@ func (app *Application) initialize() error {
app.log)
registerEventHandler(app.bus)

// tracing
tracer, err := tracing.New(&cfg.Tracing)
if err != nil {
return err
}

app.tracer = tracer

// db
db, err := db.NewDB(&cfg.Database)
if err != nil {
Expand All @@ -113,32 +119,25 @@ func (app *Application) initialize() error {

app.dispatcher = dispatcher.NewDispatcher(log.Sugar(), queue, db, app.metrics)

observabilityManager, err := middlewares.NewObservabilityManager(&cfg.Tracing)
if err != nil {
return err
}
app.observabilityManager = observabilityManager

// worker
if cfg.Worker.Enabled {
opts := worker.WorkerOptions{
PoolSize: int(cfg.Worker.Pool.Size),
PoolConcurrency: int(cfg.Worker.Pool.Concurrency),
}
deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer)
tracer := app.observabilityManager.Tracer()
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer)
}

// admin
if cfg.Admin.IsEnabled() {
handler := api.NewAPI(cfg, db, app.dispatcher).Handler()
app.admin = admin.NewAdmin(cfg.Admin, handler, app.observabilityManager)
handler := api.NewAPI(cfg, db, app.dispatcher, app.tracer).Handler()
app.admin = admin.NewAdmin(cfg.Admin, handler)
}

// gateway
if cfg.Proxy.IsEnabled() {
app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics, app.observabilityManager)
app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics, app.tracer)
}

return nil
Expand Down Expand Up @@ -228,11 +227,8 @@ func (app *Application) Stop() error {
_ = app.worker.Stop()
}

if app.observabilityManager != nil {
err := app.observabilityManager.Close()
if err != nil {
app.log.Infof("failed to call observability close: %v", err)
}
if app.tracer != nil {
_ = app.tracer.Stop()
}

app.started = false
Expand Down
29 changes: 14 additions & 15 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

log:
file: /dev/stdout
level: info # supported values are debug, info, warn, and error.
format: text # supported values are text and json
level: info # supported values are debug, info, warn, and error.
format: text # supported values are text and json

database:
host: localhost
Expand All @@ -20,6 +20,8 @@ redis:
password:
database: 0



#------------------------------------------------------------------------------
# ADMIN
#------------------------------------------------------------------------------
Expand All @@ -36,24 +38,24 @@ worker:
deliverer:
timeout: 60000
pool:
size: 10000 # pool size, default to 10000.
concurrency: 0 # pool concurrency, default to 100 * CPUs
size: 10000 # pool size, default to 10000.
concurrency: 0 # pool concurrency, default to 100 * CPUs

#------------------------------------------------------------------------------
# PROXY
#------------------------------------------------------------------------------
proxy:
#listen: 127.0.0.1:8081
timeout_read: 10 # read timeout (in seconds), 0 indicates unlimited.
timeout_write: 60 # write timeout (in seconds), 0 indicates unlimited.
timeout_read: 10 # read timeout (in seconds), 0 indicates unlimited.
timeout_write: 60 # write timeout (in seconds), 0 indicates unlimited.
max_request_body_size: 1048576
response:
code: 200
content-type: application/json
body: '{"message": "OK"}'

queue:
type: redis # supported values are redis, off
type: redis # supported values are redis, off
redis:
host: localhost
port: 6379
Expand All @@ -72,18 +74,15 @@ metrics:
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/metrics # http/protobuf(http://localhost:4318/v1/metrics), grpc(localhost:4317)

#------------------------------------------------------------------------------
# TRACING
#------------------------------------------------------------------------------
tracing:
enabled: false
service_name: WebhookX
attributes: # global attributes for each trace
env: prod
captured_request_headers: # optional,array, additional headers sent with traces by the exporter to the collector
captured_response_headers: # optional,array, additional headers send with traces by the exporter to the collector
safe_query_params: # optional,array, the list of query parameters to not redact
sampling_rate: 1.0
opentelemetry:
protocal: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317)
headers: # custom headers to be sent with the request
foo: bar
baz: buz
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/traces # http/protobuf(http://localhost:4318/v1/traces), grpc(localhost:4317)
11 changes: 7 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package config

import (
"encoding/json"
"os"

"github.com/creasty/defaults"
uuid "github.com/satori/go.uuid"
"github.com/webhookx-io/webhookx/pkg/envconfig"
"gopkg.in/yaml.v3"
"os"
)

var (
Expand Down Expand Up @@ -76,7 +75,7 @@ func Init() (*Config, error) {
if err != nil {
return nil, err
}

cfg.injectTracingEnabled()
return &cfg, nil
}

Expand All @@ -100,6 +99,10 @@ func InitWithFile(filename string) (*Config, error) {
if err != nil {
return nil, err
}

cfg.injectTracingEnabled()
return &cfg, nil
}

func (cfg *Config) injectTracingEnabled() {
cfg.Database.SetTracingEnabled(cfg.Tracing.Enabled)
}
Loading

0 comments on commit 79ef232

Please sign in to comment.