Nextflow Development - Channel Operators

Objectives
  • Gain an understanding of Nextflow channel operators

Environment Setup

Set up an interactive shell to run our Nextflow workflow:

srun --pty -p prod_short --mem 8GB --mincpus 2 -t 0-2:00 bash

Load the required modules to run Nextflow:

module load nextflow/23.04.1
module load singularity/3.7.3

Set the singularity cache environment variable:

export NXF_SINGULARITY_CACHEDIR=/config/binaries/singularity/containers_devel/nextflow

Singularity images downloaded by workflow executions will now be stored in this directory.

You may want to include these, or other environmental variables, in your .bashrc file (or alternate) that is loaded when you log in so you don’t need to export variables every session. A complete list of environment variables can be found here.

The training data can be cloned from:

git clone https://github.com/nextflow-io/training.git

RNA-seq Workflow and Module Files

Previously, we created three Nextflow files and one config file:

├── nextflow.config
├── rnaseq.nf
├── modules.nf
└── modules
    └── trimgalore.nf
  • rnaseq.nf: main workflow script where parameters are defined and processes were called.
#!/usr/bin/env nextflow

params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"

reads_ch = Channel.fromFilePairs("$params.reads")

include { INDEX } from './modules.nf'
include { QUANTIFICATION as QT } from './modules.nf'
include { FASTQC as FASTQC_one } from './modules.nf'
include { FASTQC as FASTQC_two } from './modules.nf'
include { MULTIQC } from './modules.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'

workflow {
  index_ch = INDEX(params.transcriptome_file)
  quant_ch = QT(index_ch, reads_ch)
  fastqc_ch = FASTQC_one(reads_ch)
  trimgalore_out_ch = TRIMGALORE(reads_ch).reads
  fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
  multiqc_ch = MULTIQC(quant_ch, fastqc_ch)
}
  • modules.nf: script containing the majority of modules, including INDEX, QUANTIFICATION, FASTQC, and MULTIQC
