Skip to content

Commit

Permalink
Merge branch 'master' into update-syntax-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
bentsherman authored Dec 16, 2024
2 parents 63b404a + 8b54181 commit e9a9e1a
Show file tree
Hide file tree
Showing 26 changed files with 143 additions and 46 deletions.
4 changes: 2 additions & 2 deletions docs/conda.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ name: my-env
channels:
- conda-forge
- bioconda
- defaults
dependencies:
- star=2.5.4a
- bwa=0.7.15
Expand Down Expand Up @@ -105,7 +104,8 @@ Conda environment files can also be used to install Python packages from the [Py
```yaml
name: my-env-2
channels:
- defaults
- conda-forge
- bioconda
dependencies:
- pip
- pip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ContainerHandler {
if( normalizedImageName.startsWith('docker://') && config.canRunOciImage() )
return normalizedImageName
final requiresCaching = normalizedImageName =~ IMAGE_URL_PREFIX
if( ContainerInspectMode.active() && requiresCaching )
if( ContainerInspectMode.dryRun() && requiresCaching )
return imageName
final result = requiresCaching ? createSingularityCache(this.config, normalizedImageName) : normalizedImageName
return Escape.path(result)
Expand All @@ -82,7 +82,7 @@ class ContainerHandler {
if( normalizedImageName.startsWith('docker://') && config.canRunOciImage() )
return normalizedImageName
final requiresCaching = normalizedImageName =~ IMAGE_URL_PREFIX
if( ContainerInspectMode.active() && requiresCaching )
if( ContainerInspectMode.dryRun() && requiresCaching )
return imageName
final result = requiresCaching ? createApptainerCache(this.config, normalizedImageName) : normalizedImageName
return Escape.path(result)
Expand All @@ -94,7 +94,7 @@ class ContainerHandler {
// if the imagename starts with '/' it's an absolute path
// otherwise we assume it's in a remote registry and pull it from there
final requiresCaching = !imageName.startsWith('/')
if( ContainerInspectMode.active() && requiresCaching )
if( ContainerInspectMode.dryRun() && requiresCaching )
return imageName
final result = requiresCaching ? createCharliecloudCache(this.config, normalizedImageName) : normalizedImageName
return Escape.path(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ import groovy.transform.CompileStatic
@CompileStatic
class ContainerInspectMode {

private static Boolean active
private static Boolean dryRun

static boolean active() { return active==true }
static boolean active() { return dryRun!=null }

static void activate(boolean value) { active = value }
static boolean dryRun() { return dryRun==true }

static void activate(boolean dryRun) {
this.dryRun = dryRun
}

static void reset() {
dryRun = null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CachedTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( batch ) {
batch.collect(executor, jobId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class NopeTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StoredTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {
* Force the submitted job to quit
*/
@Override
void kill() {
protected void killTask() {
if( !process ) return
final pid = ProcessHelper.pid(process)
log.trace("Killing process with pid: ${pid}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NativeTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
if( result ) result.cancel(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
* Terminates the current task execution
*/
@Override
void kill() {
protected void killTask() {
if( cleanupDisabled() )
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.processor
import static nextflow.processor.TaskStatus.*

import java.nio.file.NoSuchFileException
import java.util.concurrent.atomic.AtomicBoolean

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
Expand All @@ -37,6 +38,8 @@ import nextflow.trace.TraceRecord
@CompileStatic
abstract class TaskHandler {

private AtomicBoolean killed = new AtomicBoolean()

protected TaskHandler(TaskRun task) {
this.task = task
}
Expand Down Expand Up @@ -77,10 +80,22 @@ abstract class TaskHandler {
abstract boolean checkIfCompleted()

/**
* Force the submitted job to quit
* Template method implementing the termination of a task execution.
* This is not mean to be invoked directly. See also {@link #kill()}
*/
abstract void kill()
abstract protected void killTask()

/**
* Kill a job execution.
*
* @see #killTask()
*/
void kill() {
if (!killed.getAndSet(true)) {
killTask()
}
}

/**
* Submit the task for execution.
*
Expand Down Expand Up @@ -300,4 +315,5 @@ abstract class TaskHandler {
final workflowId = env.get("TOWER_WORKFLOW_ID")
return workflowId ? "tw-${workflowId}-${name}" : name
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package nextflow.processor

import nextflow.cloud.CloudSpotTerminationException
import nextflow.exception.FailedGuardException
import nextflow.exception.ProcessEvalException
import nextflow.exception.ProcessException
import nextflow.exception.ProcessRetryableException

import static nextflow.processor.TaskProcessor.*

import java.util.concurrent.ExecutorService
Expand All @@ -34,10 +40,9 @@ import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
import nextflow.util.Duration
import nextflow.util.SysHelper
import nextflow.util.Threads
import nextflow.util.Throttle
import static nextflow.util.SysHelper.dumpThreads

/**
* Monitors the queued tasks waiting for their termination
*
Expand Down Expand Up @@ -465,7 +470,7 @@ class TaskPollingMonitor implements TaskMonitor {
}

protected dumpCurrentThreads() {
log.trace "Current running threads:\n${dumpThreads()}"
log.trace "Current running threads:\n${SysHelper.dumpThreads()}"
}

protected void dumpRunningQueue() {
Expand Down Expand Up @@ -573,6 +578,9 @@ class TaskPollingMonitor implements TaskMonitor {
checkTaskStatus(handler)
}
catch (Throwable error) {
// At this point NF assumes job is not running, but there could be errors at monitoring that could leave a job running (#5516).
// In this case, NF needs to ensure the job is killed.
handler.kill()
handleException(handler, error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,20 @@ class CondaCacheTest extends Specification {
def hash = CondaCache.sipHash(ENV)
ENV.text = '''
channels:
- conda-forge
- bioconda
- defaults
dependencies:
# Default bismark
- star=2.5.4a
- bwa=0.7.15
- bwa=0.7.15
'''
.stripIndent(true) // https://issues.apache.org/jira/browse/GROOVY-9423
when:
def prefix = cache.condaPrefixPath(ENV.toString())
then:
1 * cache.isYamlFilePath(ENV.toString())
1 * cache.getCacheDir() >> BASE
prefix.toString() == "/conda/envs/env-${hash}-9416240708c49c4e627414b46a743664"
prefix.toString() == "/conda/envs/env-${hash}-64874f9dc9e7be788384bccef357a4f4"

cleanup:
folder?.deleteDir()
Expand All @@ -119,15 +119,15 @@ class CondaCacheTest extends Specification {
def BASE = Paths.get('/conda/envs')
def ENV = Files.createTempFile('test','.yml')
def hash = CondaCache.sipHash(ENV)
ENV.text = '''
ENV.text = '''
name: my-env-1.1
channels:
- conda-forge
- bioconda
- defaults
dependencies:
# Default bismark
- star=2.5.4a
- bwa=0.7.15
- bwa=0.7.15
'''
.stripIndent(true)

Expand All @@ -136,7 +136,7 @@ class CondaCacheTest extends Specification {
then:
1 * cache.isYamlFilePath(ENV.toString())
1 * cache.getCacheDir() >> BASE
prefix.toString() == "/conda/envs/env-${hash}-e7fafe40ca966397a2c0d9bed7181aa7"
prefix.toString() == "/conda/envs/env-${hash}-5b5c72e839d0c7dcabb5d06607c205fc"

}

Expand All @@ -150,7 +150,7 @@ class CondaCacheTest extends Specification {
def hash = CondaCache.sipHash(ENV)
ENV.text = '''
star=2.5.4a
bwa=0.7.15
bwa=0.7.15
multiqc=1.2.3
'''
.stripIndent(true) // https://issues.apache.org/jira/browse/GROOVY-9423
Expand All @@ -161,7 +161,7 @@ class CondaCacheTest extends Specification {
1 * cache.isYamlFilePath(ENV.toString())
1 * cache.isTextFilePath(ENV.toString())
1 * cache.getCacheDir() >> BASE
prefix.toString() == "/conda/envs/env-${hash}-8a4aa7db8ddb8ce4eb4d450d4814a437"
prefix.toString() == "/conda/envs/env-${hash}-85371202d8820331ff19ae89c0595497"

cleanup:
folder?.deleteDir()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.container.inspect

import spock.lang.Specification

/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
class ContainerInspectModeTest extends Specification {

def 'should validate activate and dry-run' () {
expect:
!ContainerInspectMode.active()
!ContainerInspectMode.dryRun()

when:
ContainerInspectMode.activate(false)
then:
ContainerInspectMode.active()
!ContainerInspectMode.dryRun()

when:
ContainerInspectMode.activate(true)
then:
ContainerInspectMode.active()
ContainerInspectMode.dryRun()

when:
ContainerInspectMode.reset()
then:
!ContainerInspectMode.active()
!ContainerInspectMode.dryRun()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,13 @@ class K8sTaskHandlerTest extends Specification {
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> false
1 * client.podDelete(POD_NAME) >> null

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> true
0 * client.podDelete(POD_NAME) >> null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,18 @@ class TaskHandlerTest extends Specification {
[:] | "job_1"
[TOWER_WORKFLOW_ID: '1234'] | "tw-1234-job_1"
}

def 'should not kill task twice'() {
given:
def handler = Spy(TaskHandler)
when:
handler.kill()
then:
1 * handler.killTask() >> {}

when:
handler.kill()
then:
0 * handler.killTask()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TaskPollingMonitorTest extends Specification {
then:
1 * session.disableJobsCancellation >> true
and:
0 * handler.kill() >> null
0 * handler.killTask() >> null
0 * session.notifyTaskComplete(handler) >> null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ class MockTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}
Loading

0 comments on commit e9a9e1a

Please sign in to comment.