diff --git a/README.md b/README.md index 47834be970c7b3e85fedd6048e2b7ff4253f5c18..fbe7ba175fee69f822a77d2f02309b150c4e2e99 100755 --- a/README.md +++ b/README.md @@ -119,6 +119,12 @@ The default values for all parameters are set in `src/nexflow.config`. Please no ``` +```txt +--samtools_threads + + +``` + ### Step 1: Basecalling Many of the parameters for this step are based on dorado basecaller, see their [documentation](https://github.com/nanoporetech/dorado) to understand it better. @@ -189,6 +195,12 @@ Many of the parameters for this step are based on dorado basecaller, see their [ +``` + ### Step 2: Alignment Filtering and Quality Control ```txt @@ -223,6 +235,32 @@ Many of the parameters for this step are based on dorado basecaller, see their [ ``` +> !Note. The following parameters can greatly influence the memory usage. If you're running into `Out of Memory` (OOM) issues, you might want to lower one or more of the default values. Conversely, you can trade memory for cpu overhead by increasing such values. + +```txt +--modkit_threads + + +``` + +```txt +--modkit_extract_qsize + + +``` + +```txt +--modkit_extract_isize + + +``` + +```txt +--modkit_pileup_isize + + +``` + ### Parameter Files The pipeline also supports running [pre-configured parameter files](https://www.nextflow.io/docs/latest/cli.html#pipeline-parameters). The currently supported analyses are under the `parameters/` subdir and can be used via the option `-params-file "./parameters//.yaml"` in `nextflow run`. All such files make assumptions about the type of data to be used and where they are being stored. diff --git a/src/main.nf b/src/main.nf index d38bd5e9a79f76d9d661ec903de132c9c37a42e1..725fff1bc3d4d263664138142732f4a938c68769 100755 --- a/src/main.nf +++ b/src/main.nf @@ -51,6 +51,9 @@ workflow { System.exit(1) } // Set initial files and channels + // always used + samtools_threads = Channel.value(params.samtools_threads) + // step conditionals if (params.step.toString() == "1") { if (params.prefix == "None") { fast5_path = Channel.fromPath("${params.basecall_path}/**.fast5").map{file -> tuple(file.parent.toString().split("/")[-3] + "_" + file.simpleName.split('_')[0] + "_" + file.simpleName.split('_')[-3..-2].join("_"), file) }.groupTuple() @@ -68,6 +71,7 @@ workflow { trimmed_barcodes = Channel.value(params.trimmed_barcodes) gpu_devices = Channel.value(params.gpu_devices) reference_file = file(params.reference_file) + fast5pod5_threads = Channel.value(params.fast5pod5_threads) } else if (params.step.toString() == "2_from_step_1") { bam_files = Channel.fromPath("${params.steps_2_and_3_input_directory}/basecalling_output/*.bam").map {file -> tuple(file.baseName, file) }.toSortedList( { a, b -> a[0] <=> b[0] } ).flatten().buffer(size:2) txt_files = Channel.fromPath("${params.steps_2_and_3_input_directory}/basecalling_output/*.txt").toSortedList( { a, b -> a.baseName <=> b.baseName } ).flatten() @@ -85,15 +89,18 @@ workflow { quality_thresholds = Channel.fromPath("${params.steps_2_and_3_input_directory}/intermediate_qc_reports/quality_score_thresholds/*") multiqc_config = Channel.fromPath(params.multiqc_config) multiqc_input = Channel.fromPath("${params.steps_2_and_3_input_directory}/multiqc_input/**", type: "file") + modkit_threads = Channel.value(params.modkit_threads) + modkit_isize = Channel.value(params.modkit_isize) + modkit_extract_qsize = Channel.value(params.modkit_extract_qsize) } // Run steps if (params.step.toString() == "1") { - BASECALLING(pod5_path, fast5_path, basecall_speed, basecall_mods, basecall_config, basecall_trim, qscore_thresh, barcoding_kit, trimmed_barcodes, gpu_devices, reference_file) + BASECALLING(pod5_path, fast5_path, basecall_speed, basecall_mods, basecall_config, basecall_trim, qscore_thresh, barcoding_kit, trimmed_barcodes, gpu_devices, reference_file, fast5pod5_threads, samtools_threads) } else if (params.step.toString() == "2_from_step_1") { - FILTERING_AND_QC_FROM_STEP_1(bam_files, txt_files, mapq, qscore_thresh) + FILTERING_AND_QC_FROM_STEP_1(bam_files, txt_files, mapq, qscore_thresh, samtools_threads) } else if (params.step.toString() == "2_from_minknow") { - FILTERING_AND_QC_FROM_MINKNOW(input_dir, mapq, qscore_thresh) + FILTERING_AND_QC_FROM_MINKNOW(input_dir, mapq, qscore_thresh, samtools_threads) } else if (params.step.toString()== "3") { - MODKIT_AND_MULTIQC(filtered_bams, filtered_bais, num_reads, read_length, quality_thresholds, multiqc_config, multiqc_input) + MODKIT_AND_MULTIQC(filtered_bams, filtered_bais, num_reads, read_length, quality_thresholds, multiqc_config, multiqc_input, samtools_threads, modkit_threads, modkit_isize, modkit_extract_qsize) } } diff --git a/src/modules/basecall.nf b/src/modules/basecall.nf index a59fb78e23dd716409903a82698300168ba710cf..ffe1b9ef230b1165dfc73c974fe0acd3f263fe29 100755 --- a/src/modules/basecall.nf +++ b/src/modules/basecall.nf @@ -4,6 +4,7 @@ process FAST5_to_POD5 { input: tuple val(id), path(fast5) + val fast5pod5_threads output: tuple val("${id}"), path("*.pod5") @@ -13,7 +14,7 @@ process FAST5_to_POD5 { pod5 convert fast5 *.fast5 \ --output . \ --one-to-one . \ - --threads 12 + --threads ${fast5pod5_threads} """ } @@ -32,6 +33,7 @@ process BASECALL { val trimmed_barcodes val gpu_devices path reference_file + val samtools_threads output: path ("*.bam"), emit: bam @@ -66,7 +68,7 @@ process BASECALL { fi echo "Basecalling completed, sorting bams..." - samtools sort -@ 12 "${id}.bam" -o "${id}_sorted.bam" + samtools sort -@ ${samtools_threads} "${id}.bam" -o "${id}_sorted.bam" rm "${id}.bam" mv "${id}_sorted.bam" "${id}.bam" @@ -82,7 +84,7 @@ process BASECALL { echo "Demultiplexing completed, sorting barcode files..." cd ./demux_data/ for file in *; do - samtools sort -@ 12 "\$file" -o "${id}_\$file" + samtools sort -@ ${samtools_threads} "\$file" -o "${id}_\$file" rm "\$file" done diff --git a/src/modules/calculate_coverage.nf b/src/modules/calculate_coverage.nf index d8802045f1989dc5005f870d41fc0b2074c9460e..2df76961d63efce31c269f5d069bb21519d718ce 100755 --- a/src/modules/calculate_coverage.nf +++ b/src/modules/calculate_coverage.nf @@ -4,6 +4,7 @@ process CALCULATE_COVERAGE { input: tuple val(id), path(bam) path bai + val samtools_threads output: path "*coverage*" @@ -11,7 +12,7 @@ process CALCULATE_COVERAGE { script: """ echo -e "Chromosome\\t${id}" > ${id}.coverage.tsv - samtools depth -a ${bam} | awk ' + samtools --nthreads ${samtools_threads} depth -a ${bam} | awk ' { sum[\$1] += \$3; count[\$1]++; diff --git a/src/modules/convert_input_from_minknow.nf b/src/modules/convert_input_from_minknow.nf index 97f978eaebf1a40389d202bd8e113a0fbe3f22a2..fd508790464e15b589465a7a6159c5a4accc9e47 100755 --- a/src/modules/convert_input_from_minknow.nf +++ b/src/modules/convert_input_from_minknow.nf @@ -4,6 +4,7 @@ process CONVERT_INPUT_FROM_MINKNOW_BARCODED { input: path input + val samtools_threads output: path ("*.bam"), emit: bam @@ -32,7 +33,7 @@ process CONVERT_INPUT_FROM_MINKNOW_BARCODED { # Merge BAM files using samtools barcode_name=\$(basename "\$barcode_dir") output_bam="./\${parent_dir_two_above}_\${barcode_name}.bam" - samtools merge "./\$output_bam" "\${bam_files[@]}" + samtools --nthreads ${samtools_threads} merge "./\$output_bam" "\${bam_files[@]}" # Generate summary with dorado dorado summary "\$output_bam" > "./\${parent_dir_two_above}_\${barcode_name}.txt" fi @@ -48,6 +49,7 @@ process CONVERT_INPUT_FROM_MINKNOW_NOT_BARCODED { input: path input + val samtools_threads output: path ("*.bam"), emit: bam @@ -73,7 +75,7 @@ process CONVERT_INPUT_FROM_MINKNOW_NOT_BARCODED { if [ \${#bam_files[@]} -gt 0 ]; then # Merge BAM files using samtools output_bam="./\${parent_dir}.bam" - samtools merge "./\$output_bam" "\${bam_files[@]}" + samtools --nthreads ${samtools_threads} merge "./\$output_bam" "\${bam_files[@]}" # Generate summary with dorado dorado summary "\$output_bam" > "./\${parent_dir}.txt" fi diff --git a/src/modules/filter_bam.nf b/src/modules/filter_bam.nf index 86d54e196fae064742f8b9af1ad961a2d436454d..0292686b686b0c333b38a573471f17da3ac9163d 100755 --- a/src/modules/filter_bam.nf +++ b/src/modules/filter_bam.nf @@ -7,6 +7,7 @@ process FILTER_BAM { tuple val(id), path(bam) path txt val mapq + val samtools_threads output: env (id), emit: id, optional: true @@ -22,14 +23,14 @@ process FILTER_BAM { script: """ cp "${bam}" "${id}-Unfiltered.bam" - samtools index "${id}-Unfiltered.bam" - samtools flagstat "${id}-Unfiltered.bam" > "${id}-Unfiltered.flagstat" - samtools idxstats "${id}-Unfiltered.bam" > "${id}-Unfiltered.idxstat" - samtools view -b -q ${mapq} -F 2304 -@ 12 ${bam} > "intermediate.bam" - samtools sort -@ 12 "intermediate.bam" -o "${id}-Filtered_primary_mapq_${mapq}.bam" - samtools index "${id}-Filtered_primary_mapq_${mapq}.bam" - samtools flagstat "${id}-Filtered_primary_mapq_${mapq}.bam" > "${id}-Filtered_primary_mapq_${mapq}.flagstat" - samtools idxstats "${id}-Filtered_primary_mapq_${mapq}.bam" > "${id}-Filtered_primary_mapq_${mapq}.idxstat" + samtools --nthreads ${samtools_threads} index "${id}-Unfiltered.bam" + samtools --nthreads ${samtools_threads} flagstat "${id}-Unfiltered.bam" > "${id}-Unfiltered.flagstat" + samtools --nthreads ${samtools_threads} idxstats "${id}-Unfiltered.bam" > "${id}-Unfiltered.idxstat" + samtools view -b -q ${mapq} -F 2304 -@ ${samtools_threads} ${bam} > "intermediate.bam" + samtools sort -@ ${samtools_threads} "intermediate.bam" -o "${id}-Filtered_primary_mapq_${mapq}.bam" + samtools --nthreads ${samtools_threads} index "${id}-Filtered_primary_mapq_${mapq}.bam" + samtools --nthreads ${samtools_threads} flagstat "${id}-Filtered_primary_mapq_${mapq}.bam" > "${id}-Filtered_primary_mapq_${mapq}.flagstat" + samtools --nthreads ${samtools_threads} idxstats "${id}-Filtered_primary_mapq_${mapq}.bam" > "${id}-Filtered_primary_mapq_${mapq}.idxstat" var=\$(awk 'NR==8 {print \$1}' "${id}-Filtered_primary_mapq_${mapq}.flagstat") ## If flagstat file says there are no mapped reads then delete bam and bai files. ## Since we passed optional on the output statement lines the script should still run fine even diff --git a/src/modules/modkit.nf b/src/modules/modkit.nf index d5a1a9da7165b85c6b8f7384be697ae5f4ab7625..d9549373aec63153ac06499c28b6cb393f5075df 100755 --- a/src/modules/modkit.nf +++ b/src/modules/modkit.nf @@ -5,6 +5,9 @@ process MODKIT { input: tuple val(id), path(bam) path bai + val modkit_threads + val modkit_isize + val modkit_extract_qsize output: path "*" @@ -13,13 +16,13 @@ process MODKIT { """ echo "starting modkit" ## Make Methylation TSV file - modkit extract full --bgzf --queue-size 1000 -t 4 -i 10000 --mapped-only "${bam}" "${id}_modkit_output.tsv" + modkit extract full --bgzf --queue-size ${modkit_extract_qsize} --threads ${modkit_threads} --interval-size ${modkit_isize} --mapped-only "${bam}" "${id}_modkit_output.tsv" echo "extract successful" - ## Make Methylation Table - modkit pileup -t 4 -i 10000 --chunk-size 4 "${bam}" "${id}_modkit_pileup.bed" --log-filepath "${id}_modkit_pileup.log" + ## Make Methylation Table (--chunk-size is a multiplier of threads, 1.5x by default) + modkit pileup --threads ${modkit_threads} --interval-size ${modkit_isize} "${bam}" "${id}_modkit_pileup.bed" --log-filepath "${id}_modkit_pileup.log" echo "pileup successful" ## Make Summary - modkit summary -i 10000 ${bam} > "${id}_modkit_summary.txt" + modkit summary --threads ${modkit_threads} --interval-size ${modkit_isize} ${bam} > "${id}_modkit_summary.txt" echo "summary successful" """ } diff --git a/src/nextflow.config b/src/nextflow.config index 41d22397149db2cd972c6cbea2069d6c26aa7051..b8f59cc3097ab5041b91bc43d259df29fe5a3f5d 100755 --- a/src/nextflow.config +++ b/src/nextflow.config @@ -42,6 +42,15 @@ params { multiqc_config = "./references/multiqc_config.yaml" // Are the files from MinKNOW barcoded or not is_barcoded = true + // fast5 to pod5 + fast5pod5_threads = 12 + // samtools + samtools_threads = 12 + // modkit + modkit_threads = 8 + modkit_extract_qsize = 1000 + modkit_extract_isize = 10000 + modkit_pileup_isize = 10000 } // queue_size depends on the step diff --git a/src/sub_workflows/BASECALLING.nf b/src/sub_workflows/BASECALLING.nf index d0da7561ceac238996978022a3be59de33b1bb39..989fe92cf78a1244de8f24d08aafa3f937a843a5 100755 --- a/src/sub_workflows/BASECALLING.nf +++ b/src/sub_workflows/BASECALLING.nf @@ -13,16 +13,17 @@ workflow BASECALLING { trimmed_barcodes gpu_devices reference_file + fast5pod5_threads + samtools_threads main: - FAST5_to_POD5(fast5_path) - pod5_path = FAST5_to_POD5.out.mix(pod5_path) - - BASECALL(pod5_path, basecall_speed, basecall_mods, basecall_config, basecall_trim, qscore_thresh, barcoding_kit, trimmed_barcodes, gpu_devices, reference_file) + FAST5_to_POD5(fast5_path, fast5pod5_threads) + pod5_path = FAST5_to_POD5.out.mix(pod5_path) + BASECALL(pod5_path, basecall_speed, basecall_mods, basecall_config, basecall_trim, qscore_thresh, barcoding_kit, trimmed_barcodes, gpu_devices, reference_file, samtools_threads) bams = BASECALL.out.bam.toSortedList { a, b -> a[0] <=> b[0] }.flatten().buffer(size: 2) txts = BASECALL.out.txt.toSortedList { a, b -> a.baseName <=> b.baseName }.flatten() emit: - bam_files = bams - txt_files = txts + bam_files = bams + txt_files = txts } diff --git a/src/sub_workflows/FILTERING_AND_QC_FROM_MINKNOW.nf b/src/sub_workflows/FILTERING_AND_QC_FROM_MINKNOW.nf index 8746425d75754589c16876e2bf9a54a144b93251..aa1b109f5f6a5595006ed4b39159d798c1cd997d 100755 --- a/src/sub_workflows/FILTERING_AND_QC_FROM_MINKNOW.nf +++ b/src/sub_workflows/FILTERING_AND_QC_FROM_MINKNOW.nf @@ -9,19 +9,21 @@ workflow FILTERING_AND_QC_FROM_MINKNOW { input mapq qscore_thresh + samtools_threads main: if (params.is_barcoded == true) { - CONVERT_INPUT_FROM_MINKNOW_BARCODED(input) + CONVERT_INPUT_FROM_MINKNOW_BARCODED(input, samtools_threads) converted_input = CONVERT_INPUT_FROM_MINKNOW_BARCODED.out } else if (params.is_barcoded == false) { - CONVERT_INPUT_FROM_MINKNOW_NOT_BARCODED(input) + CONVERT_INPUT_FROM_MINKNOW_NOT_BARCODED(input, samtools_threads) converted_input = CONVERT_INPUT_FROM_MINKNOW_NOT_BARCODED.out } FILTER_BAM( converted_input.bam.flatten().map { file -> tuple(file.baseName, file) }.toSortedList { a, b -> a[0] <=> b[0] }.flatten().buffer(size: 2), converted_input.txt.toSortedList { a, b -> a.baseName <=> b.baseName }.flatten(), mapq, + samtools_threads ) PYCOQC_NO_FILTER( FILTER_BAM.out.id, diff --git a/src/sub_workflows/FILTERING_AND_QC_FROM_STEP_1.nf b/src/sub_workflows/FILTERING_AND_QC_FROM_STEP_1.nf index 7edef55d1362d871c655a174e1e7517478a5042e..a069de3b6aabc6edea8bfbb78c409b3152d54ae4 100755 --- a/src/sub_workflows/FILTERING_AND_QC_FROM_STEP_1.nf +++ b/src/sub_workflows/FILTERING_AND_QC_FROM_STEP_1.nf @@ -10,9 +10,10 @@ workflow FILTERING_AND_QC_FROM_STEP_1 { txt_files mapq qscore_thresh + samtools_threads main: - FILTER_BAM(bam_files, txt_files, mapq) + FILTER_BAM(bam_files, txt_files, mapq, samtools_threads) PYCOQC_NO_FILTER( FILTER_BAM.out.id, FILTER_BAM.out.total_bam, diff --git a/src/sub_workflows/MODKIT_AND_MULTIQC.nf b/src/sub_workflows/MODKIT_AND_MULTIQC.nf index 35294c78f20ee2aef614636017fef98b5f09af10..f2c5812c1adf3029e89092de1008ece107135eae 100755 --- a/src/sub_workflows/MODKIT_AND_MULTIQC.nf +++ b/src/sub_workflows/MODKIT_AND_MULTIQC.nf @@ -13,11 +13,15 @@ workflow MODKIT_AND_MULTIQC { quality_thresholds multiqc_config multiqc_input + samtools_threads + modkit_threads + modkit_isize + modkit_extract_qsize main: - CALCULATE_COVERAGE(filtered_bams, filtered_bais) + CALCULATE_COVERAGE(filtered_bams, filtered_bais, samtools_threads) MERGE_COVERAGE(CALCULATE_COVERAGE.out.collect()) MERGE_QC_REPORT(num_reads.collect(), read_length.collect(), quality_thresholds.collect()) MULTIQC(multiqc_input.concat(MERGE_QC_REPORT.out.flatten()).collect(), MERGE_COVERAGE.out.mqc, multiqc_config) - MODKIT(filtered_bams, filtered_bais) + MODKIT(filtered_bams, filtered_bais, modkit_threads, modkit_isize, modkit_extract_qsize) }