process INDEX {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"

    input:
    path transcriptome

    output:
    path "salmon_idx"

    script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
    """
}

process QUANTIFICATION {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"

    input:
    path salmon_index
    tuple val(sample_id), path(reads)

    output:
    path "$sample_id"

    script:
    """
    salmon quant --threads $task.cpus --libType=U \
    -i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
    """
}

process FASTQC {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"

    input:
    tuple val(sample_id), path(reads)

    output:
    path "fastqc_${sample_id}_logs"

    script:
    """
    mkdir fastqc_${sample_id}_logs
    fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
    """
}

process MULTIQC {
    publishDir params.outdir, mode:'copy'
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"

    input:
    path quantification
    path fastqc

    output:
    path "*.html"

    script:
    """
    multiqc . --filename $quantification
    """
}
  • modules/trimgalore.nf: script inside a modules folder, containing only the TRIMGALORE process
process TRIMGALORE {
  container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img' 

  input:
    tuple val(sample_id), path(reads)
  
  output:
    tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
    tuple val(sample_id), path("*report.txt")                        , emit: log     , optional: true
    tuple val(sample_id), path("*unpaired*.fq.gz")                   , emit: unpaired, optional: true
    tuple val(sample_id), path("*.html")                             , emit: html    , optional: true
    tuple val(sample_id), path("*.zip")                              , emit: zip     , optional: true

  script:
    """
    trim_galore \\
      --paired \\
      --gzip \\
      ${reads[0]} \\
      ${reads[1]}
    """
}
  • nextflow.config: config file that enables singularity
singularity {
  enabled = true
  autoMounts = true
  cacheDir = "/config/binaries/singularity/containers_devel/nextflow"
}

Run the pipeline, specifying --outdir:

>>> nextflow run rnaseq.nf --outdir output
N E X T F L O W  ~  version 23.04.1
Launching `rnaseq.nf` [soggy_jennings] DSL2 - revision: 87afc1d98d
executor >  local (16)
[93/d37ef0] process > INDEX          [100%] 1 of 1 ✔
[b3/4c4d9c] process > QT (1)         [100%] 3 of 3 ✔
[d0/173a6e] process > FASTQC_one (3) [100%] 3 of 3 ✔
[58/0b8af2] process > TRIMGALORE (3) [100%] 3 of 3 ✔
[c6/def175] process > FASTQC_two (3) [100%] 3 of 3 ✔
[e0/bcf904] process > MULTIQC (3)    [100%] 3 of 3 ✔

6.1.1 map

The map operator applies a mapping function to each item in a channel. This function is expressed using the Groovy closure { }.

Channel
    .of('hello', 'world')
    .map { word -> 
        def word_size = word.size()
        [word, word_size] 
    }
    .view()

In this example, a channel containing the strings hello and world is created.

Inside the map operator, the local variable word is declared, and used to represent each input value that is passed to the function, ie. each element in the channel, hello and world.

The map operator ‘loops’ through each element in the channel and assigns that element to the local varialbe word. A new local variable word_size is defined inside the map function, and calculates the length of the string using size(). Finally, a tuple is returned, where the first element is the string represented by the local word variable, and the second element is the length of the string, represented by the local word_size variable.

Output:

[hello, 5]
[world, 5]

For our RNA-seq pipeline, let’s first create separate transcriptome files for each organ: lung.transcriptome.fa, liver.transcriptome.fa, gut.transcriptome.fa

cp "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa" "/scratch/users/.../training/nf-training/data/ggal/lung.transcriptome.fa"
cp "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa" "/scratch/users/.../training/nf-training/data/ggal/liver.transcriptome.fa"
mv "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa" "/scratch/users/.../training/nf-training/data/ggal/gut.transcriptome.fa"

Ensure transcriptome.fa no longer exists:

>>> ls /scratch/users/.../training/nf-training/data/ggal/
gut_1.fq
gut_2.fq
gut.transcriptome.fa
liver_1.fq
liver_2.fq
liver.transcriptome.fa
lung_1.fq
lung_2.fq
lung.transcriptome.fa

Exercise

Currently in the rnaseq.nf script, we define the transcriptome_file parameter to be a single file.

params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"

Set the transcriptome_file parameter to match for all three .fa files using a glob path matcher.

Use the fromPath channel factory to read in the transcriptome files, and the map operator to create a tuple where the first element is the sample (organ type) of the .fa, and the second element is the path of the .fa file. Assign the final output to be a channel called transcriptome_ch.

The getSimpleName() Groovy method can be used extract the sample name from our .fa file, for example:

def sample = fasta.getSimpleName()

Use the view() channel operator to view the transcriptome_ch channel. The expected output:

[lung, /scratch/users/.../training/nf-training/data/ggal/lung.transcriptome.fa]
[liver, /scratch/users/.../training/nf-training/data/ggal/liver.transcriptome.fa]
[gut, /scratch/users/.../training/nf-training/data/ggal/gut.transcriptome.fa]

The transcriptome_file parameter is defined using *, using glob to match for all three .fa files. The fromPath channel factory is used to read the .fa files, and the map operator is used to create the tuple.

In the map function, the variable file was chosen to represent each element that is passed to the function. The function emits a tuple where the first element is the sample name, returned by the getSimpleName() method, and the second element is the .fa file path.

params.transcriptome_file = "/scratch/users/.../nf-training/data/ggal/*.fa"

transcriptome_ch = Channel.fromPath("$params.transcriptome_file")
    .map { fasta -> 
    def sample = fasta.getSimpleName()
    [sample, fasta]
    }
    .view()


Challenge

Modify the INDEX process to match the input structure of transcriptome_ch. Modify the output of INDEX so that a tuple is emitted, where the first elememt is the value of the grouping key, and the second element is the path of the salmon_idx folder.

Index the transcriptome_ch using the INDEX process. Emit the output as index_ch.

The input is now defined to be a tuple of two elements, where the first element is the grouping key and the second element is the path of the transcriptome file.

process INDEX {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"

    input:
    tuple val(sample_id), path(transcriptome)

    output:
    tuple val(sample_id), path("salmon_idx")

    script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
    """
}

Inside the workflow block, transcriptome_ch is used as input into the INDEX process. The process outputs are emitted as index_ch

workflow {
  index_ch = INDEX(transcriptome_ch)
  index_ch.view()
}

The index_ch channel is now a tuple where the first element is the grouping key, and the second element is the path to the salmon index folder.

>>> nextflow run rnaseq.nf
N E X T F L O W  ~  version 23.04.1
Launching `rnaseq.nf` [dreamy_linnaeus] DSL2 - revision: b4ec1d02bd
[21/91088a] process > INDEX (3) [100%] 3 of 3
[liver, /scratch/users/.../work/06/f0a54ba9191cce9f73f5a97bfb7bea/salmon_idx]
[lung, /scratch/users/.../work/60/e84b1b1f06c43c8cf69a5c621d5a41/salmon_idx]
[gut, /scratch/users/.../work/21/91088aafb553cb4b933bc2b3493f33/salmon_idx]

