Go Flow is a lightweight workflow framework based on state machines, designed to simplify the creation and management of workflows in Go.
go get github.com/basenana/go-flow
You can create a workflow directly from functions:
builder := flow.NewFlowBuilder("sample-flow-01")
builder.Task(flow.NewFuncTask("task-1", func(ctx context.Context) error {
fmt.Println("do something in task 1")
return nil
}))
builder.Task(flow.NewFuncTask("task-2", func(ctx context.Context) error {
fmt.Println("do something in task 2")
return nil
}))
builder.Task(flow.NewFuncTask("task-3", func(ctx context.Context) error {
fmt.Println("do something in task 3")
return nil
}))
sampleFlow := builder.Finish()
runner := flow.NewRunner(sampleFlow)
_ = runner.Start(context.TODO())
You can customize the observer to track the status of flows and tasks, enabling operations such as persistence and logging.
type storageObserver struct {
flow map[string]*flow.Flow
task map[string]flow.Task
sync.Mutex
}
func (s *storageObserver) Handle(event flow.UpdateEvent) {
s.Lock()
defer s.Unlock()
fmt.Printf("update flow %s", event.Flow.ID)
s.flow[event.Flow.ID] = event.Flow
if event.Task != nil {
fmt.Printf("update flow %s task %s", event.Flow.ID, event.Task.GetName())
s.task[event.Task.GetName()] = event.Task
}
}
var _ flow.Observer = &storageObserver{}
Register the observer in the builder:
builder := flow.NewFlowBuilder("sample-flow-01").Observer(&storageObserver{
flow: make(map[string]*flow.Flow),
task: make(map[string]flow.Task),
})
For more complex tasks, workflows based on DAGs are also supported:
builder := flow.NewFlowBuilder("dag-flow-01").Coordinator(flow.NewDAGCoordinator())
task1 := flow.NewFuncTask("task-1", func(ctx context.Context) error {
fmt.Println("do something in task 1")
return nil
})
builder.Task(flow.WithDirector(task1, flow.NextTask{
OnSucceed: "task-3",
OnFailed: "task-fail",
}))
builder.Task(flow.NewFuncTask("task-2", func(ctx context.Context) error {
fmt.Println("do something in task 2")
return nil
}))
builder.Task(flow.NewFuncTask("task-3", func(ctx context.Context) error {
fmt.Println("do something in task 3")
return nil
}))
builder.Task(flow.NewFuncTask("task-fail", func(ctx context.Context) error {
fmt.Println("ops")
return nil
}))
dagFlow := builder.Finish()
runner := flow.NewRunner(dagFlow)
_ = runner.Start(context.TODO())
You can also define your own Task types and corresponding Executors:
type MyTask struct {
flow.BasicTask
parameters map[string]string
}
func NewMyTask(name string, parameters map[string]string) flow.Task {
return &MyTask{BasicTask: flow.BasicTask{Name: name}, parameters: parameters}
}
type MyExecutor struct{}
var _ flow.Executor = &MyExecutor{}
func (m *MyExecutor) Setup(ctx context.Context) error {
return nil
}
func (m *MyExecutor) Exec(ctx context.Context, flow *flow.Flow, task flow.Task) error {
myTask, ok := task.(*MyTask)
if !ok {
return fmt.Errorf("not my task")
}
fmt.Printf("exec my task: %s\n", myTask.Name)
return nil
}
func (m *MyExecutor) Teardown(ctx context.Context) error {
return nil
}
Load the custom Tasks into the Flow:
builder := flow.NewFlowBuilder("my-flow-01").Executor(&MyExecutor{})
builder.Task(NewMyTask("my-task-1", make(map[string]string)))
builder.Task(NewMyTask("my-task-2", make(map[string]string)))
builder.Task(NewMyTask("my-task-3", make(map[string]string)))
myFlow := builder.Finish()
runner := flow.NewRunner(myFlow)
_ = runner.Start(context.TODO())