From e3d80a77e328714acb308dae8f6ac62390444ef6 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Wed, 31 Jan 2024 17:01:05 -0800 Subject: [PATCH 01/10] Update to GBWTGraph with parallel path cover --- deps/gbwtgraph | 2 +- src/gbwt_helper.cpp | 32 ++++++++++++++++++++++ src/gbwt_helper.hpp | 11 ++++++++ src/index_registry.cpp | 52 ++++++++++++++++++++---------------- src/recombinator.cpp | 13 ++++----- src/subcommand/gbwt_main.cpp | 48 ++++++++++++++++----------------- src/transcriptome.hpp | 6 ++--- test/t/37_vg_gbwt.t | 4 +-- 8 files changed, 108 insertions(+), 60 deletions(-) diff --git a/deps/gbwtgraph b/deps/gbwtgraph index 958dc8d6a00..f5afdd0e8ca 160000 --- a/deps/gbwtgraph +++ b/deps/gbwtgraph @@ -1 +1 @@ -Subproject commit 958dc8d6a003f485bed5c0b841cd99abf284e046 +Subproject commit f5afdd0e8ca3c7d7800d15fc2c8ff4518b2f7bf0 diff --git a/src/gbwt_helper.cpp b/src/gbwt_helper.cpp index cdf59e4e928..bb7b786f7f9 100644 --- a/src/gbwt_helper.cpp +++ b/src/gbwt_helper.cpp @@ -532,6 +532,38 @@ std::string compose_short_path_name(const gbwt::GBWT& gbwt_index, gbwt::size_typ //------------------------------------------------------------------------------ +void copy_reference_samples(const gbwt::GBWT& source, gbwt::GBWT& destination) { + if (source.tags.contains(gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG)) { + // Reference samples tag is not copied automatically. + destination.tags.set( + gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG, + source.tags.get(gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG) + ); + } +} + +void copy_reference_samples(const PathHandleGraph& source, gbwt::GBWT& destination) { + std::vector reference_samples; + source.for_each_path_of_sense(PathSense::REFERENCE, [&](const path_handle_t& path) { + std::string sample_name = source.get_sample_name(path); + if (sample_name != PathMetadata::NO_SAMPLE_NAME) { + reference_samples.push_back(sample_name); + } + }); + + if (!reference_samples.empty()) { + std::string reference_samples_tag = reference_samples.front(); + for (size_t i = 1; i < reference_samples.size(); i++) { + reference_samples_tag += gbwtgraph::REFERENCE_SAMPLE_LIST_SEPARATOR + reference_samples[i]; + } + destination.tags.set( + gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG, reference_samples_tag + ); + } +} + +//------------------------------------------------------------------------------ + gbwt::GBWT get_gbwt(const std::vector& paths) { gbwt::size_type node_width = 1, total_length = 0; for (auto& path : paths) { diff --git a/src/gbwt_helper.hpp b/src/gbwt_helper.hpp index 1d8bf0e5db8..3dec9bcd444 100644 --- a/src/gbwt_helper.hpp +++ b/src/gbwt_helper.hpp @@ -182,6 +182,7 @@ struct RebuildParameters { gbwt::size_type sample_interval = gbwt::DynamicGBWT::SAMPLE_INTERVAL; }; +// TODO: Use the new parallelization scheme. /// Rebuild the GBWT by applying all provided mappings. Each mapping is a pair /// (original subpath, new subpath). If the original subpath is empty, the /// mapping is ignored. If there are multiple applicable mappings, the first one @@ -241,6 +242,16 @@ std::string compose_short_path_name(const gbwt::GBWT& gbwt_index, gbwt::size_typ //------------------------------------------------------------------------------ +/// Copies the reference sample tag from the source GBWT index to the destination GBWT index. +void copy_reference_samples(const gbwt::GBWT& source, gbwt::GBWT& destination); + +/// Copies reference samples from the source graph to the destination GBWT index. +/// Every sample with at least one reference path in the source graph is considered +/// a reference sample. +void copy_reference_samples(const PathHandleGraph& source, gbwt::GBWT& destination); + +//------------------------------------------------------------------------------ + /// Transform the paths into a GBWT index. Primarily for testing. gbwt::GBWT get_gbwt(const std::vector& paths); diff --git a/src/index_registry.cpp b/src/index_registry.cpp index 53f59fa5efb..5d453fa1cf3 100644 --- a/src/index_registry.cpp +++ b/src/index_registry.cpp @@ -2691,15 +2691,18 @@ IndexRegistry VGIndexes::get_vg_index_registry() { std::function path_filter = [&xg_index](const path_handle_t& path) { return !Paths::is_alt(xg_index->get_path_name(path)); }; - - cover = std::move(gbwtgraph::local_haplotypes(*xg_index, *gbwt_index, - IndexingParameters::giraffe_gbwt_downsample, - IndexingParameters::downsample_context_length, - IndexingParameters::gbwt_insert_batch_size, - IndexingParameters::gbwt_sampling_interval, - true, // Also include named paths from the graph - &path_filter, - IndexingParameters::verbosity >= IndexingParameters::Debug)); + + // TODO: Determine the number of parallel jobs. + gbwtgraph::PathCoverParameters parameters; + parameters.num_paths = IndexingParameters::path_cover_depth; + parameters.context = IndexingParameters::downsample_context_length; + // TODO: See path cover for handling graphs with large components. + parameters.batch_size = IndexingParameters::gbwt_insert_batch_size; + parameters.sample_interval = IndexingParameters::gbwt_sampling_interval; + parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug); + cover = std::move(gbwtgraph::local_haplotypes(*xg_index, *gbwt_index, parameters, true, &path_filter)); + // Reference samples tag is not copied automatically. + copy_reference_samples(*gbwt_index, cover); } else { // Augment the GBWT with a path cover of components without haplotypes. @@ -2709,12 +2712,13 @@ IndexRegistry VGIndexes::get_vg_index_registry() { gbwt::DynamicGBWT dynamic_index(*gbwt_index); gbwt_index.reset(); - gbwtgraph::augment_gbwt(*xg_index, dynamic_index, - IndexingParameters::path_cover_depth, - IndexingParameters::downsample_context_length, - IndexingParameters::gbwt_insert_batch_size, - IndexingParameters::gbwt_sampling_interval, - IndexingParameters::verbosity >= IndexingParameters::Debug); + gbwtgraph::PathCoverParameters parameters; + parameters.num_paths = IndexingParameters::path_cover_depth; + parameters.context = IndexingParameters::downsample_context_length; + parameters.batch_size = IndexingParameters::gbwt_insert_batch_size; + parameters.sample_interval = IndexingParameters::gbwt_sampling_interval; + parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug); + gbwtgraph::augment_gbwt(*xg_index, dynamic_index, parameters); cover = std::move(gbwt::GBWT(dynamic_index)); } @@ -2764,14 +2768,16 @@ IndexRegistry VGIndexes::get_vg_index_registry() { }; // make a GBWT from a greedy path cover - gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*xg_index, - IndexingParameters::path_cover_depth, - IndexingParameters::downsample_context_length, - std::max(IndexingParameters::gbwt_insert_batch_size, 20 * max_comp_size), // buffer size recommendation from Jouni - IndexingParameters::gbwt_sampling_interval, - true, // Also include named paths from the graph - &path_filter, - IndexingParameters::verbosity >= IndexingParameters::Debug); + // TODO: Determine the number of parallel jobs. + gbwtgraph::PathCoverParameters parameters; + parameters.num_paths = IndexingParameters::path_cover_depth; + parameters.context = IndexingParameters::downsample_context_length; + parameters.batch_size = std::max(IndexingParameters::gbwt_insert_batch_size, 20 * max_comp_size); + parameters.sample_interval = IndexingParameters::gbwt_sampling_interval; + parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug); + gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*xg_index, parameters, true, &path_filter); + // Determine reference samples from reference paths. + copy_reference_samples(*xg_index, cover); save_gbwt(cover, output_name, IndexingParameters::verbosity == IndexingParameters::Debug); diff --git a/src/recombinator.cpp b/src/recombinator.cpp index 533e73f8237..38130f95078 100644 --- a/src/recombinator.cpp +++ b/src/recombinator.cpp @@ -321,15 +321,15 @@ Haplotypes HaplotypePartitioner::partition_haplotypes(const Parameters& paramete }); size_t size_bound = this->gbz.graph.get_node_count() / parameters.approximate_jobs; gbwtgraph::ConstructionJobs jobs = gbwtgraph::gbwt_construction_jobs(this->gbz.graph, size_bound); - if (jobs.components != total_chains) { + if (jobs.components() != total_chains) { // TODO: Could we instead identify the components with multiple top-level chains // and skip them? std::string msg = "HaplotypePartitioner::partition_haplotypes(): there are " - + std::to_string(total_chains) + " top-level chains and " + std::to_string(jobs.components) + + std::to_string(total_chains) + " top-level chains and " + std::to_string(jobs.components()) + " weakly connected components; haplotype sampling cannot be used with this graph"; throw std::runtime_error(msg); } - result.header.top_level_chains = jobs.components; + result.header.top_level_chains = jobs.components(); result.header.construction_jobs = jobs.size(); result.chains.resize(result.components()); for (size_t chain_id = 0; chain_id < result.components(); chain_id++) { @@ -361,6 +361,7 @@ Haplotypes HaplotypePartitioner::partition_haplotypes(const Parameters& paramete } // Assign named and reference paths to jobs. + // TODO: Use gbwtgraph::assign_paths()? result.jobs_for_cached_paths.reserve(this->gbz.graph.named_paths.size()); for (size_t i = 0; i < this->gbz.graph.named_paths.size(); i++) { const gbwtgraph::NamedPath& path = this->gbz.graph.named_paths[i]; @@ -370,12 +371,12 @@ Haplotypes HaplotypePartitioner::partition_haplotypes(const Parameters& paramete continue; } nid_t node_id = gbwt::Node::id(path.from.first); - auto iter = jobs.node_to_job.find(node_id); - if (iter == jobs.node_to_job.end()) { + size_t job = jobs.job(node_id); + if (job >= jobs.size()) { std::string msg = "HaplotypePartitioner::partition_haplotypes(): cannot assign node " + std::to_string(node_id) + " to a job"; throw std::runtime_error(msg); } - result.jobs_for_cached_paths.push_back(iter->second); + result.jobs_for_cached_paths.push_back(job); } jobs = gbwtgraph::ConstructionJobs(); // Save memory. diff --git a/src/subcommand/gbwt_main.cpp b/src/subcommand/gbwt_main.cpp index 7971f72aa94..9190b2d0958 100644 --- a/src/subcommand/gbwt_main.cpp +++ b/src/subcommand/gbwt_main.cpp @@ -102,6 +102,17 @@ struct GBWTConfig { static size_t default_merge_jobs() { return std::min(static_cast(gbwt::MergeParameters::MERGE_JOBS), std::max(static_cast(1), static_cast(omp_get_max_threads() / 2))); } + + gbwtgraph::PathCoverParameters path_cover_parameters() const { + gbwtgraph::PathCoverParameters parameters; + parameters.num_paths = this->num_paths; + parameters.context = this->context_length; + parameters.batch_size = this->haplotype_indexer.gbwt_buffer_size * gbwt::MILLION; + parameters.sample_interval = this->haplotype_indexer.id_interval; + parameters.parallel_jobs = this->build_jobs; + parameters.show_progress = this->show_progress; + return parameters; + } }; struct GraphHandler { @@ -245,7 +256,7 @@ void help_gbwt(char** argv) { std::cerr << " --id-interval N store path ids at one out of N positions (default " << gbwt::DynamicGBWT::SAMPLE_INTERVAL << ")" << std::endl; std::cerr << std::endl; std::cerr << "Multithreading:" << std::endl; - std::cerr << " --num-jobs N use at most N parallel build jobs (for -v and -G; default " << GBWTConfig::default_build_jobs() << ")" << std::endl; + std::cerr << " --num-jobs N use at most N parallel build jobs (for -v, -G, -l, -P; default " << GBWTConfig::default_build_jobs() << ")" << std::endl; std::cerr << " --num-threads N use N parallel search threads (for -b and -r; default " << omp_get_max_threads() << ")" << std::endl; std::cerr << std::endl; std::cerr << "Step 1: GBWT construction (requires -o and one of { -v, -G, -Z, -E, A }):" << std::endl; @@ -1504,41 +1515,28 @@ void step_4_path_cover(GBWTHandler& gbwts, GraphHandler& graphs, GBWTConfig& con if (config.show_progress) { std::cerr << "Algorithm: greedy" << std::endl; } - gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*(graphs.path_graph), - config.num_paths, - config.context_length, - config.haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, - config.haplotype_indexer.id_interval, - config.include_named_paths, - &path_filter, - config.show_progress); + gbwt::GBWT cover = gbwtgraph::path_cover_gbwt( + *(graphs.path_graph), config.path_cover_parameters(), + config.include_named_paths, &path_filter + ); + copy_reference_samples(*(graphs.path_graph), cover); gbwts.use(cover); } else if (config.path_cover == GBWTConfig::path_cover_augment) { if (config.show_progress) { std::cerr << "Algorithm: augment" << std::endl; } gbwts.use_dynamic(); - gbwtgraph::augment_gbwt(*(graphs.path_graph), - gbwts.dynamic, - config.num_paths, - config.context_length, - config.haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, - config.haplotype_indexer.id_interval, - config.show_progress); + gbwtgraph::augment_gbwt(*(graphs.path_graph), gbwts.dynamic, config.path_cover_parameters()); } else { if (config.show_progress) { std::cerr << "Algorithm: local haplotypes" << std::endl; } gbwts.use_compressed(); - gbwt::GBWT cover = gbwtgraph::local_haplotypes(*(graphs.path_graph), - gbwts.compressed, - config.num_paths, - config.context_length, - config.haplotype_indexer.gbwt_buffer_size * gbwt::MILLION, - config.haplotype_indexer.id_interval, - config.include_named_paths, - &path_filter, - config.show_progress); + gbwt::GBWT cover = gbwtgraph::local_haplotypes( + *(graphs.path_graph), gbwts.compressed, config.path_cover_parameters(), + config.include_named_paths, &path_filter + ); + copy_reference_samples(gbwts.compressed, cover); gbwts.use(cover); } gbwts.unbacked(); // We modified the GBWT. diff --git a/src/transcriptome.hpp b/src/transcriptome.hpp index 7d27d1de5b7..90d3ba13bad 100644 --- a/src/transcriptome.hpp +++ b/src/transcriptome.hpp @@ -14,9 +14,9 @@ #include #include -#include "../vg.hpp" -#include "../types.hpp" -#include "../gbwt_helper.hpp" +#include "vg.hpp" +#include "types.hpp" +#include "gbwt_helper.hpp" namespace vg { diff --git a/test/t/37_vg_gbwt.t b/test/t/37_vg_gbwt.t index 20a5ad071e8..1a564b5fdf7 100644 --- a/test/t/37_vg_gbwt.t +++ b/test/t/37_vg_gbwt.t @@ -301,7 +301,7 @@ is $(vg gbwt -S xy.local.gbwt) 16 "local haplotypes: 16 samples" # Build GBZ from 16 paths of local haplotypes vg gbwt -x xy-alt.xg -g xy.local.gbz --gbz-format -l -n 16 -v small/xy2.vcf.gz is $? 0 "Local haplotypes GBZ construction" -is $(md5sum xy.local.gbz | cut -f 1 -d\ ) 65d2290f32c200ea57212cb7b71075b0 "GBZ was serialized correctly" +is $(md5sum xy.local.gbz | cut -f 1 -d\ ) b6540312514c4e70aa45fc65b4bd762c "GBZ was serialized correctly" rm -f xy.local.gg xy.local.gbwt xy.local.gbz @@ -324,7 +324,7 @@ is $? 0 "Augmented GBWTGraph construction" is $(md5sum augmented.gg | cut -f 1 -d\ ) 00429586246711abcf1367a97d3c468c "GBWTGraph was serialized correctly" is $(vg gbwt -c augmented.gbwt) 18 "augmented: 18 threads" is $(vg gbwt -C augmented.gbwt) 2 "augmented: 2 contigs" -is $(vg gbwt -H augmented.gbwt) 2 "augmented: 2 haplotypes" +is $(vg gbwt -H augmented.gbwt) 18 "augmented: 18 haplotypes" is $(vg gbwt -S augmented.gbwt) 17 "augmented: 17 samples" rm -f x.gbwt augmented.gg augmented.gbwt From 3a9ba216b9af83b98f38a9f6131bc97c60779cce Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Wed, 31 Jan 2024 17:59:04 -0800 Subject: [PATCH 02/10] Create a path cover from GBZ --- src/subcommand/gbwt_main.cpp | 39 ++++++++++++++++++++++++++---------- test/t/37_vg_gbwt.t | 27 ++++++++++++++++++++----- 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/src/subcommand/gbwt_main.cpp b/src/subcommand/gbwt_main.cpp index 9190b2d0958..94b16945984 100644 --- a/src/subcommand/gbwt_main.cpp +++ b/src/subcommand/gbwt_main.cpp @@ -123,6 +123,10 @@ struct GraphHandler { std::unique_ptr gbwt_graph = nullptr; graph_type in_use = graph_none; + // Returns a pointer to any stored `PathHandleGraph` or loads one according to + // the config if there is no such graph. + const PathHandleGraph* get_any_graph(const GBWTConfig& config); + // Load the `PathHandleGraph` specified in the config and release other graphs. // No effect if the handler already contains a `PathHandleGraph`. void get_graph(const GBWTConfig& config); @@ -305,7 +309,7 @@ void help_gbwt(char** argv) { std::cerr << " --set-tag K=V set a GBWT tag (may repeat)" << std::endl; std::cerr << " --set-reference X set sample X as the reference (may repeat)" << std::endl; std::cerr << std::endl; - std::cerr << "Step 4: Path cover GBWT construction (requires -o, -x, and one of { -a, -l, -P }):" << std::endl; + std::cerr << "Step 4: Path cover GBWT construction (requires an input graph, -o, and one of { -a, -l, -P }):" << std::endl; std::cerr << " -a, --augment-gbwt add a path cover of missing components (one input GBWT)" << std::endl; std::cerr << " -l, --local-haplotypes sample local haplotypes (one input GBWT)" << std::endl; std::cerr << " -P, --path-cover build a greedy path cover (no input GBWTs)" << std::endl; @@ -1013,8 +1017,11 @@ void validate_gbwt_config(GBWTConfig& config) { } if (config.path_cover != GBWTConfig::path_cover_none) { - if (!has_gbwt_output || config.graph_name.empty()) { - std::cerr << "error: [vg gbwt] path cover options require -x and output GBWT" << std::endl; + if (!has_gbwt_output || (config.graph_name.empty() && config.build != GBWTConfig::build_gbz && config.build != GBWTConfig::build_gbwtgraph)) { + // Path cover options needs a graph. We can use the provided graph or the GBZ/GBWTGraph + // we took as an input. In the latter case, we know that the corresponding GBWT has not + // been modified and the graph is hence safe to use. + std::cerr << "error: [vg gbwt] path cover options require an input graph and output GBWT" << std::endl; std::exit(EXIT_FAILURE); } if (config.path_cover == GBWTConfig::path_cover_greedy && !config.input_filenames.empty()) { @@ -1502,13 +1509,14 @@ void step_4_path_cover(GBWTHandler& gbwts, GraphHandler& graphs, GBWTConfig& con if (config.show_progress) { std::cerr << "Finding a " << config.num_paths << "-path cover with context length " << config.context_length << std::endl; } - - graphs.get_graph(config); + + // Select the appropriate graph. + const PathHandleGraph* graph = graphs.get_any_graph(config); // We need to drop paths that are alt allele paths and not pass them // through from a graph that has them to the synthesized GBWT. - std::function path_filter = [&graphs](const path_handle_t& path) { - return !Paths::is_alt(graphs.path_graph->get_path_name(path)); + std::function path_filter = [&graph](const path_handle_t& path) { + return !Paths::is_alt(graph->get_path_name(path)); }; if (config.path_cover == GBWTConfig::path_cover_greedy) { @@ -1516,24 +1524,24 @@ void step_4_path_cover(GBWTHandler& gbwts, GraphHandler& graphs, GBWTConfig& con std::cerr << "Algorithm: greedy" << std::endl; } gbwt::GBWT cover = gbwtgraph::path_cover_gbwt( - *(graphs.path_graph), config.path_cover_parameters(), + *graph, config.path_cover_parameters(), config.include_named_paths, &path_filter ); - copy_reference_samples(*(graphs.path_graph), cover); + copy_reference_samples(*graph, cover); gbwts.use(cover); } else if (config.path_cover == GBWTConfig::path_cover_augment) { if (config.show_progress) { std::cerr << "Algorithm: augment" << std::endl; } gbwts.use_dynamic(); - gbwtgraph::augment_gbwt(*(graphs.path_graph), gbwts.dynamic, config.path_cover_parameters()); + gbwtgraph::augment_gbwt(*graph, gbwts.dynamic, config.path_cover_parameters()); } else { if (config.show_progress) { std::cerr << "Algorithm: local haplotypes" << std::endl; } gbwts.use_compressed(); gbwt::GBWT cover = gbwtgraph::local_haplotypes( - *(graphs.path_graph), gbwts.compressed, config.path_cover_parameters(), + *graph, gbwts.compressed, config.path_cover_parameters(), config.include_named_paths, &path_filter ); copy_reference_samples(gbwts.compressed, cover); @@ -1710,6 +1718,15 @@ void step_8_threads(GBWTHandler& gbwts, GBWTConfig& config) { //---------------------------------------------------------------------------- +const PathHandleGraph* GraphHandler::get_any_graph(const GBWTConfig& config) { + if (this->in_use == GraphHandler::graph_gbz || this->in_use == GraphHandler::graph_gbwtgraph) { + return this->gbwt_graph.get(); + } else { + this->get_graph(config); + return this->path_graph.get(); + } +} + void GraphHandler::get_graph(const GBWTConfig& config) { if (this->in_use == graph_path) { return; diff --git a/test/t/37_vg_gbwt.t b/test/t/37_vg_gbwt.t index 1a564b5fdf7..6c11196fe7a 100644 --- a/test/t/37_vg_gbwt.t +++ b/test/t/37_vg_gbwt.t @@ -5,7 +5,7 @@ BASH_TAP_ROOT=../deps/bash-tap PATH=../bin:$PATH # for vg -plan tests 151 +plan tests 159 # Build vg graphs for two chromosomes @@ -298,9 +298,14 @@ is $(vg gbwt -C xy.local.gbwt) 2 "local haplotypes: 2 contigs" is $(vg gbwt -H xy.local.gbwt) 16 "local haplotypes: 16 haplotypes" is $(vg gbwt -S xy.local.gbwt) 16 "local haplotypes: 16 samples" -# Build GBZ from 16 paths of local haplotypes -vg gbwt -x xy-alt.xg -g xy.local.gbz --gbz-format -l -n 16 -v small/xy2.vcf.gz -is $? 0 "Local haplotypes GBZ construction" +# Build GBZ from 16 paths of local haplotypes with a single job +vg gbwt -x xy-alt.xg -g xy.local.gbz --gbz-format -l -n 16 --num-jobs 1 -v small/xy2.vcf.gz +is $? 0 "Local haplotypes GBZ construction (single job)" +is $(md5sum xy.local.gbz | cut -f 1 -d\ ) b6540312514c4e70aa45fc65b4bd762c "GBZ was serialized correctly" + +# As above, but with two parallel jobs +vg gbwt -x xy-alt.xg -g xy.local.gbz --gbz-format -l -n 16 --num-jobs 2 -v small/xy2.vcf.gz +is $? 0 "Local haplotypes GBZ construction (two jobs)" is $(md5sum xy.local.gbz | cut -f 1 -d\ ) b6540312514c4e70aa45fc65b4bd762c "GBZ was serialized correctly" rm -f xy.local.gg xy.local.gbwt xy.local.gbz @@ -316,6 +321,18 @@ is $(vg gbwt -S xy.local.gbwt) 17 "local haplotypes w/ paths: 17 samples" rm -f xy.local.gg xy.local.gbwt +# Build GBZ from a GFA and then build a local haplotype cover with reference paths from the GBZ +vg gbwt -G haplotype-sampling/micb-kir3dl1.gfa -g large.gbz --gbz-format +vg gbwt -Z large.gbz -l -n 16 --pass-paths -o large.local.gbwt +is $? 0 "Local haplotypes with reference paths from a larger GBZ" +is $(vg gbwt -c large.local.gbwt) 36 "local haplotypes w/ paths: 36 threads" +is $(vg gbwt -C large.local.gbwt) 2 "local haplotypes w/ paths: 2 contigs" +is $(vg gbwt -H large.local.gbwt) 18 "local haplotypes w/ paths: 18 haplotypes" +is $(vg gbwt -S large.local.gbwt) 18 "local haplotypes w/ paths: 18 samples" +is $(vg gbwt --tags large.local.gbwt | grep -c reference_samples) 1 "local haplotypes w/ paths: reference_samples set" + +rm -f large.gbz large.local.gbwt + # Build GBWTGraph from an augmented GBWT vg gbwt -x x.vg -o x.gbwt -v small/xy2.vcf.gz @@ -383,7 +400,7 @@ is $(wc -l < ref_paths.trans) 0 "ref paths: 0 translations" rm -f gfa.gbwt rm -f gfa2.gbwt gfa2.gg gfa2.trans gfa2.gbz rm -f ref_paths.gbwt ref_paths.gg ref_paths.trans -rm -f chopping.gbwt chopping.gg chopping.trans from_gbz.trans +rm -f chopping.gbwt chopping.gg chopping.gbz chopping.trans from_gbz.trans # Build a GBZ from a graph with a reference vg gbwt -g gfa.gbz --gbz-format -G graphs/gfa_with_reference.gfa From 3d811bfff3c708b94478e8212ec22b37416617ca Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Wed, 31 Jan 2024 17:59:55 -0800 Subject: [PATCH 03/10] Typo fix --- src/subcommand/convert_main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/subcommand/convert_main.cpp b/src/subcommand/convert_main.cpp index 2e798e1f69c..7474f5a521e 100644 --- a/src/subcommand/convert_main.cpp +++ b/src/subcommand/convert_main.cpp @@ -491,7 +491,7 @@ void help_convert(char** argv) { << " Not compatible with other GFA output options or non-GBWT graphs." << endl << " --vg-algorithm Always use the VG GFA algorithm. Works with all options and graph types," << endl << " but can't preserve original GFA coordinates." << endl - << " --no-translation When using the GBWTGraph algorith, convert the graph directly to GFA." << endl + << " --no-translation When using the GBWTGraph algorithm, convert the graph directly to GFA." << endl << " Do not use the translation to preserve original coordinates." << endl << "alignment options:" << endl << " -G, --gam-to-gaf FILE convert GAM FILE to GAF" << endl From 5122d1b45e2f90e755fda096f5202d0fc8a41162 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Thu, 1 Feb 2024 20:19:50 -0800 Subject: [PATCH 04/10] Use the new GBWTGraph functionality in vg haplotypes --- deps/gbwtgraph | 2 +- src/index_registry.cpp | 8 +++-- src/recombinator.cpp | 76 +++++++++++++++++++----------------------- 3 files changed, 41 insertions(+), 45 deletions(-) diff --git a/deps/gbwtgraph b/deps/gbwtgraph index f5afdd0e8ca..de61d340695 160000 --- a/deps/gbwtgraph +++ b/deps/gbwtgraph @@ -1 +1 @@ -Subproject commit f5afdd0e8ca3c7d7800d15fc2c8ff4518b2f7bf0 +Subproject commit de61d340695f64b8aa978816b195d6687e7fda5a diff --git a/src/index_registry.cpp b/src/index_registry.cpp index 5d453fa1cf3..6701470f26b 100644 --- a/src/index_registry.cpp +++ b/src/index_registry.cpp @@ -2692,9 +2692,9 @@ IndexRegistry VGIndexes::get_vg_index_registry() { return !Paths::is_alt(xg_index->get_path_name(path)); }; - // TODO: Determine the number of parallel jobs. + // FIXME: Determine the number of parallel jobs and set parameters.parallel_jobs. gbwtgraph::PathCoverParameters parameters; - parameters.num_paths = IndexingParameters::path_cover_depth; + parameters.num_paths = IndexingParameters::giraffe_gbwt_downsample; parameters.context = IndexingParameters::downsample_context_length; // TODO: See path cover for handling graphs with large components. parameters.batch_size = IndexingParameters::gbwt_insert_batch_size; @@ -2712,6 +2712,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { gbwt::DynamicGBWT dynamic_index(*gbwt_index); gbwt_index.reset(); + // Note that augmenting a GBWT is always single-threaded. gbwtgraph::PathCoverParameters parameters; parameters.num_paths = IndexingParameters::path_cover_depth; parameters.context = IndexingParameters::downsample_context_length; @@ -2768,7 +2769,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { }; // make a GBWT from a greedy path cover - // TODO: Determine the number of parallel jobs. + // FIXME: Determine the number of parallel jobs and set parameters.parallel_jobs. gbwtgraph::PathCoverParameters parameters; parameters.num_paths = IndexingParameters::path_cover_depth; parameters.context = IndexingParameters::downsample_context_length; @@ -3854,6 +3855,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { string output_name = plan->output_filepath(gbz_output); gbwtgraph::GFAParsingParameters params = get_best_gbwtgraph_gfa_parsing_parameters(); + // FIXME: Determine the number of parallel jobs and set params.parallel_jobs. // TODO: there's supposedly a heuristic to set batch size that could perform better than this global param, // but it would be kind of a pain to update it like we do the global param params.batch_size = IndexingParameters::gbwt_insert_batch_size; diff --git a/src/recombinator.cpp b/src/recombinator.cpp index 38130f95078..e3b144fffd0 100644 --- a/src/recombinator.cpp +++ b/src/recombinator.cpp @@ -220,7 +220,7 @@ void Haplotypes::TopLevelChain::simple_sds_load(std::istream& in) { void Haplotypes::TopLevelChain::load_old(std::istream& in) { this->offset = sdsl::simple_sds::load_value(in); this->job_id = sdsl::simple_sds::load_value(in); - this->contig_name = "chain_" + std::to_string(this->offset); + this->contig_name = "component_" + std::to_string(this->offset); size_t subchain_count = sdsl::simple_sds::load_value(in); this->subchains.resize(subchain_count); for (size_t i = 0; i < subchain_count; i++) { @@ -310,17 +310,20 @@ Haplotypes HaplotypePartitioner::partition_haplotypes(const Parameters& paramete Haplotypes result; result.header.k = this->minimizer_index.k(); - // Determine top-level chains. + // Determine GBWT construction jobs. double start = gbwt::readTimer(); if (this->verbosity >= Haplotypes::verbosity_basic) { std::cerr << "Determining construction jobs" << std::endl; } + size_t size_bound = this->gbz.graph.get_node_count() / parameters.approximate_jobs; + gbwtgraph::ConstructionJobs jobs = gbwtgraph::gbwt_construction_jobs(this->gbz.graph, size_bound); + result.header.construction_jobs = jobs.size(); + + // Determine the number of top-level chains and fill in basic information. size_t total_chains = 0; this->distance_index.for_each_child(this->distance_index.get_root(), [&](const handlegraph::net_handle_t&) { total_chains++; }); - size_t size_bound = this->gbz.graph.get_node_count() / parameters.approximate_jobs; - gbwtgraph::ConstructionJobs jobs = gbwtgraph::gbwt_construction_jobs(this->gbz.graph, size_bound); if (jobs.components() != total_chains) { // TODO: Could we instead identify the components with multiple top-level chains // and skip them? @@ -330,53 +333,47 @@ Haplotypes HaplotypePartitioner::partition_haplotypes(const Parameters& paramete throw std::runtime_error(msg); } result.header.top_level_chains = jobs.components(); - result.header.construction_jobs = jobs.size(); result.chains.resize(result.components()); for (size_t chain_id = 0; chain_id < result.components(); chain_id++) { result.chains[chain_id].offset = chain_id; result.chains[chain_id].job_id = result.jobs(); // Not assigned to any job yet. - result.chains[chain_id].contig_name = "chain_" + std::to_string(chain_id); // Placeholder name. + result.chains[chain_id].contig_name = "component_" + std::to_string(chain_id); // Placeholder name. } - // Assign chains to jobs and determine contig names. + // Assign chains to jobs and fill in contig names. auto chains_by_job = gbwtgraph::partition_chains(this->distance_index, this->gbz.graph, jobs); + // We do not use a path filter, because a GBZ graph should not contain alt paths. + auto contig_names = jobs.contig_names(this->gbz.graph); for (size_t job_id = 0; job_id < result.jobs(); job_id++) { for (auto& chain : chains_by_job[job_id]) { result.chains[chain.offset].job_id = job_id; - auto da = this->r_index.decompressDA(gbwtgraph::GBWTGraph::handle_to_node(chain.handle)); - for (gbwt::size_type seq_id : da) { - gbwt::size_type path_id = gbwt::Path::id(seq_id); - auto iter = this->gbz.graph.id_to_path.find(path_id); - if (iter != this->gbz.graph.id_to_path.end()) { - // This is a generic / reference path. Use its contig name for the chain. - gbwt::PathName path_name = this->gbz.index.metadata.path(path_id); - result.chains[chain.offset].contig_name = this->gbz.index.metadata.contig(path_name.contig); - if (this->verbosity >= Haplotypes::verbosity_debug) { - std::cerr << "Using contig name " << result.chains[chain.offset].contig_name << " for chain " << chain.offset << std::endl; - } - break; - } + nid_t node_id = this->gbz.graph.get_id(chain.handle); + size_t component_id = jobs.component(node_id); + if (component_id >= contig_names.size()) { + std::string msg = "HaplotypePartitioner::partition_haplotypes(): cannot map top-level chain " + std::to_string(chain.offset) + " to a component"; + throw std::runtime_error(msg); + } + result.chains[chain.offset].contig_name = contig_names[component_id]; + if (this->verbosity >= Haplotypes::verbosity_debug) { + std::cerr << "Using contig name " << contig_names[component_id] << " for chain " << chain.offset << std::endl; } } } // Assign named and reference paths to jobs. - // TODO: Use gbwtgraph::assign_paths()? - result.jobs_for_cached_paths.reserve(this->gbz.graph.named_paths.size()); - for (size_t i = 0; i < this->gbz.graph.named_paths.size(); i++) { - const gbwtgraph::NamedPath& path = this->gbz.graph.named_paths[i]; - if (path.from == gbwt::invalid_edge()) { - // Skip empty paths. - result.jobs_for_cached_paths.push_back(result.jobs()); - continue; - } - nid_t node_id = gbwt::Node::id(path.from.first); - size_t job = jobs.job(node_id); - if (job >= jobs.size()) { - std::string msg = "HaplotypePartitioner::partition_haplotypes(): cannot assign node " + std::to_string(node_id) + " to a job"; - throw std::runtime_error(msg); + result.jobs_for_cached_paths = std::vector(this->gbz.graph.named_paths.size(), result.jobs()); + // Again, we do not use a path filter, because a GBZ graph should not contain alt paths. + auto assignments = gbwtgraph::assign_paths(this->gbz.graph, jobs, nullptr, nullptr); + for (size_t job = 0; job < assignments.size(); job++) { + for (const path_handle_t& path : assignments[job]) { + // We know that GBWTGraph path handles for reference/generic paths are offsets in named_paths. + size_t path_id = handlegraph::as_integer(path); + if (path_id >= result.jobs_for_cached_paths.size()) { + std::string msg = "HaplotypePartitioner::partition_haplotypes(): path " + std::to_string(path_id) + " is not a reference/generic path"; + throw std::runtime_error(msg); + } + result.jobs_for_cached_paths[path_id] = job; } - result.jobs_for_cached_paths.push_back(job); } jobs = gbwtgraph::ConstructionJobs(); // Save memory. @@ -1171,6 +1168,7 @@ gbwt::GBWT Recombinator::generate_haplotypes(const Haplotypes& haplotypes, const } // Build partial indexes. + // FIXME we should use MetadataBuilder here double checkpoint = gbwt::readTimer(); if (this->verbosity >= Haplotypes::verbosity_basic) { std::cerr << "Running " << omp_get_max_threads() << " GBWT construction jobs in parallel" << std::endl; @@ -1213,18 +1211,14 @@ gbwt::GBWT Recombinator::generate_haplotypes(const Haplotypes& haplotypes, const std::cerr << "Finished the jobs in " << seconds << " seconds" << std::endl; } - // Merge the partial indexes. + // Merge the partial indexes and set the reference samples tag. checkpoint = gbwt::readTimer(); if (this->verbosity >= Haplotypes::verbosity_basic) { std::cerr << "Merging the partial indexes" << std::endl; } gbwt::GBWT merged(indexes); if (parameters.include_reference) { - // If we included reference paths, set the same samples as references in the output GBWT. - std::string reference_samples = this->gbz.index.tags.get(gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG); - if (!reference_samples.empty()) { - merged.tags.set(gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG, reference_samples); - } + copy_reference_samples(this->gbz.index, merged); } if (this->verbosity >= Haplotypes::verbosity_basic) { double seconds = gbwt::readTimer() - checkpoint; From 092049e94f17999549b88a4194ffc9c0b28e5463 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Fri, 2 Feb 2024 16:09:06 -0800 Subject: [PATCH 05/10] Guess the number of parallel GBWT construction jobs in autoindex --- src/index_registry.cpp | 63 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/src/index_registry.cpp b/src/index_registry.cpp index 6701470f26b..0fb91d06b09 100644 --- a/src/index_registry.cpp +++ b/src/index_registry.cpp @@ -335,6 +335,42 @@ vector vcf_contigs(const string& filename) { return return_val; } +// Returns a guess for an appropriate number of parallel GBWT construction jobs. +// This is intended for GBWT recipes where the construction uses an external +// mechanism for parallelism. +size_t guess_parallel_gbwt_jobs(size_t node_count, size_t haplotype_count, size_t available_memory, size_t batch_size) { + + // Memory usage of the GBWT construction itself. + size_t bytes_per_node = 100 * std::max(std::log10(haplotype_count + 1), 1.0); + + size_t jobs = 1; + size_t max_jobs = get_thread_count(); + // We assume that the largest chromosome is 10% of the genome. + size_t job_size = std::max(node_count / 10, size_t(1)); + size_t total_size = job_size; + + while (jobs < max_jobs) { + // We assume that the next chromosome is 5% smaller than the previous one. + job_size = std::max(size_t(job_size * 0.95), size_t(1)); + total_size += job_size; + // Construction buffers typically use 3-4 bytes per node, and the builder has two buffers. + size_t memory_usage = bytes_per_node * total_size + (jobs + 1) * batch_size * 8; + if (total_size > node_count || memory_usage > available_memory) { + break; + } + jobs++; + } + + return jobs; +} + +// Returns the in-memory size of an XG index in bytes, using the same approach as +// sdsl::size_in_bytes(). +size_t xg_index_size(const xg::XG& index) { + sdsl::nullstream ns; + return index.serialize_and_measure(ns); +} + /********************* * Indexing helper functions ***********************/ @@ -2692,13 +2728,19 @@ IndexRegistry VGIndexes::get_vg_index_registry() { return !Paths::is_alt(xg_index->get_path_name(path)); }; - // FIXME: Determine the number of parallel jobs and set parameters.parallel_jobs. gbwtgraph::PathCoverParameters parameters; parameters.num_paths = IndexingParameters::giraffe_gbwt_downsample; parameters.context = IndexingParameters::downsample_context_length; // TODO: See path cover for handling graphs with large components. parameters.batch_size = IndexingParameters::gbwt_insert_batch_size; parameters.sample_interval = IndexingParameters::gbwt_sampling_interval; + std::int64_t available_memory = plan->target_memory_usage() - xg_index_size(*xg_index) - sdsl::size_in_bytes(*gbwt_index); + parameters.parallel_jobs = guess_parallel_gbwt_jobs( + xg_index->get_node_count(), + parameters.num_paths, + std::max(available_memory, std::int64_t(0)), + parameters.batch_size + ); parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug); cover = std::move(gbwtgraph::local_haplotypes(*xg_index, *gbwt_index, parameters, true, &path_filter)); // Reference samples tag is not copied automatically. @@ -2707,7 +2749,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { else { // Augment the GBWT with a path cover of components without haplotypes. if (IndexingParameters::verbosity != IndexingParameters::None) { - cerr << "[IndexRegistry]: Not enough haplotypes; augmenting the full GBWT instead." << endl; + cerr << "[IndexRegistry]: Not too many haplotypes; augmenting the full GBWT instead." << endl; } gbwt::DynamicGBWT dynamic_index(*gbwt_index); @@ -2769,12 +2811,18 @@ IndexRegistry VGIndexes::get_vg_index_registry() { }; // make a GBWT from a greedy path cover - // FIXME: Determine the number of parallel jobs and set parameters.parallel_jobs. gbwtgraph::PathCoverParameters parameters; parameters.num_paths = IndexingParameters::path_cover_depth; parameters.context = IndexingParameters::downsample_context_length; parameters.batch_size = std::max(IndexingParameters::gbwt_insert_batch_size, 20 * max_comp_size); parameters.sample_interval = IndexingParameters::gbwt_sampling_interval; + std::int64_t available_memory = plan->target_memory_usage() - xg_index_size(*xg_index); + parameters.parallel_jobs = guess_parallel_gbwt_jobs( + xg_index->get_node_count(), + parameters.num_paths, + std::max(available_memory, std::int64_t(0)), + parameters.batch_size + ); parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug); gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*xg_index, parameters, true, &path_filter); // Determine reference samples from reference paths. @@ -3855,12 +3903,19 @@ IndexRegistry VGIndexes::get_vg_index_registry() { string output_name = plan->output_filepath(gbz_output); gbwtgraph::GFAParsingParameters params = get_best_gbwtgraph_gfa_parsing_parameters(); - // FIXME: Determine the number of parallel jobs and set params.parallel_jobs. // TODO: there's supposedly a heuristic to set batch size that could perform better than this global param, // but it would be kind of a pain to update it like we do the global param params.batch_size = IndexingParameters::gbwt_insert_batch_size; params.sample_interval = IndexingParameters::gbwt_sampling_interval; params.max_node_length = IndexingParameters::max_node_size; + // TODO: Here we assume that the GFA file contains 100 haplotypes. If there are more, + // we could safely launch more jobs. + params.parallel_jobs = guess_parallel_gbwt_jobs( + std::max(get_file_size(gfa_filename) / 600, std::int64_t(1)), + 100, + plan->target_memory_usage(), + params.batch_size + ); params.show_progress = IndexingParameters::verbosity == IndexingParameters::Debug; // jointly generate the GBWT and record sequences From 2f44164c1fb82911fe7c874de3e9c1df426e3ce4 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Fri, 2 Feb 2024 16:54:33 -0800 Subject: [PATCH 06/10] Better guess for the number of GBWT construction jobs --- src/index_registry.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/index_registry.cpp b/src/index_registry.cpp index 0fb91d06b09..7738d6a56f8 100644 --- a/src/index_registry.cpp +++ b/src/index_registry.cpp @@ -342,20 +342,20 @@ size_t guess_parallel_gbwt_jobs(size_t node_count, size_t haplotype_count, size_ // Memory usage of the GBWT construction itself. size_t bytes_per_node = 100 * std::max(std::log10(haplotype_count + 1), 1.0); + // Construction buffers typically use 3-4 bytes per node, and the builder has two buffers. + size_t bytes_per_job = batch_size * 8; size_t jobs = 1; size_t max_jobs = get_thread_count(); // We assume that the largest chromosome is 10% of the genome. size_t job_size = std::max(node_count / 10, size_t(1)); - size_t total_size = job_size; + size_t memory_usage = bytes_per_node * job_size + bytes_per_job; while (jobs < max_jobs) { // We assume that the next chromosome is 5% smaller than the previous one. job_size = std::max(size_t(job_size * 0.95), size_t(1)); - total_size += job_size; - // Construction buffers typically use 3-4 bytes per node, and the builder has two buffers. - size_t memory_usage = bytes_per_node * total_size + (jobs + 1) * batch_size * 8; - if (total_size > node_count || memory_usage > available_memory) { + memory_usage += bytes_per_node * job_size + bytes_per_job; + if (memory_usage > available_memory) { break; } jobs++; From 91da44cf4eacddd9df88e8be9f29f50fb5377033 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Fri, 2 Feb 2024 18:30:09 -0800 Subject: [PATCH 07/10] Better guess for the number of GBWT construction jobs --- src/index_registry.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/index_registry.cpp b/src/index_registry.cpp index 7738d6a56f8..a8dceef9a09 100644 --- a/src/index_registry.cpp +++ b/src/index_registry.cpp @@ -341,7 +341,7 @@ vector vcf_contigs(const string& filename) { size_t guess_parallel_gbwt_jobs(size_t node_count, size_t haplotype_count, size_t available_memory, size_t batch_size) { // Memory usage of the GBWT construction itself. - size_t bytes_per_node = 100 * std::max(std::log10(haplotype_count + 1), 1.0); + size_t bytes_per_node = 135 * std::max(std::log10(haplotype_count + 1), 1.0); // Construction buffers typically use 3-4 bytes per node, and the builder has two buffers. size_t bytes_per_job = batch_size * 8; @@ -3909,9 +3909,10 @@ IndexRegistry VGIndexes::get_vg_index_registry() { params.sample_interval = IndexingParameters::gbwt_sampling_interval; params.max_node_length = IndexingParameters::max_node_size; // TODO: Here we assume that the GFA file contains 100 haplotypes. If there are more, - // we could safely launch more jobs. + // we could safely launch more jobs. Using 600 as the divisor would be a better + // estimate of the number of segments, but we chop long segments to 32 bp nodes. params.parallel_jobs = guess_parallel_gbwt_jobs( - std::max(get_file_size(gfa_filename) / 600, std::int64_t(1)), + std::max(get_file_size(gfa_filename) / 300, std::int64_t(1)), 100, plan->target_memory_usage(), params.batch_size From 1ce37a574a45595359f4f4a9ae634f2ded09e4a1 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Fri, 2 Feb 2024 20:03:44 -0800 Subject: [PATCH 08/10] Output the number of parallel jobs --- src/index_registry.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/index_registry.cpp b/src/index_registry.cpp index a8dceef9a09..83a9c1b4a58 100644 --- a/src/index_registry.cpp +++ b/src/index_registry.cpp @@ -2742,6 +2742,9 @@ IndexRegistry VGIndexes::get_vg_index_registry() { parameters.batch_size ); parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug); + if (IndexingParameters::verbosity >= IndexingParameters::Debug) { + std::cerr << "[IndexRegistry]: Running " << parameters.parallel_jobs << " jobs in parallel" << std::endl; + } cover = std::move(gbwtgraph::local_haplotypes(*xg_index, *gbwt_index, parameters, true, &path_filter)); // Reference samples tag is not copied automatically. copy_reference_samples(*gbwt_index, cover); @@ -2824,6 +2827,9 @@ IndexRegistry VGIndexes::get_vg_index_registry() { parameters.batch_size ); parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug); + if (IndexingParameters::verbosity >= IndexingParameters::Debug) { + std::cerr << "[IndexRegistry]: Running " << parameters.parallel_jobs << " jobs in parallel" << std::endl; + } gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*xg_index, parameters, true, &path_filter); // Determine reference samples from reference paths. copy_reference_samples(*xg_index, cover); @@ -3918,7 +3924,10 @@ IndexRegistry VGIndexes::get_vg_index_registry() { params.batch_size ); params.show_progress = IndexingParameters::verbosity == IndexingParameters::Debug; - + if (IndexingParameters::verbosity >= IndexingParameters::Debug) { + std::cerr << "[IndexRegistry]: Running " << params.parallel_jobs << " jobs in parallel" << std::endl; + } + // jointly generate the GBWT and record sequences unique_ptr gbwt_index; unique_ptr seq_source; From 6508e8d00ad14715548b4d77de50ceaec6b7b925 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Tue, 6 Feb 2024 15:45:46 -0800 Subject: [PATCH 09/10] Use MetadataBuilder in vg haplotypes --- src/recombinator.cpp | 95 ++++++++++++++++++++++---------------------- src/recombinator.hpp | 2 +- 2 files changed, 49 insertions(+), 48 deletions(-) diff --git a/src/recombinator.cpp b/src/recombinator.cpp index e3b144fffd0..b1a038884d5 100644 --- a/src/recombinator.cpp +++ b/src/recombinator.cpp @@ -777,7 +777,7 @@ void HaplotypePartitioner::build_subchains(const gbwtgraph::TopLevelChain& chain * GBWT metadata will be set as following: * * * Sample name is "recombination". - * * Contig name is "chain_X", where X is the chain identifier. + * * Contig name is taken from the top-level chain. * * Haplotype identifier is set during construction. * * Fragment identifier is set as necessary. */ @@ -820,19 +820,25 @@ struct RecombinatorHaplotype { * take the prefix of the original original haplotype until the start * of the subchain. */ - void extend(sequence_type sequence, const Haplotypes::Subchain& subchain, const Recombinator& recombinator, gbwt::GBWTBuilder& builder); + void extend( + sequence_type sequence, const Haplotypes::Subchain& subchain, const Recombinator& recombinator, + gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata + ); // Takes an existing haplotype from the GBWT index and inserts it into /// the builder. This is intended for fragments that do not contain /// subchains crossed by the original haplotypes. The call will fail if /// `extend()` has been called. - void take(gbwt::size_type sequence_id, const Recombinator& recombinator, gbwt::GBWTBuilder& builder); + void take( + gbwt::size_type sequence_id, const Recombinator& recombinator, + gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata + ); // Extends the original haplotype from the latest `extend()` call until // the end, inserts it into the builder, and starts a new fragment. // The call will fail if `extend()` has not been called for this // fragment. - void finish(const Recombinator& recombinator, gbwt::GBWTBuilder& builder); + void finish(const Recombinator& recombinator, gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata); private: // Extends the haplotype over a unary path from a previous subchain. @@ -845,10 +851,13 @@ struct RecombinatorHaplotype { void suffix(const gbwt::GBWT& index); // Inserts the current fragment into the builder. - void insert(gbwt::GBWTBuilder& builder); + void insert(gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata); }; -void RecombinatorHaplotype::extend(sequence_type sequence, const Haplotypes::Subchain& subchain, const Recombinator& recombinator, gbwt::GBWTBuilder& builder) { +void RecombinatorHaplotype::extend( + sequence_type sequence, const Haplotypes::Subchain& subchain, const Recombinator& recombinator, + gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata +) { if (subchain.type == Haplotypes::Subchain::full_haplotype) { throw std::runtime_error("Haplotype::extend(): cannot extend a full haplotype"); } @@ -875,7 +884,7 @@ void RecombinatorHaplotype::extend(sequence_type sequence, const Haplotypes::Sub if (subchain.type == Haplotypes::Subchain::suffix) { this->position = curr; - this->finish(recombinator, builder); + this->finish(recombinator, builder, metadata); return; } @@ -891,7 +900,10 @@ void RecombinatorHaplotype::extend(sequence_type sequence, const Haplotypes::Sub this->position = curr; } -void RecombinatorHaplotype::take(gbwt::size_type sequence_id, const Recombinator& recombinator, gbwt::GBWTBuilder& builder) { +void RecombinatorHaplotype::take( + gbwt::size_type sequence_id, const Recombinator& recombinator, + gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata +) { if (!this->path.empty()) { throw std::runtime_error("Haplotype::take(): the current fragment is not empty"); } @@ -899,19 +911,19 @@ void RecombinatorHaplotype::take(gbwt::size_type sequence_id, const Recombinator throw std::runtime_error("Haplotype::take(): the GBWT index does not contain sequence " + std::to_string(sequence_id)); } this->path = recombinator.gbz.index.extract(sequence_id); - this->insert(builder); + this->insert(builder, metadata); this->fragment++; this->sequence_id = gbwt::invalid_sequence(); this->position = gbwt::invalid_edge(); this->path.clear(); } -void RecombinatorHaplotype::finish(const Recombinator& recombinator, gbwt::GBWTBuilder& builder) { +void RecombinatorHaplotype::finish(const Recombinator& recombinator, gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata) { if (this->position == gbwt::invalid_edge()) { throw std::runtime_error("Haplotype::finish(): there is no current position"); } this->suffix(recombinator.gbz.index); - this->insert(builder); + this->insert(builder, metadata); this->fragment++; this->sequence_id = gbwt::invalid_sequence(); this->position = gbwt::invalid_edge(); @@ -964,19 +976,9 @@ void RecombinatorHaplotype::suffix(const gbwt::GBWT& index) { } } -void RecombinatorHaplotype::insert(gbwt::GBWTBuilder& builder) { +void RecombinatorHaplotype::insert(gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata) { std::string sample_name = "recombination"; - gbwt::size_type sample_id = builder.index.metadata.sample(sample_name); - if (sample_id >= builder.index.metadata.samples()) { - builder.index.metadata.addSamples({ sample_name }); - } - - gbwt::size_type contig_id = builder.index.metadata.contig(this->contig_name); - if (contig_id >= builder.index.metadata.contigs()) { - builder.index.metadata.addContigs({ this->contig_name }); - } - - builder.index.metadata.addPath(sample_id, contig_id, this->id, this->fragment); + metadata.add_haplotype(sample_name, this->contig_name, this->id, this->fragment); builder.insert(this->path, true); } @@ -1020,24 +1022,18 @@ Recombinator::Recombinator(const gbwtgraph::GBZ& gbz, Verbosity verbosity) : //------------------------------------------------------------------------------ -void add_path(const gbwt::GBWT& source, gbwt::size_type path_id, gbwt::GBWTBuilder& builder) { +void add_path(const gbwt::GBWT& source, gbwt::size_type path_id, gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata) { // We know that sufficient metadata exists, because this is a cached path. gbwt::PathName path_name = source.metadata.path(path_id); - std::string sample_name = source.metadata.sample(path_name.sample); - path_name.sample = builder.index.metadata.sample(sample_name); - if (path_name.sample >= builder.index.metadata.samples()) { - builder.index.metadata.addSamples({ sample_name }); - } - std::string contig_name = source.metadata.contig(path_name.contig); - path_name.contig = builder.index.metadata.contig(contig_name); - if (path_name.contig >= builder.index.metadata.contigs()) { - builder.index.metadata.addContigs({ contig_name }); + if (sample_name == gbwtgraph::REFERENCE_PATH_SAMPLE_NAME) { + metadata.add_generic_path(contig_name); + } else { + // Reference samples will be copied later. + metadata.add_haplotype(sample_name, contig_name, path_name.phase, path_name.count); } - builder.index.metadata.addPath(path_name); - gbwt::vector_type path = source.extract(gbwt::Path::encode(path_id, false)); builder.insert(path, true); } @@ -1168,7 +1164,8 @@ gbwt::GBWT Recombinator::generate_haplotypes(const Haplotypes& haplotypes, const } // Build partial indexes. - // FIXME we should use MetadataBuilder here + // We use a separate MetadataBuilder for each job, because we don't know in advance + // how many fragments there will be for each generated haplotype. double checkpoint = gbwt::readTimer(); if (this->verbosity >= Haplotypes::verbosity_basic) { std::cerr << "Running " << omp_get_max_threads() << " GBWT construction jobs in parallel" << std::endl; @@ -1178,24 +1175,28 @@ gbwt::GBWT Recombinator::generate_haplotypes(const Haplotypes& haplotypes, const #pragma omp parallel for schedule(dynamic, 1) for (size_t job = 0; job < jobs.size(); job++) { gbwt::GBWTBuilder builder(sdsl::bits::length(this->gbz.index.sigma() - 1), parameters.buffer_size); - builder.index.addMetadata(); + gbwtgraph::MetadataBuilder metadata; Statistics job_statistics; + // Add named and reference paths. + for (auto path_id : reference_paths[job]) { + add_path(this->gbz.index, path_id, builder, metadata); + job_statistics.ref_paths++; + } // Add haplotypes for each chain. for (auto chain_id : jobs[job]) { try { - Statistics chain_statistics = this->generate_haplotypes(haplotypes.chains[chain_id], counts, builder, parameters, coverage); + Statistics chain_statistics = this->generate_haplotypes( + haplotypes.chains[chain_id], counts, builder, metadata, parameters, coverage + ); job_statistics.combine(chain_statistics); } catch (const std::runtime_error& e) { std::cerr << "error: [job " << job << "]: " << e.what() << std::endl; std::exit(EXIT_FAILURE); } } - // Add named and reference paths. - for (auto path_id : reference_paths[job]) { - add_path(this->gbz.index, path_id, builder); - job_statistics.ref_paths++; - } builder.finish(); + builder.index.addMetadata(); + builder.index.metadata = metadata.get_metadata(); indexes[job] = builder.index; #pragma omp critical { @@ -1415,7 +1416,7 @@ std::vector> select_haplotypes( Recombinator::Statistics Recombinator::generate_haplotypes(const Haplotypes::TopLevelChain& chain, const hash_map& kmer_counts, - gbwt::GBWTBuilder& builder, + gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata, const Parameters& parameters, double coverage ) const { @@ -1434,7 +1435,7 @@ Recombinator::Statistics Recombinator::generate_haplotypes(const Haplotypes::Top for (size_t haplotype = 0; haplotype < haplotypes.size(); haplotype++) { assert(!subchain.sequences.empty()); size_t seq = haplotype % subchain.sequences.size(); - haplotypes[haplotype].take(subchain.sequences[seq].first, *this, builder); + haplotypes[haplotype].take(subchain.sequences[seq].first, *this, builder, metadata); } statistics.full_haplotypes = 1; } else { @@ -1486,14 +1487,14 @@ Recombinator::Statistics Recombinator::generate_haplotypes(const Haplotypes::Top size_t selected = haplotype_to_selected[haplotype]; size_t seq_offset = selected_haplotypes[selected].first; statistics.score += selected_haplotypes[selected].second; - haplotypes[haplotype].extend(subchain.sequences[seq_offset], subchain, *this, builder); + haplotypes[haplotype].extend(subchain.sequences[seq_offset], subchain, *this, builder, metadata); } have_haplotypes = subchain.has_end(); statistics.subchains++; } if (have_haplotypes) { for (size_t haplotype = 0; haplotype < haplotypes.size(); haplotype++) { - haplotypes[haplotype].finish(*this, builder); + haplotypes[haplotype].finish(*this, builder, metadata); } } statistics.fragments = haplotypes.front().fragment; diff --git a/src/recombinator.hpp b/src/recombinator.hpp index 002e61f7460..0801e4421e1 100644 --- a/src/recombinator.hpp +++ b/src/recombinator.hpp @@ -521,7 +521,7 @@ class Recombinator { // Generate haplotypes for the given chain. Statistics generate_haplotypes(const Haplotypes::TopLevelChain& chain, const hash_map& kmer_counts, - gbwt::GBWTBuilder& builder, + gbwt::GBWTBuilder& builder, gbwtgraph::MetadataBuilder& metadata, const Parameters& parameters, double coverage) const; }; From 7ec174946a52ba79838ae349c48ebddbef995ca1 Mon Sep 17 00:00:00 2001 From: Jouni Siren Date: Tue, 6 Feb 2024 17:37:12 -0800 Subject: [PATCH 10/10] Clarity the status of rebuild_gbwt() --- src/gbwt_helper.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gbwt_helper.hpp b/src/gbwt_helper.hpp index 3dec9bcd444..cfb02d301a0 100644 --- a/src/gbwt_helper.hpp +++ b/src/gbwt_helper.hpp @@ -182,7 +182,6 @@ struct RebuildParameters { gbwt::size_type sample_interval = gbwt::DynamicGBWT::SAMPLE_INTERVAL; }; -// TODO: Use the new parallelization scheme. /// Rebuild the GBWT by applying all provided mappings. Each mapping is a pair /// (original subpath, new subpath). If the original subpath is empty, the /// mapping is ignored. If there are multiple applicable mappings, the first one @@ -210,6 +209,9 @@ struct RebuildParameters { /// /// NOTE: Threads may be reordered if there are multiple jobs. Old thread ids are /// no longer valid after rebuilding the GBWT. +/// +/// NOTE: This could use the ConstructionJob / MetadataBuilder scheme for +/// parallelization, but it would change the interface. gbwt::GBWT rebuild_gbwt(const gbwt::GBWT& gbwt_index, const std::vector& jobs, const std::unordered_map& node_to_job,