Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overwrite published outputs only if they are stale #4729

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package nextflow.processor
import java.nio.file.FileAlreadyExistsException
import java.nio.file.FileSystem
import java.nio.file.FileSystems
import java.nio.file.FileVisitResult
import java.nio.file.Files
import java.nio.file.LinkOption
import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.nio.file.PathMatcher
import java.nio.file.SimpleFileVisitor
import java.nio.file.attribute.BasicFileAttributes
import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutorService

Expand Down Expand Up @@ -337,10 +340,10 @@ class PublishDir {
}

if( inProcess ) {
safeProcessFile(source, destination)
safeProcessPath(source, destination)
}
else {
threadPool.submit({ safeProcessFile(source, destination) } as Runnable)
threadPool.submit({ safeProcessPath(source, destination) } as Runnable)
}

}
Expand All @@ -363,9 +366,23 @@ class PublishDir {
throw new IllegalArgumentException("Not a valid publish target path: `$target` [${target?.class?.name}]")
}

protected void safeProcessFile(Path source, Path target) {
protected void safeProcessPath(Path source, Path target) {
try {
retryableProcessFile(source, target)
// publish each file in the directory tree
if( Files.isDirectory(source) ) {
Files.walkFileTree(source, new SimpleFileVisitor<Path>() {
FileVisitResult visitFile(Path sourceFile, BasicFileAttributes attrs) {
final targetFile = target.resolve(source.relativize(sourceFile).toString())
retryableProcessFile(sourceFile, targetFile)
FileVisitResult.CONTINUE
}
})
}

// otherwise publish file directly
else {
retryableProcessFile(source, target)
}
}
catch( Throwable e ) {
final msg = "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details"
Expand Down Expand Up @@ -395,7 +412,7 @@ class PublishDir {
.build()
Failsafe
.with( retryPolicy )
.get({it-> processFile(source, target)})
.get { it -> processFile(source, target) }
}

protected void processFile( Path source, Path destination ) {
Expand All @@ -413,9 +430,10 @@ class PublishDir {

// make sure destination and source does not overlap
// see https://github.com/nextflow-io/nextflow/issues/2177
if( !sameRealPath && checkSourcePathConflicts(destination))
if( !sameRealPath && checkSourcePathConflicts(destination) )
return

// overwrite only if explicitly enabled or destination is stale
if( !sameRealPath && shouldOverwrite(source, destination) ) {
FileHelper.deletePath(destination)
processFileImpl(source, destination)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,48 @@ class PublishDirTest extends Specification {

}

def 'should overwrite published files only if they are stale' () {

given:
def session = new Session()
def folder = Files.createTempDirectory('nxf')
def sourceDir = folder.resolve('work-dir')
def targetDir = folder.resolve('pub-dir')
sourceDir.mkdir()
sourceDir.resolve('file1.txt').text = 'aaa'
sourceDir.resolve('file2.bam').text = 'bbb'
targetDir.mkdir()
targetDir.resolve('file1.txt').text = 'aaa'
targetDir.resolve('file2.bam').text = 'bbb (old)'

def task = new TaskRun(workDir: sourceDir, config: new TaskConfig(), name: 'foo')

when:
def outputs = [
sourceDir.resolve('file1.txt'),
sourceDir.resolve('file2.bam')
] as Set
def publisher = new PublishDir(path: targetDir, mode: 'copy', overwrite: 'deep')
and:
def timestamp1 = targetDir.resolve('file1.txt').lastModified()
def timestamp2 = targetDir.resolve('file2.bam').lastModified()
and:
publisher.apply( outputs, task )
and:
[email protected](false)

then:
timestamp1 == targetDir.resolve('file1.txt').lastModified()
timestamp2 != targetDir.resolve('file2.bam').lastModified()

targetDir.resolve('file1.txt').text == 'aaa'
targetDir.resolve('file2.bam').text == 'bbb'

cleanup:
folder?.deleteDir()

}

def 'should apply saveAs closure' () {

given:
Expand Down
29 changes: 29 additions & 0 deletions modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.file

/**
* Defines the interface for files that have an ETag
*
* @author Ben Sherman <[email protected]>
*/
interface ETagAwareFile {

String getETag()

}
7 changes: 7 additions & 0 deletions modules/nf-commons/src/main/nextflow/util/HashBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -44,6 +45,7 @@
import nextflow.ISession;
import nextflow.extension.Bolts;
import nextflow.extension.FilesEx;
import nextflow.file.ETagAwareFile;
import nextflow.file.FileHolder;
import nextflow.io.SerializableMarker;
import org.slf4j.Logger;
Expand Down Expand Up @@ -413,6 +415,11 @@ static private Hasher hashFileMetadata( Hasher hasher, Path file, BasicFileAttri
*/
static private Hasher hashFileContent( Hasher hasher, Path path ) {

// use etag if available
if( path instanceof ETagAwareFile )
hasher.putBytes(((ETagAwareFile)path).getETag().getBytes(StandardCharsets.UTF_8));

// otherwise compute checksum manually
OutputStream output = Funnels.asOutputStream(hasher);
try {
Files.copy(path, output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package nextflow.cloud.aws

import nextflow.cloud.aws.nio.S3FileSystemProvider
import groovy.transform.CompileStatic
import nextflow.cloud.aws.nio.S3FileSystemProvider
import nextflow.file.FileHelper
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
Expand Down
11 changes: 10 additions & 1 deletion plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import nextflow.file.ETagAwareFile;
import nextflow.file.TagAwareFile;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static java.lang.String.format;

public class S3Path implements Path, TagAwareFile {
public class S3Path implements Path, ETagAwareFile, TagAwareFile {

public static final String PATH_SEPARATOR = "/";
/**
Expand Down Expand Up @@ -566,6 +567,14 @@ public String getStorageClass() {
return storageClass;
}

@Override
public String getETag() {
return fileSystem
.getClient()
.getObjectMetadata(getBucket(), getKey())
.getETag();
}

// ~ helpers methods

private static Function<String, String> strip(final String ... strs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class PublishDirS3Test extends Specification {
when:
spy.apply1(source, true)
then:
1 * spy.safeProcessFile(source, _) >> { sourceFile, s3File ->
1 * spy.safeProcessPath(source, _) >> { sourceFile, s3File ->
assert s3File instanceof S3Path
assert (s3File as S3Path).getTagsList().find{ it.getKey()=='FOO'}.value == 'this'
assert (s3File as S3Path).getTagsList().find{ it.getKey()=='BAR'}.value == 'that'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class AzFileAttributes implements BasicFileAttributes {

private String objectId

private String etag

static AzFileAttributes root() {
new AzFileAttributes(size: 0, objectId: '/', directory: true)
}
Expand All @@ -60,6 +62,7 @@ class AzFileAttributes implements BasicFileAttributes {
updateTime = time(props.getLastModified())
directory = client.blobName.endsWith('/')
size = props.getBlobSize()
etag = props.getETag()

// Support for Azure Data Lake Storage Gen2 with hierarchical namespace enabled
final meta = props.getMetadata()
Expand All @@ -75,6 +78,7 @@ class AzFileAttributes implements BasicFileAttributes {
creationTime = time(item.properties.getCreationTime())
updateTime = time(item.properties.getLastModified())
size = item.properties.getContentLength()
etag = item.properties.getETag()
}
}

Expand Down Expand Up @@ -150,6 +154,10 @@ class AzFileAttributes implements BasicFileAttributes {
return objectId
}

String getETag() {
return etag
}

@Override
boolean equals( Object obj ) {
if( this.class != obj?.class ) return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.azure.storage.blob.models.BlobItem
import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.PackageScope
import nextflow.file.ETagAwareFile

/**
* Implements Azure path object
Expand All @@ -37,7 +38,7 @@ import groovy.transform.PackageScope
*/
@CompileStatic
@EqualsAndHashCode(includes = 'fs,path,directory', includeFields = true)
class AzPath implements Path {
class AzPath implements Path, ETagAwareFile {

private AzFileSystem fs

Expand Down Expand Up @@ -333,4 +334,9 @@ class AzPath implements Path {
return result
}

@Override
String getETag() {
return attributes.getETag()
}

}
Loading