Skip to content

Commit

Permalink
perf: add mcache (#46)
Browse files Browse the repository at this point in the history
### Summary

Add MCache(multiple levels cache) to improve performance.

### MCache Implementations

The MCache uses LRU as the L1 cache and redis as the L2 cache. To invalidate the L1 cache of different WebhookX nodes, it uses Notify / Listen to broadcast events.

https://www.postgresql.org/docs/current/sql-notify.html
https://www.postgresql.org/docs/current/sql-listen.html

---------

Signed-off-by: Yusheng Li <[email protected]>
Co-authored-by: webhookx-x <[email protected]>
  • Loading branch information
vm-001 and webhookx-x authored Oct 21, 2024
1 parent cacd346 commit 44453a0
Show file tree
Hide file tree
Showing 25 changed files with 605 additions and 131 deletions.
50 changes: 45 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package app

import (
"context"
"encoding/json"
"errors"
"github.com/webhookx-io/webhookx/admin"
"github.com/webhookx-io/webhookx/admin/api"
"github.com/webhookx-io/webhookx/config"
"github.com/webhookx-io/webhookx/db"
"github.com/webhookx-io/webhookx/dispatcher"
"github.com/webhookx-io/webhookx/eventbus"
"github.com/webhookx-io/webhookx/mcache"
"github.com/webhookx-io/webhookx/pkg/cache"
"github.com/webhookx-io/webhookx/pkg/log"
"github.com/webhookx-io/webhookx/pkg/taskqueue"
Expand All @@ -15,6 +19,7 @@ import (
"github.com/webhookx-io/webhookx/worker/deliverer"
"go.uber.org/zap"
"sync"
"time"
)

var (
Expand All @@ -35,6 +40,7 @@ type Application struct {
queue taskqueue.TaskQueue
dispatcher *dispatcher.Dispatcher
cache cache.Cache
bus *eventbus.EventBus

admin *admin.Admin
gateway *proxy.Gateway
Expand Down Expand Up @@ -65,24 +71,35 @@ func (app *Application) initialize() error {
zap.ReplaceGlobals(log)
app.log = zap.S()

// cache
client := cfg.RedisConfig.GetClient()
app.cache = cache.NewRedisCache(client)

mcache.Set(mcache.NewMCache(&mcache.Options{
L1Size: 1000,
L1TTL: time.Second * 10,
L2: app.cache,
}))

app.bus = eventbus.NewEventBus(
app.NodeID(),
cfg.DatabaseConfig.GetDSN(),
app.log)
registerEventHandler(app.bus)

// db
db, err := db.NewDB(&cfg.DatabaseConfig)
if err != nil {
return err
}
app.db = db

client := cfg.RedisConfig.GetClient()

// queue
queue := taskqueue.NewRedisQueue(taskqueue.RedisTaskQueueOptions{
Client: client,
}, app.log)
app.queue = queue

// cache
app.cache = cache.NewRedisCache(client)

app.dispatcher = dispatcher.NewDispatcher(log.Sugar(), queue, db)

// worker
Expand All @@ -109,10 +126,29 @@ func (app *Application) initialize() error {
return nil
}

func registerEventHandler(bus *eventbus.EventBus) {
bus.Subscribe(eventbus.EventInvalidation, func(data []byte) {
maps := make(map[string]interface{})
if err := json.Unmarshal(data, &maps); err != nil {
return
}
if cacheKey, ok := maps["cache_key"]; ok {
err := mcache.Invalidate(context.TODO(), cacheKey.(string))
if err != nil {
zap.S().Errorf("failed to invalidate cache: key=%s %v", cacheKey, err)
}
}
})
}

func (app *Application) DB() *db.DB {
return app.db
}

func (app *Application) NodeID() string {
return config.NODE
}

// Start starts application
func (app *Application) Start() error {
app.mux.Lock()
Expand All @@ -122,6 +158,9 @@ func (app *Application) Start() error {
return ErrApplicationStarted
}

if err := app.bus.Start(); err != nil {
return err
}
if app.admin != nil {
app.admin.Start()
}
Expand Down Expand Up @@ -156,6 +195,7 @@ func (app *Application) Stop() error {
app.log.Infof("stopped")
}()

_ = app.bus.Stop()
// TODO: timeout
if app.admin != nil {
app.admin.Stop()
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"encoding/json"
"github.com/creasty/defaults"
uuid "github.com/satori/go.uuid"
"github.com/webhookx-io/webhookx/pkg/envconfig"
"gopkg.in/yaml.v3"
"os"
Expand All @@ -11,6 +12,7 @@ import (
var (
VERSION = "dev"
COMMIT = "unknown"
NODE = uuid.NewV4().String()
)

var cfg Config
Expand Down
6 changes: 2 additions & 4 deletions config/database.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"database/sql"
"fmt"
)

Expand All @@ -13,15 +12,14 @@ type DatabaseConfig struct {
Database string `yaml:"database" default:"webhookx"`
}

func (cfg DatabaseConfig) GetDB() (*sql.DB, error) {
dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable",
func (cfg DatabaseConfig) GetDSN() string {
return fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable",
cfg.Username,
cfg.Password,
cfg.Host,
cfg.Port,
cfg.Database,
)
return sql.Open("postgres", dsn)
}

func (cfg DatabaseConfig) Validate() error {
Expand Down
24 changes: 24 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package constants

import (
"github.com/webhookx-io/webhookx/config"
"strings"
"time"
)

Expand All @@ -26,6 +27,29 @@ const (
RequeueInterval = time.Second * 60
)

type CacheKey string

func (c CacheKey) Build(id string) string {
var sb strings.Builder
sb.WriteString(Namespace)
sb.WriteString(":")
sb.WriteString(string(c))
sb.WriteString(":")
sb.WriteString(id)
return sb.String()
}

const (
Namespace string = "webhookx"
EventCacheKey CacheKey = "events"
EndpointCacheKey CacheKey = "endpoints"
SourceCacheKey CacheKey = "sources"
WorkspaceCacheKey CacheKey = "workspaces"
AttemptCacheKey CacheKey = "attempts"
PluginCacheKey CacheKey = "plugins"
AttemptDetailCacheKey CacheKey = "attempt_details"
)

var (
DefaultResponseHeaders = map[string]string{
"Content-Type": "application/json",
Expand Down
10 changes: 9 additions & 1 deletion db/dao/attempt_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/webhookx-io/webhookx/constants"
"github.com/webhookx-io/webhookx/db/entities"
"github.com/webhookx-io/webhookx/pkg/types"
)
Expand All @@ -22,8 +23,15 @@ type AttemptResult struct {
}

func NewAttemptDao(db *sqlx.DB, workspace bool) AttemptDAO {
opts := Options{
Table: "attempts",
EntityName: "Attempt",
Workspace: workspace,
CachePropagate: false,
CacheKey: constants.AttemptCacheKey,
}
return &attemptDao{
DAO: NewDAO[entities.Attempt]("attempts", db, workspace),
DAO: NewDAO[entities.Attempt](db, opts),
}
}

Expand Down
10 changes: 9 additions & 1 deletion db/dao/attempt_detail_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dao

import (
"context"
"github.com/webhookx-io/webhookx/constants"
"time"

"github.com/jmoiron/sqlx"
Expand All @@ -13,8 +14,15 @@ type attemptDetailDao struct {
}

func NewAttemptDetailDao(db *sqlx.DB, workspace bool) AttemptDetailDAO {
opts := Options{
Table: "attempt_details",
EntityName: "AttemptDetail",
Workspace: workspace,
CachePropagate: false,
CacheKey: constants.AttemptDetailCacheKey,
}
return &attemptDetailDao{
DAO: NewDAO[entities.AttemptDetail]("attempt_details", db, workspace),
DAO: NewDAO[entities.AttemptDetail](db, opts),
}
}

Expand Down
Loading

0 comments on commit 44453a0

Please sign in to comment.