Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload publishing to separate jobs #5618

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package nextflow

import nextflow.processor.PublishOffloadManager

import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -293,6 +294,12 @@ class Session implements ISession {

FilePorter getFilePorter() { filePorter }

boolean publishOffload
jorgee marked this conversation as resolved.
Show resolved Hide resolved

private PublishOffloadManager publishOffloadManager

PublishOffloadManager getPublishOffloadManager() {publishOffloadManager}
jorgee marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new session with an 'empty' (default) configuration
*/
Expand Down Expand Up @@ -394,6 +401,21 @@ class Session implements ISession {
// -- file porter config
this.filePorter = new FilePorter(this)

this.publishOffload = config.publishOffload as boolean

if ( this.publishOffload ) {
// -- publish offload manager config
log.warn("Publish offload flag enabled. Creating Offload Manager")
this.publishOffloadManager = new PublishOffloadManager(this)
}

}

void startPublishOffloadManager() {
if ( this.publishOffload ) {
log.debug("Starting Publish offload manager")
this.publishOffloadManager?.init()
}
}

protected Path cloudCachePath(Map cloudcache, Path workDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ class ExecutorFactory {
return result
}

@CompileDynamic
static String getDefaultExecutorName(Session session) {
def result = session.config.process?.executor?.toString()
if( !result ) {
if (session.config.executor instanceof String) {
return session.config.executor
} else if (session.config.executor?.name instanceof String) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle else is not needed

return session.config.executor.name
} else {
return DEFAULT_EXECUTOR
}
}
return result
}


void signalExecutors() {
for( Executor exec : executors.values() )
exec.signal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ class PublishDir {

// create target dirs if required
makeDirs(destination.parent)

def offload = false
try {
processFileImpl(source, destination)
offload = processFileImpl(source, destination)
}
catch( FileAlreadyExistsException e ) {
// don't copy source path if target is identical, but still emit the publish event
Expand All @@ -425,11 +425,12 @@ class PublishDir {

if( !sameRealPath && shouldOverwrite(source, destination) ) {
FileHelper.deletePath(destination)
processFileImpl(source, destination)
offload = processFileImpl(source, destination)
}
}

notifyFilePublish(destination, source)
//Don't notify if file publication is offloaded. It will be notified after the offloaded job is finished.
if (!offload)
notifyFilePublish(destination, source)
}

private String real0(Path p) {
Expand Down Expand Up @@ -495,7 +496,7 @@ class PublishDir {
return sourceHash != targetHash
}

protected void processFileImpl( Path source, Path destination ) {
protected boolean processFileImpl( Path source, Path destination ) {
log.trace "publishing file: $source -[$mode]-> $destination"

if( !mode || mode == Mode.SYMLINK ) {
Expand All @@ -509,17 +510,26 @@ class PublishDir {
FilesEx.mklink(source, [hard:true], destination)
}
else if( mode == Mode.MOVE ) {
FileHelper.movePath(source, destination)
if ( session.getPublishOffloadManager()?.tryMoveOffload(source, destination) ){
return true
} else {
FileHelper.movePath(source, destination)
}
}
else if( mode == Mode.COPY ) {
FileHelper.copyPath(source, destination)
if ( session.getPublishOffloadManager()?.tryCopyOffload(source, destination) ){
return true
} else {
FileHelper.copyPath(source, destination)
}
}
else if( mode == Mode.COPY_NO_FOLLOW ) {
FileHelper.copyPath(source, destination, LinkOption.NOFOLLOW_LINKS)
}
else {
throw new IllegalArgumentException("Unknown file publish mode: ${mode}")
}
return false
}

protected void createPublishDir() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import groovy.util.logging.Slf4j
import nextflow.Nextflow
import nextflow.Session
import nextflow.executor.Executor
import nextflow.executor.ExecutorFactory
import nextflow.extension.FilesEx
import nextflow.fusion.FusionHelper
import nextflow.script.BaseScript
import nextflow.script.BodyDef
import nextflow.script.ProcessConfig
import nextflow.script.TokenValCall
import nextflow.util.ArrayTuple

import java.nio.file.Path

@Slf4j
class PublishOffloadManager {
Map<TaskRun, ArrayTuple> runningPublications= new HashMap<TaskRun, ArrayTuple>(10)
static final Map SUPPORTED_SCHEMES = [awsbatch:['s3'], local:['file']]
static final String S5CMD_CONTAINER = 'jorgeejarquea/s5cmd_aws:0.0.1'
Session session
PublishTaskProcessor copyProcessor
PublishTaskProcessor moveProcessor

PublishOffloadManager(Session session) {
this.session = session
}

void init(){

if (useS5cmd()){
this.copyProcessor = createProcessor( "publish_dir_copy_process", new BodyDef({"s5cmd cp $source $target"},'copy data process') )
this.moveProcessor = createProcessor( "publish_dir_move_process", new BodyDef({"s5cmd mv $source $target"},'move data process') )
} else {
this.copyProcessor = createProcessor( "publish_dir_copy_process", new BodyDef({"cp $source $target"},'copy data process') )
this.moveProcessor = createProcessor( "publish_dir_move_process", new BodyDef({"mv $source $target"},'move data process') )
}

}

private boolean checkOffload(Path source, Path destination, String executor){
return session.publishOffload && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor];
}

private synchronized boolean tryInvokeProcessor(TaskProcessor processor, Path origin, Path destination){
if (checkOffload(origin, destination, processor.executor.name)) {
final params = new TaskStartParams(TaskId.next(), processor.indexCount.incrementAndGet())
final values = new ArrayList(1)
log.debug("Creating task for file publication: ${origin.toUri().toString()} -> ${destination.toUri().toString()} " )
values[0] = generateFileValues(origin, destination)
final args = new ArrayList(2)
args[0] = params
args[1] = values
assert args.size() == 2
processor.invokeTask(args.toArray())
runningPublications.put(processor.currentTask.get(), Nextflow.tuple(origin, destination))
return true
}
return false
}

private isFusionEnabled(){
return FusionHelper.isFusionEnabled(session)
}

private useS5cmd(){
return ( (!isFusionEnabled()) && (ExecutorFactory.getDefaultExecutorName(session) == 'awsbatch') )
}

private ArrayTuple<String> generateFileValues(Path origin, Path destination){
if ( isFusionEnabled() ){
Nextflow.tuple(FusionHelper.toContainerMount(origin), FusionHelper.toContainerMount(destination))
} else {
Nextflow.tuple(FilesEx.toUriString(origin), FilesEx.toUriString(destination))
}
}

boolean tryMoveOffload(Path origin, Path destination) {
tryInvokeProcessor(moveProcessor, origin, destination)
}

boolean tryCopyOffload(Path origin, Path destination) {
tryInvokeProcessor(copyProcessor, origin, destination)
}

private PublishTaskProcessor createProcessor( String name, BodyDef body){
assert body
assert session.script
log.debug("Creating processor $name")
// -- the config object
final processConfig = new ProcessConfig(session.script, name)
// Invoke the code block which will return the script closure to the executed.
// As side effect will set all the property declarations in the 'taskConfig' object.
if (useS5cmd()) {
processConfig.put('container', S5CMD_CONTAINER);
}
processConfig._in_tuple(new TokenValCall('source'), new TokenValCall('target'))

if ( !body )
throw new IllegalArgumentException("Missing script in the specified process block -- make sure it terminates with the script string to be executed")

// -- apply settings from config file to process config
processConfig.applyConfig((Map)session.config.process, name, name, name)

// -- get the executor for the given process config
final execObj = session.executorFactory.getExecutor(name, processConfig, body, session)

// -- create processor class
new PublishTaskProcessor( name, execObj, session, session.script, processConfig, body, this )
}

}

class PublishTaskProcessor extends TaskProcessor{

PublishOffloadManager manager

PublishTaskProcessor(String name, Executor executor, Session session, BaseScript baseScript, ProcessConfig processConfig, BodyDef bodyDef, PublishOffloadManager manager) {
super(name, executor, session, baseScript, processConfig, bodyDef)
this.manager = manager
}

@Override
void finalizeTask0(TaskRun task){
final tuple = manager.runningPublications.remove(task)
session.notifyFilePublish((Path)tuple.get(0), (Path)tuple.get(1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2422,7 +2422,7 @@ class TaskProcessor {
* @param task The {@code TaskRun} instance to finalize
* @param producedFiles The map of files to be bind the outputs
*/
private void finalizeTask0( TaskRun task ) {
protected void finalizeTask0( TaskRun task ) {
log.trace "Finalize process > ${safeTaskName(task)}"

// -- bind output (files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class ScriptRunner {
protected run() {
log.debug "> Launching execution"
assert scriptParser, "Missing script instance to run"
session.startPublishOffloadManager()
// -- launch the script execution
scriptParser.runScript()
// -- normalise output
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import nextflow.Session
import nextflow.script.BaseScript
import nextflow.script.ScriptBinding
import nextflow.script.ScriptFile
import spock.lang.Specification

import test.TestHelper


class PublishOffloadManagerTest extends Specification {

def 'should create task processor'(){
given:
def session = new Session();
def scriptBinding = new ScriptBinding(session: session)
def script = Stub(BaseScript)
script.getBinding() >> scriptBinding
def folder = TestHelper.createInMemTempDir()
def file = folder.resolve('pipeline.nf'); file.text = 'println "hello"'
def scriptFile = new ScriptFile(file)
session.init(scriptFile)
//session.start()
session.script = script;
def poManager = new PublishOffloadManager(session);
when:
poManager.init()
then:
poManager.copyProcessor != null
poManager.moveProcessor != null
cleanup:
session.classesDir?.deleteDir()


}

}
7 changes: 7 additions & 0 deletions tests/checks/publish-offload.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
set -e
cp .config nextflow.config

$NXF_RUN > stdout
[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > gen_data'` == 1 ]] || false
[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > publish_dir_copy_process'` == 1 ]] || false
[[ -f publishDir/chunk_1 ]] || false
1 change: 1 addition & 0 deletions tests/checks/publish-offload.nf/.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
publishOffload = true
18 changes: 18 additions & 0 deletions tests/publish-offload.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
params.outdir = 'publishDir'

process gen_data {
publishDir "${params.outdir}", mode: 'copy'
input:
val(i)
output:
path 'chunk_*'

script:
"""
dd if=/dev/urandom of=chunk_$i count=1 bs=10M
"""
}

workflow {
Channel.of(1) | gen_data
}
Loading