Building With Resume

A guide to using the resume function in Nextflow to modularly build pipelines
Nextflow
Bioinformatics
Author

LW Pembleton

Published

November 13, 2024

~Photo by James Wainscoat on Unsplash~

Build With Resume

The resume feature has to be one of the most powerful tools in Nextflow. It not only lets you recover an interrupted pipeline without redoing the whole thing, but it’s also super handy for writing and testing pipelines. “Build with resume” is my go-to approach for developing pipelines. I build them one process at a time, check channel outputs and published files, then add the next process and run it again using the -resume flag. This way, I can pick up from the last process instead of starting from scratch every time, and progressively add 🏗️ and check ✅ each process.

So, I thought I’d share a step-by-step example for anyone interested. You can find all the files in my GitHub repository.

Rather than using some irrelevant “Hello World” 👋 example, I’m going with a trimmed-down read mapping 🧬 and variant-calling pipeline. Hopefully, this feels a bit more meaningful!

Note

This example assumes you already have Docker and Nextflow set up on your system.

Assuming you have downloaded the GitHub repo you will have the relevant process modules and associated config file. What we are going to therefore focus on is the workflow. So lets get going 🛫

Step 1: Read in Input Files

We’ll start by reading a CSV samplesheet to define our input. When I first starting learning Nextflow I really struggled with the fact that all examples would just scan directories for FASTQ files. It just didn’t compute 🤔 for me — I have always use samplesheets or lists as my input. Seriously, how often are all your samples in one folder, and don’t we usually want some metadata with each sample? So, for those out there like me let’s go with a CSV samplesheet format (just like most mature Nextflow pipelines out there). We’ll use the splitCsv operator and map to split each row into a tuple containing the sample ID and read file paths, then set that as a channel called input_reads.

Here’s what it looks like 👇

