forked from rusenask/casbin-go-cloud-watcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
watcher.go
393 lines (349 loc) · 12.1 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
/*
Package watcher provides an implementation of [persist.WatcherEx], supporting
various pub/sub systems by leveraging the gocloud [pubsub] package.
For more details about the pub/sub systems supported by gocloud, please refer to
https://gocloud.dev/howto/pubsub/.
*/
package watcher
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"runtime"
"sync"
"time"
"github.com/casbin/casbin/v2"
"github.com/casbin/casbin/v2/model"
"github.com/casbin/casbin/v2/persist"
"gocloud.dev/pubsub"
)
var _ persist.WatcherEx = &Watcher{}
var (
// ErrNotConnected is the error returned when the watcher is not connected.
ErrNotConnected = errors.New("pubsub not connected, cannot dispatch update message")
)
// Watcher implements Casbin updates watcher to synchronize policy changes
// between the nodes.
type Watcher struct {
url string // the pubsub url
callbackFunc func(string) // the update callback function that the watcher will call
connMu *sync.RWMutex // the mutex for pubsub connections
ctx context.Context // the context for pubsub connections
topic *pubsub.Topic // the pubsub topic
sub *pubsub.Subscription // the pubsub subscription
opt Option // the watcher option
}
// UpdateType is the type of update.
type UpdateType string
// Defines the update types.
const (
Update UpdateType = "Update"
UpdateForAddPolicy UpdateType = "UpdateForAddPolicy"
UpdateForRemovePolicy UpdateType = "UpdateForRemovePolicy"
UpdateForRemoveFilteredPolicy UpdateType = "UpdateForRemoveFilteredPolicy"
UpdateForSavePolicy UpdateType = "UpdateForSavePolicy"
UpdateForAddPolicies UpdateType = "UpdateForAddPolicies"
UpdateForRemovePolicies UpdateType = "UpdateForRemovePolicies"
UpdateForUpdatePolicy UpdateType = "UpdateForUpdatePolicy"
UpdateForUpdatePolicies UpdateType = "UpdateForUpdatePolicies"
)
// MSG represents the payload for a pub/sub message, detailing the type of update
// and the specifics of the policy change.
type MSG struct {
Method UpdateType `json:"method"` // Type of update.
ID string `json:"id"` // Unique ID of the watcher instance.
Sec string `json:"sec,omitempty"` // Section of the policy being updated.
Ptype string `json:"ptype,omitempty"` // Type of policy being updated.
OldRules [][]string `json:"old_rules,omitempty"` // Previous state of the policy rules.
NewRules [][]string `json:"new_rules,omitempty"` // New state of the policy rules.
FieldIndex int `json:"field_index,omitempty"` // Index of the field being updated.
FieldValues []string `json:"field_values,omitempty"` // Values of the field being updated.
}
// MarshalBinary implements the encoding.BinaryMarshaler interface.
func (m MSG) MarshalBinary() ([]byte, error) {
return json.Marshal(m)
}
// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.
func (m *MSG) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, m)
}
// DefaultCallback is the default callback function that the watcher will call
// when the policy in DB has been changed by other instances.
func DefaultCallback(e casbin.IEnforcer) func(string) {
return func(s string) {
m := &MSG{}
if err := m.UnmarshalBinary([]byte(s)); err != nil {
log.Printf("[go-cloud-watcher] failed to unmarshal msg: %v\n", err)
return
}
var (
res bool
err error
)
switch m.Method {
case Update, UpdateForSavePolicy:
err = e.LoadPolicy()
res = true
case UpdateForAddPolicy:
res, err = e.SelfAddPolicy(m.Sec, m.Ptype, m.NewRules[0])
case UpdateForAddPolicies:
res, err = e.SelfAddPolicies(m.Sec, m.Ptype, m.NewRules)
case UpdateForRemovePolicy:
res, err = e.SelfRemovePolicy(m.Sec, m.Ptype, m.NewRules[0])
case UpdateForRemoveFilteredPolicy:
res, err = e.SelfRemoveFilteredPolicy(m.Sec, m.Ptype, m.FieldIndex, m.FieldValues...)
case UpdateForRemovePolicies:
res, err = e.SelfRemovePolicies(m.Sec, m.Ptype, m.NewRules)
case UpdateForUpdatePolicy:
res, err = e.SelfUpdatePolicy(m.Sec, m.Ptype, m.OldRules[0], m.NewRules[0])
case UpdateForUpdatePolicies:
res, err = e.SelfUpdatePolicies(m.Sec, m.Ptype, m.OldRules, m.NewRules)
default:
err = fmt.Errorf("unknown update type: %s", m.Method)
}
if err != nil {
log.Printf("[go-cloud-watcher] failed to update policy: %v\n", err)
}
if !res {
log.Println("[go-cloud-watcher] callback update policy failed")
}
}
}
// New creates a new watcher
//
// Parameters:
// - ctx: the context for pubsub connections
// - url: the pubsub url (e.g. "kafka://my-topic")
//
// Returns:
// - Watcher: the new watcher instance
// - error: the error if the watcher cannot be created
func New(ctx context.Context, url string) (*Watcher, error) {
return NewWithOption(ctx, url, Option{})
}
// NewWithOption creates a new watcher with the option
//
// Parameters:
// - ctx: the context for pubsub connections
// - url: the pubsub url (e.g. "kafka://my-topic")
// - opt: the watcher option
//
// Returns:
// - Watcher: the new watcher instance
// - error: the error if the watcher cannot be created
func NewWithOption(ctx context.Context, url string, opt Option) (*Watcher, error) {
w := &Watcher{
url: url,
connMu: &sync.RWMutex{},
opt: opt,
}
runtime.SetFinalizer(w, finalizer)
err := w.initializeConnections(ctx)
return w, err
}
// SetUpdateCallback sets the callback function that the watcher will call
// when the policy in DB has been changed by other instances.
// A classic callback is Enforcer.LoadPolicy().
func (w *Watcher) SetUpdateCallback(callbackFunc func(string)) error {
w.connMu.Lock()
w.callbackFunc = callbackFunc
w.connMu.Unlock()
return nil
}
// initializeConnections initializes the pubsub connections.
func (w *Watcher) initializeConnections(ctx context.Context) error {
w.connMu.Lock()
defer w.connMu.Unlock()
w.ctx = ctx
topic, err := pubsub.OpenTopic(ctx, w.url)
if err != nil {
return err
}
w.topic = topic
return w.subscribeToUpdates(ctx)
}
// subscribeToUpdates subscribes to the topic to receive updates.
func (w *Watcher) subscribeToUpdates(ctx context.Context) error {
sub, err := pubsub.OpenSubscription(ctx, w.url)
if err != nil {
return fmt.Errorf("failed to open updates subscription, error: %w", err)
}
w.sub = sub
go func() {
for {
msg, err := sub.Receive(ctx)
if err != nil {
if ctx.Err() == context.Canceled {
// nothing to do
return
}
log.Printf("Error while receiving an update message: %s\n", err)
return
}
w.executeCallback(msg)
msg.Ack()
}
}()
return nil
}
// executeCallback executes the callback function.
func (w *Watcher) executeCallback(msg *pubsub.Message) {
w.connMu.RLock()
defer w.connMu.RUnlock()
if w.callbackFunc != nil {
go w.callbackFunc(string(msg.Body))
}
}
// Update calls the update callback of other instances to synchronize their policy.
// It is usually called after changing the policy in DB, like Enforcer.SavePolicy(),
// Enforcer.AddPolicy(), Enforcer.RemovePolicy(), etc.
func (w *Watcher) Update() error {
w.connMu.RLock()
defer w.connMu.RUnlock()
if w.topic == nil {
return ErrNotConnected
}
return w.notifyMessage(&MSG{Method: Update, ID: w.GetLocalID()})
}
// Close stops and releases the watcher, the callback function will not be called any more.
func (w *Watcher) Close() {
finalizer(w)
}
// UpdateForAddPolicy calls the update callback of other instances to synchronize their
// policy. It is called after a policy is added via Enforcer.AddPolicy(), Enforcer.AddNamedPolicy(),
// Enforcer.AddGroupingPolicy() and Enforcer.AddNamedGroupingPolicy().
func (w *Watcher) UpdateForAddPolicy(sec string, ptype string, params ...string) error {
return w.notifyMessage(&MSG{
Method: UpdateForAddPolicy,
ID: w.GetLocalID(),
Sec: sec,
Ptype: ptype,
NewRules: [][]string{params},
})
}
// UPdateForRemovePolicy calls the update callback of other instances to
// synchronize their policy. It is called after a policy is removed by
// Enforcer.RemovePolicy(), Enforcer.RemoveNamedPolicy(),
// Enforcer.RemoveGroupingPolicy() and Enforcer.RemoveNamedGroupingPolicy().
func (w *Watcher) UpdateForRemovePolicy(sec string, ptype string, params ...string) error {
return w.notifyMessage(&MSG{
Method: UpdateForRemovePolicy,
ID: w.GetLocalID(),
Sec: sec,
Ptype: ptype,
NewRules: [][]string{params},
})
}
// UpdateForRemoveFilteredPolicy calls the update callback of other instances to
// synchronize their policy. It is called after Enforcer.RemoveFilteredPolicy(),
// Enforcer.RemoveFilteredNamedPolicy(), Enforcer.RemoveFilteredGroupingPolicy()
// and Enforcer.RemoveFilteredNamedGroupingPolicy().
func (w *Watcher) UpdateForRemoveFilteredPolicy(sec string, ptype string, fieldIndex int, fieldValues ...string) error {
return w.notifyMessage(&MSG{
Method: UpdateForRemoveFilteredPolicy,
ID: w.GetLocalID(),
Sec: sec,
Ptype: ptype,
FieldIndex: fieldIndex,
FieldValues: fieldValues,
})
}
// UpdateForSavePolicy calls the update callback of other instances to
// synchronize their policy. It is called after Enforcer.SavePolicy().
func (w *Watcher) UpdateForSavePolicy(model model.Model) error {
return w.notifyMessage(&MSG{
Method: UpdateForSavePolicy,
ID: w.GetLocalID(),
})
}
// UpdateForAddPolicies calls the update callback of other instances to
// synchronize their policy. It is called after Enforcer.AddPolicies(),
// Enforcer.AddNamedPolicies(), Enforcer.AddGroupingPolicies() and
// Enforcer.AddNamedGroupingPolicies().
func (w *Watcher) UpdateForAddPolicies(sec string, ptype string, rules ...[]string) error {
return w.notifyMessage(&MSG{
Method: UpdateForAddPolicies,
ID: w.GetLocalID(),
Sec: sec,
Ptype: ptype,
NewRules: rules,
})
}
// UpdateForRemovePolicies calls the update callback of other instances to
// synchronize their policy. It is called after Enforcer.RemovePolicies(),
// Enforcer.RemoveNamedPolicies(), Enforcer.RemoveGroupingPolicies() and
// Enforcer.RemoveNamedGroupingPolicies().
func (w *Watcher) UpdateForRemovePolicies(sec string, ptype string, rules ...[]string) error {
return w.notifyMessage(&MSG{
Method: UpdateForRemovePolicies,
ID: w.GetLocalID(),
Sec: sec,
Ptype: ptype,
NewRules: rules,
})
}
// UpdateForUpdatePolicy calls the update callback of other instances to synchronize their policy.
// It is called after Enforcer.UpdatePolicy().
func (w *Watcher) UpdateForUpdatePolicy(sec string, ptype string, oldRule, newRule []string) error {
return w.notifyMessage(&MSG{
Method: UpdateForUpdatePolicy,
ID: w.GetLocalID(),
Sec: sec,
Ptype: ptype,
OldRules: [][]string{oldRule},
NewRules: [][]string{newRule},
})
}
// UpdateForUpdatePolicies calls the update callback of other instances to synchronize their policy.
// It is called after Enforcer.UpdatePolicies().
func (w *Watcher) UpdateForUpdatePolicies(sec string, ptype string, oldRules, newRules [][]string) error {
return w.notifyMessage(&MSG{
Method: UpdateForUpdatePolicies,
ID: w.GetLocalID(),
Sec: sec,
Ptype: ptype,
OldRules: oldRules,
NewRules: newRules,
})
}
// finalizer is the destructor for Watcher.
func finalizer(w *Watcher) {
w.connMu.Lock()
defer w.connMu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if w.topic != nil {
w.topic = nil
}
if w.sub != nil {
err := w.sub.Shutdown(ctx)
if err != nil {
log.Printf("Subscription shutdown failed, error: %s\n", err)
}
w.sub = nil
}
w.callbackFunc = nil
}
// notifyMessage sends a message to the topic
//
// Parameters:
// - msg: the message to send
//
// Returns:
// - error: the error if the message cannot be sent.
func (w *Watcher) notifyMessage(msg *MSG) error {
msgBody, err := msg.MarshalBinary()
if err != nil {
return err
}
p := &pubsub.Message{Body: msgBody}
if err := w.topic.Send(w.ctx, p); err != nil {
return err
}
if w.GetVerbose() {
log.Printf("[go-cloud-watcher] send message: %s\n", string(msgBody))
}
return nil
}