Skip to content

Commit

Permalink
I320494 introduce redis (#781)
Browse files Browse the repository at this point in the history
introduce redis to sm
  • Loading branch information
nirbenrey authored Aug 11, 2022
1 parent 967ea6b commit 895b07b
Show file tree
Hide file tree
Showing 16 changed files with 272 additions and 73 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,21 @@ jobs:
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

redis:
image: redis
env:
# The hostname used to communicate with the Redis service container
REDIS_HOST: localhost
# The default Redis port
REDIS_PORT: 6379
# Set health checks to wait until redis has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
Expand Down
5 changes: 4 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/Peripli/service-manager/operations"
"github.com/Peripli/service-manager/pkg/agents"
"github.com/Peripli/service-manager/pkg/env"
"github.com/go-redis/redis"

"sync"

"github.com/Peripli/service-manager/pkg/query"
Expand Down Expand Up @@ -96,6 +98,7 @@ func (s *Settings) Validate() error {
}

type Options struct {
RedisClient *redis.Client
Repository storage.TransactionalRepository
APISettings *Settings
OperationSettings *operations.Settings
Expand All @@ -109,7 +112,7 @@ type Options struct {
// New returns the minimum set of REST APIs needed for the Service Manager
func New(ctx context.Context, e env.Environment, options *Options) (*web.API, error) {

rateLimiters, err := initRateLimiters(options)
rateLimiters, err := initRateLimiters(ctx, options)
if err != nil {
return nil, err
}
Expand Down
12 changes: 10 additions & 2 deletions api/filters/rate_limiter_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ type RateLimiterMiddleware struct {
middleware *stdlib.Middleware
pathPrefix string
method string
rate limiter.Rate
}

func NewRateLimiterMiddleware(middleware *stdlib.Middleware, pathPrefix string, method string) RateLimiterMiddleware {
func NewRateLimiterMiddleware(middleware *stdlib.Middleware, pathPrefix string, method string, rate limiter.Rate) RateLimiterMiddleware {
return RateLimiterMiddleware{
middleware,
pathPrefix,
method,
rate,
}
}

Expand Down Expand Up @@ -116,8 +118,14 @@ func (rl *RateLimiterFilter) Run(request *web.Request, next web.Handler) (*web.R
if rlm.method != "" && strings.ToUpper(rlm.method) != strings.ToUpper(request.Method) {
continue
}
limiterContext, err := rlm.middleware.Limiter.Get(request.Context(), userContext.Name)
method := "all-methods"
if rlm.method != "" {
method = rlm.method
}
key := method + ":" + rlm.pathPrefix + ":" + rlm.rate.Formatted + ":" + userContext.Name
limiterContext, err := rlm.middleware.Limiter.Get(request.Context(), key)
if err != nil {
log.C(request.Context()).Errorf("failed to get limiter context with key %s: %v", key, err)
return nil, err
}

Expand Down
24 changes: 22 additions & 2 deletions api/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package api

import (
"context"
"fmt"
"github.com/Peripli/service-manager/api/filters"
"github.com/Peripli/service-manager/pkg/log"
"github.com/ulule/limiter"
"github.com/ulule/limiter/drivers/middleware/stdlib"
"github.com/ulule/limiter/drivers/store/memory"
"github.com/ulule/limiter/drivers/store/redis"
"net/http"
"path"
"strings"
Expand Down Expand Up @@ -93,19 +96,36 @@ func parseRateLimiterConfiguration(input string) ([]RateLimiterConfiguration, er
return configurations, nil
}

func initRateLimiters(options *Options) ([]filters.RateLimiterMiddleware, error) {
func initRateLimiters(ctx context.Context, options *Options) ([]filters.RateLimiterMiddleware, error) {
var rateLimiters []filters.RateLimiterMiddleware
if !options.APISettings.RateLimitingEnabled {
return nil, nil
}

var redisStore limiter.Store
var err error
if options.RedisClient != nil {
redisStore, err = redis.NewStore(options.RedisClient)
if err != nil {
log.C(ctx).Errorf("failed to initialize redis store: %v", err)
return nil, err
}
} else {
log.C(ctx).Info("redis client is not initialized. creating in memory store for rate limiting")
}

configurations, err := parseRateLimiterConfiguration(options.APISettings.RateLimit)
if err != nil {
return nil, err
}
for _, configuration := range configurations {
store := redisStore
if store == nil {
store = memory.NewStore()
}
rateLimiters = append(
rateLimiters,
filters.NewRateLimiterMiddleware(stdlib.NewMiddleware(limiter.New(memory.NewStore(), configuration.rate)), configuration.pathPrefix, configuration.method),
filters.NewRateLimiterMiddleware(stdlib.NewMiddleware(limiter.New(store, configuration.rate)), configuration.pathPrefix, configuration.method, configuration.rate),
)
}
return rateLimiters, nil
Expand Down
2 changes: 1 addition & 1 deletion application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ storage:
encryption_key: ejHjRNHbS0NaqARSRvnweVV9zcmhQEa8
skip_ssl_validation: false
max_idle_connections: 5
max_open_connections: 30
max_open_connections: 30
api:
token_issuer_url: http://localhost:8080/uaa
client_id: cf
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config
import (
"fmt"
"github.com/Peripli/service-manager/pkg/agents"
"github.com/Peripli/service-manager/storage/cache"

"github.com/Peripli/service-manager/pkg/multitenancy"

Expand All @@ -39,6 +40,7 @@ import (
// Settings is used to setup the Service Manager
type Settings struct {
Server *server.Settings
Cache *cache.Settings
Storage *storage.Settings
Log *log.Settings
API *api.Settings
Expand All @@ -59,6 +61,7 @@ func AddPFlags(set *pflag.FlagSet) {
// DefaultSettings returns the default values for configuring the Service Manager
func DefaultSettings() *Settings {
return &Settings{
Cache: cache.DefaultSettings(),
Server: server.DefaultSettings(),
Storage: storage.DefaultSettings(),
Log: log.DefaultSettings(),
Expand Down
1 change: 1 addition & 0 deletions deployment/cf/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ applications:
GOPACKAGENAME: github.com/Peripli/service-manager
API_TOKEN_ISSUER_URL: https://uaa.dev.cfdev.sh
HTTPCLIENT_SKIP_SSL_VALIDATION: true
CACHE_NAME: <redis_instance_name>
STORAGE_ENCRYPTION_KEY: ejHjRNHbS0NaqARSRvnweVV9zcmhQEa8
STORAGE_NAME: <postgre_instance_name>
STORAGE_SKIP_SSL_VALIDATION: false
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/gavv/httpexpect v0.0.0-20170820080527-c44a6d7bb636
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 // indirect
github.com/gbrlsnchs/jwt v0.5.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/gobwas/glob v0.2.3
github.com/gofrs/uuid v3.1.0+incompatible
github.com/golang-migrate/migrate v3.5.4+incompatible
Expand All @@ -43,7 +44,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/moul/http2curl v1.0.0 // indirect
github.com/onrik/logrus v0.4.1
github.com/onsi/ginkgo v1.16.4
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.19.0
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -295,8 +297,9 @@ github.com/onrik/logrus v0.4.1/go.mod h1:qfe9NeZVAJfIxviw3cYkZo3kvBtLoPRJriAO8zl
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.1.3 h1:e/3Cwtogj0HA+25nMP1jCMDIf8RtRYbGwGGuBIFztkc=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
Expand Down
25 changes: 25 additions & 0 deletions pkg/env/cf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,35 @@ func setCFOverrides(env Environment) error {
return err
}

cacheEnabled := cast.ToBool(env.Get("cache.enabled"))
if cacheEnabled {
if err := useRedisBinding(env, cfEnv); err != nil {
return err
}
}
}
return nil
}

func useRedisBinding(env Environment, cfEnv *cfenv.App) error {
redisServiceName := cast.ToString(env.Get("cache.name"))
if redisServiceName == "" {
log.D().Warning("No Redis service name found")
return nil
}
service, err := cfEnv.Services.WithName(redisServiceName)
if err != nil {
errorMessage := fmt.Sprintf("could not find service with name %s: %v", redisServiceName, err)
log.D().Error(errorMessage)
return fmt.Errorf(errorMessage)
}

env.Set("cache.hostname", service.Credentials["hostname"])
env.Set("cache.password", service.Credentials["password"])
env.Set("cache.port", service.Credentials["port"])
return nil
}

func setPostgresSSL(env Environment, credentials map[string]interface{}) error {
if sslRootCert, hasRootCert := credentials["sslrootcert"]; hasRootCert {
filename := "./root.crt"
Expand Down
49 changes: 49 additions & 0 deletions pkg/env/cf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@ const VCAP_SERVICES_VALUE = `{ "postgresql": [
],
"volume_mounts": []
}
],
"redis-cache": [
{
"binding_guid": "8ed389ef-735c-4b6e-a85b-6fbe5fb5bbb1",
"binding_name": null,
"credentials": {
"cluster_mode": false,
"hostname": "localhost",
"password": "1234",
"port": 3283,
"tls": true,
"uri": "rediss://no-user-name-for-redis:1234@localhost:3283"
},
"instance_guid": "b78e9e08-b7fd-41db-beab-8cd510d53889",
"instance_name": "redis-test-standard-2",
"label": "redis-cache",
"name": "redis-test",
"plan": "standard",
"provider": null,
"syslog_drain_url": null,
"tags": [
"cache"
],
"volume_mounts": []
}
]
}`

Expand All @@ -64,6 +89,12 @@ var _ = Describe("CF Env", func() {
Expect(os.Setenv("STORAGE_NAME", "smdb")).ShouldNot(HaveOccurred())
Expect(os.Unsetenv("STORAGE_URI")).ShouldNot(HaveOccurred())

Expect(os.Setenv("CACHE_NAME", "redis-test")).ShouldNot(HaveOccurred())
Expect(os.Setenv("CACHE_ENABLED", "true")).ShouldNot(HaveOccurred())
Expect(os.Setenv("CACHE_PORT", "6666")).ShouldNot(HaveOccurred())
Expect(os.Setenv("CACHE_HOST", "localhost")).ShouldNot(HaveOccurred())
Expect(os.Setenv("CACHE_PASSWORD", "1234")).ShouldNot(HaveOccurred())

environment, err = New(context.TODO(), EmptyFlagSet())
Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -72,6 +103,10 @@ var _ = Describe("CF Env", func() {
Expect(os.Unsetenv("VCAP_APPLICATION")).ShouldNot(HaveOccurred())
Expect(os.Unsetenv("VCAP_SERVICES")).ShouldNot(HaveOccurred())
Expect(os.Unsetenv("STORAGE_NAME")).ShouldNot(HaveOccurred())
Expect(os.Unsetenv("CACHE_ENABLED")).ShouldNot(HaveOccurred())
Expect(os.Unsetenv("CACHE_PORT")).ShouldNot(HaveOccurred())
Expect(os.Unsetenv("CACHE_HOST")).ShouldNot(HaveOccurred())
Expect(os.Unsetenv("CACHE_PASSWORD")).ShouldNot(HaveOccurred())
})

Describe("Set CF environment values", func() {
Expand All @@ -95,6 +130,20 @@ var _ = Describe("CF Env", func() {

})
})
Context("when cache.enabled is missing from environment", func() {
It("returns no error", func() {
Expect(os.Unsetenv("CACHE_ENABLED")).ShouldNot(HaveOccurred())
Expect(setCFOverrides(environment)).ShouldNot(HaveOccurred())
Expect(environment.Get("cache.enabled")).Should(BeNil())
})
})
Context("when cache.enabled is true", func() {
It("shouldn't return error if cache name is missing", func() {
Expect(os.Unsetenv("CACHE_NAME")).ShouldNot(HaveOccurred())
Expect(setCFOverrides(environment)).ShouldNot(HaveOccurred())
Expect(environment.Get("cache.name")).Should(BeNil())
})
})

Context("when storage with name storage.name is missing from VCAP_SERVICES", func() {
It("returns error", func() {
Expand Down
26 changes: 24 additions & 2 deletions pkg/sm/sm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package sm

import (
"context"
"crypto/tls"
"database/sql"
"errors"
"fmt"
secFilters "github.com/Peripli/service-manager/pkg/security/filters"
osbc "github.com/kubernetes-sigs/go-open-service-broker-client/v2"
"math"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -58,14 +61,14 @@ import (
_ "github.com/Kount/pq-timeouts"
"github.com/Peripli/service-manager/api/filters"
"github.com/Peripli/service-manager/pkg/web"
osbc "github.com/kubernetes-sigs/go-open-service-broker-client/v2"
"github.com/go-redis/redis"
)

// ServiceManagerBuilder type is an extension point that allows adding additional filters, plugins and
// controllers before running ServiceManager.
type ServiceManagerBuilder struct {
*web.API

RedisClient *redis.Client
Storage *storage.InterceptableTransactionalRepository
Notificator storage.Notificator
NotificationCleaner *storage.NotificationCleaner
Expand Down Expand Up @@ -106,6 +109,23 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg

util.HandleInterrupts(ctx, cancel)

// Setup cache
var redisClient *redis.Client
if cfg.Cache.Enabled {
var tlsConfig *tls.Config
if cfg.Cache.TLSEnabled {
tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
}
redisClient = redis.NewClient(&redis.Options{
Addr: cfg.Cache.Host + ":" + strconv.Itoa(cfg.Cache.Port),
Password: cfg.Cache.Password,
MaxRetries: cfg.Cache.MaxRetries,
//MinRetryBackoff: minRetryBackoff,
//MaxRetryBackoff: maxRetryBackoff,
TLSConfig: tlsConfig,
})
}

// Setup storage
log.C(ctx).Info("Setting up Service Manager storage...")
smStorage := &postgres.Storage{
Expand Down Expand Up @@ -137,6 +157,7 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg
}

apiOptions := &api.Options{
RedisClient: redisClient,
Repository: interceptableRepository,
APISettings: cfg.API,
OperationSettings: cfg.Operations,
Expand Down Expand Up @@ -208,6 +229,7 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg
OSBClientProvider: osbClientProvider,
encryptingRepository: encryptingRepository,
APIOptions: apiOptions,
RedisClient: redisClient,
}

smb.RegisterPlugins(osb.NewCatalogFilterByVisibilityPlugin(interceptableRepository))
Expand Down
Loading

0 comments on commit 895b07b

Please sign in to comment.