Nextflow Development - Outputs, Scatter, and Gather
- Gain an understanding of how to structure nextflow published outputs
- Gain an understanding of how to do scatter & gather processes
Environment Setup
Set up an interactive shell to run our Nextflow workflow:
srun --pty -p prod_short --mem 8GB --mincpus 2 -t 0-2:00 bash
Load the required modules to run Nextflow:
module load nextflow/23.04.1 module load singularity/3.7.3
Set the singularity cache environment variable:
export NXF_SINGULARITY_CACHEDIR=/config/binaries/singularity/containers_devel/nextflow
Singularity images downloaded by workflow executions will now be stored in this directory.
You may want to include these, or other environmental variables, in your .bashrc
file (or alternate) that is loaded when you log in so you don’t need to export variables every session. A complete list of environment variables can be found here.
The training data can be cloned from:
git clone https://github.com/nextflow-io/training.git
RNA-seq Workflow and Module Files
Previously, we created three Nextflow files and one config file:
├── nextflow.config
├── rnaseq.nf
├── modules.nf
└── modules └── trimgalore.nf
rnaseq.nf
: main workflow script where parameters are defined and processes were called.
#!/usr/bin/env nextflow
params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"
reads_ch = Channel.fromFilePairs("$params.reads")
include { INDEX } from './modules.nf'
include { QUANTIFICATION as QT } from './modules.nf'
include { FASTQC as FASTQC_one } from './modules.nf'
include { FASTQC as FASTQC_two } from './modules.nf'
include { MULTIQC } from './modules.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'
workflow {
index_ch = INDEX(params.transcriptome_file)
quant_ch = QT(index_ch, reads_ch)
fastqc_ch = FASTQC_one(reads_ch)
trimgalore_out_ch = TRIMGALORE(reads_ch).reads
fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
multiqc_ch = MULTIQC(quant_ch, fastqc_ch) }
modules.nf
: script containing the majority of modules, includingINDEX
,QUANTIFICATION
,FASTQC
, andMULTIQC
process INDEX {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
path transcriptome
output:
path "salmon_idx"
script:
"""
salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
"""
}
process QUANTIFICATION {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
path salmon_index
tuple val(sample_id), path(reads)
output:
path "$sample_id"
script:
"""
salmon quant --threads $task.cpus --libType=U \
-i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
"""
}
process FASTQC {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
input:
tuple val(sample_id), path(reads)
output:
path "fastqc_${sample_id}_logs"
script:
"""
mkdir fastqc_${sample_id}_logs
fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
"""
}
process MULTIQC {
publishDir params.outdir, mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
""" }
modules/trimgalore.nf
: script inside amodules
folder, containing only theTRIMGALORE
process
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
""" }
nextflow.config
: config file that enables singularity
singularity {
enabled = true
autoMounts = true
cacheDir = "/config/binaries/singularity/containers_devel/nextflow" }
Run the pipeline, specifying --outdir
:
>>> nextflow run rnaseq.nf --outdir output
N E X T F L O W ~ version 23.04.1
Launching `rnaseq.nf` [soggy_jennings] DSL2 - revision: 87afc1d98d
executor > local (16)
[93/d37ef0] process > INDEX [100%] 1 of 1 ✔
[b3/4c4d9c] process > QT (1) [100%] 3 of 3 ✔
[d0/173a6e] process > FASTQC_one (3) [100%] 3 of 3 ✔
[58/0b8af2] process > TRIMGALORE (3) [100%] 3 of 3 ✔
[c6/def175] process > FASTQC_two (3) [100%] 3 of 3 ✔ [e0/bcf904] process > MULTIQC (3) [100%] 3 of 3 ✔
8.1. Organise outputs
The output declaration block defines the channels used by the process to send out the results produced. However, this output only stays in the work/
directory if there is no publishDir
directive specified.
Given each task is being executed in separate temporary work/ folder (e.g., work/f1/850698…), you may want to save important, non-intermediary, and/or final files in a results folder.
To store our workflow result files, you need to explicitly mark them using the directive publishDir
in the process that’s creating the files. For example:
process MULTIQC {
publishDir params.outdir, mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
""" }
The above example will copy all html
files created by the MULTIQC process into the directory path specified in the params.outdir
8.1.1. Store outputs matching a glob pattern
You can use more than one publishDir
to keep different outputs in separate directories. For each directive specify a different glob pattern
using the pattern option to store into each directory only the files that match the provided pattern.
For example:
reads_ch = Channel.fromFilePairs('data/ggal/*_{1,2}.fq')
process FOO {
publishDir "results/bam", pattern: "*.bam"
publishDir "results/bai", pattern: "*.bai"
input:
tuple val(sample_id), path(sample_id_paths)
output:
tuple val(sample_id), path("*.bam")
tuple val(sample_id), path("*.bai")
script:
"""
echo your_command_here --sample $sample_id_paths > ${sample_id}.bam
echo your_command_here --sample $sample_id_paths > ${sample_id}.bai
""" }
Exercise
Use publishDir
and pattern
to keep the outputs from the trimgalore.nf
into separate directories.
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
publishDir "$params.outdir/report", mode: "copy", pattern:"*report.txt"
publishDir "$params.outdir/trimmed_fastq", mode: "copy", pattern:"*fq.gz"
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
""" }
Output should now look like
>>> tree ./output
./output
├── gut.html
├── liver.html
├── lung.html
├── report
│ ├── gut_1.fq_trimming_report.txt
│ ├── gut_2.fq_trimming_report.txt
│ ├── liver_1.fq_trimming_report.txt
│ ├── liver_2.fq_trimming_report.txt
│ ├── lung_1.fq_trimming_report.txt
│ └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
├── gut_1_val_1.fq.gz
├── gut_2_val_2.fq.gz
├── liver_1_val_1.fq.gz
├── liver_2_val_2.fq.gz
├── lung_1_val_1.fq.gz
└── lung_2_val_2.fq.gz
2 directories, 15 files
8.1.2. Store outputs renaming files or in a sub-directory
The publishDir
directive also allow the use of saveAs
option to give each file a name of your choice, providing a custom rule as a closure.
process foo {
publishDir 'results', saveAs: { filename -> "foo_$filename" }
output:
path '*.txt'
'''
touch this.txt
touch that.txt
''' }
The same pattern can be used to store specific files in separate directories depending on the actual name.
process foo {
publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }
output:
path '*'
'''
touch this.txt
touch that.zip
''' }
Exercise
Modify the MULTIQC
output with saveAs
such that resulting folder is as follow:
./output
├── MultiQC
│ ├── multiqc_gut.html
│ ├── multiqc_liver.html
│ └── multiqc_lung.html
├── report
│ ├── gut_1.fq_trimming_report.txt
│ ├── gut_2.fq_trimming_report.txt
│ ├── liver_1.fq_trimming_report.txt
│ ├── liver_2.fq_trimming_report.txt
│ ├── lung_1.fq_trimming_report.txt
│ └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
├── gut_1_val_1.fq.gz
├── gut_2_val_2.fq.gz
├── liver_1_val_1.fq.gz
├── liver_2_val_2.fq.gz
├── lung_1_val_1.fq.gz
└── lung_2_val_2.fq.gz
3 directories, 15 files
You need to remove existing output folder/files if you want to have a clean output. By default, nextflow will overwrite existing files, and keep all the remaining files in the same specified output directory.
process MULTIQC {
publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
""" }
Challenge
Modify all the processes in rnaseq.nf
such that we will have the following output structure
./output
├── gut
│ ├── QC
│ │ ├── fastqc_gut_logs
│ │ │ ├── gut_1_fastqc.html
│ │ │ ├── gut_1_fastqc.zip
│ │ │ ├── gut_2_fastqc.html
│ │ │ └── gut_2_fastqc.zip
│ │ └── gut.html
│ ├── report
│ │ ├── gut_1.fq_trimming_report.txt
│ │ └── gut_2.fq_trimming_report.txt
│ └── trimmed_fastq
│ ├── gut_1_val_1.fq.gz
│ └── gut_2_val_2.fq.gz
├── liver
│ ├── QC
│ │ ├── fastqc_liver_logs
│ │ │ ├── liver_1_fastqc.html
│ │ │ ├── liver_1_fastqc.zip
│ │ │ ├── liver_2_fastqc.html
│ │ │ └── liver_2_fastqc.zip
│ │ └── liver.html
│ ├── report
│ │ ├── liver_1.fq_trimming_report.txt
│ │ └── liver_2.fq_trimming_report.txt
│ └── trimmed_fastq
│ ├── liver_1_val_1.fq.gz
│ └── liver_2_val_2.fq.gz
└── lung
├── QC
│ ├── fastqc_lung_logs
│ │ ├── lung_1_fastqc.html
│ │ ├── lung_1_fastqc.zip
│ │ ├── lung_2_fastqc.html
│ │ └── lung_2_fastqc.zip
│ └── lung.html
├── report
│ ├── lung_1.fq_trimming_report.txt
│ └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
├── lung_1_val_1.fq.gz
└── lung_2_val_2.fq.gz
15 directories, 27 files
process FASTQC {
publishDir "$params.outdir/$sample_id/QC", mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
input:
tuple val(sample_id), path(reads)
output:
path "fastqc_${sample_id}_logs"
script:
"""
mkdir fastqc_${sample_id}_logs
fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
"""
}
process MULTIQC {
//publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
publishDir "$params.outdir/$quantification/QC", mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
"""
}
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
publishDir "${params.outdir}/${sample_id}/report", mode: "copy", pattern:"*report.txt"
publishDir "${params.outdir}/${sample_id}/trimmed_fastq", mode: "copy", pattern:"*fq.gz"
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
""" }
8.2 Scatter
The scatter
operation involves distributing large input data into smaller chunks that can be analysed across multiple processes in parallel.
One very simple example of native scatter
is how nextflow handles Channel factories with the Channel.fromPath
or Channel.fromFilePairs
method, where multiple input data is processed in parallel.
params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
reads_ch = Channel.fromFilePairs("$params.reads")
include { FASTQC as FASTQC_one } from './modules.nf'
workflow {
fastqc_ch = FASTQC_one(reads_ch) }
From the above snippet from our rnaseq.nf
, we will get three execution of FASTQC_one for each pairs of our input data.
Other than natively splitting execution by input data, Nextflow also provides operators to scatter existing input data for various benefits, such as faster processing. For example:
- splitText
- splitFasta
- splitFastq
- map with from or fromList
- flatten
8.2.1 Process per file chunk
Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process count_line {
debug true
input:
file x
script:
"""
wc -l $x
"""
}
workflow {
Channel.fromPath(params.infile) \
| splitText(by: params.size, file: true) \
| count_line }
Exercise
params.infile = "/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_{1,2}.fq"
params.size = 1000
workflow {
Channel.fromFilePairs(params.infile, flat: true) \
| splitFastq(by: params.size, pe: true, file: true) \
| view() }
8.2.1 Process per file range
Exercise
Channel.from(1..22) \
| map { chr -> ["sample${chr}", file("${chr}.indels.vcf"), file("${chr}.vcf")] } \ | view()
>> nextflow run test_scatter.nf
[sample1, /scratch/users/${users}/1.indels.vcf, /scratch/users/${users}/1.vcf]
[sample2, /scratch/users/${users}/2.indels.vcf, /scratch/users/${users}/2.vcf]
[sample3, /scratch/users/${users}/3.indels.vcf, /scratch/users/${users}/3.vcf]
[sample4, /scratch/users/${users}/4.indels.vcf, /scratch/users/${users}/4.vcf]
[sample5, /scratch/users/${users}/5.indels.vcf, /scratch/users/${users}/5.vcf]
[sample6, /scratch/users/${users}/6.indels.vcf, /scratch/users/${users}/6.vcf]
[sample7, /scratch/users/${users}/7.indels.vcf, /scratch/users/${users}/7.vcf]
[sample8, /scratch/users/${users}/8.indels.vcf, /scratch/users/${users}/8.vcf]
[sample9, /scratch/users/${users}/9.indels.vcf, /scratch/users/${users}/9.vcf]
[sample10, /scratch/users${users}/10.indels.vcf, /scratch/users${users}/10.vcf]
[sample11, /scratch/users${users}/11.indels.vcf, /scratch/users${users}/11.vcf]
[sample12, /scratch/users${users}/12.indels.vcf, /scratch/users${users}/12.vcf]
[sample13, /scratch/users${users}/13.indels.vcf, /scratch/users${users}/13.vcf]
[sample14, /scratch/users${users}/14.indels.vcf, /scratch/users${users}/14.vcf]
[sample15, /scratch/users${users}/15.indels.vcf, /scratch/users${users}/15.vcf]
[sample16, /scratch/users${users}/16.indels.vcf, /scratch/users${users}/16.vcf]
[sample17, /scratch/users${users}/17.indels.vcf, /scratch/users${users}/17.vcf]
[sample18, /scratch/users${users}/18.indels.vcf, /scratch/users${users}/18.vcf]
[sample19, /scratch/users${users}/19.indels.vcf, /scratch/users${users}/19.vcf]
[sample20, /scratch/users${users}/20.indels.vcf, /scratch/users${users}/20.vcf]
[sample21, /scratch/users${users}/21.indels.vcf, /scratch/users${users}/21.vcf] [sample22, /scratch/users${users}/22.indels.vcf, /scratch/users${users}/22.vcf]
Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process split_bed_by_chr {
debug true
input:
path bed
val chr
output:
path "*.bed"
script:
"""
grep ^${chr}\t ${bed} > ${chr}.bed
"""
}
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22)) | view()
}
Challenge
How do we include chr X
and Y
into the above split by chromosome?
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | view() }
8.3 Gather
The gather
operation consolidates results from parallel computations (can be from scatter
) into a centralized process for aggregation and further processing.
Some of the Nextflow provided operators that facilitate this gather
operation, include:
8.3.1. Process all outputs altogether
Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process split_bed_by_chr {
debug true
input:
path bed
val chr
output:
path "*.bed"
script:
"""
grep ^${chr}\t ${bed} > ${chr}.bed
"""
}
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collect | view() }
8.3.2. Collect outputs into a file
Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process split_bed_by_chr {
debug true
input:
path bed
val chr
output:
path "*.bed"
script:
"""
grep ^${chr}\t ${bed} > ${chr}.bed
"""
}
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collectFile(name: 'merged.bed', newLine:true) | view() }
Exercise
workflow {
Channel.fromPath("/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_1.fq", checkIfExists: true) \
| collectFile(name: 'combined_1.fq', newLine:true) \
| view }