Nextflow Development - Outputs, Scatter, and Gather

Objectives
  • Gain an understanding of how to structure nextflow published outputs
  • Gain an understanding of how to do scatter & gather processes

Environment Setup

Set up an interactive shell to run our Nextflow workflow:

srun --pty -p prod_short --mem 8GB --mincpus 2 -t 0-2:00 bash

Load the required modules to run Nextflow:

module load nextflow/23.04.1
module load singularity/3.7.3

Set the singularity cache environment variable:

export NXF_SINGULARITY_CACHEDIR=/config/binaries/singularity/containers_devel/nextflow

Singularity images downloaded by workflow executions will now be stored in this directory.

You may want to include these, or other environmental variables, in your .bashrc file (or alternate) that is loaded when you log in so you don’t need to export variables every session. A complete list of environment variables can be found here.

The training data can be cloned from:

git clone https://github.com/nextflow-io/training.git

RNA-seq Workflow and Module Files

Previously, we created three Nextflow files and one config file:

├── nextflow.config
├── rnaseq.nf
├── modules.nf
└── modules
    └── trimgalore.nf
  • rnaseq.nf: main workflow script where parameters are defined and processes were called.
#!/usr/bin/env nextflow

params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"

reads_ch = Channel.fromFilePairs("$params.reads")

include { INDEX } from './modules.nf'
include { QUANTIFICATION as QT } from './modules.nf'
include { FASTQC as FASTQC_one } from './modules.nf'
include { FASTQC as FASTQC_two } from './modules.nf'
include { MULTIQC } from './modules.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'

workflow {
  index_ch = INDEX(params.transcriptome_file)
  quant_ch = QT(index_ch, reads_ch)
  fastqc_ch = FASTQC_one(reads_ch)
  trimgalore_out_ch = TRIMGALORE(reads_ch).reads
  fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
  multiqc_ch = MULTIQC(quant_ch, fastqc_ch)
}
  • modules.nf: script containing the majority of modules, including INDEX, QUANTIFICATION, FASTQC, and MULTIQC