Copy the new INDEX process into modules.nf. In the workflow block of rnaseq.nf, use transcriptome_ch as the input to the process INDEX.


6.1.2 combine

The combine operator produces the cross product (ie. outer product) combinations of two source channels.

For example: The words channel is combined with the numbers channel, emitting a channel where each element of numbers is paired with each element of words.

numbers = Channel.of(1, 2, 3)
words = Channel.of('hello', 'ciao')

numbers.combine(words).view()

Output:

[1, hello]
[2, hello]
[3, hello]
[1, ciao]
[2, ciao]
[3, ciao]

The by option can be used to combine items that share a matching key. This value is zero-based, and represents the index or list of indices for the grouping key. The emitted tuple will consist of multiple elements.

For example: source and target are channels consisting of multiple tuples, where the first element of each tuple represents the grouping key. Since indexing is zero-based, by is set to 0 to represent the first element of the tuple.

source = Channel.of( [1, 'alpha'], [2, 'beta'] )
target = Channel.of( [1, 'x'], [1, 'y'], [1, 'z'], [2, 'p'], [2, 'q'], [2, 't'] )

source.combine(target, by: 0).view()

Each value within the source and target channels are separate elements, resulting in the emitted tuple each containing 3 elements:

[1, alpha, x]
[1, alpha, y]
[1, alpha, z]
[2, beta, p]
[2, beta, q]
[2, beta, t]

Exercise

In our RNA-seq pipeline, create a channel quant_inputs_ch that contains the reads_ch combined with the index_ch via a matching key. The emitted channel should contain three elements, where the first element is the grouping key, the second element is the path to the salmon index folder, and the third element is a list of the .fq pairs.

The expected output:

[liver, /scratch/users/.../work/cf/42458b80e050a466d62baf99d0c1cf/salmon_idx, [/scratch/users/.../training/nf-training/data/ggal/liver_1.fq, /scratch/users/.../training/nf-training/data/ggal/liver_2.fq]]
[lung, /scratch/users/.../work/64/90a77a5f1ed5a0000f6620fd1fab9a/salmon_idx, [/scratch/users/.../training/nf-training/data/ggal/lung_1.fq, /scratch/users/.../training/nf-training/data/ggal/lung_2.fq]]
[gut, /scratch/users/.../work/37/352b00bfb71156a9250150428ddf1d/salmon_idx, [/scratch/users/.../training/nf-training/data/ggal/gut_1.fq, /scratch/users/.../training/nf-training/data/ggal/gut_2.fq]]

Use quant_inputs_ch as the input for the QT process within the workflow block.

Modify the process such that the input will be a tuple consisting of three elements, where the first element is the grouping key, the second element is the salmon index and the third element is the list of .fq reads. Also modify the output of the QT process to emit a tuple of two elements, where the first element is the grouping key and the second element is the $sample_id folder. Emit the process output as quant_ch in the workflow block.

The reads_ch is combined with the index_ch using the combine channel operator with by: 0, and is assigned to the channel quant_inputs_ch. The new quant_inputs_ch channel is input into the QT process.

workflow {
  index_ch = INDEX(transcriptome_ch)

  quant_inputs_ch = index_ch.combine(reads_ch, by: 0)
  quant_ch = QT(quant_inputs_ch)
}

In te QT process, the input has been modified to be a tuple of three elements - the first element is the grouping key, the second element is the path to the salmon index, and the third element is the list of .fq reads.

process QUANTIFICATION {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"

    input:
    tuple val(sample_id), path(salmon_index), path(reads)

    output:
    tuple val(sample_id), path("$sample_id")

    script:
    """
    salmon quant --threads $task.cpus --libType=U \
    -i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
    """
}


6.1.3 groupTuple

The groupTuple operator collects tuples into groups based on a similar grouping key, emitting a new tuple for each distinct key. The groupTuple differs from the combine operator in that it is performed on one input channel, and the matching values are emitted as a list.

Channel.of( [1, 'A'], [1, 'B'], [2, 'C'], [3, 'B'], [1, 'C'], [2, 'A'], [3, 'D'] )
    .groupTuple()
    .view()

Output:

[1, [A, B, C]]
[2, [C, A]]
[3, [B, D]]

By default, the first element of each tuple is used as the grouping key. The by option can be used to specify a different index. For example, to group by the second element of each tuple:

