-
Notifications
You must be signed in to change notification settings - Fork 8
/
state_test.go
89 lines (75 loc) · 3.36 KB
/
state_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package workflow_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/luno/workflow"
"github.com/luno/workflow/adapters/memrecordstore"
"github.com/luno/workflow/adapters/memrolescheduler"
"github.com/luno/workflow/adapters/memstreamer"
"github.com/luno/workflow/adapters/memtimeoutstore"
)
func TestInternalState(t *testing.T) {
b := workflow.NewBuilder[string, status]("example")
b.AddStep(StatusStart, func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
return StatusMiddle, nil
}, StatusMiddle)
b.AddStep(StatusMiddle,
func(ctx context.Context, r *workflow.Run[string, status]) (status, error) {
return StatusEnd, nil
},
StatusEnd,
).WithOptions(
workflow.ParallelCount(3),
)
b.AddTimeout(StatusInitiated, workflow.DurationTimerFunc[string, status](time.Hour), func(ctx context.Context, r *workflow.Run[string, status], now time.Time) (status, error) {
return StatusCompleted, nil
}, StatusCompleted)
ctx := context.Background()
b.AddConnector(
"consume-other-stream",
memstreamer.NewConnector(nil),
func(ctx context.Context, w *workflow.Workflow[string, status], e *workflow.ConnectorEvent) error {
return nil
},
).WithOptions(
workflow.ParallelCount(2),
)
recordStore := memrecordstore.New()
wf := b.Build(
memstreamer.New(),
recordStore,
memrolescheduler.New(),
workflow.WithTimeoutStore(memtimeoutstore.New()),
)
require.Equal(t, map[string]workflow.State{}, wf.States())
wf.Run(ctx)
t.Cleanup(wf.Stop)
time.Sleep(time.Second)
require.Equal(t, map[string]workflow.State{
"middle-consumer-1-of-3": workflow.StateRunning,
"middle-consumer-2-of-3": workflow.StateRunning,
"middle-consumer-3-of-3": workflow.StateRunning,
"start-consumer-1-of-1": workflow.StateRunning,
"initiated-timeout-auto-inserter-consumer": workflow.StateRunning,
"initiated-timeout-consumer": workflow.StateRunning,
"consume-other-stream-connector-to-example-consumer-1-of-2": workflow.StateRunning,
"consume-other-stream-connector-to-example-consumer-2-of-2": workflow.StateRunning,
"outbox-consumer-1-of-1": workflow.StateRunning,
"example-delete-consumer": workflow.StateRunning,
}, wf.States())
wf.Stop()
require.Equal(t, map[string]workflow.State{
"middle-consumer-1-of-3": workflow.StateShutdown,
"middle-consumer-2-of-3": workflow.StateShutdown,
"middle-consumer-3-of-3": workflow.StateShutdown,
"start-consumer-1-of-1": workflow.StateShutdown,
"initiated-timeout-auto-inserter-consumer": workflow.StateShutdown,
"initiated-timeout-consumer": workflow.StateShutdown,
"consume-other-stream-connector-to-example-consumer-1-of-2": workflow.StateShutdown,
"consume-other-stream-connector-to-example-consumer-2-of-2": workflow.StateShutdown,
"outbox-consumer-1-of-1": workflow.StateShutdown,
"example-delete-consumer": workflow.StateShutdown,
}, wf.States())
}