Skip to content

Commit

Permalink
Ensure job is killed when exception in task status check (#5561)
Browse files Browse the repository at this point in the history

Signed-off-by: jorgee <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
jorgee and pditommaso authored Dec 13, 2024
1 parent 8e2056e commit 9eefd20
Show file tree
Hide file tree
Showing 19 changed files with 64 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CachedTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( batch ) {
batch.collect(executor, jobId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class NopeTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StoredTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {
* Force the submitted job to quit
*/
@Override
void kill() {
protected void killTask() {
if( !process ) return
final pid = ProcessHelper.pid(process)
log.trace("Killing process with pid: ${pid}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NativeTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
if( result ) result.cancel(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
* Terminates the current task execution
*/
@Override
void kill() {
protected void killTask() {
if( cleanupDisabled() )
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.processor
import static nextflow.processor.TaskStatus.*

import java.nio.file.NoSuchFileException
import java.util.concurrent.atomic.AtomicBoolean

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
Expand All @@ -37,6 +38,8 @@ import nextflow.trace.TraceRecord
@CompileStatic
abstract class TaskHandler {

private AtomicBoolean killed = new AtomicBoolean()

protected TaskHandler(TaskRun task) {
this.task = task
}
Expand Down Expand Up @@ -77,10 +80,22 @@ abstract class TaskHandler {
abstract boolean checkIfCompleted()

/**
* Force the submitted job to quit
* Template method implementing the termination of a task execution.
* This is not mean to be invoked directly. See also {@link #kill()}
*/
abstract void kill()
abstract protected void killTask()

/**
* Kill a job execution.
*
* @see #killTask()
*/
void kill() {
if (!killed.getAndSet(true)) {
killTask()
}
}

/**
* Submit the task for execution.
*
Expand Down Expand Up @@ -300,4 +315,5 @@ abstract class TaskHandler {
final workflowId = env.get("TOWER_WORKFLOW_ID")
return workflowId ? "tw-${workflowId}-${name}" : name
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package nextflow.processor

import nextflow.cloud.CloudSpotTerminationException
import nextflow.exception.FailedGuardException
import nextflow.exception.ProcessEvalException
import nextflow.exception.ProcessException
import nextflow.exception.ProcessRetryableException

import static nextflow.processor.TaskProcessor.*

import java.util.concurrent.ExecutorService
Expand All @@ -34,10 +40,9 @@ import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
import nextflow.util.Duration
import nextflow.util.SysHelper
import nextflow.util.Threads
import nextflow.util.Throttle
import static nextflow.util.SysHelper.dumpThreads

/**
* Monitors the queued tasks waiting for their termination
*
Expand Down Expand Up @@ -465,7 +470,7 @@ class TaskPollingMonitor implements TaskMonitor {
}

protected dumpCurrentThreads() {
log.trace "Current running threads:\n${dumpThreads()}"
log.trace "Current running threads:\n${SysHelper.dumpThreads()}"
}

protected void dumpRunningQueue() {
Expand Down Expand Up @@ -573,6 +578,9 @@ class TaskPollingMonitor implements TaskMonitor {
checkTaskStatus(handler)
}
catch (Throwable error) {
// At this point NF assumes job is not running, but there could be errors at monitoring that could leave a job running (#5516).
// In this case, NF needs to ensure the job is killed.
handler.kill()
handleException(handler, error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,13 @@ class K8sTaskHandlerTest extends Specification {
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> false
1 * client.podDelete(POD_NAME) >> null

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> true
0 * client.podDelete(POD_NAME) >> null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,18 @@ class TaskHandlerTest extends Specification {
[:] | "job_1"
[TOWER_WORKFLOW_ID: '1234'] | "tw-1234-job_1"
}

def 'should not kill task twice'() {
given:
def handler = Spy(TaskHandler)
when:
handler.kill()
then:
1 * handler.killTask() >> {}

when:
handler.kill()
then:
0 * handler.killTask()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TaskPollingMonitorTest extends Specification {
then:
1 * session.disableJobsCancellation >> true
and:
0 * handler.kill() >> null
0 * handler.killTask() >> null
0 * session.notifyTaskComplete(handler) >> null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ class MockTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
* {@inheritDoc}
*/
@Override
void kill() {
protected void killTask() {
assert jobId
log.trace "[AWS BATCH] Process `${task.lazyName()}` - killing job=$jobId"
final targetId = normaliseJobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,23 +881,23 @@ class AwsBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> true
and:
1 * handler.terminateJob('job1') >> null

when:
handler.@jobId = 'job1:task2'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> true
and:
1 * handler.terminateJob('job1') >> null

when:
handler.@jobId = 'job1:task2'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> false
and:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( !taskKey )
return
batchService.terminate(taskKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( isActive() ) {
log.trace "[GOOGLE BATCH] Process `${task.lazyName()}` - deleting job name=$jobId"
if( executor.shouldDeleteJob(jobId) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class GoogleLifeSciencesTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
if( !operation ) return
log.debug "[GLS] Killing task > $task.name - Pipeline Id: $pipelineId"
helper.cancelOperation(operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> false
0 * executor.shouldDeleteJob('job1') >> true
Expand All @@ -568,7 +568,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> true
1 * executor.shouldDeleteJob('job1') >> true
Expand All @@ -577,7 +577,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> true
1 * executor.shouldDeleteJob('job1') >> false
Expand Down

0 comments on commit 9eefd20

Please sign in to comment.