diff --git a/src/configs/queue-basecalling.config b/src/configs/queue-basecalling.config new file mode 100644 index 0000000000000000000000000000000000000000..d4e6df1ae52a63d6ad0aac0e4d6b26b55960f8d4 --- /dev/null +++ b/src/configs/queue-basecalling.config @@ -0,0 +1 @@ +params.queue_size = 1 \ No newline at end of file diff --git a/src/configs/queue-default.config b/src/configs/queue-default.config new file mode 100644 index 0000000000000000000000000000000000000000..90330102f5cd6bbae4206dab8a9429f65dd27677 --- /dev/null +++ b/src/configs/queue-default.config @@ -0,0 +1 @@ +params.queue_size = 5 \ No newline at end of file diff --git a/src/main.nf b/src/main.nf index e0a983f8e20e276b1d5a7be9892be70fa1a83b45..06e3012936f7326cf82f1f293c385b296d3c4265 100755 --- a/src/main.nf +++ b/src/main.nf @@ -1,95 +1,92 @@ -// Make this pipeline a nextflow 2 implementation -nextflow.enable.dsl=2 - -if (params.step.toString() == "1") { - log.info """ -================================================================= -STEP 1 - OXFORD NANOPORE DNA SEQUENCING BASECALLING AND ALIGNMENT -================================================================= -basecall files path containing : ${params.basecall_path} -basecall speed (basecall only) : ${params.basecall_speed} -basecall modifications (basecall only) : ${params.basecall_mods} -basecall config : ${params.basecall_config} -basecall read trimming option : ${params.basecall_trim} -basecall quality score threshold for basecalling : ${params.qscore_thresh} -basecall demultiplexing : ${params.basecall_demux} -trim barcodes during demultiplexing : ${params.trim_barcode} -submission output file prefix : ${params.prefix} -GPU device for submission : ${params.gpu_devices} -Output directory : ${params.out_dir} -================================================================= -""" -} else if (params.step.toString() == "2_from_step_1" || params.step.toString() == "2_from_minknow") { - log.info """ -====================================== -STEP 2 - FILTERING AND QUALITY CONTROL -====================================== -Input directory (output dir from step 1) : ${params.steps_2_and_3_input_directory} -Basecall quality score threshold : ${params.qscore_thresh} -MAPQ filtering threshold : ${params.mapq} -Min number of mapped reads per sample/barcode : ${params.min_mapped_reads_thresh} -BAM files barcoded? : ${params.is_barcoded} -====================================== -""" -} else if (params.step.toString() == "3") { - log.info """ -=============================================== -STEP 3 - METHYLATION CALLING AND MULTIQC REPORT -=============================================== -Input directory (input dir from step 2) : ${params.steps_2_and_3_input_directory} -MultiQC configuration file : ${params.multiqc_config} -=============================================== -""" -} else { - println "ERROR: You must set parameter --step to '1' or '2_from_step_1' or '2_from_minknow' or '3'. Please refer to documentation at: https://github.com/bernardo-heberle/DCNL_NANOPORE_PIPELINE" - System.exit(1) -} - -// Import Workflows +// Import sub-workflows include {BASECALLING} from './sub_workflows/BASECALLING' include {FILTERING_AND_QC_FROM_STEP_1} from './sub_workflows/FILTERING_AND_QC_FROM_STEP_1.nf' include {FILTERING_AND_QC_FROM_MINKNOW} from './sub_workflows/FILTERING_AND_QC_FROM_MINKNOW.nf' include {MODKIT_AND_MULTIQC} from './sub_workflows/MODKIT_AND_MULTIQC.nf' -// Define initial files and channels -if (params.step.toString() == "1") { - if (params.prefix == "None") { - fast5_path = Channel.fromPath("${params.basecall_path}/**.fast5").map{file -> tuple(file.parent.toString().split("/")[-3] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() - pod5_path = Channel.fromPath("${params.basecall_path}/**.pod5").map{file -> tuple(file.parent.toString().split("/")[-3] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() +// Main workflow logic +workflow { + // Log execution parameters + if (params.step.toString() == "1") { + log.info """ + ================================================================= + STEP 1 - OXFORD NANOPORE DNA SEQUENCING BASECALLING AND ALIGNMENT + ================================================================= + basecall files path containing : ${params.basecall_path} + basecall speed (basecall only) : ${params.basecall_speed} + basecall modifications (basecall only) : ${params.basecall_mods} + basecall config : ${params.basecall_config} + basecall read trimming option : ${params.basecall_trim} + basecall quality score threshold for basecalling : ${params.qscore_thresh} + basecall demultiplexing : ${params.basecall_demux} + trim barcodes during demultiplexing : ${params.trim_barcode} + submission output file prefix : ${params.prefix} + GPU device for submission : ${params.gpu_devices} + Output directory : ${params.out_dir} + ================================================================= + """ + } else if (params.step.toString() == "2_from_step_1" || params.step.toString() == "2_from_minknow") { + log.info """ + ====================================== + STEP 2 - FILTERING AND QUALITY CONTROL + ====================================== + Input directory (output dir from step 1) : ${params.steps_2_and_3_input_directory} + Basecall quality score threshold : ${params.qscore_thresh} + MAPQ filtering threshold : ${params.mapq} + Min number of mapped reads per sample/barcode : ${params.min_mapped_reads_thresh} + BAM files barcoded? : ${params.is_barcoded} + ====================================== + """ + } else if (params.step.toString() == "3") { + log.info """ + =============================================== + STEP 3 - METHYLATION CALLING AND MULTIQC REPORT + =============================================== + Input directory (input dir from step 2) : ${params.steps_2_and_3_input_directory} + MultiQC configuration file : ${params.multiqc_config} + =============================================== + """ } else { - fast5_path = Channel.fromPath("${params.basecall_path}/**.fast5").map{file -> tuple("${params.prefix}_" + file.parent.toString().split("/")[-2] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() - pod5_path = Channel.fromPath("${params.basecall_path}/**.pod5").map{file -> tuple("${params.prefix}_" + file.parent.toString().split("/")[-2] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() + println "ERROR: You must set parameter --step to '1' or '2_from_step_1' or '2_from_minknow' or '3'. Please refer to documentation at: https://github.com/bernardo-heberle/DCNL_NANOPORE_PIPELINE" + System.exit(1) } - ref = file(params.ref) - quality_score = Channel.value(params.qscore_thresh) - basecall_speed = Channel.value(params.basecall_speed) - basecall_mods = Channel.value(params.basecall_mods) - basecall_config = Channel.value(params.basecall_config) - basecall_trim = Channel.value(params.basecall_trim) - basecall_compute = Channel.value(params.basecall_compute) - trim_barcode = Channel.value(params.trim_barcode) - devices = Channel.value(params.gpu_devices) -} else if (params.step.toString() == "2_from_step_1") { - total_bams = Channel.fromPath("${params.steps_2_and_3_input_directory}/basecalling_output/*.bam").map {file -> tuple(file.baseName, file) }.toSortedList( { a, b -> a[0] <=> b[0] } ).flatten().buffer(size:2) - txts = Channel.fromPath("${params.steps_2_and_3_input_directory}/basecalling_output/*.txt").toSortedList( { a, b -> a.baseName <=> b.baseName } ).flatten() - mapq = Channel.value(params.mapq) - quality_score = Channel.value(params.qscore_thresh) -} else if (params.step.toString() == "2_from_minknow") { - input_dir = Channel.fromPath("${params.steps_2_and_3_input_directory}/") - mapq = Channel.value(params.mapq) - quality_score = Channel.value(params.qscore_thresh) -} else if (params.step.toString() == "3") { - filtered_bams = Channel.fromPath("${params.steps_2_and_3_input_directory}/bam_filtering/*-Filtered*.bam").map {file -> tuple(file.baseName, file) }.toSortedList( { a, b -> a[0] <=> b[0] } ).flatten().buffer(size:2) - filtered_bais = Channel.fromPath("${params.steps_2_and_3_input_directory}/bam_filtering/*-Filtered*.bam.bai").toSortedList( { a, b -> a.baseName <=> b.baseName } ).flatten() - num_reads = Channel.fromPath("${params.steps_2_and_3_input_directory}/intermediate_qc_reports/number_of_reads/*") - read_length = Channel.fromPath("${params.steps_2_and_3_input_directory}/intermediate_qc_reports/read_length/*") - quality_thresholds = Channel.fromPath("${params.steps_2_and_3_input_directory}/intermediate_qc_reports/quality_score_thresholds/*") - multiqc_config = Channel.fromPath(params.multiqc_config) - multiqc_input = Channel.fromPath("${params.steps_2_and_3_input_directory}/multiqc_input/**", type: "file") -} - -// Main logic -workflow { + // Set initial files and channels + if (params.step.toString() == "1") { + if (params.prefix == "None") { + fast5_path = Channel.fromPath("${params.basecall_path}/**.fast5").map{file -> tuple(file.parent.toString().split("/")[-3] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() + pod5_path = Channel.fromPath("${params.basecall_path}/**.pod5").map{file -> tuple(file.parent.toString().split("/")[-3] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() + } else { + fast5_path = Channel.fromPath("${params.basecall_path}/**.fast5").map{file -> tuple("${params.prefix}_" + file.parent.toString().split("/")[-2] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() + pod5_path = Channel.fromPath("${params.basecall_path}/**.pod5").map{file -> tuple("${params.prefix}_" + file.parent.toString().split("/")[-2] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() + } + ref = file(params.ref) + quality_score = Channel.value(params.qscore_thresh) + basecall_speed = Channel.value(params.basecall_speed) + basecall_mods = Channel.value(params.basecall_mods) + basecall_config = Channel.value(params.basecall_config) + basecall_trim = Channel.value(params.basecall_trim) + // basecall_compute = Channel.value(params.basecall_compute) + trim_barcode = Channel.value(params.trim_barcode) + devices = Channel.value(params.gpu_devices) + } else if (params.step.toString() == "2_from_step_1") { + total_bams = Channel.fromPath("${params.steps_2_and_3_input_directory}/basecalling_output/*.bam").map {file -> tuple(file.baseName, file) }.toSortedList( { a, b -> a[0] <=> b[0] } ).flatten().buffer(size:2) + txts = Channel.fromPath("${params.steps_2_and_3_input_directory}/basecalling_output/*.txt").toSortedList( { a, b -> a.baseName <=> b.baseName } ).flatten() + mapq = Channel.value(params.mapq) + quality_score = Channel.value(params.qscore_thresh) + } else if (params.step.toString() == "2_from_minknow") { + input_dir = Channel.fromPath("${params.steps_2_and_3_input_directory}/") + mapq = Channel.value(params.mapq) + quality_score = Channel.value(params.qscore_thresh) + } else if (params.step.toString() == "3") { + filtered_bams = Channel.fromPath("${params.steps_2_and_3_input_directory}/bam_filtering/*-Filtered*.bam").map {file -> tuple(file.baseName, file) }.toSortedList( { a, b -> a[0] <=> b[0] } ).flatten().buffer(size:2) + filtered_bais = Channel.fromPath("${params.steps_2_and_3_input_directory}/bam_filtering/*-Filtered*.bam.bai").toSortedList( { a, b -> a.baseName <=> b.baseName } ).flatten() + num_reads = Channel.fromPath("${params.steps_2_and_3_input_directory}/intermediate_qc_reports/number_of_reads/*") + read_length = Channel.fromPath("${params.steps_2_and_3_input_directory}/intermediate_qc_reports/read_length/*") + quality_thresholds = Channel.fromPath("${params.steps_2_and_3_input_directory}/intermediate_qc_reports/quality_score_thresholds/*") + multiqc_config = Channel.fromPath(params.multiqc_config) + multiqc_input = Channel.fromPath("${params.steps_2_and_3_input_directory}/multiqc_input/**", type: "file") + } + // Run steps if (params.step.toString() == "1") { BASECALLING(pod5_path, fast5_path, basecall_speed, basecall_mods, basecall_config, basecall_trim, quality_score, trim_barcode, devices, ref) } else if (params.step.toString() == "2_from_step_1") { diff --git a/src/nextflow.config b/src/nextflow.config index 3b104aad4ae276d2ccf00a6a157a83e55fa3b0be..4c73a4929bcd3e803111cc62fc3a6ea3f2e5b546 100755 --- a/src/nextflow.config +++ b/src/nextflow.config @@ -3,51 +3,53 @@ // Pipeline parameter default values, can be modified by user when calling // pipeline on command line (e.g. --ont_reads_fq sample_1.fastq) -// Input reference fasta file -params.ref = 'None' -// Step of pipeline to execute -params.step = 'None' -// Output directory for pipeline results -params.out_dir = "output_directory/" -// directory of basecalling data -params.basecall_path = 'None' -// MAPQ filtering threshold for bam files, 0 for no filtering -params.mapq = "10" -// Quality score threshold -params.qscore_thresh = "9" -// Desired basecall speed -params.basecall_speed = "hac" -// Desired basecaller modifications -params.basecall_mods = false -// Threshold for mapped reasds -params.min_mapped_reads_thresh = 500 -// Desired basecall configuration -params.basecall_config = "None" -// Type of read trimming during basecalling ("all", "primers", "adapters", "none") -params.basecall_trim = "none" -// Basecalling demultiplexing -params.basecall_demux = false -// CPU vs GPU basecalling -params.basecall_compute = "gpu" -// Trim barcodes (only counts if demultiplexing is enabled) -params.trim_barcode = "True" -// Add prefix to all output files -params.prefix = "None" -// Which GPU devices to use for basecalling? -params.gpu_devices = "all" -// Previous results -params.steps_2_and_3_input_directory = "None" -// MultiQC config -params.multiqc_config = "None" -// Are the files from MinKNOW barcoded or not -params.is_barcoded = true - -// Set queue size for the executor -if (params.step == 1) { - queue_size = 1 -} else { - queue_size = 5 +params { + // Input reference fasta file + ref = 'None' + // Step of pipeline to execute + step = 'None' + // Output directory for pipeline results + out_dir = "output_directory/" + // directory of basecalling data + basecall_path = 'None' + // MAPQ filtering threshold for bam files, 0 for no filtering + mapq = "10" + // Quality score threshold + qscore_thresh = "9" + // Desired basecall speed + basecall_speed = "hac" + // Desired basecaller modifications + basecall_mods = false + // Threshold for mapped reasds + min_mapped_reads_thresh = 500 + // Desired basecall configuration + basecall_config = "None" + // Type of read trimming during basecalling ("all", "primers", "adapters", "none") + basecall_trim = "none" + // Basecalling demultiplexing + basecall_demux = false + // CPU vs GPU basecalling + basecall_compute = "gpu" + // Trim barcodes (only counts if demultiplexing is enabled) + trim_barcode = "True" + // Add prefix to all output files + prefix = "None" + // Which GPU devices to use for basecalling? + gpu_devices = "all" + // Previous results + steps_2_and_3_input_directory = "None" + // MultiQC config + multiqc_config = "None" + // Are the files from MinKNOW barcoded or not + is_barcoded = true } + +// queue_size depends on the step +includeConfig ({ + if (params.step == 1) { return './configs/queue-basecalling.config' } + else { return './configs/queue-default.config' } +}()) + process { // Define local cpu execution withLabel: cpu { @@ -58,14 +60,15 @@ process { executor='local' containerOptions = '--nv' } - // Define the singularity container for every process - //container = "library://joaochrusciel/nanopore/ont_methylation:2024-10-18" + // Define the container for every process container = "./images/debian-nanopore.sif" } + executor { name = 'local' - queueSize = queue_size + queueSize = params.queue_size } + apptainer { enabled = true pullTimeout = '60m'