Nextflow Development - Outputs, Scatter, and Gather
- Gain an understanding of how to structure nextflow published outputs
- Gain an understanding of how to do scatter & gather processes
Environment Setup
Set up an interactive shell to run our Nextflow workflow:
srun --pty -p prod_short --mem 8GB --mincpus 2 -t 0-2:00 bashLoad the required modules to run Nextflow:
module load nextflow/23.04.1
module load singularity/3.7.3Set the singularity cache environment variable:
export NXF_SINGULARITY_CACHEDIR=/config/binaries/singularity/containers_devel/nextflowSingularity images downloaded by workflow executions will now be stored in this directory.
You may want to include these, or other environmental variables, in your .bashrc file (or alternate) that is loaded when you log in so you don’t need to export variables every session. A complete list of environment variables can be found here.
The training data can be cloned from:
git clone https://github.com/nextflow-io/training.gitRNA-seq Workflow and Module Files
Previously, we created three Nextflow files and one config file:
├── nextflow.config
├── rnaseq.nf
├── modules.nf
└── modules
└── trimgalore.nfrnaseq.nf: main workflow script where parameters are defined and processes were called.
#!/usr/bin/env nextflow
params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"
reads_ch = Channel.fromFilePairs("$params.reads")
include { INDEX } from './modules.nf'
include { QUANTIFICATION as QT } from './modules.nf'
include { FASTQC as FASTQC_one } from './modules.nf'
include { FASTQC as FASTQC_two } from './modules.nf'
include { MULTIQC } from './modules.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'
workflow {
index_ch = INDEX(params.transcriptome_file)
quant_ch = QT(index_ch, reads_ch)
fastqc_ch = FASTQC_one(reads_ch)
trimgalore_out_ch = TRIMGALORE(reads_ch).reads
fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
multiqc_ch = MULTIQC(quant_ch, fastqc_ch)
}modules.nf: script containing the majority of modules, includingINDEX,QUANTIFICATION,FASTQC, andMULTIQC
process INDEX {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
path transcriptome
output:
path "salmon_idx"
script:
"""
salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
"""
}
process QUANTIFICATION {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
input:
path salmon_index
tuple val(sample_id), path(reads)
output:
path "$sample_id"
script:
"""
salmon quant --threads $task.cpus --libType=U \
-i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
"""
}
process FASTQC {
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
input:
tuple val(sample_id), path(reads)
output:
path "fastqc_${sample_id}_logs"
script:
"""
mkdir fastqc_${sample_id}_logs
fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
"""
}
process MULTIQC {
publishDir params.outdir, mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
"""
}modules/trimgalore.nf: script inside amodulesfolder, containing only theTRIMGALOREprocess
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
"""
}nextflow.config: config file that enables singularity
singularity {
enabled = true
autoMounts = true
cacheDir = "/config/binaries/singularity/containers_devel/nextflow"
}Run the pipeline, specifying --outdir:
>>> nextflow run rnaseq.nf --outdir output
N E X T F L O W ~ version 23.04.1
Launching `rnaseq.nf` [soggy_jennings] DSL2 - revision: 87afc1d98d
executor > local (16)
[93/d37ef0] process > INDEX [100%] 1 of 1 ✔
[b3/4c4d9c] process > QT (1) [100%] 3 of 3 ✔
[d0/173a6e] process > FASTQC_one (3) [100%] 3 of 3 ✔
[58/0b8af2] process > TRIMGALORE (3) [100%] 3 of 3 ✔
[c6/def175] process > FASTQC_two (3) [100%] 3 of 3 ✔
[e0/bcf904] process > MULTIQC (3) [100%] 3 of 3 ✔8.1. Organise outputs
The output declaration block defines the channels used by the process to send out the results produced. However, this output only stays in the work/ directory if there is no publishDir directive specified.
Given each task is being executed in separate temporary work/ folder (e.g., work/f1/850698…), you may want to save important, non-intermediary, and/or final files in a results folder.
To store our workflow result files, you need to explicitly mark them using the directive publishDir in the process that’s creating the files. For example:
process MULTIQC {
publishDir params.outdir, mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
"""
}The above example will copy all html files created by the MULTIQC process into the directory path specified in the params.outdir
8.1.1. Store outputs matching a glob pattern
You can use more than one publishDir to keep different outputs in separate directories. For each directive specify a different glob pattern using the pattern option to store into each directory only the files that match the provided pattern.
For example:
reads_ch = Channel.fromFilePairs('data/ggal/*_{1,2}.fq')
process FOO {
publishDir "results/bam", pattern: "*.bam"
publishDir "results/bai", pattern: "*.bai"
input:
tuple val(sample_id), path(sample_id_paths)
output:
tuple val(sample_id), path("*.bam")
tuple val(sample_id), path("*.bai")
script:
"""
echo your_command_here --sample $sample_id_paths > ${sample_id}.bam
echo your_command_here --sample $sample_id_paths > ${sample_id}.bai
"""
}Exercise
Use publishDir and pattern to keep the outputs from the trimgalore.nf into separate directories.
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
publishDir "$params.outdir/report", mode: "copy", pattern:"*report.txt"
publishDir "$params.outdir/trimmed_fastq", mode: "copy", pattern:"*fq.gz"
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
"""
}Output should now look like
>>> tree ./output
./output
├── gut.html
├── liver.html
├── lung.html
├── report
│ ├── gut_1.fq_trimming_report.txt
│ ├── gut_2.fq_trimming_report.txt
│ ├── liver_1.fq_trimming_report.txt
│ ├── liver_2.fq_trimming_report.txt
│ ├── lung_1.fq_trimming_report.txt
│ └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
├── gut_1_val_1.fq.gz
├── gut_2_val_2.fq.gz
├── liver_1_val_1.fq.gz
├── liver_2_val_2.fq.gz
├── lung_1_val_1.fq.gz
└── lung_2_val_2.fq.gz
2 directories, 15 files8.1.2. Store outputs renaming files or in a sub-directory
The publishDir directive also allow the use of saveAs option to give each file a name of your choice, providing a custom rule as a closure.
process foo {
publishDir 'results', saveAs: { filename -> "foo_$filename" }
output:
path '*.txt'
'''
touch this.txt
touch that.txt
'''
}The same pattern can be used to store specific files in separate directories depending on the actual name.
process foo {
publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }
output:
path '*'
'''
touch this.txt
touch that.zip
'''
}Exercise
Modify the MULTIQC output with saveAs such that resulting folder is as follow:
./output
├── MultiQC
│ ├── multiqc_gut.html
│ ├── multiqc_liver.html
│ └── multiqc_lung.html
├── report
│ ├── gut_1.fq_trimming_report.txt
│ ├── gut_2.fq_trimming_report.txt
│ ├── liver_1.fq_trimming_report.txt
│ ├── liver_2.fq_trimming_report.txt
│ ├── lung_1.fq_trimming_report.txt
│ └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
├── gut_1_val_1.fq.gz
├── gut_2_val_2.fq.gz
├── liver_1_val_1.fq.gz
├── liver_2_val_2.fq.gz
├── lung_1_val_1.fq.gz
└── lung_2_val_2.fq.gz
3 directories, 15 filesYou need to remove existing output folder/files if you want to have a clean output. By default, nextflow will overwrite existing files, and keep all the remaining files in the same specified output directory.
process MULTIQC {
publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
"""
}Challenge
Modify all the processes in rnaseq.nf such that we will have the following output structure
./output
├── gut
│ ├── QC
│ │ ├── fastqc_gut_logs
│ │ │ ├── gut_1_fastqc.html
│ │ │ ├── gut_1_fastqc.zip
│ │ │ ├── gut_2_fastqc.html
│ │ │ └── gut_2_fastqc.zip
│ │ └── gut.html
│ ├── report
│ │ ├── gut_1.fq_trimming_report.txt
│ │ └── gut_2.fq_trimming_report.txt
│ └── trimmed_fastq
│ ├── gut_1_val_1.fq.gz
│ └── gut_2_val_2.fq.gz
├── liver
│ ├── QC
│ │ ├── fastqc_liver_logs
│ │ │ ├── liver_1_fastqc.html
│ │ │ ├── liver_1_fastqc.zip
│ │ │ ├── liver_2_fastqc.html
│ │ │ └── liver_2_fastqc.zip
│ │ └── liver.html
│ ├── report
│ │ ├── liver_1.fq_trimming_report.txt
│ │ └── liver_2.fq_trimming_report.txt
│ └── trimmed_fastq
│ ├── liver_1_val_1.fq.gz
│ └── liver_2_val_2.fq.gz
└── lung
├── QC
│ ├── fastqc_lung_logs
│ │ ├── lung_1_fastqc.html
│ │ ├── lung_1_fastqc.zip
│ │ ├── lung_2_fastqc.html
│ │ └── lung_2_fastqc.zip
│ └── lung.html
├── report
│ ├── lung_1.fq_trimming_report.txt
│ └── lung_2.fq_trimming_report.txt
└── trimmed_fastq
├── lung_1_val_1.fq.gz
└── lung_2_val_2.fq.gz
15 directories, 27 filesprocess FASTQC {
publishDir "$params.outdir/$sample_id/QC", mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
input:
tuple val(sample_id), path(reads)
output:
path "fastqc_${sample_id}_logs"
script:
"""
mkdir fastqc_${sample_id}_logs
fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
"""
}
process MULTIQC {
//publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
publishDir "$params.outdir/$quantification/QC", mode:'copy'
container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
input:
path quantification
path fastqc
output:
path "*.html"
script:
"""
multiqc . --filename $quantification
"""
}
process TRIMGALORE {
container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
publishDir "${params.outdir}/${sample_id}/report", mode: "copy", pattern:"*report.txt"
publishDir "${params.outdir}/${sample_id}/trimmed_fastq", mode: "copy", pattern:"*fq.gz"
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
tuple val(sample_id), path("*report.txt") , emit: log , optional: true
tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
tuple val(sample_id), path("*.html") , emit: html , optional: true
tuple val(sample_id), path("*.zip") , emit: zip , optional: true
script:
"""
trim_galore \\
--paired \\
--gzip \\
${reads[0]} \\
${reads[1]}
"""
}8.2 Scatter
The scatter operation involves distributing large input data into smaller chunks that can be analysed across multiple processes in parallel.
One very simple example of native scatter is how nextflow handles Channel factories with the Channel.fromPath or Channel.fromFilePairs method, where multiple input data is processed in parallel.
params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
reads_ch = Channel.fromFilePairs("$params.reads")
include { FASTQC as FASTQC_one } from './modules.nf'
workflow {
fastqc_ch = FASTQC_one(reads_ch)
}From the above snippet from our rnaseq.nf, we will get three execution of FASTQC_one for each pairs of our input data.
Other than natively splitting execution by input data, Nextflow also provides operators to scatter existing input data for various benefits, such as faster processing. For example:
- splitText
- splitFasta
- splitFastq
- map with from or fromList
- flatten
8.2.1 Process per file chunk
Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process count_line {
debug true
input:
file x
script:
"""
wc -l $x
"""
}
workflow {
Channel.fromPath(params.infile) \
| splitText(by: params.size, file: true) \
| count_line
}Exercise
params.infile = "/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_{1,2}.fq"
params.size = 1000
workflow {
Channel.fromFilePairs(params.infile, flat: true) \
| splitFastq(by: params.size, pe: true, file: true) \
| view()
}8.2.1 Process per file range
Exercise
Channel.from(1..22) \
| map { chr -> ["sample${chr}", file("${chr}.indels.vcf"), file("${chr}.vcf")] } \
| view()>> nextflow run test_scatter.nf
[sample1, /scratch/users/${users}/1.indels.vcf, /scratch/users/${users}/1.vcf]
[sample2, /scratch/users/${users}/2.indels.vcf, /scratch/users/${users}/2.vcf]
[sample3, /scratch/users/${users}/3.indels.vcf, /scratch/users/${users}/3.vcf]
[sample4, /scratch/users/${users}/4.indels.vcf, /scratch/users/${users}/4.vcf]
[sample5, /scratch/users/${users}/5.indels.vcf, /scratch/users/${users}/5.vcf]
[sample6, /scratch/users/${users}/6.indels.vcf, /scratch/users/${users}/6.vcf]
[sample7, /scratch/users/${users}/7.indels.vcf, /scratch/users/${users}/7.vcf]
[sample8, /scratch/users/${users}/8.indels.vcf, /scratch/users/${users}/8.vcf]
[sample9, /scratch/users/${users}/9.indels.vcf, /scratch/users/${users}/9.vcf]
[sample10, /scratch/users${users}/10.indels.vcf, /scratch/users${users}/10.vcf]
[sample11, /scratch/users${users}/11.indels.vcf, /scratch/users${users}/11.vcf]
[sample12, /scratch/users${users}/12.indels.vcf, /scratch/users${users}/12.vcf]
[sample13, /scratch/users${users}/13.indels.vcf, /scratch/users${users}/13.vcf]
[sample14, /scratch/users${users}/14.indels.vcf, /scratch/users${users}/14.vcf]
[sample15, /scratch/users${users}/15.indels.vcf, /scratch/users${users}/15.vcf]
[sample16, /scratch/users${users}/16.indels.vcf, /scratch/users${users}/16.vcf]
[sample17, /scratch/users${users}/17.indels.vcf, /scratch/users${users}/17.vcf]
[sample18, /scratch/users${users}/18.indels.vcf, /scratch/users${users}/18.vcf]
[sample19, /scratch/users${users}/19.indels.vcf, /scratch/users${users}/19.vcf]
[sample20, /scratch/users${users}/20.indels.vcf, /scratch/users${users}/20.vcf]
[sample21, /scratch/users${users}/21.indels.vcf, /scratch/users${users}/21.vcf]
[sample22, /scratch/users${users}/22.indels.vcf, /scratch/users${users}/22.vcf]Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process split_bed_by_chr {
debug true
input:
path bed
val chr
output:
path "*.bed"
script:
"""
grep ^${chr}\t ${bed} > ${chr}.bed
"""
}
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22)) | view()
}
Challenge
How do we include chr X and Y into the above split by chromosome?
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | view()
}8.3 Gather
The gather operation consolidates results from parallel computations (can be from scatter) into a centralized process for aggregation and further processing.
Some of the Nextflow provided operators that facilitate this gather operation, include:
8.3.1. Process all outputs altogether
Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process split_bed_by_chr {
debug true
input:
path bed
val chr
output:
path "*.bed"
script:
"""
grep ^${chr}\t ${bed} > ${chr}.bed
"""
}
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collect | view()
}8.3.2. Collect outputs into a file
Exercise
params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
params.size = 100000
process split_bed_by_chr {
debug true
input:
path bed
val chr
output:
path "*.bed"
script:
"""
grep ^${chr}\t ${bed} > ${chr}.bed
"""
}
workflow {
split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collectFile(name: 'merged.bed', newLine:true) | view()
}Exercise
workflow {
Channel.fromPath("/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_1.fq", checkIfExists: true) \
| collectFile(name: 'combined_1.fq', newLine:true) \
| view
}