Tutorial:How to run CWL with Nextflow
0
5
Entering edit mode
4.8 years ago
steve ★ 3.5k

And you may find yourself with a bunch of CWL's

And you may find yourself wanting to use a Nextflow feature

And you may ask yourself, well How did I get here?


As the debate between CWL and Nextflow rages on, and the efforts to make them cross-compatible have languished, one will inevitably come to a point in their life where it would be helpful to be able to use both Nextflow and CWL at the same time. In this tutorial, I will show a very basic method to run your CWL's inside a Nextflow pipeline.

The full demonstration can be found here: https://github.com/stevekm/nextflow-demos/tree/master/CWL

This tutorial will demonstrate how to use these handy Nextflow features with your CWL pipeline:

  • pre-filtering of task inputs to prevent 'bad' samples from getting through

  • allowing one or more samples in the pipeline to fail and still continue execution with the remaining samples

  • dynamically changing the tasks to be executed based on input parameters

  • automatic HTML execution report generation and workflow graph visualization

Our demo pipeline will process four "samples", create a custom text file for each one containing a message, then create an archive for each message. At the end, all of the archives that pass all pipeline steps will be gathered for batch processing. SPOILER ALERT: some samples will be "bad" and won't make it all the way through the pipeline.

The pipeline will have two modes of operation: 'zip' mode and 'tar' mode, corresponding to the archive format to be used.

Software

The software used will be

  • Nextflow version 20.01.0
  • cwlref-runner 1.0
  • cwltool 2.0.20200126090152
  • jq 1.5 (for building CWL inputs)
  • Graphviz (for visualizing the DAG)

You can install them all with these commands:

$ conda install -y \
bioconda::nextflow=20.01.0 \
jq=1.5 \
anaconda::graphviz

$ pip install cwltool==2.0.20200126090152 cwlref-runner==1.0

CWL

We will use three CWL files in a directory called cwl:

echo.cwl:

#!/usr/bin/env cwl-runner

cwlVersion: v1.0
class: CommandLineTool
baseCommand: echo
stdout: message.txt
inputs:
  message:
    type: string
    inputBinding:
      position: 1
outputs:
  message_output:
    type: stdout

tar.cwl:

#!/usr/bin/env cwl-runner

cwlVersion: v1.0
class: CommandLineTool
baseCommand: [tar, -czf]
inputs:
  archive_output_file:
    type: string
    inputBinding:
      position: 1
  archive_input_file:
    type: File
    inputBinding:
      position: 2
outputs:
  archive:
    type: File
    outputBinding:
      glob: $(inputs.archive_output_file)

zip.cwl:

#!/usr/bin/env cwl-runner

cwlVersion: v1.0
class: CommandLineTool
baseCommand: zip
inputs:
  archive_output_file:
    type: string
    inputBinding:
      position: 1
  archive_input_file:
    type: File
    inputBinding:
      position: 2
outputs:
  archive:
    type: File
    outputBinding:
      glob: $(inputs.archive_output_file)

Nextflow

Our Nextflow pipeline script will look like this:

main.nf:

params.cwl_dir = "cwl"
params.archive_type = "zip"
def cwl_dir = new File("${params.cwl_dir}").getCanonicalPath()

// make a file with some contents
process create_message {
    tag "${sampleID}"

    input:
    val(sampleID) from Channel.from(['Sample1', 'Sample2', 'Sample3', 'Sample4'])

    output:
    set val(sampleID), file("${output_file}") into messages, messages2

    script:
    output_file = "message.txt"
    """
    jq -n --arg message "hello this is ${sampleID}" '{"message":\$message}' > input.json
    cwl-runner "${cwl_dir}/echo.cwl" input.json
    """
}

// print to console so we know we got the file and its message
process print_message {
    tag "${sampleID}"
    echo true

    input:
    set val(sampleID), file(message_txt) from messages

    output:
    set val(sampleID), file(message_txt) into printed_messages

    script:
    """
    printf "Got message for sample ${sampleID} from file ${message_txt}: %s\n" "\$(cat ${message_txt})"
    """
}

