diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index f394245259..5fc6c436f4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -23,6 +23,7 @@ import java.nio.file.Paths import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeoutException import com.google.common.hash.HashCode import groovy.transform.CompileDynamic @@ -689,8 +690,17 @@ class Session implements ISession { try { log.trace "Session > destroying" // shutdown thread pools - finalizePoolManager?.shutdown(aborted) - publishPoolManager?.shutdown(aborted) + try { + finalizePoolManager?.shutdown(aborted) + publishPoolManager?.shutdown(aborted) + } + catch( TimeoutException e ) { + final ignoreErrors = config.navigate('workflow.output.ignoreErrors', false) + if( !ignoreErrors ) + throw new AbortOperationException("Timed out while waiting to publish outputs") + else + log.warn e.message + } // invoke shutdown callbacks shutdown0() log.trace "Session > after cleanup" diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy index 12c736bd60..edb4b083cc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy @@ -20,6 +20,7 @@ package nextflow.util import java.util.concurrent.ExecutorService import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import groovy.transform.CompileStatic import groovy.util.logging.Slf4j @@ -45,10 +46,8 @@ class ThreadPoolHelper { break final delta = System.currentTimeMillis()-t0 - if( delta > max ) { - log.warn(exitMsg) - break - } + if( delta > max ) + throw new TimeoutException(exitMsg) // log to console every 10 minutes (120 * 5 sec) if( count % 120 == 0 ) { diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index 1ce8875521..23b9bf9c35 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -18,6 +18,7 @@ package nextflow.cloud.aws.batch import java.nio.file.Path import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import com.amazonaws.services.batch.AWSBatch import com.amazonaws.services.batch.model.AWSBatchException @@ -320,7 +321,12 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec reaper.shutdown() final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)" final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" - ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg) + try { + ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg) + } + catch( TimeoutException e ) { + log.warn e.message + } } @Override diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java index 9661afd657..25aadbb2c8 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java @@ -660,18 +660,4 @@ public void uploadDirectory(File source, S3Path target) { String getObjectKmsKeyId(String bucketName, String key) { return getObjectMetadata(bucketName,key).getSSEAwsKmsKeyId(); } - - protected void showdownTransferPool(boolean hard) { - log.debug("Initiating transfer manager shutdown (hard={})", hard); - if( hard ) { - transferPool.shutdownNow(); - } - else { - // await pool completion - transferPool.shutdown(); - final String waitMsg = "[AWS S3] Waiting files transfer to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before FileTransfer thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(transferPool, Duration.of("1h"), waitMsg, exitMsg); - } - } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java index 56454b186a..eed9e3cda7 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java @@ -665,22 +665,4 @@ static synchronized ExecutorService getOrCreateExecutor(int maxThreads) { return executorSingleton; } - /** - * Shutdown the executor and clear the singleton - */ - static void shutdownExecutor(boolean hard) { - if( hard ) { - executorSingleton.shutdownNow(); - } - else { - executorSingleton.shutdown(); - log.trace("Uploader await completion"); - final String waitMsg = "[AWS S3] Waiting stream uploader to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before stream uploader thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(executorSingleton, Duration.of("1h") ,waitMsg, exitMsg); - log.trace("Uploader shutdown completed"); - executorSingleton = null; - } - } - }