Channel.of( [1, 'A'], [1, 'B'], [2, 'C'], [3, 'B'], [1, 'C'], [2, 'A'], [3, 'D'] )
    .groupTuple(by: 1)
    .view()
[[1, 2], A]
[[1, 3], B]
[[2, 1], C]
[[3], D]


In the workflow script rnaseq.nf we defined the reads parameter to be multiple paired .fq files that are created into a channel using the fromFilePairs channel factory. This created a tuple where the first element is a unique grouping key, created automatically based on similarities in file name, and the second element contains the list of paired files.

#!/usr/bin/env nextflow

params.reads = "/scratch/users/.../nf-training/data/ggal/*_{1,2}.fq"

reads_ch = Channel.fromFilePairs("$params.reads")

Now, move the /scratch/users/.../nf-training/data/ggal/lung_2.fq file into another directory so the folder contains one lung .fq file:

>>> mv /scratch/users/.../training/nf-training/data/ggal/lung_2.fq .
>>> ls /scratch/users/.../training/nf-training/data/ggal
gut_1.fq
gut_2.fq
gut.transcriptome.fa
liver_1.fq
liver_2.fq
liver.transcriptome.fa
lung_1.fq
lung.transcriptome.fa

Exercise

Use the fromPath channel factory to read all .fq files as separate elements.

Then, use map to create a mapping function that returns a tuple, where the first element is the grouping key, and the second element is the .fq file(s).

Then, use groupTuple() to create channels containing both single and paired .fq files. Within the groupTuple() operator, set sort: true, which orders the groups numerically, ensuring the first .fq is first.

Expected output:

[lung, [/scratch/users/.../training/nf-training/data/ggal/lung_1.fq]]
[gut, [/scratch/users/.../training/nf-training/data/ggal/gut_1.fq, /scratch/users/.../training/nf-training/data/ggal/gut_2.fq]]
[liver, [/scratch/users/.../training/nf-training/data/ggal/liver_1.fq, /scratch/users/.../training/nf-training/data/ggal/liver_2.fq]]

Inside the map function, the following can be used to extract the sample name from the .fq files. file is the local variable defined inside the function that represents each .fq file. The getName() method will return the file name without the full path, and replaceAll is used to remove the _2.fq and _1.fq file suffixes.

def group_key = file.getName().replaceAll(/_2.fq/,'').replaceAll(/_1.fq/,'')

For a full list of Nextflow file attributes, see here.

The fromPath channel is used to read all .fq files separately. The map function is then used to create a two-element tuple where the first element is a grouping key and the second element is the list of .fq file(s).

reads_ch = Channel.fromPath("/home/sli/nextflow_training/training/nf-training/data/ggal/*.fq")
  .map { file ->
    def group_key = file.getName().replaceAll(/_2.fq/,'').replaceAll(/_1.fq/,'')
    [group_key, file]
  }
  .groupTuple(sort: true)
  .view()

Now, run the workflow up to the combine step. The quant_inputs_ch should now consist of:

[liver, /scratch/users/.../work/cf/42458b80e050a466d62baf99d0c1cf/salmon_idx, [/scratch/users/.../nf-training/data/ggal/liver_1.fq, /scratch/users/.../nf-training/data/ggal/liver_2.fq]]
[lung, /scratch/users/.../work/64/90a77a5f1ed5a0000f6620fd1fab9a/salmon_idx, [/scratch/users/.../nf-training/data/ggal/lung_1.fq]]
[gut, /scratch/users/.../work/37/352b00bfb71156a9250150428ddf1d/salmon_idx, [/scratch/users/.../nf-training/data/ggal/gut_1.fq, /scratch/users/.../nf-training/data/ggal/gut_2.fq]]


6.1.4 flatten

The flatten operator flattens each item from a source channel and emits the elements separately. Deeply nested inputs are also flattened.

Channel.of( [1, [2, 3]], 4, [5, [6]] )
    .flatten()
    .view()

Output:

1
2
3
4
5
6


Within the script block of the QUANTIFICATION process in the RNA-seq pipeline, we are assuming the reads are paired, and specify -1 ${reads[0]} -2 ${reads[1]} as inputs to salmon quant.

process QUANTIFICATION {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"

    input:
    tuple val(sample_id), path(salmon_index), path(reads)

    output:
    tuple val(sample_id) path("$sample_id")

    script:
    """
    salmon quant --threads $task.cpus --libType=U \
    -i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
    """
}

