-
Notifications
You must be signed in to change notification settings - Fork 0
/
sysd.go
301 lines (259 loc) · 7.25 KB
/
sysd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package sysd
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
)
const (
// GracefulShutdownTimeout is the default graceful shutdown timeout
GracefulShutdownTimeout = 20 * time.Second
// StatusCheckInterval is the default status check interval
StatusCheckInterval = 5 * time.Second
)
var (
// OnFailureRestart will restart the app if it fails
OnFailureRestart *OnFailure = &OnFailure{name: "restart", retry: 3, retryTimeout: 5 * time.Second}
// OnFailureIgnore will ignore the app failure
OnFailureIgnore *OnFailure = &OnFailure{name: "ignore"}
// ErrAppAlreadyExists is returned when an app is added to the systemd service
// but an app with the same name already exists
ErrAppAlreadyExists = errors.New("app already exists")
// ErrAppNotExists is returned when an app is not found in the systemd service
ErrAppNotExists = errors.New("app not exists")
)
// OnFailure is an enum that represents the action to take when an app fails
type OnFailure struct {
name string
retry int
retryTimeout time.Duration
}
// Equal returns true if the OnFailure is equal to the target
func (o *OnFailure) Equal(target *OnFailure) bool {
return o.name == target.name
}
// String returns the string representation of the OnFailure
func (o *OnFailure) String() string {
return o.name
}
// Retry set OnFailure number of retries
func (o *OnFailure) Retry(retry int) *OnFailure {
o.retry = retry
return o
}
// RetryTimeout set OnFailure retry timeout
func (o *OnFailure) RetryTimeout(retryTimeout time.Duration) *OnFailure {
o.retryTimeout = retryTimeout
return o
}
// App is an interface that represents an app
type App interface {
// Start starts the app, should be blocking until the context is cancelled
// restored is true if the app is being restored after a failure
Start(ctx context.Context) error
// Status returns the status of the app
Status(ctx context.Context) error
// Name returns the name of the app
Name() string
}
type appItem struct {
App
name string
onFailure *OnFailure
priority int
}
// Systemd is a struct that represents a systemd service
type Systemd struct {
apps map[string]appItem
defaultOnFailure *OnFailure
logger *logger
graceFullShutdownTimeout time.Duration
statusCheckInterval time.Duration
}
// New returns a new Systemd struct
func New() *Systemd {
return &Systemd{
graceFullShutdownTimeout: GracefulShutdownTimeout,
statusCheckInterval: StatusCheckInterval,
defaultOnFailure: OnFailureRestart,
logger: &logger{l: log.Default()},
}
}
// Add adds an app to the systemd service
func (s *Systemd) Add(app App) error {
if s.apps == nil {
s.apps = make(map[string]appItem)
}
if _, ok := s.apps[app.Name()]; ok {
s.logger.Error("app %q is already exist in systemd stack", app.Name())
return ErrAppAlreadyExists
}
s.apps[app.Name()] = appItem{
App: app,
name: app.Name(),
onFailure: s.defaultOnFailure,
priority: 0,
}
return nil
}
// SetLogger sets the logger
func (s *Systemd) SetLogger(l Logger) {
s.logger = &logger{l: l}
}
// SetGraceFulShutdownTimeout sets the graceful shutdown timeout
func (s *Systemd) SetGraceFulShutdownTimeout(t time.Duration) {
s.graceFullShutdownTimeout = t
}
// SetStatusCheckInterval sets the status check interval
func (s *Systemd) SetStatusCheckInterval(t time.Duration) {
s.statusCheckInterval = t
}
// SetDefaultOnFailure sets the default on failure action
func (s *Systemd) SetDefaultOnFailure(onFailure *OnFailure) {
s.defaultOnFailure = onFailure
}
// SetAppOnFailure sets the on failure action for a specific app
func (s *Systemd) SetAppOnFailure(appName string, onFailure *OnFailure) error {
if app, ok := s.apps[appName]; ok {
app.onFailure = onFailure
s.apps[appName] = app
}
return ErrAppNotExists
}
// SetAppPriority sets the priority for a specific app
func (s *Systemd) SetAppPriority(appName string, priority int) error {
if app, ok := s.apps[appName]; ok {
app.priority = priority
s.apps[appName] = app
}
return ErrAppNotExists
}
// Start starts the systemd service, and all apps within.
// it will return an error if any of the apps fail to start
// or block until the context is cancelled
func (s *Systemd) Start(ctx context.Context) error {
// Start apps in parallel
errs := make(chan error, len(s.apps))
wg := sync.WaitGroup{}
apps := make([]appItem, 0, len(s.apps))
for _, app := range s.apps {
apps = append(apps, app)
}
// sort apps by priority
sortByPriority(apps)
for _, app := range apps {
s.startApp(ctx, app, &wg, errs)
}
go s.watchForStatus(ctx, &wg, errs)
// wait for all apps to start or context to be cancelled
for {
select {
case <-ctx.Done():
s.WaitForAppsStop(&wg) // wait for all apps to stop
return nil
case err := <-errs:
if !errors.Is(err, context.Canceled) {
return err
}
}
}
}
func sortByPriority(apps []appItem) {
for i := 0; i < len(apps); i++ {
for j := i + 1; j < len(apps); j++ {
if apps[i].priority > apps[j].priority {
apps[i], apps[j] = apps[j], apps[i]
}
}
}
}
func (s *Systemd) startApp(ctx context.Context, app appItem, wg *sync.WaitGroup, errs chan error) {
wg.Add(1)
go func(app appItem) {
defer func() {
wg.Done()
if r := recover(); r != nil {
if err, ok := r.(error); ok {
errs <- err
} else {
errs <- fmt.Errorf("%v", r)
}
}
}()
s.logger.Info("Starting app: %q", app.Name())
// start the app with retry and timeout if configured
if err := startWithRetry(ctx, app); err != nil {
errs <- err
}
}(app)
}
func startWithRetry(ctx context.Context, app appItem) error {
var err error
for i := 0; i < app.onFailure.retry; i++ {
if err = app.Start(ctx); err != nil {
time.Sleep(app.onFailure.retryTimeout)
continue
}
return nil
}
return err
}
// WaitForAppsStop waits for all apps to stop or context to be cancelled
func (s *Systemd) WaitForAppsStop(wg *sync.WaitGroup) {
// wait for all apps to stop or context to be cancelled
select {
case <-time.After(s.graceFullShutdownTimeout):
s.logger.Error("Shutdown timeout, forcefully stopping apps")
return
case <-waitForGroup(wg):
s.logger.Info("All apps stopped")
return
}
}
func (s *Systemd) watchForStatus(ctx context.Context, wg *sync.WaitGroup, errs chan error) {
ticker := time.NewTicker(s.statusCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
errs <- ctx.Err()
case <-ticker.C:
for _, app := range s.apps {
if err := app.Status(ctx); err != nil {
s.logger.Error("app %q status check failed: %v", app.Name(), err)
switch app.onFailure {
case OnFailureRestart:
s.logger.Info("Restarting app %q", app.Name())
s.startApp(restoredContext(ctx), app, wg, errs)
case OnFailureIgnore:
s.logger.Info("Ignoring app %q failure", app.Name())
// remove app from apps list
delete(s.apps, app.Name())
// remove app from wait group
wg.Add(-1)
}
}
}
}
}
}
func waitForGroup(wg *sync.WaitGroup) <-chan struct{} {
c := make(chan struct{})
go func() {
wg.Wait()
close(c)
}()
return c
}
type restoredTask struct{}
func restoredContext(ctx context.Context) context.Context {
return context.WithValue(ctx, restoredTask{}, true)
}
func IsRestored(ctx context.Context) bool {
if ctx.Value(restoredTask{}) != nil {
return true
}
return false
}