Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition when resuming an aborted run #5600

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

bentsherman
Copy link
Member

Close #5595

When a Nextflow run is aborted for any reason, Nextflow will try to kill all jobs and save them to the cache as failed tasks, so that on a resumed run, the new jobs will use new task directories.

However, if Nextflow is unable to complete this cleanup for any reason, such as an abrupt crash, the task cache might not be updated. On a resumed run, the new task will be re-executed in the same directory as before:

Path resumeDir = null
boolean exists = false
try {
final entry = session.cache.getTaskEntry(hash, this)
resumeDir = entry ? FileHelper.asPath(entry.trace.getWorkDir()) : null
if( resumeDir )
exists = resumeDir.exists()
log.trace "[${safeTaskName(task)}] Cacheable folder=${resumeDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry"
final cached = shouldTryCache && exists && entry.trace.isCompleted() && checkCachedOutput(task.clone(), resumeDir, hash, entry)
if( cached )
break
}
catch (Throwable t) {
log.warn1("[${safeTaskName(task)}] Unable to resume cached task -- See log file for details", causedBy: t)
}
if( exists ) {
tries++
continue
}

What I think happens here is:

  1. the cache entry is missing
  2. Nextflow doesn't check whether the task dir exists
  3. Nextflow proceeds to re-use the task dir with a new task

This can cause a race condition because if the begin and exit files are still present, Nextflow could submit a job, detect the begin/exit codes from the previous job, mark the job as completed (as long as the required outputs are still present), and launch downstream tasks on truncated data.

I think this can only happen on grid executors because the GridTaskHandler checks the begin/exit files before polling the scheduler. The cloud executors poll their API first, and the local executor doesn't check these files at all.

It should be possible to replicate this issue on a grid executor by cancelling a run with -disable-jobs-cancellation and allowing the jobs to complete before resuming the run. Which by the way is an unsafe property of that CLI option.

I'm trying to modify the local executor to simulate the issue but haven't been able to replicate yet

Copy link

netlify bot commented Dec 11, 2024

Deploy Preview for nextflow-docs-staging canceled.

Name Link
🔨 Latest commit 4cff6a8
🔍 Latest deploy log https://app.netlify.com/sites/nextflow-docs-staging/deploys/675c7a7895eaad00082be30d

@bentsherman
Copy link
Member Author

Alternative solution would be for the task to always delete .command.begin and .exitcode as a precaution. However I think we generally assume that a task should never reuse an existing work directory since the presence of old outputs could cause other problems, hence the current approach

@bentsherman bentsherman marked this pull request as ready for review December 13, 2024 15:46
@pditommaso
Copy link
Member

Not sure to understand on what resource the race condition happens?

@bentsherman
Copy link
Member Author

Maybe it's not so much a race condition as it is a cache corruption

Nextflow re-executes a task in the same directory as before, and because the previous .command.begin and .exitcode are present, Nextflow immediately thinks the task is finished

@pditommaso
Copy link
Member

Basically we are saying the task fails 0 in the .exitcode ?

@bentsherman
Copy link
Member Author

The task completes with exit code 0 because maybe Nextflow was previously killed before it was able to cancel the job. So the job runs to completion and writes the exit file. But there is no cache entry for this task so Nextflow assumes the task directory is safe to use.

@pditommaso
Copy link
Member

ok, got it

Comment on lines -810 to +817
resumeDir = entry ? FileHelper.asPath(entry.trace.getWorkDir()) : null
if( resumeDir )
exists = resumeDir.exists()
workDir = entry
? FileHelper.asPath(entry.trace.getWorkDir())
: task.getWorkDirFor(hash)
if( workDir )
exists = workDir.exists()

log.trace "[${safeTaskName(task)}] Cacheable folder=${resumeDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry"
final cached = shouldTryCache && exists && entry.trace.isCompleted() && checkCachedOutput(task.clone(), resumeDir, hash, entry)
log.trace "[${safeTaskName(task)}] Cacheable folder=${workDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry"
final cached = shouldTryCache && exists && entry && entry.trace.isCompleted() && checkCachedOutput(task.clone(), workDir, hash, entry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of that piece of code the lesser the changes the better, both to make it simple to review and to keep history readable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. This is about as simple as it gets while keeping the intent clear.

  1. check for a cache entry and it's work dir, or compute the work dir if there is no cache entry
  2. check whether the work dir exists
  3. check for cached outputs if the cache entry and work dir exist and the task completed

and further down:
4. if the outputs are cached then use them
5. otherwise, if the work dir exists then use a new work dir
6. otherwise, create the work dir and use it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, getting better, but why adding entry && in the cached condition? If the intent is to applied the same logic when the entry is missing should not the condition remain the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous logic, it wouldn't check for cached outputs if the cache entry was missing. That is also how it behaves here. The && entry is required to prevent a null reference exception on entry.trace.isCompleted()

Signed-off-by: Ben Sherman <[email protected]>
Comment on lines 830 to +832
final lock = lockManager.acquire(hash)
final workDir = task.getWorkDirFor(hash)
try {
if( resumeDir != workDir )
exists = workDir.exists()
if( !exists && !workDir.mkdirs() )
if( !workDir.mkdirs() )
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been wondering about this lock over the task directory. I think the purpose is to prevent two tasks from using the same directory. But in that case maybe it should be over the previous try-catch block?

It should prevent two tasks from checking the same directory at the same time, because that is how a task determines whether to use the directory or try a different one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may. however what I'm thinking that to solve this issue it should be assumed a new task run should always use a newly created directory. Not sure this logic satisfy it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently this lock is useless, because if two tasks request the same directory, the lock will serialize the mkdirs() call but won't actually cause the second task to move to a different directory

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Resume from crashed head job caused process and its downstream dependency to run at the same time
2 participants