From 544be43ca0d146269b5b6f6e2da204bc09febfc9 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Wed, 11 Dec 2024 15:51:07 -0600 Subject: [PATCH 1/2] Fix race condition when resuming an aborted run Signed-off-by: Ben Sherman --- .../nextflow/processor/TaskProcessor.groovy | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 4b5fbf2791..e849a91b9e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -803,16 +803,18 @@ class TaskProcessor { while( true ) { hash = HashBuilder.defaultHasher().putBytes(hash.asBytes()).putInt(tries).hash() - Path resumeDir = null + Path workDir = null boolean exists = false try { final entry = session.cache.getTaskEntry(hash, this) - resumeDir = entry ? FileHelper.asPath(entry.trace.getWorkDir()) : null - if( resumeDir ) - exists = resumeDir.exists() + workDir = entry + ? FileHelper.asPath(entry.trace.getWorkDir()) + : task.getWorkDirFor(hash) + if( workDir ) + exists = workDir.exists() - log.trace "[${safeTaskName(task)}] Cacheable folder=${resumeDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry" - final cached = shouldTryCache && exists && entry.trace.isCompleted() && checkCachedOutput(task.clone(), resumeDir, hash, entry) + log.trace "[${safeTaskName(task)}] Cacheable folder=${workDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry" + final cached = shouldTryCache && exists && entry && entry.trace.isCompleted() && checkCachedOutput(task.clone(), workDir, hash, entry) if( cached ) break } @@ -826,10 +828,7 @@ class TaskProcessor { } final lock = lockManager.acquire(hash) - final workDir = task.getWorkDirFor(hash) try { - if( resumeDir != workDir ) - exists = workDir.exists() if( !exists && !workDir.mkdirs() ) throw new IOException("Unable to create directory=$workDir -- check file system permissions") } From 4cff6a80c7e5fa01d6c21a6e299a1a4e95c8114a Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 13 Dec 2024 12:18:24 -0600 Subject: [PATCH 2/2] remove unnecessary check Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/processor/TaskProcessor.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index e849a91b9e..9c264b7125 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -829,7 +829,7 @@ class TaskProcessor { final lock = lockManager.acquire(hash) try { - if( !exists && !workDir.mkdirs() ) + if( !workDir.mkdirs() ) throw new IOException("Unable to create directory=$workDir -- check file system permissions") } finally {