Skip to content

Commit

Permalink
feat(observability): add metrics support (#49)
Browse files Browse the repository at this point in the history
### Summary

Integrated metric, and the OpenTelemetry implementation.

#### Runtime metrics

- `webhookx.runtime.num_goroutine`
- `webhookx.runtime.alloc_bytes`
- `webhookx.runtime.sys_bytes`
- `webhookx.runtime.mallocs`
- `webhookx.runtime.frees`
- `webhookx.runtime.heap_objects`
- `webhookx.runtime.pause_total_ns`
- `webhookx.runtime.num_gc`

#### Request metrics

- `webhookx.request.total`
- `webhookx.request.duration`

#### Event metrics

- `webhookx.event.total`
- `webhookx.event.persisted`
- `webhookx.event.pending`

#### Attempt metrics

- `webhookx.attempt.total`
- `webhookx.attempt.response.duration`
- `webhookx.attempt.pending`
- `webhookx.attempt.failed`
  • Loading branch information
vm-001 authored Oct 23, 2024
1 parent a8309a8 commit 117929a
Show file tree
Hide file tree
Showing 35 changed files with 1,068 additions and 182 deletions.
29 changes: 8 additions & 21 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ jobs:
with:
go-version-file: go.mod

- name: start dependencies
run: |
mkdir -p test/output/otel
sudo chmod 777 -R test/output/otel
make deps
sleep 3
docker compose -f test/docker-compose.yml logs
- name: install
run: make install

Expand All @@ -60,27 +68,6 @@ jobs:
fail_ci_if_error: true
flags: integration

services:
postgres:
image: postgres:13
env:
POSTGRES_USER: webhookx
POSTGRES_DB: webhookx
POSTGRES_HOST_AUTH_METHOD: trust
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 3s --health-timeout 5s --health-retries 3

redis:
image: redis:6
ports:
- 6379:6379

httpbin:
image: kennethreitz/httpbin
ports:
- 9999:80

test-docker-compose:
runs-on: ubuntu-latest

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ coverage.*
webhookx
*.log
dist/
test/output
20 changes: 11 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,41 +1,43 @@
DIR := $(shell pwd)

export WEBHOOKX_TEST_OTEL_COLLECTOR_OUTPUT_PATH=$(DIR)/test/output/otel

LDFLAGS = --ldflags "\
-X github.com/webhookx-io/webhookx/config.COMMIT=`git rev-parse --verify --short HEAD` \
-X github.com/webhookx-io/webhookx/config.VERSION=`git tag -l --points-at HEAD | head -n 1`"

.PHONY: clean
.PHONY: clean build install generate test test-coverage test-integration \
test-integration-coverage goreleaser migrate-create deps

clean:
go clean
go clean -testcache

.PHONY: build
build:
CGO_ENABLED=0 go build ${LDFLAGS}

.PHONY: install
install:
go install ${LDFLAGS}

.PHONY: generate
generate:
go generate ./...

.PHONY: test
deps:
mkdir -p test/output/otel
docker compose -f test/docker-compose.yml up -d

test: clean
go test $$(go list ./... | grep -v /test/)

.PHONY: test-coverage
test-coverage: clean
go test $$(go list ./... | grep -v /test/) -coverprofile=coverage.txt

.PHONY: test-integration
test-integration: clean
go test -p 1 -v ./test/...

.PHONY: test-integration-coverage
test-integration-coverage: clean
go test -p 1 -v ./test/... --coverpkg ./... -coverprofile=coverage.txt

.PHONY: goreleaser
goreleaser:
goreleaser release --snapshot --clean

Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ WebhookX is an open-source webhooks gateway for message receiving, processing, a
- **Plugins:**
- `webhookx-signature`: signing outbound requests with HMAC(SHA-256) by adding `Webhookx-Signature` and `Webhookx-Timestamp` to request header.
- `transformer`(WIP): transform request before sending outbound requests.

- **Observability:** Metrics and Tracing(WIP).


## Roadmap

- [ ] Data retention policy
- [ ] Insight admin APIs
- [ ] Observability(o11y) including tracing and metrics
- [ ] Declarative configuration management

#### Inbound
Expand Down
44 changes: 27 additions & 17 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/webhookx-io/webhookx/mcache"
"github.com/webhookx-io/webhookx/pkg/cache"
"github.com/webhookx-io/webhookx/pkg/log"
"github.com/webhookx-io/webhookx/pkg/metrics"
"github.com/webhookx-io/webhookx/pkg/taskqueue"
"github.com/webhookx-io/webhookx/proxy"
"github.com/webhookx-io/webhookx/worker"
Expand Down Expand Up @@ -41,6 +42,7 @@ type Application struct {
dispatcher *dispatcher.Dispatcher
cache cache.Cache
bus *eventbus.EventBus
metrics *metrics.Metrics

admin *admin.Admin
gateway *proxy.Gateway
Expand Down Expand Up @@ -72,7 +74,7 @@ func (app *Application) initialize() error {
app.log = zap.S()

// cache
client := cfg.RedisConfig.GetClient()
client := cfg.Redis.GetClient()
app.cache = cache.NewRedisCache(client)

mcache.Set(mcache.NewMCache(&mcache.Options{
Expand All @@ -83,44 +85,49 @@ func (app *Application) initialize() error {

app.bus = eventbus.NewEventBus(
app.NodeID(),
cfg.DatabaseConfig.GetDSN(),
cfg.Database.GetDSN(),
app.log)
registerEventHandler(app.bus)

// db
db, err := db.NewDB(&cfg.DatabaseConfig)
db, err := db.NewDB(&cfg.Database)
if err != nil {
return err
}
app.db = db

app.metrics, err = metrics.New(cfg.Metrics)
if err != nil {
return err
}

// queue
queue := taskqueue.NewRedisQueue(taskqueue.RedisTaskQueueOptions{
Client: client,
}, app.log)
}, app.log, app.metrics)
app.queue = queue

app.dispatcher = dispatcher.NewDispatcher(log.Sugar(), queue, db)
app.dispatcher = dispatcher.NewDispatcher(log.Sugar(), queue, db, app.metrics)

// worker
if cfg.WorkerConfig.Enabled {
if cfg.Worker.Enabled {
opts := worker.WorkerOptions{
PoolSize: int(cfg.WorkerConfig.Pool.Size),
PoolConcurrency: int(cfg.WorkerConfig.Pool.Concurrency),
PoolSize: int(cfg.Worker.Pool.Size),
PoolConcurrency: int(cfg.Worker.Pool.Concurrency),
}
deliverer := deliverer.NewHTTPDeliverer(&cfg.WorkerConfig.Deliverer)
app.worker = worker.NewWorker(opts, db, deliverer, queue)
deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer)
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics)
}

// admin
if cfg.AdminConfig.IsEnabled() {
if cfg.Admin.IsEnabled() {
handler := api.NewAPI(cfg, db, app.dispatcher).Handler()
app.admin = admin.NewAdmin(cfg.AdminConfig, handler)
app.admin = admin.NewAdmin(cfg.Admin, handler)
}

// gateway
if cfg.ProxyConfig.IsEnabled() {
app.gateway = proxy.NewGateway(&cfg.ProxyConfig, db, app.dispatcher)
if cfg.Proxy.IsEnabled() {
app.gateway = proxy.NewGateway(&cfg.Proxy, db, app.dispatcher, app.metrics)
}

return nil
Expand Down Expand Up @@ -196,15 +203,18 @@ func (app *Application) Stop() error {
}()

_ = app.bus.Stop()
if app.metrics != nil {
_ = app.metrics.Stop()
}
// TODO: timeout
if app.admin != nil {
app.admin.Stop()
_ = app.admin.Stop()
}
if app.gateway != nil {
app.gateway.Stop()
_ = app.gateway.Stop()
}
if app.worker != nil {
app.worker.Stop()
_ = app.worker.Stop()
}

app.started = false
Expand Down
6 changes: 3 additions & 3 deletions cmd/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func newMigrationsResetCmd() *cobra.Command {
return errors.New("canceled")
}
}
m := migrator.New(&cfg.DatabaseConfig)
m := migrator.New(&cfg.Database)
fmt.Println("resetting database...")
if err := m.Reset(); err != nil {
return err
Expand Down Expand Up @@ -48,7 +48,7 @@ func newMigrationsCmd() *cobra.Command {
Short: "Print the migration status",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
m := migrator.New(&cfg.DatabaseConfig)
m := migrator.New(&cfg.Database)
version, dirty, err := m.Status()
if err != nil {
return err
Expand All @@ -68,7 +68,7 @@ func newMigrationsCmd() *cobra.Command {
Short: "Run any new migrations",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
m := migrator.New(&cfg.DatabaseConfig)
m := migrator.New(&cfg.Database)
if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,15 @@ proxy:
port: 6379
password:
database: 0

#------------------------------------------------------------------------------
# METRICS
#------------------------------------------------------------------------------
metrics:
attributes: # global attributes for each metric
env: prod
#exports: [ opentelemetry ] # list of enabled vendor exports. supported value are opentelemetry
opentelemetry:
push_interval: 10 # interval(in seconds) at which metrics are sent to the OpenTelemetry Collector
protocol: http/protobuf # supported value are http/protobuf, grpc
endpoint: http://localhost:4318/v1/metrics # http/protobuf(http://localhost:4318/v1/metrics), grpc(localhost:4317)
26 changes: 15 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ var (
var cfg Config

type Config struct {
Log LogConfig `yaml:"log" envconfig:"LOG"`
DatabaseConfig DatabaseConfig `yaml:"database" envconfig:"DATABASE"`
RedisConfig RedisConfig `yaml:"redis" envconfig:"REDIS"`
AdminConfig AdminConfig `yaml:"admin" envconfig:"ADMIN"`
ProxyConfig ProxyConfig `yaml:"proxy" envconfig:"PROXY"`
WorkerConfig WorkerConfig `yaml:"worker" envconfig:"WORKER"`
Log LogConfig `yaml:"log" envconfig:"LOG"`
Database DatabaseConfig `yaml:"database" envconfig:"DATABASE"`
Redis RedisConfig `yaml:"redis" envconfig:"REDIS"`
Admin AdminConfig `yaml:"admin" envconfig:"ADMIN"`
Proxy ProxyConfig `yaml:"proxy" envconfig:"PROXY"`
Worker WorkerConfig `yaml:"worker" envconfig:"WORKER"`
Metrics MetricsConfig `yaml:"metrics" envconfig:"METRICS"`
}

func (cfg Config) String() string {
Expand All @@ -38,19 +39,22 @@ func (cfg Config) Validate() error {
if err := cfg.Log.Validate(); err != nil {
return err
}
if err := cfg.DatabaseConfig.Validate(); err != nil {
if err := cfg.Database.Validate(); err != nil {
return err
}
if err := cfg.RedisConfig.Validate(); err != nil {
if err := cfg.Redis.Validate(); err != nil {
return err
}
if err := cfg.AdminConfig.Validate(); err != nil {
if err := cfg.Admin.Validate(); err != nil {
return err
}
if err := cfg.ProxyConfig.Validate(); err != nil {
if err := cfg.Proxy.Validate(); err != nil {
return err
}
if err := cfg.WorkerConfig.Validate(); err != nil {
if err := cfg.Worker.Validate(); err != nil {
return err
}
if err := cfg.Metrics.Validate(); err != nil {
return err
}

Expand Down
62 changes: 62 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,68 @@ func TestProxyConfig(t *testing.T) {
}
}

func TestMetricsConfig(t *testing.T) {
tests := []struct {
desc string
cfg MetricsConfig
expectedValidateErr error
}{
{
desc: "sanity",
cfg: MetricsConfig{
Attributes: nil,
Exports: nil,
OpenTelemetry: Opentelemetry{
PushInterval: 1,
Protocol: "http/protobuf",
},
},
expectedValidateErr: nil,
},
{
desc: "invalid export",
cfg: MetricsConfig{
Attributes: nil,
Exports: []Export{"unknown"},
OpenTelemetry: Opentelemetry{
PushInterval: 1,
Protocol: "http/protobuf",
},
},
expectedValidateErr: errors.New("invalid export: unknown"),
},
{
desc: "invalid export",
cfg: MetricsConfig{
Attributes: nil,
Exports: nil,
OpenTelemetry: Opentelemetry{
PushInterval: 1,
Protocol: "unknown",
},
},
expectedValidateErr: errors.New("invalid protocol: unknown"),
},
{
desc: "invalid PushInterval",
cfg: MetricsConfig{
Attributes: nil,
Exports: nil,
OpenTelemetry: Opentelemetry{
PushInterval: 61,
Protocol: "http/protobuf",
},
},
expectedValidateErr: errors.New("interval must be in the range [1, 60]"),
},
}

for _, test := range tests {
actualValidateErr := test.cfg.Validate()
assert.Equal(t, test.expectedValidateErr, actualValidateErr, "expected %v got %v", test.expectedValidateErr, actualValidateErr)
}
}

func TestConfig(t *testing.T) {
cfg, err := Init()
assert.Nil(t, err)
Expand Down
Loading

0 comments on commit 117929a

Please sign in to comment.