Nextflow Channels and Processes


Objectives
  • Gain an understanding of Nextflow channels and processes
  • Gain an understanding of Nextflow syntax
  • Read data of different types into a Nextflow workflow
  • Create Nextflow processes consisting of multiple scripting languages

3.1.1. Download data

Download the training data to your work directory:

git clone https://github.com/nextflow-io/training.git

3.1.2. Channels and channel factories

Channels are a key data structure of Nextflow, used to pass data between processes.

Queue Channels

A queue channel connects two processes or operators, and is implicitly created by process outputs, or using channel factories such as Channel.of or Channel.fromPath.

The training/nf-training/snippet.nf script creates a three-element channel consisting of the strings "1", "2", and "3". This is assigned to a channel called ch. To create the channel, the Channel.of channel factory is used, which can create a channel from arguments such as strings or integers. The view() operator can be used to view the contents of the channel.

#!/usr/bin/env nextflow

ch = Channel.of("1","2","3")
ch.view()

When you run the script training/nf-training/snippet.nf, something similar to the following will be returned:

nextflow run training/nf-training/snippet.nf
 N E X T F L O W   ~  version 24.10.5

Launching `training/nf-training/snippet.nf` [intergalactic_rutherford] DSL2 - revision: 59a79bc0dd

1
2
3

Now, modify the script training/nf-training/snippet.nf to the following:

#!/usr/bin/env nextflow

ch1 = Channel.of(1, 2, 3)
ch2 = Channel.of(1)

process SUM {
    input:
    val x
    val y

    output:
    stdout

    script:
    """
    echo \$(($x+$y))
    """
}

workflow {
    SUM(ch1, ch2).view()
}

We now have a workflow that creates two queue channels, ch1 and ch2, that are input into the SUM process inside the workflow function. The SUM process sums the two inputs and prints the standard output result using the view() channel operator.

nextflow run training/nf-training/snippet.nf

After running the script, you may notice the only output is 2. In the output log, we can also see that one task for the SUM process has been launched, as indicated by 1 of 1 ✔.

 N E X T F L O W   ~  version 24.10.5

Launching `training/nf-training/snippet.nf` [cheeky_shannon] DSL2 - revision: 508a8e352b

executor >  local (1)
[37/0561fd] SUM (1) | 1 of 1 ✔
2

Since ch1 and ch2 are queue channels, the single element of ch2 has been consumed when it was initially passed to the SUM process with the first element of ch1. Even though there are other elements to be consumed in ch1, no new process instances will be launched. This is because a process waits until it receives an input value from all the channels declared as an input. The channel values are consumed serially one after another and the first empty channel causes the process execution to stop, even though there are values in other channels.

Value Channels

A value channel differs from a queue channel in that it is bound to a single value, and it can be read unlimited times without consuming its contents. To use the single element in ch2 multiple times, you can use the Channel.value channel factory.

Modify ch2 to the following: ch2 = Channel.value(1) and run the script.

nextflow run training/nf-training/snippet.nf
 N E X T F L O W   ~  version 24.10.5

Launching `training/nf-training/snippet.nf` [curious_payne] DSL2 - revision: acd5299c29

executor >  local (3)
[ec/62956a] SUM (2) | 3 of 3 ✔
2

4

3

Now that ch2 has been read in as a value channel, its value can be read unlimited times without consuming its contents. We can also see in the output log that three separate tasks have been executed and completed, as indicated by 3 of 3 ✔

In many situations, Nextflow will implicitly convert variables into value channels when they are input into a process. Modify the invocation of the SUM process in the workflow function to the following: SUM(ch1, 1).view() and run the script. Here we are directly using the integer 1 as an input to the SUM process.

nextflow run training/nf-training/snippet.nf
 N E X T F L O W   ~  version 24.10.5

Launching `training/nf-training/snippet.nf` [astonishing_baekeland] DSL2 - revision: dd0fb5d771

executor >  local (3)
[c3/15235d] SUM (1) | 3 of 3 ✔
3

4

2

This integer has been automatically cast as a value channel, allowing it to be used multiple times for each of the elements in ch1, without its contents being consumed. Again, in the output logs we see that three separate tasks have been executed successfully.

3.1.3. Processes

In Nextflow, a process is the basic computing task to execute functions (i.e., custom scripts or tools).

The process definition starts with the keyword process, followed by the process name, commly written in upper case by convention, and finally the process body delimited by curly brackets.

The process body can contain many definition blocks:

process < name > {
    [ directives ] 

    input: 
    < process inputs >

    output: 
    < process outputs >

    [script|shell|exec]: 
    """
    < user script to be executed >
    """
}
  • Directives are optional declarations of settings such as cpus, time, executor, container.
  • input: the expected names and qualifiers of variables into the process
  • output: the expected names and qualifiers of variables output from the process
  • script: defines the command to be executed by the process

Inside the script block, all $ characters need to be escaped with a \. This is true for both referencing Bash variables created inside the script block (ie. echo \$z) as well as performing Bash commands (ie. echo \$(($x+$y))), but not when referencing Nextflow variables declared in the input (ie. $x+$y).