// dont process anything called "Sample4"
good_samples = Channel.create()
bad_samples = Channel.create()

printed_messages.choice(good_samples, bad_samples){ items ->
    def sampleID = items[0]
    def message_txt = items[1]
    def output = 1 // bad by default
    if (sampleID != "Sample4") output = 0
    return(output)
}

bad_samples.subscribe{ sampleID, message_txt ->
    println "WARNING: bad sample was filtered out: ${sampleID}"
}

// alternate task execution based on input parameters
// NOTE: this conditional can also be implemented less verbosely under the 'script' directive
if ( "${params.archive_type}" == "zip" ) {
    // create a zip archive from the message file
    process zip_message {
        tag "${sampleID}"

        input:
        set val(sampleID), file(message_txt) from good_samples

        output:
        set val(sampleID), val("${params.archive_type}"), file("${output_file}") into archived_messages

        script:
        output_file = "${sampleID}.message.zip"
        // zip foo.txt.zip foo.txt
        """
        jq -n --arg archive_output_file "${output_file}" \
        --arg archive_input_file "${message_txt}" \
        '{"archive_output_file":\$archive_output_file, \
        "archive_input_file": {"class": "File", "path":\$archive_input_file} }' \
        > input.json

        cwl-runner "${cwl_dir}/zip.cwl" input.json
        """
    }
} else if ( "${params.archive_type}" == "tar" ) {
    process tar_message {
        tag "${sampleID}"

        input:
        set val(sampleID), file(message_txt) from good_samples

        output:
        set val(sampleID), val("${params.archive_type}"), file("${output_file}") into archived_messages

        script:
        output_file = "${sampleID}.message.tar.gz"
        // tar -czf foo.txt.tar.gz foo.txt
        """
        jq -n --arg archive_output_file "${output_file}" \
        --arg archive_input_file "${message_txt}" \
        '{"archive_output_file":\$archive_output_file, \
        "archive_input_file": {"class": "File", "path":\$archive_input_file} }' \
        > input.json

        cwl-runner "${cwl_dir}/tar.cwl" input.json
        """
    }
}

// Sample2 is destined for failure!
process please_dont_break {
    tag "${sampleID}"
    echo true
    errorStrategy "ignore"

    input:
    set val(sampleID), val(archive_type), file(archive_file) from archived_messages

    output:
    set val(archive_type), file(archive_file) into successful_messages

    script:
    """
    if [ "${sampleID}" == "Sample2" ]; then
        echo ">>> ERROR: ${sampleID} has failed!"
        exit 1
    fi
    """
}

// group all the items by archive type
successful_messages.groupTuple().set { grouped_messages }

process gather_files {
    echo true

    publishDir "output"

    input:
    set val(archive_type), file(items: '*') from grouped_messages

    output:
    file(items)

    script:
    """
    echo "Got these files of type ${archive_type}:"
    echo "${items}"
    """
}

Usage

The following command and output will show how to run the pipeline:

$ nextflow run main.nf --archive_type "zip" \
    -with-report nextflow.html \
    -with-timeline timeline.html \
    -with-trace trace.txt \
    -with-dag dag.png

