Skip to content

Commit

Permalink
fix minor issue
Browse files Browse the repository at this point in the history
Signed-off-by: jorgee <[email protected]>
  • Loading branch information
jorgee committed Dec 20, 2024
1 parent 832834f commit ac04d04
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
4 changes: 2 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class Session implements ISession {

FilePorter getFilePorter() { filePorter }

int publishOffloadBatchSize
private int publishOffloadBatchSize

private PublishOffloadManager publishOffloadManager

Expand Down Expand Up @@ -406,7 +406,7 @@ class Session implements ISession {
if ( this.publishOffloadBatchSize ) {
// -- publish offload manager config
log.warn("Publish offload flag enabled. Creating Offload Manager")
this.publishOffloadManager = new PublishOffloadManager(this)
this.publishOffloadManager = new PublishOffloadManager(this, publishOffloadBatchSize)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ class PublishOffloadManager {
static final String PUBLISH_FUNCTION = 'nxf_publish'
private Session session
private PublishTaskProcessor publishProcessor
private List<String> commands = new LinkedList<String>();
private boolean closed = false;
private List<String> commands = new LinkedList<String>()
private boolean closed = false
private int batchSize
/**
* Unique offloaded index number
*/
final protected AtomicInteger indexCount = new AtomicInteger()

PublishOffloadManager(Session session) {
PublishOffloadManager(Session session, int batchSize) {
this.session = session;
this.batchSize = batchSize;
}
@PackageScope
TaskProcessor getPublishProcessor(){ publishProcessor }
Expand All @@ -64,7 +66,7 @@ class PublishOffloadManager {
}

private boolean checkOffload(Path source, Path destination, String executor){
return session.publishOffloadBatchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor];
return this.batchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor];
}

private void invokeProcessor(inputValue) {
Expand All @@ -77,12 +79,12 @@ class PublishOffloadManager {
publishProcessor.invokeTask(args.toArray())
}

private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError){
private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError){
if (checkOffload(origin, destination, publishProcessor.executor.name)) {
final id = indexCount.incrementAndGet()
runningPublications.put(id, Nextflow.tuple(origin, destination, failonError))
runningPublications.put(id, Nextflow.tuple(origin, destination, failOnError))
commands.add(generateExecutionCommand(id, command, origin, destination, retryConfig))
if (commands.size() == session.publishOffloadBatchSize){
if (commands.size() == this.batchSize){
invokeProcessor(commands.join(";"))
commands.clear()
}
Expand Down Expand Up @@ -112,20 +114,20 @@ class PublishOffloadManager {
}
}

boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) {
boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) {
String command = 'mv'
if ( useS5cmd() ) {
command = 's5cmd mv'
}
tryOffload(command, origin, destination, retryConfig, failonError)
tryOffload(command, origin, destination, retryConfig, failOnError)
}

boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failonError) {
boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) {
String command = 'cp'
if ( useS5cmd() ) {
command = 's5cmd cp'
}
tryOffload(command, origin, destination, retryConfig, failonError)
tryOffload(command, origin, destination, retryConfig, failOnError)
}

private PublishTaskProcessor createProcessor( String name, BodyDef body){
Expand Down Expand Up @@ -183,8 +185,8 @@ class PublishTaskProcessor extends TaskProcessor{
if (result.size() == 2) {
final id = result[0] as Integer
final tuple = manager.runningPublications.remove(id)
final exitcode = result[1] as Integer
if( exitcode == 0 ){
final exitCode = result[1] as Integer
if( exitCode == 0 ){
session.notifyFilePublish((Path) tuple.get(0), (Path) tuple.get(1))
} else {
if (tuple.get(2) as Boolean) {
Expand Down

0 comments on commit ac04d04

Please sign in to comment.