Nextflow Development - Channel Operators
- Gain an understanding of Nextflow channel operators
Environment Setup
Set up an interactive shell to run our Nextflow workflow:
srun --pty -p prod_short --mem 8GB --mincpus 2 -t 0-2:00 bash
Load the required modules to run Nextflow:
module load nextflow/23.04.1 module load singularity/3.7.3
Set the singularity cache environment variable:
export NXF_SINGULARITY_CACHEDIR=/config/binaries/singularity/containers_devel/nextflow
Singularity images downloaded by workflow executions will now be stored in this directory.
You may want to include these, or other environmental variables, in your .bashrc
file (or alternate) that is loaded when you log in so you don’t need to export variables every session. A complete list of environment variables can be found here.
The training data can be cloned from:
git clone https://github.com/nextflow-io/training.git
RNA-seq Workflow and Module Files
Previously, we created three Nextflow files and one config file:
├── nextflow.config
├── rnaseq.nf
├── modules.nf
└── modules └── trimgalore.nf
rnaseq.nf
: main workflow script where parameters are defined and processes were called.
#!/usr/bin/env nextflow
params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"
reads_ch = Channel.fromFilePairs("$params.reads")
include { INDEX } from './modules.nf'
include { QUANTIFICATION as QT } from './modules.nf'
include { FASTQC as FASTQC_one } from './modules.nf'
include { FASTQC as FASTQC_two } from './modules.nf'
include { MULTIQC } from './modules.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'
workflow {
index_ch = INDEX(params.transcriptome_file)
quant_ch = QT(index_ch, reads_ch)
fastqc_ch = FASTQC_one(reads_ch)
trimgalore_out_ch = TRIMGALORE(reads_ch).reads
fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
multiqc_ch = MULTIQC(quant_ch, fastqc_ch) }
modules.nf
: script containing the majority of modules, includingINDEX
,QUANTIFICATION
,FASTQC
, andMULTIQC
process INDEX {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
path transcriptome
output:
path "salmon_idx"
script:
"""
salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
"""
}
process QUANTIFICATION {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
path salmon_index
tuple val(sample_id), path(reads)
output:
path "$sample_id"
script:
"""
salmon quant --threads $task.cpus --libType=U \
-i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
"""
}
process FASTQC {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
input:
tuple val(sample_id), path(reads)
output:
path "fastqc_${sample_id}_logs"
script:
"""
mkdir fastqc_${sample_id}_logs
fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
"""
}
process MULTIQC {
publishDir params.outdir, mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
""" }
modules/trimgalore.nf
: script inside amodules
folder, containing only theTRIMGALORE
process
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
""" }
nextflow.config
: config file that enables singularity
singularity {
enabled = true
autoMounts = true
cacheDir = "/config/binaries/singularity/containers_devel/nextflow" }
Run the pipeline, specifying --outdir
:
>>> nextflow run rnaseq.nf --outdir output
N E X T F L O W ~ version 23.04.1
Launching `rnaseq.nf` [soggy_jennings] DSL2 - revision: 87afc1d98d
executor > local (16)
[93/d37ef0] process > INDEX [100%] 1 of 1 ✔
[b3/4c4d9c] process > QT (1) [100%] 3 of 3 ✔
[d0/173a6e] process > FASTQC_one (3) [100%] 3 of 3 ✔
[58/0b8af2] process > TRIMGALORE (3) [100%] 3 of 3 ✔
[c6/def175] process > FASTQC_two (3) [100%] 3 of 3 ✔ [e0/bcf904] process > MULTIQC (3) [100%] 3 of 3 ✔
6.1.1 map
The map operator applies a mapping function to each item in a channel. This function is expressed using the Groovy closure { }
.
Channel
.of('hello', 'world')
.map { word ->
def word_size = word.size()
[word, word_size]
} .view()
In this example, a channel containing the strings hello
and world
is created.
Inside the map
operator, the local variable word
is declared, and used to represent each input value that is passed to the function, ie. each element in the channel, hello
and world
.
The map
operator ‘loops’ through each element in the channel and assigns that element to the local varialbe word
. A new local variable word_size
is defined inside the map
function, and calculates the length of the string using size()
. Finally, a tuple is returned, where the first element is the string represented by the local word
variable, and the second element is the length of the string, represented by the local word_size
variable.
Output:
[hello, 5] [world, 5]
For our RNA-seq pipeline, let’s first create separate transcriptome files for each organ: lung.transcriptome.fa
, liver.transcriptome.fa
, gut.transcriptome.fa
cp "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa" "/scratch/users/.../training/nf-training/data/ggal/lung.transcriptome.fa"
cp "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa" "/scratch/users/.../training/nf-training/data/ggal/liver.transcriptome.fa" mv "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa" "/scratch/users/.../training/nf-training/data/ggal/gut.transcriptome.fa"
Ensure transcriptome.fa
no longer exists:
>>> ls /scratch/users/.../training/nf-training/data/ggal/
gut_1.fq
gut_2.fq
gut.transcriptome.fa
liver_1.fq
liver_2.fq
liver.transcriptome.fa
lung_1.fq
lung_2.fq lung.transcriptome.fa
Exercise
Currently in the rnaseq.nf
script, we define the transcriptome_file
parameter to be a single file.
params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"
Set the transcriptome_file
parameter to match for all three .fa
files using a glob path matcher.
Use the fromPath
channel factory to read in the transcriptome files, and the map
operator to create a tuple where the first element is the sample (organ type) of the .fa
, and the second element is the path of the .fa
file. Assign the final output to be a channel called transcriptome_ch
.
The getSimpleName()
Groovy method can be used extract the sample name from our .fa
file, for example:
def sample = fasta.getSimpleName()
Use the view()
channel operator to view the transcriptome_ch
channel. The expected output:
[lung, /scratch/users/.../training/nf-training/data/ggal/lung.transcriptome.fa]
[liver, /scratch/users/.../training/nf-training/data/ggal/liver.transcriptome.fa] [gut, /scratch/users/.../training/nf-training/data/ggal/gut.transcriptome.fa]
The transcriptome_file
parameter is defined using *
, using glob to match for all three .fa
files. The fromPath
channel factory is used to read the .fa
files, and the map
operator is used to create the tuple.
In the map
function, the variable file
was chosen to represent each element that is passed to the function. The function emits a tuple where the first element is the sample name, returned by the getSimpleName()
method, and the second element is the .fa
file path.
params.transcriptome_file = "/scratch/users/.../nf-training/data/ggal/*.fa"
transcriptome_ch = Channel.fromPath("$params.transcriptome_file")
.map { fasta ->
def sample = fasta.getSimpleName()
[sample, fasta]
} .view()
Challenge
Modify the INDEX
process to match the input structure of transcriptome_ch
. Modify the output of INDEX
so that a tuple is emitted, where the first elememt is the value of the grouping key, and the second element is the path of the salmon_idx
folder.
Index the transcriptome_ch
using the INDEX
process. Emit the output as index_ch
.
The input is now defined to be a tuple of two elements, where the first element is the grouping key and the second element is the path of the transcriptome file.
process INDEX {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
tuple val(sample_id), path(transcriptome)
output:
tuple val(sample_id), path("salmon_idx")
script:
"""
salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
""" }
Inside the workflow block, transcriptome_ch
is used as input into the INDEX
process. The process outputs are emitted as index_ch
workflow {
index_ch = INDEX(transcriptome_ch)
index_ch.view() }
The index_ch
channel is now a tuple where the first element is the grouping key, and the second element is the path to the salmon index folder.
>>> nextflow run rnaseq.nf
N E X T F L O W ~ version 23.04.1
Launching `rnaseq.nf` [dreamy_linnaeus] DSL2 - revision: b4ec1d02bd
[21/91088a] process > INDEX (3) [100%] 3 of 3
[liver, /scratch/users/.../work/06/f0a54ba9191cce9f73f5a97bfb7bea/salmon_idx]
[lung, /scratch/users/.../work/60/e84b1b1f06c43c8cf69a5c621d5a41/salmon_idx] [gut, /scratch/users/.../work/21/91088aafb553cb4b933bc2b3493f33/salmon_idx]
Copy the new INDEX
process into modules.nf
. In the workflow block of rnaseq.nf
, use transcriptome_ch
as the input to the process INDEX
.
6.1.2 combine
The combine
operator produces the cross product (ie. outer product) combinations of two source channels.
For example: The words
channel is combined with the numbers
channel, emitting a channel where each element of numbers
is paired with each element of words
.
numbers = Channel.of(1, 2, 3)
words = Channel.of('hello', 'ciao')
numbers.combine(words).view()
Output:
[1, hello]
[2, hello]
[3, hello]
[1, ciao]
[2, ciao] [3, ciao]
The by
option can be used to combine items that share a matching key. This value is zero-based, and represents the index or list of indices for the grouping key. The emitted tuple will consist of multiple elements.
For example: source
and target
are channels consisting of multiple tuples, where the first element of each tuple represents the grouping key. Since indexing is zero-based, by
is set to 0
to represent the first element of the tuple.
source = Channel.of( [1, 'alpha'], [2, 'beta'] )
target = Channel.of( [1, 'x'], [1, 'y'], [1, 'z'], [2, 'p'], [2, 'q'], [2, 't'] )
source.combine(target, by: 0).view()
Each value within the source
and target
channels are separate elements, resulting in the emitted tuple each containing 3 elements:
[1, alpha, x]
[1, alpha, y]
[1, alpha, z]
[2, beta, p]
[2, beta, q] [2, beta, t]
Exercise
In our RNA-seq pipeline, create a channel quant_inputs_ch
that contains the reads_ch
combined with the index_ch
via a matching key. The emitted channel should contain three elements, where the first element is the grouping key, the second element is the path to the salmon index folder, and the third element is a list of the .fq
pairs.
The expected output:
[liver, /scratch/users/.../work/cf/42458b80e050a466d62baf99d0c1cf/salmon_idx, [/scratch/users/.../training/nf-training/data/ggal/liver_1.fq, /scratch/users/.../training/nf-training/data/ggal/liver_2.fq]]
[lung, /scratch/users/.../work/64/90a77a5f1ed5a0000f6620fd1fab9a/salmon_idx, [/scratch/users/.../training/nf-training/data/ggal/lung_1.fq, /scratch/users/.../training/nf-training/data/ggal/lung_2.fq]] [gut, /scratch/users/.../work/37/352b00bfb71156a9250150428ddf1d/salmon_idx, [/scratch/users/.../training/nf-training/data/ggal/gut_1.fq, /scratch/users/.../training/nf-training/data/ggal/gut_2.fq]]
Use quant_inputs_ch
as the input for the QT
process within the workflow block.
Modify the process such that the input will be a tuple consisting of three elements, where the first element is the grouping key, the second element is the salmon index and the third element is the list of .fq
reads. Also modify the output of the QT
process to emit a tuple of two elements, where the first element is the grouping key and the second element is the $sample_id
folder. Emit the process output as quant_ch
in the workflow
block.
The reads_ch
is combined with the index_ch
using the combine
channel operator with by: 0
, and is assigned to the channel quant_inputs_ch
. The new quant_inputs_ch
channel is input into the QT
process.
workflow {
index_ch = INDEX(transcriptome_ch)
quant_inputs_ch = index_ch.combine(reads_ch, by: 0)
quant_ch = QT(quant_inputs_ch) }
In te QT
process, the input
has been modified to be a tuple of three elements - the first element is the grouping key, the second element is the path to the salmon index, and the third element is the list of .fq
reads.
process QUANTIFICATION {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
tuple val(sample_id), path(salmon_index), path(reads)
output:
tuple val(sample_id), path("$sample_id")
script:
"""
salmon quant --threads $task.cpus --libType=U \
-i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
""" }
6.1.3 groupTuple
The groupTuple
operator collects tuples into groups based on a similar grouping key, emitting a new tuple for each distinct key. The groupTuple
differs from the combine
operator in that it is performed on one input channel, and the matching values are emitted as a list.
Channel.of( [1, 'A'], [1, 'B'], [2, 'C'], [3, 'B'], [1, 'C'], [2, 'A'], [3, 'D'] )
.groupTuple() .view()
Output:
[1, [A, B, C]]
[2, [C, A]] [3, [B, D]]
By default, the first element of each tuple is used as the grouping key. The by
option can be used to specify a different index. For example, to group by the second element of each tuple:
Channel.of( [1, 'A'], [1, 'B'], [2, 'C'], [3, 'B'], [1, 'C'], [2, 'A'], [3, 'D'] )
.groupTuple(by: 1)
.view()
[[1, 2], A]
[[1, 3], B]
[[2, 1], C] [[3], D]
In the workflow script rnaseq.nf
we defined the reads
parameter to be multiple paired .fq
files that are created into a channel using the fromFilePairs
channel factory. This created a tuple where the first element is a unique grouping key, created automatically based on similarities in file name, and the second element contains the list of paired files.
#!/usr/bin/env nextflow
params.reads = "/scratch/users/.../nf-training/data/ggal/*_{1,2}.fq"
reads_ch = Channel.fromFilePairs("$params.reads")
Now, move the /scratch/users/.../nf-training/data/ggal/lung_2.fq
file into another directory so the folder contains one lung .fq
file:
>>> mv /scratch/users/.../training/nf-training/data/ggal/lung_2.fq .
>>> ls /scratch/users/.../training/nf-training/data/ggal
gut_1.fq
gut_2.fq
gut.transcriptome.fa
liver_1.fq
liver_2.fq
liver.transcriptome.fa
lung_1.fq lung.transcriptome.fa
Exercise
Use the fromPath
channel factory to read all .fq
files as separate elements.
Then, use map
to create a mapping function that returns a tuple, where the first element is the grouping key, and the second element is the .fq
file(s).
Then, use groupTuple()
to create channels containing both single and paired .fq
files. Within the groupTuple()
operator, set sort: true
, which orders the groups numerically, ensuring the first .fq
is first.
Expected output:
[lung, [/scratch/users/.../training/nf-training/data/ggal/lung_1.fq]]
[gut, [/scratch/users/.../training/nf-training/data/ggal/gut_1.fq, /scratch/users/.../training/nf-training/data/ggal/gut_2.fq]] [liver, [/scratch/users/.../training/nf-training/data/ggal/liver_1.fq, /scratch/users/.../training/nf-training/data/ggal/liver_2.fq]]
Inside the map
function, the following can be used to extract the sample name from the .fq
files. file
is the local variable defined inside the function that represents each .fq
file. The getName()
method will return the file name without the full path, and replaceAll
is used to remove the _2.fq
and _1.fq
file suffixes.
def group_key = file.getName().replaceAll(/_2.fq/,'').replaceAll(/_1.fq/,'')
For a full list of Nextflow file attributes, see here.
The fromPath
channel is used to read all .fq
files separately. The map
function is then used to create a two-element tuple where the first element is a grouping key and the second element is the list of .fq
file(s).
reads_ch = Channel.fromPath("/home/sli/nextflow_training/training/nf-training/data/ggal/*.fq")
.map { file ->
def group_key = file.getName().replaceAll(/_2.fq/,'').replaceAll(/_1.fq/,'')
[group_key, file]
}
.groupTuple(sort: true) .view()
Now, run the workflow up to the combine
step. The quant_inputs_ch
should now consist of:
[liver, /scratch/users/.../work/cf/42458b80e050a466d62baf99d0c1cf/salmon_idx, [/scratch/users/.../nf-training/data/ggal/liver_1.fq, /scratch/users/.../nf-training/data/ggal/liver_2.fq]]
[lung, /scratch/users/.../work/64/90a77a5f1ed5a0000f6620fd1fab9a/salmon_idx, [/scratch/users/.../nf-training/data/ggal/lung_1.fq]] [gut, /scratch/users/.../work/37/352b00bfb71156a9250150428ddf1d/salmon_idx, [/scratch/users/.../nf-training/data/ggal/gut_1.fq, /scratch/users/.../nf-training/data/ggal/gut_2.fq]]
6.1.4 flatten
The flatten operator flattens each item from a source channel and emits the elements separately. Deeply nested inputs are also flattened.
Channel.of( [1, [2, 3]], 4, [5, [6]] )
.flatten() .view()
Output:
1
2
3
4
5 6
Within the script block of the QUANTIFICATION
process in the RNA-seq pipeline, we are assuming the reads are paired, and specify -1 ${reads[0]} -2 ${reads[1]}
as inputs to salmon quant
.
process QUANTIFICATION {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
tuple val(sample_id), path(salmon_index), path(reads)
output:
tuple val(sample_id) path("$sample_id")
script:
"""
salmon quant --threads $task.cpus --libType=U \
-i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
""" }
Now that the input reads
can be either single or paired, the QUANTIFICATION
process needs to be modified to allow for either input type. This can be done using the flatten()
operator, and conditional script statements. Additionally, the size()
method can be used to calculate the size of a list.
The script block can be changed to the following:
script:
def input_reads = [reads]
if( input_reads.flatten().size() == 1 )
"""
salmon quant --threads $task.cpus --libType=U \
-i $salmon_index -r $reads -o $sample_id
"""
else
"""
salmon quant --threads $task.cpus --libType=U \\
-i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id """
First, a new variable input_reads
is defined, which consists of the reads
input being converted into a list. This has to be done since Nextflow will automatically convert a list of length 1 into a path within process. If the size()
method was used on a path
type input, it will return the size of the file in bytes, and not the list size. Therefore, all inputs must first be converted into a list in order to correctly caculate the number of files.
def input_reads = [reads]
For reads
that are already in a list (ie. paired reads), this will nest the input into another list, for example:
[ [ file1, file2 ] ]
If the size()
operator is used on this input, it will always return 1
since the encompassing list only contains one element. Therefore, the flatten()
operator has to be used to emit the files as separate elements.
The final definition to obtain the number of files in reads
becomes:
input_reads.flatten().size()
For single reads, the input to salmon quant
becomes -r $reads
Exercise
Currently the TRIMGALORE
process only accounts for paired reads.
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
""" }
Modify the process such that both single and paired reads can be used. For single reads, the following script block can be used:
"""
trim_galore \\
--gzip \\
$reads """
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
def input_reads = [reads]
if( input_reads.flatten().size() == 1 )
"""
trim_galore \\
--gzip \\
$reads
"""
else
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
"""
}
Extension
Modify the FASTQC
process such that the output is a tuple where the first element is the grouping key, and the second element is the path to the fastqc logs.
Modify the MULTIQC
process such that the output is a tuple where the first element is the grouping key, and the second element is the path to the generated html
file.
Finally, run the entire workflow, specifying an --outdir
. The workflow block should look like this:
workflow {
index_ch = INDEX(transcriptome_ch)
quant_inputs_ch = index_ch.combine(reads_ch, by: 0)
quant_ch = QT(quant_inputs_ch)
trimgalore_out_ch = TRIMGALORE(reads_ch).reads
fastqc_ch = FASTQC_one(reads_ch)
fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
multiqc_ch = MULTIQC(quant_ch, fastqc_ch) }
The output
block of both processes have been modified to be tuples containing a grouping key.
process FASTQC {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("fastqc_${sample_id}_logs")
script:
"""
mkdir fastqc_${sample_id}_logs
fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
"""
}
process MULTIQC {
publishDir params.outdir, mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
tuple val(sample_id), path(quantification)
tuple val(sample_id), path(fastqc)
output:
tuple val(sample_id), path("*.html")
script:
"""
multiqc . --filename $quantification
""" }
This workshop is adapted from Fundamentals Training, Advanced Training, Developer Tutorials, Nextflow Patterns materials from Nextflow, nf-core nf-core tools documentation and nf-validation