Skip to content

Commit

Permalink
Add cache configuration per endpoint in fallback cache (#443)
Browse files Browse the repository at this point in the history
* Add cache configuration per endpoint in fallback cache

1. We are using existing 'methodName' context already being set and used in EPG
2. Create initialization function to be called as contructor options
3. New configuration defaults to NO-OP if config is not set or empty else is enforced

* Addressed comments

* TEst

* Add context key setters and getters

* Move this to 3.4.26 release

* Addressed comments

* derp
  • Loading branch information
ViddyCat authored May 29, 2020
1 parent 02cd50b commit fc58226
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 15 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Changelog

## v3.4.26 (unreleased)
- Nothing changed yet.
- Add cache configuration per endpoint in fallback cache

## v3.4.25 (2020-05-05)
- Add the RateLimit error.
Expand Down
73 changes: 59 additions & 14 deletions connectors/cache/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,37 @@ func WithSkipWriteInvalidateEntities(entities ...dosa.DomainObject) Options {
}
}

// contextKey used by SetCacheableEndpoints to set endpoint names
type contextKey string

var (
// contextEndpoint allows users to pass in calling endpoint name
contextEndpoint contextKey = "endpoint"
// endpointActiveStatus marks the endpoint as active
endpointActiveStatus bool = true
)

// SetContextEndpoint set endpoint in context
func SetContextEndpoint(ctx context.Context, endpoint string) context.Context {
return context.WithValue(ctx, contextEndpoint, endpoint)
}

// GetContextEndpoint get endpoint from context
func GetContextEndpoint(ctx context.Context) string {
endpoint, _ := ctx.Value(contextEndpoint).(string)
return endpoint
}

// SetCacheableEndpoints sets cacheable endpoints
func SetCacheableEndpoints(endpoints ...string) Options {
return func(c *Connector) error {
for _, endpoint := range endpoints {
c.cacheableEndpointStatus[endpoint] = endpointActiveStatus
}
return nil
}
}

