Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in WorkflowJob output callback when using MultithreadedJobExecutor #2068

Open
AlexTate opened this issue Nov 12, 2024 · 0 comments

Comments

@AlexTate
Copy link
Contributor

AlexTate commented Nov 12, 2024

Continuation of #2003

Expected Behavior

Scatter outputs should be collected only once for each scatter job.

Actual Behavior

The patch provided by @GlassOfWhiskey in PR #2051 has done well to ensure the expected behavior, but as they note, it doesn't address the root cause. Under certain conditions, ReceiveScatterOutput.receive_scatter_output() may still be called twice within a narrow window of time for the same scatter job output.

The root cause lies within WorkflowJob and the conditions it uses to determine if .do_output_callback() should be called. This method is intended to be called from either WorkflowJob.receive_output() OR WorkflowJob.job(), but during the race condition this method is called from both.

WorkflowJob.job() runs in one thread while WorkflowJob.receive_output() is the callback bound to a work unit being executed by one of the TaskQueue workers, i.e. it executes in a separate scatter job thread. The race condition is that .receive_output() thread calls do_output_callback() (which later sets self.did_callback = True) and while in the body of that method, the .job() thread queries the value of did_callback which is still False, so it also calls do_output_callback().

The relevant shared state is:

  • WorkflowJob.did_callback
  • WorkflowJob.steps where completed==True

I wanted to be sure that both methods were executing the same callback, which can be a little tricky with all of the nested functools partials that obscure the object to which each level is bound. If you unwind the callback chain you'll see they are the same in both branches:

WorkflowJob.receive_output()  # [workflow scatterletters_#]
	WorkflowStep.receive_output()  # [simple-simple-scatter.cwl#scatterletters]
		ReceiveScatterOutput.receive_scatter_output()
			WorkflowJob.receive_output()  # [workflow ]
				MultithreadedJobExecutor.output_callback()
WorkflowJob.job()  # [workflow scatterletters_#]
	WorkflowStep.receive_output()  # [simple-simple-scatter.cwl#scatterletters]
		ReceiveScatterOutput.receive_scatter_output()
			WorkflowJob.receive_output()  # [workflow ]
				MultithreadedJobExecutor.output_callback()

Workflow Code

See #2003

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant