Skip to content

Commit

Permalink
Apply suggestions from review
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman committed Dec 13, 2024
1 parent 19de4d3 commit c475e7a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 26 deletions.
23 changes: 12 additions & 11 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -682,23 +682,24 @@ class Session implements ISession {
if( !aborted ) {
joinAllOperators()
log.trace "Session > all operators finished"
final finalizerComplete = finalizePoolManager?.shutdown(false)
final publisherComplete = publishPoolManager?.shutdown(false)
if( !finalizerComplete || !publisherComplete ) {
final ignoreErrors = config.navigate('workflow.output.ignoreErrors')
if( !ignoreErrors )
throw new AbortOperationException("Timed out while waiting to publish outputs")
}
}
else {
finalizePoolManager?.shutdown(true)
publishPoolManager?.shutdown(true)
}
}

void destroy() {
try {
log.trace "Session > destroying"
// shutdown thread pools
try {
finalizePoolManager?.shutdown(aborted)
publishPoolManager?.shutdown(aborted)
}
catch( TimeoutException e ) {
final ignoreErrors = config.navigate('workflow.output.ignoreErrors', false)
if( !ignoreErrors )
throw new AbortOperationException("Timed out while waiting to publish outputs")
else
log.warn e.message
}
// invoke shutdown callbacks
shutdown0()
log.trace "Session > after cleanup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import jdk.internal.vm.ThreadContainer
@Slf4j
class ThreadPoolHelper {

static boolean await(ExecutorService pool, Duration maxAwait, String waitMessage) {
static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) {
final max = maxAwait.millis
final t0 = System.currentTimeMillis()
// wait for ongoing file transfer to complete
Expand All @@ -46,7 +46,7 @@ class ThreadPoolHelper {

final delta = System.currentTimeMillis()-t0
if( delta > max )
return false
throw new TimeoutException(exitMsg)

// log to console every 10 minutes (120 * 5 sec)
if( count % 120 == 0 ) {
Expand All @@ -59,7 +59,6 @@ class ThreadPoolHelper {
// increment the count
count++
}
return true
}

static protected int pending(ExecutorService pool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,24 @@ class ThreadPoolManager {
return result
}

boolean shutdown(ISession session) {
void shutdown(ISession session) {
final sess = (Session) session
return shutdown( sess != null && sess.aborted )
shutdown( sess != null && sess.aborted )
}

boolean shutdown(boolean hard) {
void shutdown(boolean hard) {
if( !executorService )
return true
return

if( hard ) {
executorService.shutdownNow()
return true
return
}

executorService.shutdown()
// wait for remaining threads to complete
final complete = ThreadPoolHelper.await(executorService, maxAwait, waitMsg)
if( !complete )
log.warn exitMsg
ThreadPoolHelper.await(executorService, maxAwait, waitMsg, exitMsg)
log.debug "Thread pool '$name' shutdown completed (hard=$hard)"
return complete
}

static ExecutorService create(String name, int maxThreads=0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,13 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
// start shutdown process
reaper.shutdown()
final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)"
final complete = ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg)
if( !complete )
log.warn "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated"
final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated"
try {
ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg)
}
catch( TimeoutException e ) {
log.warn e.message
}
}

@Override
Expand Down

0 comments on commit c475e7a

Please sign in to comment.