main.nf
// Check mandatory parameters
if (params.input) { csv_file = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
if (params.reference == null) error "Please specify a reference genome fasta file with --reference"

def reference =  file(params.reference)

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    IMPORT LOCAL MODULES
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/

include { FASTP } from './modules/local/fastp'
include { SAMTOOLS_FAIDX } from './modules/local/samtools_faidx'
include { BWA_INDEX } from './modules/local/bwa_index'
include { BWA_MEM } from './modules/local/bwa_mem'
include { BCFTOOLS_CALL } from './modules/local/bcftools_call'


workflow {

    // Read in samplesheet and convert into input sample channel
    Channel.fromPath(params.input) \
        | splitCsv(header:true) \
        | map { row-> tuple(row.sample_id, file(row.fastq_1), file(row.fastq_2)) } \
        | set {input_reads}

}

Now, we could run this first step, but we wouldn’t see any output. Let’s spice it up and add a view() statement to check out 👀 our new input_reads channel:

main.nf
// Check mandatory parameters
if (params.input) { csv_file = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
if (params.reference == null) error "Please specify a reference genome fasta file with --reference"

def reference =  file(params.reference)

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    IMPORT LOCAL MODULES
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/

include { FASTP } from './modules/local/fastp'
include { SAMTOOLS_FAIDX } from './modules/local/samtools_faidx'
include { BWA_INDEX } from './modules/local/bwa_index'
include { BWA_MEM } from './modules/local/bwa_mem'
include { BCFTOOLS_CALL } from './modules/local/bcftools_call'


workflow {

    // Read in samplesheet and convert into input sample channel
    Channel.fromPath(params.input) \
        | splitCsv(header:true) \
        | map { row-> tuple(row.sample_id, file(row.fastq_1), file(row.fastq_2)) } \
        | set {input_reads}
  
  input_reads.view()

}

Let’s run it with the included samplesheet and example files 👇

nextflow run main.nf --input docs/example_data/samplesheet.csv --reference ./docs/example_data/GCF_020520425.1_BTI_SOV_V1_chr1-2_genomic_50Mb_trimmed.fna

Assuming everything worked, you should see your input_reads channel printed to the console—along with the colourful nextflow console output (thanks, Phil! 🥳).

Step 2: Add the First Process

Now, let’s add our first process: quality control (QC) with fastp to preprocess the reads.

main.nf
// Check mandatory parameters
if (params.input) { csv_file = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
if (params.reference == null) error "Please specify a reference genome fasta file with --reference"

def reference =  file(params.reference)

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    IMPORT LOCAL MODULES
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/

include { FASTP } from './modules/local/fastp'
include { SAMTOOLS_FAIDX } from './modules/local/samtools_faidx'
include { BWA_INDEX } from './modules/local/bwa_index'
include { BWA_MEM } from './modules/local/bwa_mem'
include { BCFTOOLS_CALL } from './modules/local/bcftools_call'


workflow {

    // Read in samplesheet and convert into input sample channel
    Channel.fromPath(params.input) \
        | splitCsv(header:true) \
        | map { row-> tuple(row.sample_id, file(row.fastq_1), file(row.fastq_2)) } \
        | set {input_reads}
  
  //input_reads.view()
  
  // QC preprocessing of reads
    FASTP(input_reads)
    FASTP.out.reads.view()

}

Alright and now lets get into the magic 🪄 of resume. Instead of re-running everything (admittedly it was only a small channel factory process) lets resume from where we left off.

nextflow run main.nf --input docs/example_data/samplesheet.csv --reference ./docs/example_data/GCF_020520425.1_BTI_SOV_V1_chr1-2_genomic_50Mb_trimmed.fna -resume

Success the fastp process has been added to our pipeline 🥳

Step 3: Map Reads to Reference

Next, let’s map the reads to the reference sequence with bwa-mem, but we’ll need to index the reference first. We’ll add the BWA_INDEX and BWA_MEM processes for this.

main.nf
// Check mandatory parameters
if (params.input) { csv_file = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
if (params.reference == null) error "Please specify a reference genome fasta file with --reference"

def reference =  file(params.reference)

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    IMPORT LOCAL MODULES
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/

include { FASTP } from './modules/local/fastp'
include { SAMTOOLS_FAIDX } from './modules/local/samtools_faidx'
include { BWA_INDEX } from './modules/local/bwa_index'
include { BWA_MEM } from './modules/local/bwa_mem'
include { BCFTOOLS_CALL } from './modules/local/bcftools_call'


workflow {

    // Read in samplesheet and convert into input sample channel
    Channel.fromPath(params.input) \
        | splitCsv(header:true) \
        | map { row-> tuple(row.sample_id, file(row.fastq_1), file(row.fastq_2)) } \
        | set {input_reads}
  
  //input_reads.view()
  
  // QC preprocessing of reads
    FASTP(input_reads)
    //FASTP.out.reads.view()
    
    // Create reference index for BWA
    BWA_INDEX(reference)

    // Map reads to reference
    BWA_MEM(FASTP.out.reads, BWA_INDEX.out.index)
    BWA_MEM.out.bam.view()


}

Run the pipeline again with the -resume statement 👇

nextflow run main.nf --input docs/example_data/samplesheet.csv --reference ./docs/example_data/GCF_020520425.1_BTI_SOV_V1_chr1-2_genomic_50Mb_trimmed.fna -resume

In the console output you should see that it lists the previous FASTP process as cached.

Step 4: Call Variants

Time for variant calling! We’ll use a bcftools process and index our reference again with samtools.

Home stretch now, time to call variants! For this we will use a bcftools process, but we will also need to index our reference again, this time with samtools.

main.nf
// Check mandatory parameters
if (params.input) { csv_file = file(params.input) } else { exit 1, 'Input samplesheet not specified!' }
if (params.reference == null) error "Please specify a reference genome fasta file with --reference"

def reference =  file(params.reference)

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    IMPORT LOCAL MODULES
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/

include { FASTP } from './modules/local/fastp'
include { SAMTOOLS_FAIDX } from './modules/local/samtools_faidx'
include { BWA_INDEX } from './modules/local/bwa_index'
include { BWA_MEM } from './modules/local/bwa_mem'
include { BCFTOOLS_CALL } from './modules/local/bcftools_call'


workflow {

    // Read in samplesheet and convert into input sample channel
    Channel.fromPath(params.input) \
        | splitCsv(header:true) \
        | map { row-> tuple(row.sample_id, file(row.fastq_1), file(row.fastq_2)) } \
        | set {input_reads}
  
  //input_reads.view()
  
  // QC preprocessing of reads
    FASTP(input_reads)
    //FASTP.out.reads.view()
    
    // Create reference index for BWA
    BWA_INDEX(reference)

    // Map reads to reference
    BWA_MEM(FASTP.out.reads, BWA_INDEX.out.index)
    //BWA_MEM.out.bam.view()

  // Create reference index for bcftools
    SAMTOOLS_FAIDX(reference)
    

    //BWA_MEM.out.bam.map { tuple -> tuple[1] }.collect().view()
    // Call variants
    BCFTOOLS_CALL(BWA_MEM.out.bam.map { tuple -> tuple[1] }.collect(), BWA_MEM.out.bai.map { tuple -> tuple[1] }.collect(), reference, SAMTOOLS_FAIDX.out.fai)


}

Curious 🤔 about the map operator here? It gathers the relevant BAM and BAI components from the output tuples of multiple samples into input channels for BCFTOOLS_CALL. You can uncomment BWA_MEM.out.bam.view() and BWA_MEM.out.bam.map { tuple -> tuple[1] }.collect().view() to compare the outputs.

Tip

If you ever accidentally run your pipeline without the -resume flag and get that dreaded feeling that you have just lost all the progress on previous processes. No need to worry you can run it again with the -resume flag and and the session ID of the previous run you want to resume from.

e.g. nextflow run main.nf -resume <session-id>

Don’t know what the session ID is, just run nextflow log

Hopefully, this example of modularly building a Nextflow pipeline and using the -resume feature has been helpful! It’s a game-changer for big pipelines or when testing with larger datasets where processes take longer.

Alright Nextflow champions, Happy developing, coding, hacking, or whatever you like to call it!

Note

The example sequencing data found in the related repository and used in this example is from Spinacia oleracea NCBI bioproject [PRJNA724923](https://www.ncbi.nlm.nih.gov/bioproject/PRJNA724923). To keep it small I pre-aligned the samples to chromosome 1 & 2 and then subsetted the mapped reads to form the example fastq files and example genome reference.