Now that the input reads can be either single or paired, the QUANTIFICATION process needs to be modified to allow for either input type. This can be done using the flatten() operator, and conditional script statements. Additionally, the size() method can be used to calculate the size of a list.

The script block can be changed to the following:

    script:
    def input_reads = [reads]
    if( input_reads.flatten().size() == 1 )
        """
        salmon quant --threads $task.cpus --libType=U \
        -i $salmon_index -r $reads -o $sample_id
        """
    else 
        """
        salmon quant --threads $task.cpus --libType=U \\
        -i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
        """

First, a new variable input_reads is defined, which consists of the reads input being converted into a list. This has to be done since Nextflow will automatically convert a list of length 1 into a path within process. If the size() method was used on a path type input, it will return the size of the file in bytes, and not the list size. Therefore, all inputs must first be converted into a list in order to correctly caculate the number of files.

def input_reads = [reads]

For reads that are already in a list (ie. paired reads), this will nest the input into another list, for example:

[ [ file1, file2 ] ]

If the size() operator is used on this input, it will always return 1 since the encompassing list only contains one element. Therefore, the flatten() operator has to be used to emit the files as separate elements.

The final definition to obtain the number of files in reads becomes:

input_reads.flatten().size()

For single reads, the input to salmon quant becomes -r $reads


Exercise

Currently the TRIMGALORE process only accounts for paired reads.

process TRIMGALORE {
    container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img' 

    input:
        tuple val(sample_id), path(reads)
    
    output:
        tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
        tuple val(sample_id), path("*report.txt")                        , emit: log     , optional: true
        tuple val(sample_id), path("*unpaired*.fq.gz")                   , emit: unpaired, optional: true
        tuple val(sample_id), path("*.html")                             , emit: html    , optional: true
        tuple val(sample_id), path("*.zip")                              , emit: zip     , optional: true

    script:
        """
        trim_galore \\
        --paired \\
        --gzip \\
        ${reads[0]} \\
        ${reads[1]}
        """
}

Modify the process such that both single and paired reads can be used. For single reads, the following script block can be used:

"""
trim_galore \\
  --gzip \\
  $reads
"""
process TRIMGALORE {
  container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img' 

  input:
    tuple val(sample_id), path(reads)
  
  output:
    tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
    tuple val(sample_id), path("*report.txt")                        , emit: log     , optional: true
    tuple val(sample_id), path("*unpaired*.fq.gz")                   , emit: unpaired, optional: true
    tuple val(sample_id), path("*.html")                             , emit: html    , optional: true
    tuple val(sample_id), path("*.zip")                              , emit: zip     , optional: true

  script:
  def input_reads = [reads]

  if( input_reads.flatten().size() == 1 )
    """
    trim_galore \\
      --gzip \\
      $reads
    """
  else
    """
    trim_galore \\
      --paired \\
      --gzip \\
      ${reads[0]} \\
      ${reads[1]}
    """

}

Extension

Modify the FASTQC process such that the output is a tuple where the first element is the grouping key, and the second element is the path to the fastqc logs.

Modify the MULTIQC process such that the output is a tuple where the first element is the grouping key, and the second element is the path to the generated html file.

Finally, run the entire workflow, specifying an --outdir. The workflow block should look like this:

workflow {
  index_ch = INDEX(transcriptome_ch)

  quant_inputs_ch = index_ch.combine(reads_ch, by: 0)
  quant_ch = QT(quant_inputs_ch)

  trimgalore_out_ch = TRIMGALORE(reads_ch).reads

  fastqc_ch = FASTQC_one(reads_ch)
  fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
  multiqc_ch = MULTIQC(quant_ch, fastqc_ch)
}

The output block of both processes have been modified to be tuples containing a grouping key.

process FASTQC {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"

    input:
    tuple val(sample_id), path(reads)

    output:
    tuple val(sample_id), path("fastqc_${sample_id}_logs")

    script:
    """
    mkdir fastqc_${sample_id}_logs
    fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
    """
}

process MULTIQC {
    publishDir params.outdir, mode:'copy'
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"

    input:
    tuple val(sample_id), path(quantification)
    tuple val(sample_id), path(fastqc)

    output:
    tuple val(sample_id), path("*.html")

    script:
    """
    multiqc . --filename $quantification
    """
}

This workshop is adapted from Fundamentals Training, Advanced Training, Developer Tutorials, Nextflow Patterns materials from Nextflow, nf-core nf-core tools documentation and nf-validation