Skip to content

Commit

Permalink
[simple]try to solve #13.
Browse files Browse the repository at this point in the history
  • Loading branch information
vistart committed Jun 28, 2024
1 parent 4134c20 commit 4a12752
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
12 changes: 8 additions & 4 deletions workflow/simple/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ type Workflow[TInput, TOutput any] struct {
transits *Transits
muLoggers sync.RWMutex
loggers *Loggers
ctx context.Context
close context.CancelFunc
WorkflowInterface[TInput, TOutput]
}

Expand Down Expand Up @@ -475,10 +477,7 @@ func (d *Workflow[TInput, TOutput]) BuildWorkflow(ctx context.Context) error {
// After this method is executed, all input, output channels and transits will be deleted.
// Note, please do not call this method during workflow execution, otherwise it will lead to unpredictable consequences.
func (d *Workflow[TInput, TOutput]) CloseWorkflow() {
d.muChannels.Lock()
defer d.muChannels.Unlock()
d.channels.close()
d.channels = nil
d.close()
}

// Execute the workflow.
Expand All @@ -492,6 +491,11 @@ func (d *Workflow[TInput, TOutput]) CloseWorkflow() {
func (d *Workflow[TInput, TOutput]) Execute(root context.Context, input *TInput) *TOutput {
// The sub-context is introduced and has a cancellation handler, making it easy to terminate the entire process
// at any time.
select {
case <-d.ctx.Done():
return nil
default:
}
ctx, cancel := context.WithCancelCause(root)

// Record the context and cancellation handler, so they can be called at the appropriate time.
Expand Down
8 changes: 7 additions & 1 deletion workflow/simple/dag_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@

package simple

import "context"

// Option defines the option used to instantiate a Workflow.
// If an error occurs during instantiation, it needs to be reported. If no errors occurred, `nil` is returned.
type Option[TInput, TOutput any] func(d *Workflow[TInput, TOutput]) error

// NewWorkflow instantiates a workflow.
func NewWorkflow[TInput, TOutput any](options ...Option[TInput, TOutput]) (*Workflow[TInput, TOutput], error) {
dag := &Workflow[TInput, TOutput]{}
ctx, cancel := context.WithCancel(context.Background())
dag := &Workflow[TInput, TOutput]{
ctx: ctx,
close: cancel,
}
for _, option := range options {
err := option(dag)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions workflow/simple/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestSimpleWorkflowValueTypeError_Error(t *testing.T) {
input := "input"
assert.Nil(t, f.Execute(context.Background(), &input))
assert.Nil(t, f.RunOnce(context.Background(), &input))
assert.Nil(t, f.Execute(context.Background(), &input))
}

// NewWorkflowTwoParallelTransitsWithLogger defines a workflow with logger.
Expand Down

0 comments on commit 4a12752

Please sign in to comment.