And you may find yourself with a bunch of CWL's
And you may find yourself wanting to use a Nextflow feature
And you may ask yourself, well How did I get here?
As the debate between CWL and Nextflow rages on, and the efforts to make them cross-compatible have languished, one will inevitably come to a point in their life where it would be helpful to be able to use both Nextflow and CWL at the same time. In this tutorial, I will show a very basic method to run your CWL's inside a Nextflow pipeline.
The full demonstration can be found here: https://github.com/stevekm/nextflow-demos/tree/master/CWL
This tutorial will demonstrate how to use these handy Nextflow features with your CWL pipeline:
pre-filtering of task inputs to prevent 'bad' samples from getting through
allowing one or more samples in the pipeline to fail and still continue execution with the remaining samples
dynamically changing the tasks to be executed based on input parameters
automatic HTML execution report generation and workflow graph visualization
Our demo pipeline will process four "samples", create a custom text file for each one containing a message, then create an archive for each message. At the end, all of the archives that pass all pipeline steps will be gathered for batch processing. SPOILER ALERT: some samples will be "bad" and won't make it all the way through the pipeline.
The pipeline will have two modes of operation: 'zip' mode and 'tar' mode, corresponding to the archive format to be used.
Software
The software used will be
- Nextflow version 20.01.0
- cwlref-runner 1.0
- cwltool 2.0.20200126090152
- jq 1.5 (for building CWL inputs)
- Graphviz (for visualizing the DAG)
You can install them all with these commands:
$ conda install -y \
bioconda::nextflow=20.01.0 \
jq=1.5 \
anaconda::graphviz
$ pip install cwltool==2.0.20200126090152 cwlref-runner==1.0
CWL
We will use three CWL files in a directory called cwl
:
echo.cwl
:
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: CommandLineTool
baseCommand: echo
stdout: message.txt
inputs:
message:
type: string
inputBinding:
position: 1
outputs:
message_output:
type: stdout
tar.cwl
:
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: CommandLineTool
baseCommand: [tar, -czf]
inputs:
archive_output_file:
type: string
inputBinding:
position: 1
archive_input_file:
type: File
inputBinding:
position: 2
outputs:
archive:
type: File
outputBinding:
glob: $(inputs.archive_output_file)
zip.cwl
:
#!/usr/bin/env cwl-runner
cwlVersion: v1.0
class: CommandLineTool
baseCommand: zip
inputs:
archive_output_file:
type: string
inputBinding:
position: 1
archive_input_file:
type: File
inputBinding:
position: 2
outputs:
archive:
type: File
outputBinding:
glob: $(inputs.archive_output_file)
Nextflow
Our Nextflow pipeline script will look like this:
main.nf
:
params.cwl_dir = "cwl"
params.archive_type = "zip"
def cwl_dir = new File("${params.cwl_dir}").getCanonicalPath()
// make a file with some contents
process create_message {
tag "${sampleID}"
input:
val(sampleID) from Channel.from(['Sample1', 'Sample2', 'Sample3', 'Sample4'])
output:
set val(sampleID), file("${output_file}") into messages, messages2
script:
output_file = "message.txt"
"""
jq -n --arg message "hello this is ${sampleID}" '{"message":\$message}' > input.json
cwl-runner "${cwl_dir}/echo.cwl" input.json
"""
}
// print to console so we know we got the file and its message
process print_message {
tag "${sampleID}"
echo true
input:
set val(sampleID), file(message_txt) from messages
output:
set val(sampleID), file(message_txt) into printed_messages
script:
"""
printf "Got message for sample ${sampleID} from file ${message_txt}: %s\n" "\$(cat ${message_txt})"
"""
}
// dont process anything called "Sample4"
good_samples = Channel.create()
bad_samples = Channel.create()
printed_messages.choice(good_samples, bad_samples){ items ->
def sampleID = items[0]
def message_txt = items[1]
def output = 1 // bad by default
if (sampleID != "Sample4") output = 0
return(output)
}
bad_samples.subscribe{ sampleID, message_txt ->
println "WARNING: bad sample was filtered out: ${sampleID}"
}
// alternate task execution based on input parameters
// NOTE: this conditional can also be implemented less verbosely under the 'script' directive
if ( "${params.archive_type}" == "zip" ) {
// create a zip archive from the message file
process zip_message {
tag "${sampleID}"
input:
set val(sampleID), file(message_txt) from good_samples
output:
set val(sampleID), val("${params.archive_type}"), file("${output_file}") into archived_messages
script:
output_file = "${sampleID}.message.zip"
// zip foo.txt.zip foo.txt
"""
jq -n --arg archive_output_file "${output_file}" \
--arg archive_input_file "${message_txt}" \
'{"archive_output_file":\$archive_output_file, \
"archive_input_file": {"class": "File", "path":\$archive_input_file} }' \
> input.json
cwl-runner "${cwl_dir}/zip.cwl" input.json
"""
}
} else if ( "${params.archive_type}" == "tar" ) {
process tar_message {
tag "${sampleID}"
input:
set val(sampleID), file(message_txt) from good_samples
output:
set val(sampleID), val("${params.archive_type}"), file("${output_file}") into archived_messages
script:
output_file = "${sampleID}.message.tar.gz"
// tar -czf foo.txt.tar.gz foo.txt
"""
jq -n --arg archive_output_file "${output_file}" \
--arg archive_input_file "${message_txt}" \
'{"archive_output_file":\$archive_output_file, \
"archive_input_file": {"class": "File", "path":\$archive_input_file} }' \
> input.json
cwl-runner "${cwl_dir}/tar.cwl" input.json
"""
}
}
// Sample2 is destined for failure!
process please_dont_break {
tag "${sampleID}"
echo true
errorStrategy "ignore"
input:
set val(sampleID), val(archive_type), file(archive_file) from archived_messages
output:
set val(archive_type), file(archive_file) into successful_messages
script:
"""
if [ "${sampleID}" == "Sample2" ]; then
echo ">>> ERROR: ${sampleID} has failed!"
exit 1
fi
"""
}
// group all the items by archive type
successful_messages.groupTuple().set { grouped_messages }
process gather_files {
echo true
publishDir "output"
input:
set val(archive_type), file(items: '*') from grouped_messages
output:
file(items)
script:
"""
echo "Got these files of type ${archive_type}:"
echo "${items}"
"""
}
Usage
The following command and output will show how to run the pipeline:
$ nextflow run main.nf --archive_type "zip" \
-with-report nextflow.html \
-with-timeline timeline.html \
-with-trace trace.txt \
-with-dag dag.png
N E X T F L O W ~ version 20.01.0
Launching `main.nf` [fabulous_davinci] - revision: d4abb2c6f2
WARN: Task runtime metrics are not reported when using macOS without a container engine
[e0/e3e9bd] Submitted process > create_message (Sample3)
[f2/38e6cc] Submitted process > create_message (Sample2)
[a2/23c8f1] Submitted process > create_message (Sample1)
[0a/998b23] Submitted process > create_message (Sample4)
[1a/70bd87] Submitted process > print_message (Sample1)
[3c/491c5a] Submitted process > print_message (Sample4)
[20/d29772] Submitted process > print_message (Sample3)
[4c/17f557] Submitted process > print_message (Sample2)
Got message for sample Sample1 from file message.txt: hello this is Sample1
[d9/fc52b1] Submitted process > zip_message (Sample1)
Got message for sample Sample3 from file message.txt: hello this is Sample3
[a4/cd6d3e] Submitted process > zip_message (Sample3)
Got message for sample Sample2 from file message.txt: hello this is Sample2
[4a/6def52] Submitted process > zip_message (Sample2)
Got message for sample Sample4 from file message.txt: hello this is Sample4
WARNING: bad sample was filtered out: Sample4
[35/a7329d] Submitted process > please_dont_break (Sample1)
[f9/141d00] Submitted process > please_dont_break (Sample2)
[f9/52510a] Submitted process > please_dont_break (Sample3)
[f9/141d00] NOTE: Process `please_dont_break (Sample2)` terminated with an error exit status (1) -- Error is ignored
[81/934e7e] Submitted process > gather_files (1)
Got these files of type zip:
Sample1.message.zip Sample3.message.zip
We can see that there were 4 samples input in the pipeline. However, 'Sample4' was detected as being "bad" and was removed from the pipeline before it could break anything. On the other hand, 'Sample2' was also bad and managed to break a pipeline step, however the pipeline was able to continue to completion despite the failure. Ultimately, only two samples passed all pipeline steps and made it to the end for processing.
You will notice the command line argument --archive_type "zip"
, which tells the pipeline to output .zip items via the zip_message
workflow task. We can change this to 'tar' to get the output in a .tar.gz file instead:
$ nextflow run main.nf --archive_type "tar" \
-with-report nextflow.html \
-with-timeline timeline.html \
-with-trace trace.txt \
-with-dag dag.png
N E X T F L O W ~ version 20.01.0
Launching `main.nf` [cheesy_cajal] - revision: 4f2aadde6f
WARN: Task runtime metrics are not reported when using macOS without a container engine
[b3/cb2636] Submitted process > create_message (Sample4)
[02/244a33] Submitted process > create_message (Sample3)
[8d/46923c] Submitted process > create_message (Sample2)
[0e/c16d9a] Submitted process > create_message (Sample1)
[1d/fc9e2c] Submitted process > print_message (Sample1)
[54/305899] Submitted process > print_message (Sample4)
[5c/a2e684] Submitted process > print_message (Sample2)
[f6/b099ca] Submitted process > print_message (Sample3)
Got message for sample Sample1 from file message.txt: hello this is Sample1
[d5/18dd54] Submitted process > tar_message (Sample1)
Got message for sample Sample4 from file message.txt: hello this is Sample4
WARNING: bad sample was filtered out: Sample4
Got message for sample Sample2 from file message.txt: hello this is Sample2
Got message for sample Sample3 from file message.txt: hello this is Sample3
[8c/bdda32] Submitted process > tar_message (Sample2)
[0b/d23025] Submitted process > tar_message (Sample3)
[4a/52b3c9] Submitted process > please_dont_break (Sample1)
[b8/ae2d6a] Submitted process > please_dont_break (Sample2)
[c7/27beb0] Submitted process > please_dont_break (Sample3)
[b8/ae2d6a] NOTE: Process `please_dont_break (Sample2)` terminated with an error exit status (1) -- Error is ignored
[7e/2c3675] Submitted process > gather_files (1)
Got these files of type tar:
Sample1.message.tar.gz Sample3.message.tar.gz
This time, we got files for the same set of samples, but they were passed through the tar_message
which produced .tar.gz files.
You will also notice some extra items output in the current directory; trace.txt
, nextflow.html
, timeline.html
, and dag.png
. These are logs, reports, and visualizations of the workflow that was executed. They will look something like this:
Conclusion
Using these basic methods, you can use Nextflow to run your CWL pipelines.