// NewConnector creates a fallback cache connector
func NewConnector(origin, fallback dosa.Connector, scope metrics.Scope, entities []dosa.DomainObject, options ...Options) *Connector {
c := newConnector(origin, fallback, scope, encoding.NewGobEncoder(), entities)
Expand All @@ -76,12 +107,14 @@ func NewConnector(origin, fallback dosa.Connector, scope metrics.Scope, entities
func newConnector(origin, fallback dosa.Connector, scope metrics.Scope, encoder encoding.Encoder, entities []dosa.DomainObject) *Connector {
bc := base.Connector{Next: origin}
set := createCachedEntitiesSet(entities)
cacheableEndpointStatus := make(map[string]bool)
return &Connector{
Connector: bc,
fallback: fallback,
encoder: encoder,
cacheableEntities: set,
stats: scope,
Connector: bc,
fallback: fallback,
encoder: encoder,
cacheableEntities: set,
cacheableEndpointStatus: cacheableEndpointStatus,
stats: scope,
}
}

Expand All @@ -91,6 +124,7 @@ type Connector struct {
fallback dosa.Connector
encoder encoding.Encoder
cacheableEntities map[string]bool
cacheableEndpointStatus map[string]bool
skipWriteInvalidateEntitiesMap map[string]bool
mux sync.Mutex
stats metrics.Scope
Expand All @@ -100,7 +134,7 @@ type Connector struct {

// Upsert removes (invalidates) the entry from the fallback if the entity is not in the skipWriteInvalidateEntitiesMap
func (c *Connector) Upsert(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue) error {
if c.isCacheable(ei) {
if c.isCacheable(ctx, ei) {
w := func() error {
return c.removeValueFromFallback(ctx, ei, createCacheKey(ei, values))
}
Expand All @@ -117,7 +151,7 @@ func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, keys map[stri
populateValuesWithKeys(keys, source)
}
// If we are not caching for this entity, just return
if !c.isCacheable(ei) {
if !c.isCacheable(ctx, ei) {
return source, sourceErr
}

Expand Down Expand Up @@ -169,7 +203,7 @@ func (c *Connector) write(ctx context.Context, ei *dosa.EntityInfo, keys map[str
// Range returns range from origin, reverts to fallback if origin fails
func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) {
sourceRows, sourceToken, sourceErr := c.Next.Range(ctx, ei, columnConditions, dosa.All(), token, limit)
if !c.isCacheable(ei) || dosa.ErrorIsNotFound(sourceErr) {
if !c.isCacheable(ctx, ei) || dosa.ErrorIsNotFound(sourceErr) {
return sourceRows, sourceToken, sourceErr
}
cacheKey := rangeQuery{
Expand Down Expand Up @@ -230,7 +264,7 @@ func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, keys []m
}
}

if dosa.ErrorIsNotFound(sourceErr) || !c.isCacheable(ei) {
if dosa.ErrorIsNotFound(sourceErr) || !c.isCacheable(ctx, ei) {
return source, sourceErr
}

Expand Down Expand Up @@ -289,7 +323,7 @@ func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, keys []m

// Remove deletes an entry
func (c *Connector) Remove(ctx context.Context, ei *dosa.EntityInfo, keys map[string]dosa.FieldValue) error {
if c.isCacheable(ei) {
if c.isCacheable(ctx, ei) {
w := func() error {
return c.removeValueFromFallback(ctx, ei, createCacheKey(ei, keys))
}
Expand All @@ -301,7 +335,7 @@ func (c *Connector) Remove(ctx context.Context, ei *dosa.EntityInfo, keys map[st

// MultiUpsert deletes the entries getting upserted from the fallback if the entity is not in the skipWriteInvalidateEntitiesMap
func (c *Connector) MultiUpsert(ctx context.Context, ei *dosa.EntityInfo, multiValues []map[string]dosa.FieldValue) (result []error, err error) {
if c.isCacheable(ei) {
if c.isCacheable(ctx, ei) {
w := func() error {
for _, values := range multiValues {
_ = c.removeValueFromFallback(ctx, ei, createCacheKey(ei, values))
Expand All @@ -315,7 +349,7 @@ func (c *Connector) MultiUpsert(ctx context.Context, ei *dosa.EntityInfo, multiV

// MultiRemove deletes multiple entries from the fallback
func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiKeys []map[string]dosa.FieldValue) (result []error, err error) {
if c.isCacheable(ei) {
if c.isCacheable(ctx, ei) {
w := func() error {
for _, keys := range multiKeys {
_ = c.removeValueFromFallback(ctx, ei, createCacheKey(ei, keys))
Expand Down Expand Up @@ -413,8 +447,19 @@ func (c *Connector) shouldSkipInvalidateCacheOnWrite(ei *dosa.EntityInfo) bool {
return c.skipWriteInvalidateEntitiesMap[ei.Def.Name]
}

func (c *Connector) isCacheable(ei *dosa.EntityInfo) bool {
return c.cacheableEntities[ei.Def.Name]
func (c *Connector) isCacheable(ctx context.Context, ei *dosa.EntityInfo) bool {
return c.cacheableEntities[ei.Def.Name] && c.isEndpointCacheable(ctx)
}

func (c *Connector) isEndpointCacheable(ctx context.Context) bool {
// Cacheable endpoints not set via. SetCacheableEndpoints
// return true to default behaviour
if len(c.cacheableEndpointStatus) == 0 {
return true
}

endpoint := GetContextEndpoint(ctx)
return c.cacheableEndpointStatus[endpoint]
}

func createCacheMapFromEntites(entities []dosa.DomainObject) map[string]bool {
Expand Down
17 changes: 17 additions & 0 deletions connectors/cache/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,23 @@ func TestCacheableEntities(t *testing.T) {
assert.Len(t, set, 1)
}

// Test setting cacheable endpoints
func TestCacheableEndpoints(t *testing.T) {
c := NewConnector(memory.NewConnector(), memory.NewConnector(), nil, nil)
assert.Len(t, c.cacheableEndpointStatus, 0)

endpoints := []string{"getEaterPromotions", "getPromotionsForStores"}
c = NewConnector(memory.NewConnector(), memory.NewConnector(), nil, nil, SetCacheableEndpoints(endpoints...))
assert.Len(t, c.cacheableEndpointStatus, 2)

// Test context key setters and getters
for _, endpoint := range endpoints {
ctx := context.Background()
ctx = SetContextEndpoint(ctx, endpoint)
assert.Equal(t, GetContextEndpoint(ctx), endpoint)
}
}

func TestWriteKeyValueToFallback(t *testing.T) {
connector := NewConnector(memory.NewConnector(), memory.NewConnector(), nil, nil)
err := connector.writeKeyValueToFallback(context.TODO(), testEi, "a", nil)
Expand Down

0 comments on commit fc58226

Please sign in to comment.