process INDEX {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"

    input:
    path transcriptome

    output:
    path "salmon_idx"

    script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
    """
}

process QUANTIFICATION {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"

    input:
    path salmon_index
    tuple val(sample_id), path(reads)

    output:
    path "$sample_id"

    script:
    """
    salmon quant --threads $task.cpus --libType=U \
    -i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
    """
}

process FASTQC {
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"

    input:
    tuple val(sample_id), path(reads)

    output:
    path "fastqc_${sample_id}_logs"

    script:
    """
    mkdir fastqc_${sample_id}_logs
    fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
    """
}

process MULTIQC {
    publishDir params.outdir, mode:'copy'
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"

    input:
    path quantification
    path fastqc

    output:
    path "*.html"

    script:
    """
    multiqc . --filename $quantification
    """
}
  • modules/trimgalore.nf: script inside a modules folder, containing only the TRIMGALORE process
process TRIMGALORE {
  container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img' 

  input:
    tuple val(sample_id), path(reads)
  
  output:
    tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
    tuple val(sample_id), path("*report.txt")                        , emit: log     , optional: true
    tuple val(sample_id), path("*unpaired*.fq.gz")                   , emit: unpaired, optional: true
    tuple val(sample_id), path("*.html")                             , emit: html    , optional: true
    tuple val(sample_id), path("*.zip")                              , emit: zip     , optional: true

  script:
    """
    trim_galore \\
      --paired \\
      --gzip \\
      ${reads[0]} \\
      ${reads[1]}
    """
}
  • nextflow.config: config file that enables singularity
singularity {
  enabled = true
  autoMounts = true
  cacheDir = "/config/binaries/singularity/containers_devel/nextflow"
}

Run the pipeline, specifying --outdir:

>>> nextflow run rnaseq.nf --outdir output
N E X T F L O W  ~  version 23.04.1
Launching `rnaseq.nf` [soggy_jennings] DSL2 - revision: 87afc1d98d
executor >  local (16)
[93/d37ef0] process > INDEX          [100%] 1 of 1 ✔
[b3/4c4d9c] process > QT (1)         [100%] 3 of 3 ✔
[d0/173a6e] process > FASTQC_one (3) [100%] 3 of 3 ✔
[58/0b8af2] process > TRIMGALORE (3) [100%] 3 of 3 ✔
[c6/def175] process > FASTQC_two (3) [100%] 3 of 3 ✔
[e0/bcf904] process > MULTIQC (3)    [100%] 3 of 3 ✔

8.1. Organise outputs

The output declaration block defines the channels used by the process to send out the results produced. However, this output only stays in the work/ directory if there is no publishDir directive specified.

Given each task is being executed in separate temporary work/ folder (e.g., work/f1/850698…), you may want to save important, non-intermediary, and/or final files in a results folder.

To store our workflow result files, you need to explicitly mark them using the directive publishDir in the process that’s creating the files. For example:

process MULTIQC {
    publishDir params.outdir, mode:'copy'
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"

    input:
    path quantification
    path fastqc

    output:
    path "*.html"

    script:
    """
    multiqc . --filename $quantification
    """
}

The above example will copy all html files created by the MULTIQC process into the directory path specified in the params.outdir

8.1.1. Store outputs matching a glob pattern

You can use more than one publishDir to keep different outputs in separate directories. For each directive specify a different glob pattern using the pattern option to store into each directory only the files that match the provided pattern.

For example:

reads_ch = Channel.fromFilePairs('data/ggal/*_{1,2}.fq')

process FOO {
    publishDir "results/bam", pattern: "*.bam"
    publishDir "results/bai", pattern: "*.bai"

    input:
    tuple val(sample_id), path(sample_id_paths)

    output:
    tuple val(sample_id), path("*.bam")
    tuple val(sample_id), path("*.bai")

    script:
    """
    echo your_command_here --sample $sample_id_paths > ${sample_id}.bam
    echo your_command_here --sample $sample_id_paths > ${sample_id}.bai
    """
}

Exercise

Use publishDir and pattern to keep the outputs from the trimgalore.nf into separate directories.

process TRIMGALORE {
  container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img' 
  publishDir "$params.outdir/report", mode: "copy", pattern:"*report.txt"
  publishDir "$params.outdir/trimmed_fastq", mode: "copy", pattern:"*fq.gz"

  input:
    tuple val(sample_id), path(reads)
  
  output:
    tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
    tuple val(sample_id), path("*report.txt")                        , emit: log     , optional: true
    tuple val(sample_id), path("*unpaired*.fq.gz")                   , emit: unpaired, optional: true
    tuple val(sample_id), path("*.html")                             , emit: html    , optional: true
    tuple val(sample_id), path("*.zip")                              , emit: zip     , optional: true

  script:
    """
    trim_galore \\
      --paired \\
      --gzip \\
      ${reads[0]} \\
      ${reads[1]}
    """
}

Output should now look like

>>> tree ./output
./output
├── gut.html
├── liver.html
├── lung.html
├── report
│   ├── gut_1.fq_trimming_report.txt
│   ├── gut_2.fq_trimming_report.txt
│   ├── liver_1.fq_trimming_report.txt
│   ├── liver_2.fq_trimming_report.txt
│   ├── lung_1.fq_trimming_report.txt
│   └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
    ├── gut_1_val_1.fq.gz
    ├── gut_2_val_2.fq.gz
    ├── liver_1_val_1.fq.gz
    ├── liver_2_val_2.fq.gz
    ├── lung_1_val_1.fq.gz
    └── lung_2_val_2.fq.gz

2 directories, 15 files

8.1.2. Store outputs renaming files or in a sub-directory

The publishDir directive also allow the use of saveAs option to give each file a name of your choice, providing a custom rule as a closure.

process foo {
  publishDir 'results', saveAs: { filename -> "foo_$filename" }

  output: 
  path '*.txt'

  '''
  touch this.txt
  touch that.txt
  '''
}

The same pattern can be used to store specific files in separate directories depending on the actual name.

process foo {
  publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }

  output: 
  path '*'

  '''
  touch this.txt
  touch that.zip
  '''
}

Exercise

Modify the MULTIQC output with saveAs such that resulting folder is as follow:

./output
├── MultiQC
│   ├── multiqc_gut.html
│   ├── multiqc_liver.html
│   └── multiqc_lung.html
├── report
│   ├── gut_1.fq_trimming_report.txt
│   ├── gut_2.fq_trimming_report.txt
│   ├── liver_1.fq_trimming_report.txt
│   ├── liver_2.fq_trimming_report.txt
│   ├── lung_1.fq_trimming_report.txt
│   └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
    ├── gut_1_val_1.fq.gz
    ├── gut_2_val_2.fq.gz
    ├── liver_1_val_1.fq.gz
    ├── liver_2_val_2.fq.gz
    ├── lung_1_val_1.fq.gz
    └── lung_2_val_2.fq.gz

3 directories, 15 files
Warning

You need to remove existing output folder/files if you want to have a clean output. By default, nextflow will overwrite existing files, and keep all the remaining files in the same specified output directory.

process MULTIQC {
    publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"

    input:
    path quantification
    path fastqc

    output:
    path "*.html"

    script:
    """
    multiqc . --filename $quantification
    """
}

Challenge

Modify all the processes in rnaseq.nf such that we will have the following output structure

./output
├── gut
│   ├── QC
│   │   ├── fastqc_gut_logs
│   │   │   ├── gut_1_fastqc.html
│   │   │   ├── gut_1_fastqc.zip
│   │   │   ├── gut_2_fastqc.html
│   │   │   └── gut_2_fastqc.zip
│   │   └── gut.html
│   ├── report
│   │   ├── gut_1.fq_trimming_report.txt
│   │   └── gut_2.fq_trimming_report.txt
│   └── trimmed_fastq
│       ├── gut_1_val_1.fq.gz
│       └── gut_2_val_2.fq.gz
├── liver
│   ├── QC
│   │   ├── fastqc_liver_logs
│   │   │   ├── liver_1_fastqc.html
│   │   │   ├── liver_1_fastqc.zip
│   │   │   ├── liver_2_fastqc.html
│   │   │   └── liver_2_fastqc.zip
│   │   └── liver.html
│   ├── report
│   │   ├── liver_1.fq_trimming_report.txt
│   │   └── liver_2.fq_trimming_report.txt
│   └── trimmed_fastq
│       ├── liver_1_val_1.fq.gz
│       └── liver_2_val_2.fq.gz
└── lung
    ├── QC
    │   ├── fastqc_lung_logs
    │   │   ├── lung_1_fastqc.html
    │   │   ├── lung_1_fastqc.zip
    │   │   ├── lung_2_fastqc.html
    │   │   └── lung_2_fastqc.zip
    │   └── lung.html
    ├── report
    │   ├── lung_1.fq_trimming_report.txt
    │   └── lung_2.fq_trimming_report.txt
    └── trimmed_fastq
        ├── lung_1_val_1.fq.gz
        └── lung_2_val_2.fq.gz

15 directories, 27 files
process FASTQC {
    publishDir "$params.outdir/$sample_id/QC", mode:'copy'
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"

    input:
    tuple val(sample_id), path(reads)

    output:
    path "fastqc_${sample_id}_logs"

    script:
    """
    mkdir fastqc_${sample_id}_logs
    fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
    """
}

process MULTIQC {
    //publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
    publishDir "$params.outdir/$quantification/QC", mode:'copy'
    container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"

    input:
    path quantification
    path fastqc

    output:
    path "*.html"

    script:
    """
    multiqc . --filename $quantification
    """
}

process TRIMGALORE {
  container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
  publishDir "${params.outdir}/${sample_id}/report", mode: "copy", pattern:"*report.txt"
  publishDir "${params.outdir}/${sample_id}/trimmed_fastq", mode: "copy", pattern:"*fq.gz"

  input:
    tuple val(sample_id), path(reads)

  output:
    tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
    tuple val(sample_id), path("*report.txt")                        , emit: log     , optional: true
    tuple val(sample_id), path("*unpaired*.fq.gz")                   , emit: unpaired, optional: true
    tuple val(sample_id), path("*.html")                             , emit: html    , optional: true
    tuple val(sample_id), path("*.zip")                              , emit: zip     , optional: true

  script:
    """
    trim_galore \\
      --paired \\
      --gzip \\
      ${reads[0]} \\
      ${reads[1]}
    """
}

8.2 Scatter

The scatter operation involves distributing large input data into smaller chunks that can be analysed across multiple processes in parallel.

One very simple example of native scatter is how nextflow handles Channel factories with the Channel.fromPath or Channel.fromFilePairs method, where multiple input data is processed in parallel.

params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
reads_ch = Channel.fromFilePairs("$params.reads")

include { FASTQC as FASTQC_one } from './modules.nf'

workflow {
  fastqc_ch = FASTQC_one(reads_ch)
}

From the above snippet from our rnaseq.nf, we will get three execution of FASTQC_one for each pairs of our input data.

Other than natively splitting execution by input data, Nextflow also provides operators to scatter existing input data for various benefits, such as faster processing. For example:

8.2.1 Process per file chunk

Exercise

params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000

process count_line {
  debug true
  input: 
  file x

  script:
  """
  wc -l $x 
  """
}

workflow {
  Channel.fromPath(params.infile) \
    | splitText(by: params.size, file: true) \
    | count_line
}

Exercise

params.infile = "/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_{1,2}.fq"
params.size = 1000

workflow {
  Channel.fromFilePairs(params.infile, flat: true) \
    | splitFastq(by: params.size, pe: true, file: true) \
    | view()
}

8.2.1 Process per file range

Exercise

Channel.from(1..22) \
   | map { chr -> ["sample${chr}", file("${chr}.indels.vcf"), file("${chr}.vcf")] } \
   | view()
>> nextflow run test_scatter.nf

[sample1, /scratch/users/${users}/1.indels.vcf, /scratch/users/${users}/1.vcf]
[sample2, /scratch/users/${users}/2.indels.vcf, /scratch/users/${users}/2.vcf]
[sample3, /scratch/users/${users}/3.indels.vcf, /scratch/users/${users}/3.vcf]
[sample4, /scratch/users/${users}/4.indels.vcf, /scratch/users/${users}/4.vcf]
[sample5, /scratch/users/${users}/5.indels.vcf, /scratch/users/${users}/5.vcf]
[sample6, /scratch/users/${users}/6.indels.vcf, /scratch/users/${users}/6.vcf]
[sample7, /scratch/users/${users}/7.indels.vcf, /scratch/users/${users}/7.vcf]
[sample8, /scratch/users/${users}/8.indels.vcf, /scratch/users/${users}/8.vcf]
[sample9, /scratch/users/${users}/9.indels.vcf, /scratch/users/${users}/9.vcf]
[sample10, /scratch/users${users}/10.indels.vcf, /scratch/users${users}/10.vcf]
[sample11, /scratch/users${users}/11.indels.vcf, /scratch/users${users}/11.vcf]
[sample12, /scratch/users${users}/12.indels.vcf, /scratch/users${users}/12.vcf]
[sample13, /scratch/users${users}/13.indels.vcf, /scratch/users${users}/13.vcf]
[sample14, /scratch/users${users}/14.indels.vcf, /scratch/users${users}/14.vcf]
[sample15, /scratch/users${users}/15.indels.vcf, /scratch/users${users}/15.vcf]
[sample16, /scratch/users${users}/16.indels.vcf, /scratch/users${users}/16.vcf]
[sample17, /scratch/users${users}/17.indels.vcf, /scratch/users${users}/17.vcf]
[sample18, /scratch/users${users}/18.indels.vcf, /scratch/users${users}/18.vcf]
[sample19, /scratch/users${users}/19.indels.vcf, /scratch/users${users}/19.vcf]
[sample20, /scratch/users${users}/20.indels.vcf, /scratch/users${users}/20.vcf]
[sample21, /scratch/users${users}/21.indels.vcf, /scratch/users${users}/21.vcf]
[sample22, /scratch/users${users}/22.indels.vcf, /scratch/users${users}/22.vcf]

Exercise

params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000

process split_bed_by_chr {
  debug true

  input:
  path bed
  val chr

  output:
  path "*.bed"

  script:
  """
  grep ^${chr}\t ${bed} > ${chr}.bed
  """
}

workflow {
    split_bed_by_chr(params.infile, Channel.from(1..22)) | view()
}

Challenge

How do we include chr X and Y into the above split by chromosome?

workflow {
    split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | view()
}

8.3 Gather

The gather operation consolidates results from parallel computations (can be from scatter) into a centralized process for aggregation and further processing.

Some of the Nextflow provided operators that facilitate this gather operation, include:

8.3.1. Process all outputs altogether

Exercise

params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000

process split_bed_by_chr {
  debug true

  input:
  path bed
  val chr

  output:
  path "*.bed"

  script:
  """
  grep ^${chr}\t ${bed} > ${chr}.bed
  """
}

workflow {
    split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collect | view()
}

8.3.2. Collect outputs into a file

Exercise

params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000

process split_bed_by_chr {
  debug true

  input:
  path bed
  val chr

  output:
  path "*.bed"

  script:
  """
  grep ^${chr}\t ${bed} > ${chr}.bed
  """
}

workflow {
    split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collectFile(name: 'merged.bed', newLine:true) | view()
}

Exercise

workflow {
  Channel.fromPath("/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_1.fq", checkIfExists: true) \
    | collectFile(name: 'combined_1.fq', newLine:true) \
    | view
}