From cdca40801c8ef625983db442b5b788b11b05851d Mon Sep 17 00:00:00 2001 From: Ryan Collins Date: Sat, 11 Feb 2023 07:54:32 -0500 Subject: [PATCH] Final update before PR --- wdl/FilterOutlierSamples.wdl | 2 + wdl/IdentifyOutlierSamples.wdl | 110 +++++++++++++++++++++++++++------ 2 files changed, 93 insertions(+), 19 deletions(-) diff --git a/wdl/FilterOutlierSamples.wdl b/wdl/FilterOutlierSamples.wdl index 93be211e6..3c54fce5d 100644 --- a/wdl/FilterOutlierSamples.wdl +++ b/wdl/FilterOutlierSamples.wdl @@ -18,6 +18,7 @@ workflow FilterOutlierSamples { Boolean plot_counts = false Array[String]? sample_subset_prefixes # if provided, will identify outliers separately within each subset Array[String]? sample_subset_lists # if provided, will identify outliers separately within each subset + Int samples_per_shard = 5000 String sv_pipeline_docker String sv_base_mini_docker String linux_docker @@ -49,6 +50,7 @@ workflow FilterOutlierSamples { bcftools_preprocessing_options = bcftools_preprocessing_options, plot_counts = plot_counts, sample_subsets = sample_subsets, + samples_per_shard = samples_per_shard, sv_pipeline_docker = sv_pipeline_docker, sv_base_mini_docker = sv_base_mini_docker, linux_docker = linux_docker, diff --git a/wdl/IdentifyOutlierSamples.wdl b/wdl/IdentifyOutlierSamples.wdl index 83cbe10d5..273670023 100644 --- a/wdl/IdentifyOutlierSamples.wdl +++ b/wdl/IdentifyOutlierSamples.wdl @@ -3,6 +3,7 @@ version 1.0 import "Structs.wdl" import "PlotSVCountsPerSample.wdl" as plot_svcounts import "Utils.wdl" as util +import "TasksMakeCohortVcf.wdl" as cohort_utils import "CollectQcVcfWide.wdl" as qc_utils import "FilterOutlierSamplesPostMinGQ.wdl" as legacy @@ -19,6 +20,7 @@ workflow IdentifyOutlierSamples { String? bcftools_preprocessing_options Boolean plot_counts = false Array[Pair[String, File]]? sample_subsets # if provided, will identify outliers separately within each subset. Expected format is array of pairs, where pair.left is the subset name and pair.right is a text file with all relevant sample IDs + Int samples_per_shard = 5000 String sv_pipeline_docker String sv_base_mini_docker String linux_docker @@ -34,15 +36,14 @@ workflow IdentifyOutlierSamples { String prefix = if (defined(vcf_identifier)) then "~{name}_~{vcf_identifier}" else name - if (!defined(sample_subsets)) { - call util.GetSampleIdsFromVcf as GetSamplesList { - input: - vcf = vcfs[0], - sv_base_mini_docker = sv_base_mini_docker, - runtime_attr_override = runtime_attr_ids_from_vcf - } + call util.GetSampleIdsFromVcf as GetSamplesList { + input: + vcf = vcfs[0], + sv_base_mini_docker = sv_base_mini_docker, + runtime_attr_override = runtime_attr_ids_from_vcf } - Array[Pair[String, File]] subsets_to_eval = select_first([sample_subsets, [("ALL", select_all([GetSamplesList.out_file]))]]) + Array[Pair[String, File]] default_subsets = [("ALL", GetSamplesList.out_file)] + Array[Pair[String, File]] subsets_to_eval = select_first([sample_subsets, default_subsets]) # Collect SV counts for each VCF in parallel unless sv_counts is provided if (!defined(sv_counts)) { @@ -71,16 +72,41 @@ workflow IdentifyOutlierSamples { } } - # Combine counts across all VCFs - call legacy.CombineCounts as Combine { + # Combine counts across all VCFs (scattered over sample chunks) + call cohort_utils.SplitUncompressed as ShardSamples { input: - svcounts = CountPerVcf.sv_counts, - prefix = prefix, - sv_pipeline_docker = sv_pipeline_docker, - runtime_attr_override = runtime_attr_combine_counts + whole_file = GetSamplesList.out_file, + lines_per_shard = samples_per_shard, + shard_prefix = prefix, + shuffle_file = true, + random_seed = 2023, + sv_pipeline_docker = sv_pipeline_docker + } + scatter ( sample_shard in ShardSamples.shards ) { + call SubsetCounts as SubsetPreCombine { + input: + svcounts = CountPerVcf.sv_counts, + samples_list = sample_shard, + outfile = "${prefix}.shard.counts.tsv", + linux_docker = linux_docker, + runtime_attr_override = runtime_attr_subset_counts + } + call legacy.CombineCounts as CombineShard { + input: + svcounts = [SubsetPreCombine.counts_subset], + prefix = prefix, + sv_pipeline_docker = sv_pipeline_docker, + runtime_attr_override = runtime_attr_combine_counts + } + } + call CatCounts as Combine { + input: + svcounts = CombineShard.summed_svcounts, + outfile = "${prefix}.merged.counts.tsv", + linux_docker = linux_docker } } - File final_counts = select_first([sv_counts, Combine.summed_svcounts]) + File final_counts = select_first([sv_counts, Combine.merged_counts]) # If a precomputed outlier table is provided, directly apply those cutoffs if (defined(outlier_cutoff_table)) { @@ -101,7 +127,7 @@ workflow IdentifyOutlierSamples { scatter ( subset_info in subsets_to_eval ) { call SubsetCounts { input: - svcounts = final_counts, + svcounts = [final_counts], samples_list = subset_info.right, outfile = "${prefix}.${subset_info.left}.counts.tsv", linux_docker = linux_docker, @@ -249,7 +275,7 @@ task IdentifyOutliersByCutoffTable { # Restrict a file of SV counts per sample to a subset of samples task SubsetCounts { input { - File svcounts + Array[File] svcounts File samples_list String outfile String linux_docker @@ -273,8 +299,10 @@ task SubsetCounts { command <<< set -euo pipefail - head -n1 ~{svcounts} > ~{outfile} - fgrep -wf ~{samples_list} ~{svcounts} >> ~{outfile} + head -n1 ~{svcounts[0]} > ~{outfile} + for file in ~{sep=" " svcounts}; do + fgrep -wf ~{samples_list} "$file" >> ~{outfile} + done >>> @@ -290,6 +318,50 @@ task SubsetCounts { } +# Naive concatenation of multiple counts files while accounting for header +task CatCounts { + input { + Array[File] svcounts + String outfile + String linux_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: 10, + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File merged_counts = "${outfile}" + } + + command <<< + + set -euo pipefail + head -n1 ~{svcounts[0]} > ~{outfile} + for file in ~{sep=" " svcounts}; do + cat "$file" | sed '1d' >> ~{outfile} + done + + >>> + + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: linux_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } + +} # Merge outlier sample lists across algorithms task CatOutliers {