process SUM {
    input:
    val x
    val y

    output:
    stdout

    script:
    """
    z='SUM'
    echo \$z
    echo \$(($x+$y))
    """
}

By default, the process command is interpreted as a Bash script. However, any other scripting language can be used by simply starting the script with the corresponding Shebang declaration. To reference Python variables created inside the Python script, no $ is required. For example:

process PYSTUFF {
    output:
    stdout

    script:
    """
    #!/usr/bin/env python

    x = 'Hello'
    y = 'world!'
    print(x,y)
    """
}

workflow {
    PYSTUFF().view()
}

Vals

The val qualifier allows any data type to be received as input to the process. In the example below, the num queue channel is created from integers 1 and 2, and string Hello, and input into the BASICEXAMPLE process, where it is declared with the qualifier val and assigned to the local variable x. Within this process, this input is referred to and accessed locally by the specified variable name x, prepended with $.

num = Channel.of(1, 2, "Hello")

process BASICEXAMPLE {
    input:
    val x

    output:
    stdout

    script:
    """
    echo process job $x
    """
}

workflow {
    BASICEXAMPLE(num).view()
}

In the above example the process is executed three times, for each element in the channel num. Thus, it results in an output similar to the one shown below:

 N E X T F L O W   ~  version 24.10.5

Launching `training/nf-training/snippet.nf` [jovial_austin] DSL2 - revision: 9227a67fbc

process job 1
process job 2
process job Hello

The val qualifier can also be used to specify the process output.

num = Channel.of(1, 2, "Hello")

process BASICEXAMPLE {
    input:
    val x

    output:
    val x

    script:
    """
    echo process job $x > file.txt
    """
}

workflow {
    BASICEXAMPLE(num).view()
}

In this example, each element of num is printed to a file file.txt. The same input value is then returned as the output. Its contents are printed using the view() channel operator.

 N E X T F L O W   ~  version 24.10.5

Launching `training/nf-training/snippet.nf` [jovial_austin] DSL2 - revision: 9227a67fbc

executor >  local (3)
[52/d66b4d] process > BASICEXAMPLE (3) [100%] 3 of 3 ✔
2
1
Hello

Paths

The path qualifier allows the handling of files inside a process. When a new instance of a process is executed, a new process execution directory will be created just for that process.

When the path qualifier is specified as the input, Nextflow will stage the file inside the process execution directory (eg. creating a symbolic link to the file), allowing it to be accessed by the script using the specified name in the input declaration, without having to provide the full file path.

Let’s first create a folder called lesson3.1 and move into it:

mkdir ./lesson3.1 && cd $_

Now, take a look inside our data folder. This folder contains multiple .fq files, along with a .fa file.

ls ../training/nf-training/data/ggal
gut_1.fq  gut_2.fq  liver_1.fq  liver_2.fq  lung_1.fq  lung_2.fq  transcriptome.fa

Save the following code block as a new file foo.nf inside lesson3.1:

ch_reads = Channel.fromPath('../training/nf-training/data/ggal/*.fq') 

process FOO {
    input:
    path fq

    output:
    stdout

    script:
    """
    ls $fq
    """
}

workflow {
    FOO(ch_reads).view()
}

In this example, the wildcard character * is used to match for .fq files, and assigned to ch_reads. This queue channel is input to the process FOO. In the input declaration of the process body, the file is referred to as fq, which has been declared with the path qualifier. The script block then lists the path of the Nextflow variable fq, which is prepended with $, and this standard output is returned.

nextflow run foo.nf

When the script is ran, the FOO process is executed six times, once for each element in the ch_reads channel, consuming its contents.

 N E X T F L O W   ~  version 24.10.5

Launching `foo.nf` [stoic_williams] DSL2 - revision: 58ab4e5e92

executor >  local (6)
[1f/a01f11] FOO (6) | 6 of 6 ✔
gut_2.fq

lung_1.fq

liver_1.fq

liver_2.fq

gut_1.fq

lung_2.fq

Note that here, the full path name is not printed, just the file name. This is because the path qualifier will stage each execution of the process separately inside an execution directory. Each process execution directory is saved inside a work folder, which is automatically create by Nextflow.

You will notice that inside your work directory, there are six folders, one corresponding to each .fq file inside the ch_reads queue channel. By default, only one execution directory will be printed – in this example it is 1f/a01f11.

Inside the FOO execution directory (ie. ./work/1f/a01f11...), the input file has been staged (symbolically linked) under just the file name, allowing the script to access the file within the execution directory without requiring the full path.

>>> ls -l ./work/1f/a01f11...
gut_2.fq -> /.../training/nf-training/data/ggal/gut_2.fq

Similarly, the path qualifier can also be used to specify one or more files that will be output by the process.

Modify your foo.nf script to the following:

ch_reads = Channel.fromPath('../training/nf-training/data/ggal/*.fq') 

