-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Notification filtering #278
Conversation
9dc8c8e
to
b4c7594
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very confusing implementation that doesn't make sense without a lot of additional comments. I'd try a different approach to filtering because this implementation will be unmaintainable in a month. Maybe inverse the logic.
@@ -58,9 +48,18 @@ func (c *Controller) handleWS(req *web.Request) (*web.Response, error) { | |||
return util.NewJSONResponse(http.StatusGone, nil) | |||
} | |||
|
|||
notificationQueue, lastKnownRevision, notificationsList, err := c.registerConsumer(ctx, revisionKnownToProxy, user) | |||
user, ok := web.UserFromContext(req.Context()) | |||
if !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move the user check down here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not used above. I have moved it to the place where it is needed for a first time.
@@ -100,6 +100,9 @@ var ( | |||
|
|||
// ErrConcurrentResourceModification error returned when concurrent resource updates are happening | |||
ErrConcurrentResourceModification = errors.New("another resource update happened concurrently. Please reattempt the update") | |||
|
|||
// ErrInvalidNotificationRevision provided notification revision is not valid, must return http status GONE | |||
ErrInvalidNotificationRevision = errors.New("notification revision is not valid") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor - I'd rather remove all these util.Err*
because they are not util errors. One option is to move all errors and their handling in an errors package or move every error in its respective package (e.g. storage.ErrConcurrentModification
, notifications.ErrInvalidRevision
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have created an issue for that - #281 as it could be done in a different PR.
@@ -30,6 +36,10 @@ import ( | |||
type notificationStorage interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that's not part of this PR, but this smells of bad design... a private interface for the Postgres storage only to make the tests work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This interface will be removed when the storage supports sorting by a specific property and limit of returned results (in sql - order by and limit).
storage/interfaces.go
Outdated
} | ||
|
||
// NotificationFilterFunc filters recipients for a given notifications | ||
type NotificationFilterFunc func(recipients []*types.Platform, notification *types.Notification) (filteredRecipients []*types.Platform) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe ReceiversFilterFunc
storage/postgres/notificator.go
Outdated
} | ||
|
||
func (n *Notificator) replaceQueueWithMissingNotificationsQueue(queue storage.NotificationQueue, lastKnownRevision, lastKnownRevisionToSM int64, platform *types.Platform) (storage.NotificationQueue, error) { | ||
if lastKnownRevision > lastKnownRevisionToSM { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this check outside as it's unrelated to this function
pkg/types/notification.go
Outdated
@@ -35,6 +35,9 @@ const ( | |||
|
|||
// DELETED represents a notification type for deleting a resource | |||
DELETED OperationType = "DELETED" | |||
|
|||
// INVALIDREVISION revision with invalid value | |||
INVALIDREVISION int64 = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strange for this to be in the types package. I'd rather have the previous state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in the storage/postgres and api/notifications. It will be strange for any of them to import the other. It cannot be in storage as well as postgres must not import storage. This seemed the most appropriate as InvalidRevision is some value of a property of notification similar to the operation types (CREATED, DELETED ...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be InvalidNotificationRevision
then, because types.InvalidRevision
doesn't mean much
} | ||
|
||
if n.queueSize < len(filteredMissedNotification) { | ||
log.C(n.ctx).Debugf("too many missed notifications %d", len(filteredMissedNotification)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? it should just queue them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but if it can't queue them (queues have limited size) the proxy will do a full resync.
storage/postgres/notificator.go
Outdated
return nil, err | ||
} | ||
filteredMissedNotification := make([]*types.Notification, 0, len(missedNotifications)) | ||
var recipients []*types.Platform |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this row
storage/postgres/notificator.go
Outdated
filteredMissedNotification := make([]*types.Notification, 0, len(missedNotifications)) | ||
var recipients []*types.Platform | ||
for _, notification := range missedNotifications { | ||
recipients = n.filterRecipients([]*types.Platform{platform}, notification) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very weird... it says "filter recepients" that uses "notifications filter" but filters platforms and after that this filtering doesn't use the returned result. The problem is that you're using the same function to do 2 different things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully with the rename of NotificationFilterFunc
to ReceiversFilterFunc
it might be more clear. The idea is to filter given recipients for a specific notification by returning the filtered recipients. This way the one who is writing the filter can process the notification once for all recipients. filterRecipients
function iterates over all registered filter functions and applies them to the initial recipients and given notification.
func (n *Notificator) filterRecipients(recipients []*types.Platform, notification *types.Notification) []*types.Platform { | ||
for _, filter := range n.notificationFilters { | ||
recipients = filter(recipients, notification) | ||
if len(recipients) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't understand why this works this way - filter returns an empty slice if any filter decides that no platform should receive it and the same list if all filters decide that at least one platform should receive it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an optimisation - if a filter in the chain of filters returns 0 recipients then it is not necessary to call the other filters as there are no platforms to filter. See also comment above.
@@ -24,22 +22,14 @@ const ( | |||
LastKnownRevisionHeader = "last_notification_revision" | |||
LastKnownRevisionQueryParam = "last_notification_revision" | |||
|
|||
unknownRevision int64 = -1 | |||
noRevision int64 = 0 | |||
noRevision int64 = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look like this constant is not needed.
If the revision is Unknown the flow is the same.
In the producer: https://github.com/Peripli/service-broker-proxy/blob/master/pkg/sbproxy/notifications/producer.go#L43
The value for InvalidRevision is returned when it is unknown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SM sends 0 as last notification when there are no notifications in the database. This 0 is received by the proxy and if its connection drops it should resend it as last known revision. In this case SM sends 410 (see line 47 and 79). Idea to improve this is not to send the 0 at all (both by the SM and proxy). But this requires changes in the proxy as well. It should be done in a different PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -69,3 +79,38 @@ func (ns *notificationStorageImpl) GetNotification(ctx context.Context, id strin | |||
} | |||
return notificationObj.(*types.Notification), nil | |||
} | |||
|
|||
func (ns *notificationStorageImpl) ListNotifications(ctx context.Context, platformID string, from, to int64) ([]*types.Notification, error) { | |||
// TODO: replace with less than or equal operator when available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid TODOs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They were from a previous PR. I will remove them.
return nil, err | ||
} | ||
notificationsList := objectList.(*types.Notifications) | ||
// TODO: Should be done in the database with order by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same - create a ticket/issues for the desired/missing functionalities and clean the TODOs.
storage/interfaces.go
Outdated
// When consumer wants to stop listening for notifications it must unregister the notification queue. | ||
RegisterConsumer(userContext *web.UserContext) (NotificationQueue, int64, error) | ||
RegisterConsumer(platform *types.Platform, lastKnownRevision int64) (NotificationQueue, int64, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor - platform should be consumer
Notification filtering
Motivation
Some notifications might not be necessary for some platforms.
For example if a broker must not be registered in a specific platform then a filter can be introduced on /v1/service_brokers which filters the broker. But the creation of the broker will create a notification for all platforms. A way of filtering this notification must be introduced.
Approach
Introduce notification filters - functions which filter the recipients of a notification.
Pull Request status