-
Notifications
You must be signed in to change notification settings - Fork 0
/
doc.go
351 lines (279 loc) · 9.73 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
/*
Package goasync is a helper framework for writing asynchronous code in go.
It's primary goal is to reduce the amount of boilier plate code one has to
write to do concurrent tasks.
TLDR: Don't care about my journey through golang's concurrency model,
just want to know how this library works, skip down to The Task API.
Prior Art
https://github.com/chebyrash/promise
https://github.com/fanliao/go-promise
https://github.com/capitalone/go-future-context
https://github.com/rafaeldias/async
Other Reading
http://www.golangpatterns.info/concurrency/futures
https://stackoverflow.com/questions/35926173/implementing-promise-with-channels-in-go
https://medium.com/strava-engineering/futures-promises-in-the-land-of-golang-1453f4807945
https://www.reddit.com/r/golang/comments/3qhgsr/futurespromises_in_the_land_of_golang
Async Functions in Go
Sure you can prefix any function call with `go` and it will run asynchronously
but this isn't always the full story, more often than not you have to deal with
the results of that function call, including any error handling strategy and at
the very least some sort of method of ensuring it actually runs to completion
not to mention cancelation.
What is a pipeline?
"Informally, a pipeline is a series of stages connected by channels,
where each stage is a group of goroutines running the same function."
https://blog.golang.org/pipelines
If you are writting some sort of server/service (which is what go is really
awesome at doing) where you might have the same async pipeline run per request.
Then you can construct all of this once and it's not really a major issue.
But when you need to create many different pipelines, or pipelines that have a
stage that run many different functions in goroutines then a different solution
is required. This is what I consider to be the basic async function:
func fooAsync(bar <-chan string) (<-chan string, <-chan error) {
resolver := make(chan string, 1)
rejector := make(chan error, 1)
go func() {
v, err := doSomething(<-bar)
if err != nil {
rejector <- err
} else {
resolver <- v
}
}()
return resolver, rejector
}
Ok so lets pull that apart:
* Lets say it's idiomatic to add the `Async` suffix to any function name,
much like you might prefix a function name with `Must` to denote it might panic.
This follows other languages such as C#.
* Any inputs to an async function must be channels, this is so that when that
input is returned from another async function that channel can just be passed
in and read from the goroutine, not blocking anyone else.
* Similarly all returned values from an async function must be channels.
Effectively we have what might be similar to a Javascript promise but broken
into 2 parts, the resolver and the rejector.
* Resolvers and rejectors should be buffered such that they can be executed
before anything has been setup to read the channels.
* A resolver or rejector will only ever send a single value to the channel.
* A resolver or rejector can only be read once. A simple trick to "tee" a channel:
ch2 := make(chan string, 1)
ch3 := make(chan string, 1)
go func() {
v := <-ch1
ch2 <- v
ch3 <- v
}()
Adding Cancelation
func fooAsync(bar <-chan string) (resolver <-chan string, rejector <-chan error, stopper chan<- struct{}) {
resolver = make(chan string, 1)
rejector = make(chan error, 1)
stopper = make(chan struct{}, 1)
go func() {
for {
select {
case <-stopper:
return
case <-time.After(5 * time.Second):
resolver <- value
return
default:
v, err := doSomething(<-bar)
if err != nil {
rejector <- err
} else {
value = value + v
}
}
}
}()
return resolver, rejector, stopper
}
* So we have just written 24 lines of code and only one of them actually does
anything of any importance.
* While there are some cases where having the resolver, rejector & stopper
separate makes sense more often than not it gets hard to keep track of the
pipeline with so many variables. What if fooAsync above wanted to see the error
of bar, you would have to pass that in too.
* Why not use context.Context? Plenty of reading about that
https://dave.cheney.net/2017/08/20/context-isnt-for-cancellation
Awaiting in Go
Your friend is `select`, basically consider it be the replacement for the
keyword `await` used in other languages.
resolver, rejector := barAsync()
select {
case v := <-resolver:
fmt.Println("we got an value", v)
case err := <-rejector:
fmt.Println("we got an error", err)
}
Await Any: This is how you might await for "any" of the results
from a collection of async calls.
resolver1, rejector1 := fooAsync()
resolver2, rejector2 := barAsync()
resolver3, rejector3 := bazAsync()
var value string
var err error
select {
case v := <-resolver1:
value = v
case v := <-resolver2:
value = v
case v := <-resolver3:
value = v
case e := <-rejector1:
err = e
case e := <-rejector2:
err = e
case e := <-rejector3:
err = e
}
if err != nil {
panic(err)
}
fmt.Println("we got a value", value)
Await All: This is how you might await for "all" of the results
from a collection of async calls.
resolver1, rejector1 := fooAsync()
resolver2, rejector2 := barAsync()
resolver3, rejector3 := bazAsync()
values := []string{}
errs := []error{}
for i := 0; i < 3; i++ { // no this should not be 6
select {
case v := <-resolver1:
values = append(values, v)
case v := <-resolver2:
values = append(values, v)
case v := <-resolver3:
values = append(values, v)
case e := <-rejector1:
errs = append(errs, e)
case e := <-rejector2:
errs = append(errs, e)
case e := <-rejector3:
errs = append(errs, e)
}
}
if len(errs) > 0 {
for _, err := range errs {
fmt.Println("oh no an error", err)
}
panic("we got errors")
}
for _, value := range values {
fmt.Println("we got a value", value)
}
Chaining Async Functions into a Pipeline
Normally you will want to chain additional actions to take place, in the
fastest way possible. Usually those functions will consume the outputs from
the previous functions. So you can just pass a channel into the next function
and it will wait on the channel inside it's own goroutine.
aCh := make(chan string, 1)
aCh <- "aFile"
aTag, aErr := BuildDockerImageAsync(aCh)
bCh := make(chan string, 1)
bCh <- "bFile"
bTag, bErr := BuildDockerImageAsync(bCh)
cCh := make(chan string, 1)
cCh <- "cFile"
cTag, cErr := BuildDockerImageAsync(cCh)
aPubDone, aPubErr := PublishDockerImageAsync(aTag)
bPubDone, bPubErr := PublishDockerImageAsync(bTag)
cPubDone, cPubErr := PublishDockerImageAsync(cTag)
aDeployDone, aDeployErr := DeployDockerImageAsync(aPubDone)
bDeployDone, bDeployErr := DeployDockerImageAsync(bPubDone)
cDeployDone, cDeployErr := DeployDockerImageAsync(cPubDone)
done := make(chan struct{}, 1)
go func(){
defer close(done)
for i := 0; i < 3; i++ {
select {
case <-aDeployDone:
case <-bDeployDone:
case <-cDeployDone:
}
}
}()
var err error
select {
case <-done:
case e := <-aErr:
err = e
case e := <-bErr:
err = e
case e := <-cErr:
err = e
case e := <-aPubErr:
err = e
case e := <-bPubErr:
err = e
case e := <-cPubErr:
err = e
case e := <-aDeployErr:
err = e
case e := <-bDeployErr:
err = e
case e := <-cDeployErr:
err = e
}
if err != nil {
panic(err)
}
Just write a Synchronous API and let the consumer use it Asynchronously if they want
My issue with this is the boilerplate code one has to write to do this.
Whatever happened to DRY? I believe the methodologies outlined by this
library strike a reasonable balance between writing idiomatic go and going
insane repeating yourself everywhere.
Whats more this is just one way things can be done, if you need something more
powerful for a particular use case then the full power of go's concurrency model
is still there, this library doesn't take any of that away.
Up to this point all I have shown you is how to do some stuff with vanilla go
and I hope that not only does it illustrate some of my frustrations with the
language but also acts a useful reference to go back to when you need to do
something more complex.
The Task API
Here is an example of `fooAsync` but this time it uses https://github.com/brad-jones/goasync/task
func fooAsync(bar *task.Task) *task.Task {
return task.New(func(t *task.Internal){
// normally you would call this at the start of any long running loop
if t.ShouldStop() {
return
}
res, err := bar.Result()
if err != nil {
t.Reject(e) // or you could do something based on the error
return
}
v, err := intToString(res.(int))
if err != nil {
t.Reject(e)
} else {
t.Resolve(v)
}
})
}
t := fooAsync(task.Resolved(1))
v, err := t.Result()
castedV := v.(string)
The Await API
Tasks can be awaited using https://github.com/brad-jones/goasync/await
values, errors := await.All(task1, task2, task3)
values, error := await.AllOrError(task1, task2, task3)
value, error := await.Any(task1, task2, task3)
The awaiters that return early (before all tasks are complete) such as Any will
cooperatively stop the remaining tasks. So cancelation will happen automatically.
If you do not care for cancelation and wish to have the awaiter return as soon
as possible you may uses the `Fast` awaiters.
`Fast` awaiters will still ask any remaining tasks to
cooperatively stop but they do so asynchronously.
value, error := await.FastAny(task1, task2, task3)
Or perhaps you might like to use a timeout.
value, error := await.AnyWithTimeout(5 * time.Second, task1, task2, task3)
Not Type Safe
Due to go's lack of generics the only sane why this package can be created is
by the use of the `interface{}` type. This means that all values that are
returned from a task's `Result()` method or an awaiter must be casted correctly
by the caller.
*/
package goasync