N E X T F L O W  ~  version 20.01.0
Launching `main.nf` [fabulous_davinci] - revision: d4abb2c6f2
WARN: Task runtime metrics are not reported when using macOS without a container engine
[e0/e3e9bd] Submitted process > create_message (Sample3)
[f2/38e6cc] Submitted process > create_message (Sample2)
[a2/23c8f1] Submitted process > create_message (Sample1)
[0a/998b23] Submitted process > create_message (Sample4)
[1a/70bd87] Submitted process > print_message (Sample1)
[3c/491c5a] Submitted process > print_message (Sample4)
[20/d29772] Submitted process > print_message (Sample3)
[4c/17f557] Submitted process > print_message (Sample2)
Got message for sample Sample1 from file message.txt: hello this is Sample1
[d9/fc52b1] Submitted process > zip_message (Sample1)
Got message for sample Sample3 from file message.txt: hello this is Sample3
[a4/cd6d3e] Submitted process > zip_message (Sample3)
Got message for sample Sample2 from file message.txt: hello this is Sample2
[4a/6def52] Submitted process > zip_message (Sample2)
Got message for sample Sample4 from file message.txt: hello this is Sample4
WARNING: bad sample was filtered out: Sample4
[35/a7329d] Submitted process > please_dont_break (Sample1)
[f9/141d00] Submitted process > please_dont_break (Sample2)
[f9/52510a] Submitted process > please_dont_break (Sample3)
[f9/141d00] NOTE: Process `please_dont_break (Sample2)` terminated with an error exit status (1) -- Error is ignored
[81/934e7e] Submitted process > gather_files (1)
Got these files of type zip:
Sample1.message.zip Sample3.message.zip

We can see that there were 4 samples input in the pipeline. However, 'Sample4' was detected as being "bad" and was removed from the pipeline before it could break anything. On the other hand, 'Sample2' was also bad and managed to break a pipeline step, however the pipeline was able to continue to completion despite the failure. Ultimately, only two samples passed all pipeline steps and made it to the end for processing.

You will notice the command line argument --archive_type "zip", which tells the pipeline to output .zip items via the zip_message workflow task. We can change this to 'tar' to get the output in a .tar.gz file instead:

$ nextflow run main.nf --archive_type "tar" \
    -with-report nextflow.html \
    -with-timeline timeline.html \
    -with-trace trace.txt \
    -with-dag dag.png

N E X T F L O W  ~  version 20.01.0
Launching `main.nf` [cheesy_cajal] - revision: 4f2aadde6f
WARN: Task runtime metrics are not reported when using macOS without a container engine
[b3/cb2636] Submitted process > create_message (Sample4)
[02/244a33] Submitted process > create_message (Sample3)
[8d/46923c] Submitted process > create_message (Sample2)
[0e/c16d9a] Submitted process > create_message (Sample1)
[1d/fc9e2c] Submitted process > print_message (Sample1)
[54/305899] Submitted process > print_message (Sample4)
[5c/a2e684] Submitted process > print_message (Sample2)
[f6/b099ca] Submitted process > print_message (Sample3)
Got message for sample Sample1 from file message.txt: hello this is Sample1
[d5/18dd54] Submitted process > tar_message (Sample1)
Got message for sample Sample4 from file message.txt: hello this is Sample4
WARNING: bad sample was filtered out: Sample4
Got message for sample Sample2 from file message.txt: hello this is Sample2
Got message for sample Sample3 from file message.txt: hello this is Sample3
[8c/bdda32] Submitted process > tar_message (Sample2)
[0b/d23025] Submitted process > tar_message (Sample3)
[4a/52b3c9] Submitted process > please_dont_break (Sample1)
[b8/ae2d6a] Submitted process > please_dont_break (Sample2)
[c7/27beb0] Submitted process > please_dont_break (Sample3)
[b8/ae2d6a] NOTE: Process `please_dont_break (Sample2)` terminated with an error exit status (1) -- Error is ignored
[7e/2c3675] Submitted process > gather_files (1)
Got these files of type tar:
Sample1.message.tar.gz Sample3.message.tar.gz

This time, we got files for the same set of samples, but they were passed through the tar_message which produced .tar.gz files.

You will also notice some extra items output in the current directory; trace.txt, nextflow.html, timeline.html, and dag.png. These are logs, reports, and visualizations of the workflow that was executed. They will look something like this:


Screen Shot 2020-02-18 at 12 36 17 PM

Screen Shot 2020-02-16 at 5 31 28 PM

dag


Conclusion

Using these basic methods, you can use Nextflow to run your CWL pipelines.

cwl nextflow • 3.6k views
ADD COMMENT

Login before adding your answer.

Traffic: 2023 users visited in the last hour
Help About
FAQ
Access RSS
API
Stats

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.

Powered by the version 2.3.6