From 895b07b63de873f4a621fd24dc8d3d8a189002f6 Mon Sep 17 00:00:00 2001 From: Nir Ben-Rei Date: Thu, 11 Aug 2022 15:12:31 +0300 Subject: [PATCH] I320494 introduce redis (#781) introduce redis to sm --- .github/workflows/go.yml | 16 ++- api/api.go | 5 +- api/filters/rate_limiter_filter.go | 12 +- api/rate_limiter.go | 24 +++- application.yml | 2 +- config/config.go | 3 + deployment/cf/manifest.yml | 1 + go.mod | 3 +- go.sum | 5 +- pkg/env/cf.go | 25 ++++ pkg/env/cf_test.go | 49 ++++++++ pkg/sm/sm.go | 26 +++- storage/cache/config.go | 20 +++ test/common/application.yml | 5 + test/common/test_context.go | 16 ++- test/rate_limiter_test/rate_limiter_test.go | 133 +++++++++++--------- 16 files changed, 272 insertions(+), 73 deletions(-) create mode 100644 storage/cache/config.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index d320cb5fd..f8057cfa3 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -174,7 +174,21 @@ jobs: ports: - 5432:5432 options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 - + redis: + image: redis + env: + # The hostname used to communicate with the Redis service container + REDIS_HOST: localhost + # The default Redis port + REDIS_PORT: 6379 + # Set health checks to wait until redis has started + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 steps: - name: Set up Go 1.x uses: actions/setup-go@v2 diff --git a/api/api.go b/api/api.go index ffcb5bdef..da82fcf62 100644 --- a/api/api.go +++ b/api/api.go @@ -25,6 +25,8 @@ import ( "github.com/Peripli/service-manager/operations" "github.com/Peripli/service-manager/pkg/agents" "github.com/Peripli/service-manager/pkg/env" + "github.com/go-redis/redis" + "sync" "github.com/Peripli/service-manager/pkg/query" @@ -96,6 +98,7 @@ func (s *Settings) Validate() error { } type Options struct { + RedisClient *redis.Client Repository storage.TransactionalRepository APISettings *Settings OperationSettings *operations.Settings @@ -109,7 +112,7 @@ type Options struct { // New returns the minimum set of REST APIs needed for the Service Manager func New(ctx context.Context, e env.Environment, options *Options) (*web.API, error) { - rateLimiters, err := initRateLimiters(options) + rateLimiters, err := initRateLimiters(ctx, options) if err != nil { return nil, err } diff --git a/api/filters/rate_limiter_filter.go b/api/filters/rate_limiter_filter.go index f4bbd7cb0..348dca302 100644 --- a/api/filters/rate_limiter_filter.go +++ b/api/filters/rate_limiter_filter.go @@ -25,13 +25,15 @@ type RateLimiterMiddleware struct { middleware *stdlib.Middleware pathPrefix string method string + rate limiter.Rate } -func NewRateLimiterMiddleware(middleware *stdlib.Middleware, pathPrefix string, method string) RateLimiterMiddleware { +func NewRateLimiterMiddleware(middleware *stdlib.Middleware, pathPrefix string, method string, rate limiter.Rate) RateLimiterMiddleware { return RateLimiterMiddleware{ middleware, pathPrefix, method, + rate, } } @@ -116,8 +118,14 @@ func (rl *RateLimiterFilter) Run(request *web.Request, next web.Handler) (*web.R if rlm.method != "" && strings.ToUpper(rlm.method) != strings.ToUpper(request.Method) { continue } - limiterContext, err := rlm.middleware.Limiter.Get(request.Context(), userContext.Name) + method := "all-methods" + if rlm.method != "" { + method = rlm.method + } + key := method + ":" + rlm.pathPrefix + ":" + rlm.rate.Formatted + ":" + userContext.Name + limiterContext, err := rlm.middleware.Limiter.Get(request.Context(), key) if err != nil { + log.C(request.Context()).Errorf("failed to get limiter context with key %s: %v", key, err) return nil, err } diff --git a/api/rate_limiter.go b/api/rate_limiter.go index afc7a9783..0b11bd833 100644 --- a/api/rate_limiter.go +++ b/api/rate_limiter.go @@ -1,11 +1,14 @@ package api import ( + "context" "fmt" "github.com/Peripli/service-manager/api/filters" + "github.com/Peripli/service-manager/pkg/log" "github.com/ulule/limiter" "github.com/ulule/limiter/drivers/middleware/stdlib" "github.com/ulule/limiter/drivers/store/memory" + "github.com/ulule/limiter/drivers/store/redis" "net/http" "path" "strings" @@ -93,19 +96,36 @@ func parseRateLimiterConfiguration(input string) ([]RateLimiterConfiguration, er return configurations, nil } -func initRateLimiters(options *Options) ([]filters.RateLimiterMiddleware, error) { +func initRateLimiters(ctx context.Context, options *Options) ([]filters.RateLimiterMiddleware, error) { var rateLimiters []filters.RateLimiterMiddleware if !options.APISettings.RateLimitingEnabled { return nil, nil } + + var redisStore limiter.Store + var err error + if options.RedisClient != nil { + redisStore, err = redis.NewStore(options.RedisClient) + if err != nil { + log.C(ctx).Errorf("failed to initialize redis store: %v", err) + return nil, err + } + } else { + log.C(ctx).Info("redis client is not initialized. creating in memory store for rate limiting") + } + configurations, err := parseRateLimiterConfiguration(options.APISettings.RateLimit) if err != nil { return nil, err } for _, configuration := range configurations { + store := redisStore + if store == nil { + store = memory.NewStore() + } rateLimiters = append( rateLimiters, - filters.NewRateLimiterMiddleware(stdlib.NewMiddleware(limiter.New(memory.NewStore(), configuration.rate)), configuration.pathPrefix, configuration.method), + filters.NewRateLimiterMiddleware(stdlib.NewMiddleware(limiter.New(store, configuration.rate)), configuration.pathPrefix, configuration.method, configuration.rate), ) } return rateLimiters, nil diff --git a/application.yml b/application.yml index f6ee2745d..5d52fe9a9 100644 --- a/application.yml +++ b/application.yml @@ -23,7 +23,7 @@ storage: encryption_key: ejHjRNHbS0NaqARSRvnweVV9zcmhQEa8 skip_ssl_validation: false max_idle_connections: 5 - max_open_connections: 30 + max_open_connections: 30 api: token_issuer_url: http://localhost:8080/uaa client_id: cf diff --git a/config/config.go b/config/config.go index dc81041ba..e478dd1fc 100644 --- a/config/config.go +++ b/config/config.go @@ -19,6 +19,7 @@ package config import ( "fmt" "github.com/Peripli/service-manager/pkg/agents" + "github.com/Peripli/service-manager/storage/cache" "github.com/Peripli/service-manager/pkg/multitenancy" @@ -39,6 +40,7 @@ import ( // Settings is used to setup the Service Manager type Settings struct { Server *server.Settings + Cache *cache.Settings Storage *storage.Settings Log *log.Settings API *api.Settings @@ -59,6 +61,7 @@ func AddPFlags(set *pflag.FlagSet) { // DefaultSettings returns the default values for configuring the Service Manager func DefaultSettings() *Settings { return &Settings{ + Cache: cache.DefaultSettings(), Server: server.DefaultSettings(), Storage: storage.DefaultSettings(), Log: log.DefaultSettings(), diff --git a/deployment/cf/manifest.yml b/deployment/cf/manifest.yml index f19f2cb80..185cb94dd 100644 --- a/deployment/cf/manifest.yml +++ b/deployment/cf/manifest.yml @@ -10,6 +10,7 @@ applications: GOPACKAGENAME: github.com/Peripli/service-manager API_TOKEN_ISSUER_URL: https://uaa.dev.cfdev.sh HTTPCLIENT_SKIP_SSL_VALIDATION: true + CACHE_NAME: STORAGE_ENCRYPTION_KEY: ejHjRNHbS0NaqARSRvnweVV9zcmhQEa8 STORAGE_NAME: STORAGE_SKIP_SSL_VALIDATION: false diff --git a/go.mod b/go.mod index cad286c74..2516dd809 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/gavv/httpexpect v0.0.0-20170820080527-c44a6d7bb636 github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 // indirect github.com/gbrlsnchs/jwt v0.5.0 + github.com/go-redis/redis v6.15.9+incompatible github.com/gobwas/glob v0.2.3 github.com/gofrs/uuid v3.1.0+incompatible github.com/golang-migrate/migrate v3.5.4+incompatible @@ -43,7 +44,7 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/moul/http2curl v1.0.0 // indirect github.com/onrik/logrus v0.4.1 - github.com/onsi/ginkgo v1.16.4 + github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.19.0 github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect diff --git a/go.sum b/go.sum index f4a9ba9f2..3cdca930f 100644 --- a/go.sum +++ b/go.sum @@ -120,6 +120,8 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -295,8 +297,9 @@ github.com/onrik/logrus v0.4.1/go.mod h1:qfe9NeZVAJfIxviw3cYkZo3kvBtLoPRJriAO8zl github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.1.3 h1:e/3Cwtogj0HA+25nMP1jCMDIf8RtRYbGwGGuBIFztkc= github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= diff --git a/pkg/env/cf.go b/pkg/env/cf.go index c9e462cee..db4a334b9 100644 --- a/pkg/env/cf.go +++ b/pkg/env/cf.go @@ -51,10 +51,35 @@ func setCFOverrides(env Environment) error { return err } + cacheEnabled := cast.ToBool(env.Get("cache.enabled")) + if cacheEnabled { + if err := useRedisBinding(env, cfEnv); err != nil { + return err + } + } } return nil } +func useRedisBinding(env Environment, cfEnv *cfenv.App) error { + redisServiceName := cast.ToString(env.Get("cache.name")) + if redisServiceName == "" { + log.D().Warning("No Redis service name found") + return nil + } + service, err := cfEnv.Services.WithName(redisServiceName) + if err != nil { + errorMessage := fmt.Sprintf("could not find service with name %s: %v", redisServiceName, err) + log.D().Error(errorMessage) + return fmt.Errorf(errorMessage) + } + + env.Set("cache.hostname", service.Credentials["hostname"]) + env.Set("cache.password", service.Credentials["password"]) + env.Set("cache.port", service.Credentials["port"]) + return nil +} + func setPostgresSSL(env Environment, credentials map[string]interface{}) error { if sslRootCert, hasRootCert := credentials["sslrootcert"]; hasRootCert { filename := "./root.crt" diff --git a/pkg/env/cf_test.go b/pkg/env/cf_test.go index 011ff6364..a0fa0f3a0 100644 --- a/pkg/env/cf_test.go +++ b/pkg/env/cf_test.go @@ -49,6 +49,31 @@ const VCAP_SERVICES_VALUE = `{ "postgresql": [ ], "volume_mounts": [] } + ], + "redis-cache": [ + { + "binding_guid": "8ed389ef-735c-4b6e-a85b-6fbe5fb5bbb1", + "binding_name": null, + "credentials": { + "cluster_mode": false, + "hostname": "localhost", + "password": "1234", + "port": 3283, + "tls": true, + "uri": "rediss://no-user-name-for-redis:1234@localhost:3283" + }, + "instance_guid": "b78e9e08-b7fd-41db-beab-8cd510d53889", + "instance_name": "redis-test-standard-2", + "label": "redis-cache", + "name": "redis-test", + "plan": "standard", + "provider": null, + "syslog_drain_url": null, + "tags": [ + "cache" + ], + "volume_mounts": [] + } ] }` @@ -64,6 +89,12 @@ var _ = Describe("CF Env", func() { Expect(os.Setenv("STORAGE_NAME", "smdb")).ShouldNot(HaveOccurred()) Expect(os.Unsetenv("STORAGE_URI")).ShouldNot(HaveOccurred()) + Expect(os.Setenv("CACHE_NAME", "redis-test")).ShouldNot(HaveOccurred()) + Expect(os.Setenv("CACHE_ENABLED", "true")).ShouldNot(HaveOccurred()) + Expect(os.Setenv("CACHE_PORT", "6666")).ShouldNot(HaveOccurred()) + Expect(os.Setenv("CACHE_HOST", "localhost")).ShouldNot(HaveOccurred()) + Expect(os.Setenv("CACHE_PASSWORD", "1234")).ShouldNot(HaveOccurred()) + environment, err = New(context.TODO(), EmptyFlagSet()) Expect(err).ShouldNot(HaveOccurred()) }) @@ -72,6 +103,10 @@ var _ = Describe("CF Env", func() { Expect(os.Unsetenv("VCAP_APPLICATION")).ShouldNot(HaveOccurred()) Expect(os.Unsetenv("VCAP_SERVICES")).ShouldNot(HaveOccurred()) Expect(os.Unsetenv("STORAGE_NAME")).ShouldNot(HaveOccurred()) + Expect(os.Unsetenv("CACHE_ENABLED")).ShouldNot(HaveOccurred()) + Expect(os.Unsetenv("CACHE_PORT")).ShouldNot(HaveOccurred()) + Expect(os.Unsetenv("CACHE_HOST")).ShouldNot(HaveOccurred()) + Expect(os.Unsetenv("CACHE_PASSWORD")).ShouldNot(HaveOccurred()) }) Describe("Set CF environment values", func() { @@ -95,6 +130,20 @@ var _ = Describe("CF Env", func() { }) }) + Context("when cache.enabled is missing from environment", func() { + It("returns no error", func() { + Expect(os.Unsetenv("CACHE_ENABLED")).ShouldNot(HaveOccurred()) + Expect(setCFOverrides(environment)).ShouldNot(HaveOccurred()) + Expect(environment.Get("cache.enabled")).Should(BeNil()) + }) + }) + Context("when cache.enabled is true", func() { + It("shouldn't return error if cache name is missing", func() { + Expect(os.Unsetenv("CACHE_NAME")).ShouldNot(HaveOccurred()) + Expect(setCFOverrides(environment)).ShouldNot(HaveOccurred()) + Expect(environment.Get("cache.name")).Should(BeNil()) + }) + }) Context("when storage with name storage.name is missing from VCAP_SERVICES", func() { It("returns error", func() { diff --git a/pkg/sm/sm.go b/pkg/sm/sm.go index fd5e0c3f2..4341b80d4 100644 --- a/pkg/sm/sm.go +++ b/pkg/sm/sm.go @@ -18,12 +18,15 @@ package sm import ( "context" + "crypto/tls" "database/sql" "errors" "fmt" secFilters "github.com/Peripli/service-manager/pkg/security/filters" + osbc "github.com/kubernetes-sigs/go-open-service-broker-client/v2" "math" "net/http" + "strconv" "sync" "time" @@ -58,14 +61,14 @@ import ( _ "github.com/Kount/pq-timeouts" "github.com/Peripli/service-manager/api/filters" "github.com/Peripli/service-manager/pkg/web" - osbc "github.com/kubernetes-sigs/go-open-service-broker-client/v2" + "github.com/go-redis/redis" ) // ServiceManagerBuilder type is an extension point that allows adding additional filters, plugins and // controllers before running ServiceManager. type ServiceManagerBuilder struct { *web.API - + RedisClient *redis.Client Storage *storage.InterceptableTransactionalRepository Notificator storage.Notificator NotificationCleaner *storage.NotificationCleaner @@ -106,6 +109,23 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg util.HandleInterrupts(ctx, cancel) + // Setup cache + var redisClient *redis.Client + if cfg.Cache.Enabled { + var tlsConfig *tls.Config + if cfg.Cache.TLSEnabled { + tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + redisClient = redis.NewClient(&redis.Options{ + Addr: cfg.Cache.Host + ":" + strconv.Itoa(cfg.Cache.Port), + Password: cfg.Cache.Password, + MaxRetries: cfg.Cache.MaxRetries, + //MinRetryBackoff: minRetryBackoff, + //MaxRetryBackoff: maxRetryBackoff, + TLSConfig: tlsConfig, + }) + } + // Setup storage log.C(ctx).Info("Setting up Service Manager storage...") smStorage := &postgres.Storage{ @@ -137,6 +157,7 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg } apiOptions := &api.Options{ + RedisClient: redisClient, Repository: interceptableRepository, APISettings: cfg.API, OperationSettings: cfg.Operations, @@ -208,6 +229,7 @@ func New(ctx context.Context, cancel context.CancelFunc, e env.Environment, cfg OSBClientProvider: osbClientProvider, encryptingRepository: encryptingRepository, APIOptions: apiOptions, + RedisClient: redisClient, } smb.RegisterPlugins(osb.NewCatalogFilterByVisibilityPlugin(interceptableRepository)) diff --git a/storage/cache/config.go b/storage/cache/config.go new file mode 100644 index 000000000..137c00f70 --- /dev/null +++ b/storage/cache/config.go @@ -0,0 +1,20 @@ +package cache + +// Settings type to be loaded from the environment +type Settings struct { + Enabled bool `mapstructure:"enabled" description:"true if cache is enabled"` + Port int `mapstructure:"port" description:"port for redis-cache"` + Host string `mapstructure:"host" description:"host for redis-cache"` + Password string `mapstructure:"password" description:"password for redis-cache"` + TLSEnabled bool `mapstructure:"tls_enabled" description:"true if tls is enabled"` + MaxRetries int `mapstructure:"max_retries" description:"maximum number of retries before giving up"` +} + +// DefaultSettings returns default values for cache settings +func DefaultSettings() *Settings { + return &Settings{ + Enabled: false, + TLSEnabled: true, + MaxRetries: 0, + } +} diff --git a/test/common/application.yml b/test/common/application.yml index 69044761b..77d43adf9 100644 --- a/test/common/application.yml +++ b/test/common/application.yml @@ -15,6 +15,11 @@ websocket: log: level: info format: text +cache: + port: 6379 + host: localhost + password: "" + tls_enabled: false storage: uri: postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable encryption_key: ejHjRNHbS0NaqARSRvnweVV9zcmhQEa8 diff --git a/test/common/test_context.go b/test/common/test_context.go index 5fa08cd46..aff0b07dc 100644 --- a/test/common/test_context.go +++ b/test/common/test_context.go @@ -21,6 +21,7 @@ import ( "encoding/base64" "flag" "fmt" + "github.com/go-redis/redis" "math/rand" "net" "net/http" @@ -223,6 +224,7 @@ type TestContext struct { // In the end requesting brokers with this SMWithOAuthForTenant *SMExpect SMWithBasic *SMExpect + RedisClient *redis.Client SMRepository storage.TransactionalRepository SMScheduler *operations.Scheduler TestPlatform *types.Platform @@ -486,7 +488,7 @@ func (tcb *TestContextBuilder) BuildWithListener(listener net.Listener, cleanup } wg := &sync.WaitGroup{} - smServer, smRepository, smScheduler, maintainer, smConfig := newSMServer(environment, wg, tcb.smExtensions, listener) + smServer, smRepository, smRedis, smScheduler, maintainer, smConfig := newSMServer(environment, wg, tcb.smExtensions, listener) tcb.Servers[SMServer] = smServer SM := httpexpect.New(ginkgo.GinkgoT(), smServer.URL()) @@ -510,6 +512,7 @@ func (tcb *TestContextBuilder) BuildWithListener(listener net.Listener, cleanup SMWithOAuth: &SMExpect{Expect: SMWithOAuth}, SMWithOAuthForTenant: &SMExpect{Expect: SMWithOAuthForTenant}, Servers: tcb.Servers, + RedisClient: smRedis, SMRepository: smRepository, SMScheduler: smScheduler, HttpClient: tcb.HttpClient, @@ -596,7 +599,7 @@ func NewSMListener() (net.Listener, error) { return nil, fmt.Errorf("unable to create sm listener: %s", err) } -func newSMServer(smEnv env.Environment, wg *sync.WaitGroup, fs []func(ctx context.Context, smb *sm.ServiceManagerBuilder, env env.Environment) error, listener net.Listener) (*testSMServer, storage.TransactionalRepository, *operations.Scheduler, *operations.Maintainer, *config.Settings) { +func newSMServer(smEnv env.Environment, wg *sync.WaitGroup, fs []func(ctx context.Context, smb *sm.ServiceManagerBuilder, env env.Environment) error, listener net.Listener) (*testSMServer, storage.TransactionalRepository, *redis.Client, *operations.Scheduler, *operations.Maintainer, *config.Settings) { ctx, cancel := context.WithCancel(context.Background()) cfg, err := config.New(smEnv) @@ -635,7 +638,7 @@ func newSMServer(smEnv env.Environment, wg *sync.WaitGroup, fs []func(ctx contex return &testSMServer{ cancel: cancel, Server: testServer, - }, smb.Storage, scheduler, smb.OperationMaintainer, cfg + }, smb.Storage, smb.RedisClient, scheduler, smb.OperationMaintainer, cfg } func (ctx *TestContext) RegisterBrokerWithCatalogAndLabels(catalog SBCatalog, brokerData Object, expectedStatus int) *BrokerUtils { @@ -871,6 +874,9 @@ func (ctx *TestContext) CleanupAdditionalResources() { ctx.SMWithOAuth.DELETE(web.ServiceBrokersURL).Expect() ctx.CleanupPlatforms() + if ctx.RedisClient != nil { + ctx.ResetRateLimiter(ctx.RedisClient) + } serversToDelete := make([]string, 0) for serverName, server := range ctx.Servers { if serverName != SMServer && serverName != OauthServer && serverName != TenantOauthServer { @@ -930,3 +936,7 @@ func (ctx *TestContext) CloseWebSocket(conn *websocket.Conn) { } } } + +func (ctx *TestContext) ResetRateLimiter(client *redis.Client) { + client.FlushDB() +} diff --git a/test/rate_limiter_test/rate_limiter_test.go b/test/rate_limiter_test/rate_limiter_test.go index 18862e7f8..6a410f2d7 100644 --- a/test/rate_limiter_test/rate_limiter_test.go +++ b/test/rate_limiter_test/rate_limiter_test.go @@ -67,11 +67,12 @@ var _ = Describe("Service Manager Rate Limiter", func() { filterContext.UserName = userName return userName } - var newRateLimiterEnv = func(limit string, customizer func(set *pflag.FlagSet)) { - ctx = common.NewTestContextBuilderWithSecurity().WithEnvPreExtensions(func(set *pflag.FlagSet) { + var newRateLimiterEnv = func(limit string, redisEnabled bool, customizer func(set *pflag.FlagSet)) *common.TestContext { + return common.NewTestContextBuilderWithSecurity().WithEnvPreExtensions(func(set *pflag.FlagSet) { Expect(set.Set("api.rate_limit", limit)).ToNot(HaveOccurred()) Expect(set.Set("api.rate_limiting_enabled", "true")).ToNot(HaveOccurred()) Expect(set.Set("api.rate_limit_exclude_paths", web.OperationsURL)).ToNot(HaveOccurred()) + Expect(set.Set("cache.enabled", strconv.FormatBool(redisEnabled))).ToNot(HaveOccurred()) if customizer != nil { customizer(set) } @@ -111,49 +112,21 @@ var _ = Describe("Service Manager Rate Limiter", func() { osbURL = "/v1/osb/" + brokerID } - BeforeEach(func() { - newRateLimiterEnv("20-M", nil) - registerBroker() - }) - - AfterEach(func() { - ctx.Cleanup() - filterContext.UserName = "" - }) - - Describe("rate limiter", func() { - - Context("request is authorized", func() { - - When("basic auth (global Platform)", func() { - It("doesn't limit basic auth requests", func() { - bulkRequest(ctx.SMWithBasic, osbURL+"/v2/catalog", 21) - }) - }) - - When("endpoint is public", func() { - BeforeEach(func() { - bulkRequest(ctx.SMWithOAuth, "/v1/info", 21) - }) - It("doesn't limit public endpoints", func() { - expectNonLimitedRequest(ctx.SMWithOAuth, "/v1/info") - }) - }) - - When("endpoint is excluded", func() { - BeforeEach(func() { - bulkRequest(ctx.SMWithOAuth, web.OperationsURL, 20) - }) - It("doesn't limit excluded paths", func() { - expectNonLimitedRequest(ctx.SMWithOAuth, web.OperationsURL) - }) - }) + redisEnabledValues := []bool{true, false} + for i := range redisEnabledValues { + Describe("rate limiter", func() { + redisEnabled := redisEnabledValues[i] When("doing too many requests", func() { BeforeEach(func() { + ctx = newRateLimiterEnv("20-M", redisEnabled, nil) changeClientIdentifier() bulkRequest(ctx.SMWithOAuth, web.ServiceBrokersURL, 20) }) + AfterEach(func() { + ctx.Cleanup() + filterContext.UserName = "" + }) It("does limit", func() { expectLimitedRequest(ctx.SMWithOAuth, web.ServiceBrokersURL) }) @@ -163,22 +136,9 @@ var _ = Describe("Service Manager Rate Limiter", func() { }) }) - When("exclude client configured", func() { - BeforeEach(func() { - client := changeClientIdentifier() - newRateLimiterEnv("2-M,2-M", func(set *pflag.FlagSet) { - Expect(set.Set("api.rate_limit_exclude_clients", client)).ToNot(HaveOccurred()) - }) - bulkRequest(ctx.SMWithOAuth, web.PlatformsURL, 2) - }) - It("doesn't limit", func() { - expectNonLimitedRequest(ctx.SMWithOAuth, web.PlatformsURL) - }) - }) - When("two global limiters configured", func() { BeforeEach(func() { - newRateLimiterEnv("2-S,3-M", nil) + ctx = newRateLimiterEnv("2-S,3-M", redisEnabled, nil) changeClientIdentifier() // Exhaust seconds limiter, minute limiter drop from 3 to 1 bulkRequest(ctx.SMWithOAuth, web.ServiceBrokersURL, 2) @@ -188,6 +148,9 @@ var _ = Describe("Service Manager Rate Limiter", func() { // Expecting second limiter will reset, but minute should be still exhausted time.Sleep(3 * time.Second) }) + AfterEach(func() { + ctx.Cleanup() + }) It("does limit using secondary limiter", func() { expectLimitedRequest(ctx.SMWithOAuth, web.ServiceBrokersURL) }) @@ -200,10 +163,13 @@ var _ = Describe("Service Manager Rate Limiter", func() { When("limiter for global and for specific path configured", func() { BeforeEach(func() { - newRateLimiterEnv("10-M,5-M:"+web.PlatformsURL, nil) + ctx = newRateLimiterEnv("10-M,5-M:"+web.PlatformsURL, redisEnabled, nil) changeClientIdentifier() bulkRequest(ctx.SMWithOAuth, web.PlatformsURL, 5) }) + AfterEach(func() { + ctx.Cleanup() + }) It("apply request limit on specific path", func() { expectNonLimitedRequest(ctx.SMWithOAuth, web.ServiceBrokersURL) expectLimitedRequest(ctx.SMWithOAuth, web.PlatformsURL) @@ -212,10 +178,13 @@ var _ = Describe("Service Manager Rate Limiter", func() { When("limiter for specific path configured", func() { BeforeEach(func() { - newRateLimiterEnv("5-M:"+web.PlatformsURL, nil) + ctx = newRateLimiterEnv("5-M:"+web.PlatformsURL, redisEnabled, nil) changeClientIdentifier() bulkRequest(ctx.SMWithOAuth, web.PlatformsURL, 5) }) + AfterEach(func() { + ctx.Cleanup() + }) It("apply request limit on specific path", func() { expectLimitedRequest(ctx.SMWithOAuth, web.PlatformsURL) }) @@ -230,7 +199,7 @@ var _ = Describe("Service Manager Rate Limiter", func() { When("limiter for /v1/service_instances path and POST method is configured", func() { var servicePlanID string BeforeEach(func() { - newRateLimiterEnv("6-M:"+web.ServiceInstancesURL+",5-M:"+web.ServiceInstancesURL+":POST", nil) + ctx = newRateLimiterEnv("6-M:"+web.ServiceInstancesURL+",5-M:"+web.ServiceInstancesURL+":POST", redisEnabled, nil) registerBroker() servicePlanID = ctx.SMWithOAuth.List(web.ServicePlansURL).Element(0).Object().Value("id").String().Raw() changeClientIdentifier() @@ -247,6 +216,9 @@ var _ = Describe("Service Manager Rate Limiter", func() { Status(http.StatusCreated) } }) + AfterEach(func() { + ctx.Cleanup() + }) It("apply request limit on /v1/service_instances path and POST method", func() { requestBody := common.Object{ "name": "test-instance", @@ -267,11 +239,15 @@ var _ = Describe("Service Manager Rate Limiter", func() { When("limiter for multiple paths configured", func() { BeforeEach(func() { - newRateLimiterEnv("10-M,5-M:"+web.PlatformsURL+",2-M:"+web.ServicePlansURL, nil) + ctx = newRateLimiterEnv("10-M,5-M:"+web.PlatformsURL+",2-M:"+web.ServicePlansURL, redisEnabled, nil) changeClientIdentifier() // Counters: 10 (global), 5 (platforms), 2 (plans) bulkRequest(ctx.SMWithOAuth, web.PlatformsURL, 5) // 5,0,2 }) + AfterEach(func() { + changeClientIdentifier() + ctx.Cleanup() + }) It("apply request limit", func() { expectNonLimitedRequest(ctx.SMWithOAuth, web.ServiceBrokersURL) // 4,0,2 expectLimitedRequest(ctx.SMWithOAuth, web.PlatformsURL) // 3,0,2 @@ -284,7 +260,7 @@ var _ = Describe("Service Manager Rate Limiter", func() { When("limiter configured with global rate and multiple for specific path", func() { BeforeEach(func() { - newRateLimiterEnv("100-M,2-S:"+web.PlatformsURL+",4-M:"+web.PlatformsURL, nil) + ctx = newRateLimiterEnv("100-M,2-S:"+web.PlatformsURL+",4-M:"+web.PlatformsURL, redisEnabled, nil) changeClientIdentifier() // Exhaust seconds limiter, minute limiter drop from 4 to 2 bulkRequest(ctx.SMWithOAuth, web.PlatformsURL, 2) @@ -294,6 +270,9 @@ var _ = Describe("Service Manager Rate Limiter", func() { // Expecting second limiter will reset, but minute should be still exhausted time.Sleep(3 * time.Second) }) + AfterEach(func() { + ctx.Cleanup() + }) It("apply request limit using secondary limiter", func() { expectLimitedRequest(ctx.SMWithOAuth, web.PlatformsURL) }) @@ -303,6 +282,42 @@ var _ = Describe("Service Manager Rate Limiter", func() { expectNonLimitedRequest(ctx.SMWithOAuth, web.PlatformsURL) }) }) + + Context("request is authorized", func() { + BeforeEach(func() { + ctx = newRateLimiterEnv("20-M", redisEnabled, nil) + registerBroker() + changeClientIdentifier() + bulkRequest(ctx.SMWithOAuth, web.ServiceBrokersURL, 20) + }) + AfterEach(func() { + ctx.Cleanup() + filterContext.UserName = "" + }) + When("basic auth (global Platform)", func() { + It("doesn't limit basic auth requests", func() { + bulkRequest(ctx.SMWithBasic, osbURL+"/v2/catalog", 21) + }) + }) + + When("endpoint is public", func() { + BeforeEach(func() { + bulkRequest(ctx.SMWithOAuth, "/v1/info", 21) + }) + It("doesn't limit public endpoints", func() { + expectNonLimitedRequest(ctx.SMWithOAuth, "/v1/info") + }) + }) + + When("endpoint is excluded", func() { + BeforeEach(func() { + bulkRequest(ctx.SMWithOAuth, web.OperationsURL, 20) + }) + It("doesn't limit excluded paths", func() { + expectNonLimitedRequest(ctx.SMWithOAuth, web.OperationsURL) + }) + }) + }) }) - }) + } })