Multiple subworkflow executions #5162
-
I would like to execute a subworkflow, that bundles multiple processes, for each element in an input channel, as with a process. However I can only get all elments from that channel as a list into the subworkflow. The general structure looks something like this:
My problem is, that I take as "input" all of the elements in "channel", but I would like to execute "subw" with every element seperately. Thanks for your help! |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
Have you tried out the |
Beta Was this translation helpful? Give feedback.
-
It is not possible to execute a subworkflow "for ecah value" in a channel. A workflow can only consume the entire channel and delegate to processes and operators. Although I'm not 100% grokking what you're trying to do, it should be possible using operators, so I would study the operators to see how you can use them. Aaron's suggestion with |
Beta Was this translation helpful? Give feedback.
-
Thanks both of you for your answers! process unzip {
input:
tuple val(name), path(file)
output:
tuple val(name), path("${file.baseName}")
"""
gunzip -c ${file} > ${file.baseName}
"""
}
workflow subw {
take:
input
main:
// Here we get all the files with the .txt extension, while keeping the original tuple structure
// Transpose is needed if the following process can only handle one input per call
input.map { it -> tuple(it[0], it[1].findAll { (it.name.endsWith("gz")) }) }.filter{ !it[1].isEmpty() }.transpose().set{ zipped }
unzip(zipped)
// Make sure, that the second position in the tuple is a list
unzip.out.map { it -> tuple(it[0], [it[1]].flatten()) }.set { unzip_out }
// ...
// Concat two channels
txt_channel.concat(unzip_out.map { it -> tuple(it[0], it[1].findAll { (it.name.endsWith(".txt")) }) }).filter { !it[1].isEmpty() }.groupTuple().map { it -> tuple(it[0], it[1].flatten()) }.set { txt_all }
// ...
}
workflow {
// This can be done for multiple directories, then leading to [["dirName1", ["file1", "file2", ...]], ["dirName2", [...]], ...]
channel = Channel.of([dirName, files(dirPath + "/*")])
// Filter directories without files present
channel.filter { it -> !it[1].isEmpty() }.set { channel }
subw(channel)
} |
Beta Was this translation helpful? Give feedback.
I see. It is difficult to find a solution without precise background information, but I have an idea that might help you.
After filtering, you could group the files according to their path so that files from the same directory are in the same group.
You could therefore expand your channel beforehand and write your files in tuples.
input = input.map { it -> [it.getParent(), it] }
would add tuples to your channel elements and extend the file path. In the next step, you could now group the same file paths usinggroupTuple
.input = input.groupTuple(by: 0)
.Now you would have your files sorted by their path. For further steps you could make use of the file path. For example, in a process direc…