process FOO {
    input:
    path fq

    output:
    path "*.txt"

    script:
    """
    echo $fq > sample.txt
    """
}

workflow {
    ch_output = FOO(ch_reads)
    ch_output.view()
}

In this example, we are now printing the fq file to a file sample_used.txt. The output is declared with the path qualifier and specified using the wildcard *, which will match any output files containing the .txt extension. The path to sample_used.txt will returned and assigned to ch_output, and can be viewed with .view()

nextflow run foo.nf
 N E X T F L O W   ~  version 24.10.5

Launching `foo.nf` [awesome_poitras] DSL2 - revision: 7b830769cb

executor >  local (6)
[d5/840b58] process > FOO (4) [100%] 6 of 6 ✔
/.../work/50/da6e3380c47504e1b52f2e552183eb/sample.txt
/.../work/ac/ad3e12673a826f7aead7445e477fb1/sample.txt
/.../work/ac/4df2c1ab0eb1fc84661efd6589d8a2/sample.txt
/.../work/db/90d554944f53b3d0390a01736e27c5/sample.txt
/.../work/fb/0a44e3b46e892469144094494b1a4d/sample.txt
/.../work/d5/840b58fde765a97379edec43426c05/sample.txt

Tuples

To define paired/grouped input and output information, the tuple qualifier can be used. The input and output declarations for tuples must be declared with a tuple qualifier followed by the definition of each element in the tuple.

In the example below, ch_reads is a channel created using the fromFilePairs channel factory, which automatically creates a tuple from file pairs.

ch_reads = Channel.fromFilePairs("../training/nf-training/data/ggal/*_{1,2}.fq")
ch_reads.view()

This created tuple consists of two elements – the first element is always the grouping key of the matching pair (based on similarities in the file name), and the second is a list of paths to each file.

[gut, [/.../training/nf-training/data/ggal/gut_1.fq, /.../training/nf-training/data/ggal/gut_2.fq]]
[liver, [/.../training/nf-training/data/ggal/liver_1.fq, /.../training/nf-training/data/ggal/liver_2.fq]]
[lung, [/.../training/nf-training/data/ggal/lung_1.fq, /.../training/nf-training/data/ggal/lung_2.fq]]

To input a tuple into a process, the tuple qualifier must be used in the input block. Below, the first element of the tuple (ie. the grouping key) is declared with the val qualifier, and the second element of the tuple is declared with the path qualifier. The FOO process then prints the .fq file paths to a file called sample.txt, and returns that output file as a tuple containing the same grouping key, declared with val, and the output file created inside the process, declared with path.

ch_reads = Channel.fromFilePairs("../training/nf-training/data/ggal/*_{1,2}.fq")

process FOO {
    input:
    tuple val(sample_id), path(sample_id_paths)

    output:
    tuple val(sample_id), path('sample.txt')

    script:
    """
    echo $sample_id_paths > sample.txt
    """
}

workflow {
    sample_ch = FOO(ch_reads)
    sample_ch.view()
}

Update foo.nf and run the script.

nextflow run foo.nf
 N E X T F L O W   ~  version 24.10.5

Launching `foo.nf` [deadly_hypatia] DSL2 - revision: 59fe4396c3

executor >  local (3)
[6e/125990] FOO (2) | 3 of 3 ✔
[lung, /.../work/3e/d17f681f95541b56e9b3561f2623b8/sample.txt]
[gut, /.../work/42/bec200096a897ff70a1a0e2d9afd44/sample.txt]
[liver, /.../work/6e/125990d7d8506c8d41f312e2e500ad/sample.txt]

Here, the FOO process is executed three times in parallel, so there’s no guarantee of a particular execution order. Therefore, if the script was ran again, the final result may be printed out in a different order:

nextflow run foo.nf
 N E X T F L O W   ~  version 24.10.5

Launching `foo.nf` [mighty_einstein] DSL2 - revision: 59fe4396c3

executor >  local (3)
[eb/bf2f06] FOO (2) [100%] 3 of 3 ✔
[gut, /.../work/67/baae976fa0d5e83ff175ce35c0405c/sample.txt]
[liver, /.../work/eb/bf2f0695d7e3aa57e61ca509c3594c/sample.txt]
[lung, /.../work/47/3dd9f920214bf0a1c1539fd8001a21/sample.txt]

The use of the tuple qualifier is especially important when the output of a process is being used as an input into another process. This qualifier allows sample metadata information to be stored, critical in ensuring the correct inputs are being used for downstream processes.

Key points
  • When a queue channel is input into a process, each element will be serially consumed until the channel is empty
  • Value channels can be used unlimited times without consuming its contents
  • $ characters need to be escaped with \ when referencing Bash variables and functions, while Nextflow variables do not
  • The scripting language within a process can be altered by starting the script with the desired Shebang declaration


Next Chapter: Creating an RNAseq Workflow



This workshop is adapted from Fundamentals Training, Advanced Training, Developer Tutorials, and Nextflow Patterns materials from Nextflow and nf-core*^