Skip to content

Commit

Permalink
Fixed merge placeholders
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Dec 9, 2013
2 parents e7c0ec5 + 79fc667 commit de385f7
Show file tree
Hide file tree
Showing 77 changed files with 722 additions and 470 deletions.
17 changes: 10 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/

apply plugin: 'gradle-one-jar'
<<<<<<< HEAD
version = '0.5.3'
=======
version = '0.5.2'
>>>>>>> upstream

allprojects {
apply plugin: 'java'
apply plugin: 'groovy'

repositories {
Expand All @@ -42,12 +45,6 @@ allprojects {
}
}

compileJava {
options.compilerArgs << '-XDignore.symbol.file'
options.fork = true // may not needed on 1.8
options.forkOptions.executable = 'javac' // may not needed on 1.8
}

configurations {
oneJarLib
dnanexus.extendsFrom runtime
Expand Down Expand Up @@ -75,6 +72,12 @@ dependencies {
}
}

sourceSets.main.java.srcDirs = []
sourceSets.main.groovy.srcDirs = ['src/main/java', 'src/main/groovy']

compileGroovy {
options.compilerArgs = ['-XDignore.symbol.file']
}

subprojects {
apply plugin: 'groovy'
Expand Down
2 changes: 1 addition & 1 deletion examples/blast.nf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ params.db = "$HOME/tools/blast-db/pdb/pdb"
params.query = "$HOME/sample.fa"
params.chunkSize = 1

DB=params.db
DB= file(params.db)
seq = channel()

inputFile = file(params.query)
Expand Down
5 changes: 3 additions & 2 deletions examples/small_parallel.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
params.query = "$HOME/sample.fa"
params.db = "$HOME/tools/blast-db/pdb/pdb"

db = file(params.db)
fasta = channel()
queryFile = file(params.query)
queryFile.chunkFasta {
Expand All @@ -15,7 +16,7 @@ process blast {
file top_hits

"""
blastp -db ${params.db} -query query.fa -outfmt 6 > blast_result
blastp -db ${db} -query query.fa -outfmt 6 > blast_result
cat blast_result | head -n 10 | cut -f 2 > top_hits
"""
}
Expand All @@ -28,7 +29,7 @@ process extract {
output:
file sequences

"blastdbcmd -db ${params.db} -entry_batch $top_hits > sequences"
"blastdbcmd -db ${db} -entry_batch $top_hits > sequences"
}

process all(merge:true) {
Expand Down
2 changes: 1 addition & 1 deletion examples/small_pipeline.nf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ process extractTopHits {
output:
file sequences

"blastdbcmd -db ${params.db} -entry_batch $top_hits > sequences"
"blastdbcmd -db ${db} -entry_batch $top_hits > sequences"
}

process align {
Expand Down
12 changes: 12 additions & 0 deletions src/main/groovy/nextflow/Const.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,29 @@ class Const {
/**
* The application version
*/
<<<<<<< HEAD
static final String APP_VER = "0.5.3"
=======
static final String APP_VER = "0.5.2"
>>>>>>> upstream

/**
* The app build time as linux/unix timestamp
*/
<<<<<<< HEAD
static final long APP_TIMESTAMP = 1386613774413
=======
static final long APP_TIMESTAMP = 1382739512876
>>>>>>> upstream

/**
* The app build number
*/
<<<<<<< HEAD
static final int APP_BUILDNUM = 1112
=======
static final int APP_BUILDNUM = 1138
>>>>>>> upstream

/**
* The date time formatter string
Expand Down
63 changes: 55 additions & 8 deletions src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import groovyx.gpars.group.NonDaemonPGroup
import groovyx.gpars.group.PGroup
import groovyx.gpars.util.PoolUtils
import jsr166y.Phaser
import nextflow.processor.TaskScheduler
import nextflow.processor.TaskDispatcher
/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand All @@ -43,9 +43,9 @@ class Session {
final List<DataflowProcessor> allProcessors = []

/**
* The scheduler monitoring the tasks execution
* Dispatch tasks for executions
*/
final TaskScheduler scheduler
final TaskDispatcher dispatcher

/**
* Holds the configuration object
Expand Down Expand Up @@ -143,7 +143,7 @@ class Session {
pgroup = new NonDaemonPGroup( config.poolSize as int )
Dataflow.activeParallelGroup.set(pgroup)

scheduler = new TaskScheduler(this)
dispatcher = new TaskDispatcher(this)

log.debug ">>> phaser register (session)"
phaser.register()
Expand All @@ -152,10 +152,6 @@ class Session {
@PackageScope
def getPhaser() { phaser }

def void start() {
scheduler.start()
}

/**
* Await the termination of all processors
*/
Expand Down Expand Up @@ -196,6 +192,57 @@ class Session {
}


public int getQueueSize( String execName, int defValue ) {
// creating the running tasks queue
def size = null

// make sure that the *executor* is a map object
// it could also be a plain string (when it specifies just the its name)
if( config.executor instanceof Map ){
if( execName ) {
size = config.executor?."$execName"?.queueSize
}

if( !size && config.executor?.queueSize ) {
size = config.executor?.queueSize
}
}


if( !size ) {
size = defValue
log.debug "Undefined executor queueSize property runnable queue size -- fallback default value: $size"
}

return size
}

public long getPollInterval( String execName, long defValue = 1_000 ) {
// creating the running tasks queue
def result = null

// make sure that the *executor* is a map object
// it could also be a plain string (when it specifies just the its name)
if( config.executor instanceof Map ){
if( execName ) {
result = config.executor?."$execName"?.pollInterval
}

if( !result && config.executor?.pollInterval ) {
result = config.executor?.pollInterval
}
}


if( !result ) {
result = defValue
log.debug "Undefined executor queueSize property runnable queue size -- fallback on num of available processors-1: $result"
}

return result
}


// /**
// * Create a table report of all executed or running tasks
// *
Expand Down
6 changes: 3 additions & 3 deletions src/main/groovy/nextflow/ast/ProcessDefTransformImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class ProcessDefTransformImpl implements ASTTransformation {

def methodCall = expression as MethodCallExpression
def methodName = methodCall.getMethodAsString()
log.debug "convert > input method: $methodName"
log.trace "convert > input method: $methodName"

if( methodName in ['val','env','file','each'] ) {
//this methods require a special prefix '__in_'
Expand All @@ -249,15 +249,15 @@ class ProcessDefTransformImpl implements ASTTransformation {
}

def void convertOutputMethod( Expression expression ) {
log.debug "convert > output expression: $expression"
log.trace "convert > output expression: $expression"

if( !(expression instanceof MethodCallExpression) ) {
return
}

def methodCall = expression as MethodCallExpression
def methodName = methodCall.getMethodAsString()
log.debug "convert > output method: $methodName"
log.trace "convert > output method: $methodName"

if( methodName in ['val','file'] ) {
// prefix the method name with the string '__out_'
Expand Down
55 changes: 55 additions & 0 deletions src/main/groovy/nextflow/executor/AbstractExecutor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import java.nio.file.Path

import groovy.io.FileType
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.exception.MissingFileException
import nextflow.processor.FileHolder
import nextflow.processor.FileInParam
import nextflow.processor.FileOutParam
import nextflow.processor.TaskConfig
import nextflow.processor.TaskHandler
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskRun
/**
* Declares methods have to be implemented by a generic
Expand All @@ -25,12 +27,65 @@ abstract class AbstractExecutor {
*/
TaskConfig taskConfig

/**
* The current session object
*/
Session session

/**
* The executor simple name
*/
String name

/**
* The queue holder that keep track of all tasks for this executor.
*/
private TaskMonitor monitor


/**
* Let to post initialize the executor
*/
def void init() {
if( !monitor ) {
monitor = session.dispatcher.getOrCreateMonitor(this.class) { createTaskMonitor() }
}
}

/**
* @return Create a new instance of the {@code TaskQueueHolder} component
*/
abstract protected TaskMonitor createTaskMonitor()

/**
* @return A reference to the current {@code #queueHolder} object
*/
TaskMonitor getTaskMonitor() { monitor }

/**
* @return Create a new {@code TaskHandler} to manage the scheduling
* actions for this task
*/
abstract TaskHandler createTaskHandler(TaskRun task)

/**
* Submit a task for execution
*
* @param task
* @return
*/
TaskHandler submitTask( TaskRun task ) {
def handler = createTaskHandler(task)
monitor.put(handler)
try {
handler.submit()
}
catch( Exception e ) {
monitor.remove(handler)
}
return handler
}


/**
* Collect the file(s) with the name specified, produced by the execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class BashWrapperBuilder {
* 4 - un-stage e.g. copy back the result files to the working folder
*/

def ENDL = '\n'
final ENDL = '\n'
def wrapper = new StringBuilder()
wrapper << '#!/bin/bash -Eeu' << ENDL
wrapper << 'trap onexit 1 2 3 15 ERR' << ENDL
Expand Down
27 changes: 17 additions & 10 deletions src/main/groovy/nextflow/executor/GridExecutors.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import nextflow.exception.InvalidExitException
import nextflow.processor.FileInParam
import nextflow.processor.TaskConfig
import nextflow.processor.TaskHandler
import nextflow.processor.TaskPollingMonitor
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskRun
import nextflow.util.CmdLineHelper
import org.apache.commons.io.IOUtils
Expand All @@ -42,6 +44,19 @@ import org.apache.commons.io.IOUtils
@Slf4j
abstract class AbstractGridExecutor extends AbstractExecutor {

/**
* Create a a queue holder for this executor
* @return
*/
def TaskMonitor createTaskMonitor() {
final queueSize = session.getQueueSize(name, 50)
final pollInterval = session.getPollInterval(name, 1_000)
log.debug "Creating executor queue with size: $queueSize; poll-interval: $pollInterval"

return new TaskPollingMonitor(session, queueSize, pollInterval) .start()
}


/*
* Prepare and launch the task in the underlying execution platform
*/
Expand Down Expand Up @@ -204,7 +219,7 @@ class GridTaskHandler extends TaskHandler {
}
// save the JobId in the
this.jobId = executor.parseJobId(result)
this.status = Status.RUNNING
this.status = Status.SUBMITTED
}
catch( Exception e ) {
task.exitCode = exitStatus
Expand Down Expand Up @@ -241,11 +256,7 @@ class GridTaskHandler extends TaskHandler {
@Override
boolean checkIfRunning() {

if( !isNew() ) {
return true
}

if( isNew() && startFile.exists() ) {
if( isSubmitted() && startFile.exists() ) {
status = Status.RUNNING
return true
}
Expand All @@ -256,10 +267,6 @@ class GridTaskHandler extends TaskHandler {
@Override
boolean checkIfTerminated() {

if( isTerminated() ) {
return true
}

if( isRunning() && exitFile.exists() ) {

// finalize the task
Expand Down
Loading

0 comments on commit de385f7

Please sign in to comment.