Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified parallel GBWT construction #4221

Merged
merged 11 commits into from
Feb 9, 2024
32 changes: 32 additions & 0 deletions src/gbwt_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,38 @@ std::string compose_short_path_name(const gbwt::GBWT& gbwt_index, gbwt::size_typ

//------------------------------------------------------------------------------

void copy_reference_samples(const gbwt::GBWT& source, gbwt::GBWT& destination) {
if (source.tags.contains(gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG)) {
// Reference samples tag is not copied automatically.
destination.tags.set(
gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG,
source.tags.get(gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG)
);
}
}

void copy_reference_samples(const PathHandleGraph& source, gbwt::GBWT& destination) {
std::vector<std::string> reference_samples;
source.for_each_path_of_sense(PathSense::REFERENCE, [&](const path_handle_t& path) {
std::string sample_name = source.get_sample_name(path);
if (sample_name != PathMetadata::NO_SAMPLE_NAME) {
reference_samples.push_back(sample_name);
}
});

if (!reference_samples.empty()) {
std::string reference_samples_tag = reference_samples.front();
for (size_t i = 1; i < reference_samples.size(); i++) {
reference_samples_tag += gbwtgraph::REFERENCE_SAMPLE_LIST_SEPARATOR + reference_samples[i];
}
destination.tags.set(
gbwtgraph::REFERENCE_SAMPLE_LIST_GBWT_TAG, reference_samples_tag
);
}
}

//------------------------------------------------------------------------------

gbwt::GBWT get_gbwt(const std::vector<gbwt::vector_type>& paths) {
gbwt::size_type node_width = 1, total_length = 0;
for (auto& path : paths) {
Expand Down
13 changes: 13 additions & 0 deletions src/gbwt_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ struct RebuildParameters {
///
/// NOTE: Threads may be reordered if there are multiple jobs. Old thread ids are
/// no longer valid after rebuilding the GBWT.
///
/// NOTE: This could use the ConstructionJob / MetadataBuilder scheme for
/// parallelization, but it would change the interface.
gbwt::GBWT rebuild_gbwt(const gbwt::GBWT& gbwt_index,
const std::vector<RebuildJob>& jobs,
const std::unordered_map<nid_t, size_t>& node_to_job,
Expand Down Expand Up @@ -241,6 +244,16 @@ std::string compose_short_path_name(const gbwt::GBWT& gbwt_index, gbwt::size_typ

//------------------------------------------------------------------------------

/// Copies the reference sample tag from the source GBWT index to the destination GBWT index.
void copy_reference_samples(const gbwt::GBWT& source, gbwt::GBWT& destination);

/// Copies reference samples from the source graph to the destination GBWT index.
/// Every sample with at least one reference path in the source graph is considered
/// a reference sample.
void copy_reference_samples(const PathHandleGraph& source, gbwt::GBWT& destination);

//------------------------------------------------------------------------------

/// Transform the paths into a GBWT index. Primarily for testing.
gbwt::GBWT get_gbwt(const std::vector<gbwt::vector_type>& paths);

Expand Down
123 changes: 98 additions & 25 deletions src/index_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,42 @@ vector<string> vcf_contigs(const string& filename) {
return return_val;
}

// Returns a guess for an appropriate number of parallel GBWT construction jobs.
// This is intended for GBWT recipes where the construction uses an external
// mechanism for parallelism.
size_t guess_parallel_gbwt_jobs(size_t node_count, size_t haplotype_count, size_t available_memory, size_t batch_size) {

// Memory usage of the GBWT construction itself.
size_t bytes_per_node = 135 * std::max(std::log10(haplotype_count + 1), 1.0);
// Construction buffers typically use 3-4 bytes per node, and the builder has two buffers.
size_t bytes_per_job = batch_size * 8;

size_t jobs = 1;
size_t max_jobs = get_thread_count();
// We assume that the largest chromosome is 10% of the genome.
size_t job_size = std::max(node_count / 10, size_t(1));
size_t memory_usage = bytes_per_node * job_size + bytes_per_job;

while (jobs < max_jobs) {
// We assume that the next chromosome is 5% smaller than the previous one.
job_size = std::max(size_t(job_size * 0.95), size_t(1));
memory_usage += bytes_per_node * job_size + bytes_per_job;
if (memory_usage > available_memory) {
break;
}
jobs++;
}

return jobs;
}

// Returns the in-memory size of an XG index in bytes, using the same approach as
// sdsl::size_in_bytes().
size_t xg_index_size(const xg::XG& index) {
sdsl::nullstream ns;
return index.serialize_and_measure(ns);
}

/*********************
* Indexing helper functions
***********************/
Expand Down Expand Up @@ -2691,30 +2727,44 @@ IndexRegistry VGIndexes::get_vg_index_registry() {
std::function<bool(const path_handle_t&)> path_filter = [&xg_index](const path_handle_t& path) {
return !Paths::is_alt(xg_index->get_path_name(path));
};

cover = std::move(gbwtgraph::local_haplotypes(*xg_index, *gbwt_index,
IndexingParameters::giraffe_gbwt_downsample,
IndexingParameters::downsample_context_length,
IndexingParameters::gbwt_insert_batch_size,
IndexingParameters::gbwt_sampling_interval,
true, // Also include named paths from the graph
&path_filter,
IndexingParameters::verbosity >= IndexingParameters::Debug));

gbwtgraph::PathCoverParameters parameters;
parameters.num_paths = IndexingParameters::giraffe_gbwt_downsample;
parameters.context = IndexingParameters::downsample_context_length;
// TODO: See path cover for handling graphs with large components.
parameters.batch_size = IndexingParameters::gbwt_insert_batch_size;
parameters.sample_interval = IndexingParameters::gbwt_sampling_interval;
std::int64_t available_memory = plan->target_memory_usage() - xg_index_size(*xg_index) - sdsl::size_in_bytes(*gbwt_index);
parameters.parallel_jobs = guess_parallel_gbwt_jobs(
xg_index->get_node_count(),
parameters.num_paths,
std::max(available_memory, std::int64_t(0)),
parameters.batch_size
);
parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug);
if (IndexingParameters::verbosity >= IndexingParameters::Debug) {
std::cerr << "[IndexRegistry]: Running " << parameters.parallel_jobs << " jobs in parallel" << std::endl;
}
cover = std::move(gbwtgraph::local_haplotypes(*xg_index, *gbwt_index, parameters, true, &path_filter));
// Reference samples tag is not copied automatically.
copy_reference_samples(*gbwt_index, cover);
}
else {
// Augment the GBWT with a path cover of components without haplotypes.
if (IndexingParameters::verbosity != IndexingParameters::None) {
cerr << "[IndexRegistry]: Not enough haplotypes; augmenting the full GBWT instead." << endl;
cerr << "[IndexRegistry]: Not too many haplotypes; augmenting the full GBWT instead." << endl;
}

gbwt::DynamicGBWT dynamic_index(*gbwt_index);
gbwt_index.reset();
gbwtgraph::augment_gbwt(*xg_index, dynamic_index,
IndexingParameters::path_cover_depth,
IndexingParameters::downsample_context_length,
IndexingParameters::gbwt_insert_batch_size,
IndexingParameters::gbwt_sampling_interval,
IndexingParameters::verbosity >= IndexingParameters::Debug);
// Note that augmenting a GBWT is always single-threaded.
gbwtgraph::PathCoverParameters parameters;
parameters.num_paths = IndexingParameters::path_cover_depth;
parameters.context = IndexingParameters::downsample_context_length;
parameters.batch_size = IndexingParameters::gbwt_insert_batch_size;
parameters.sample_interval = IndexingParameters::gbwt_sampling_interval;
parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug);
gbwtgraph::augment_gbwt(*xg_index, dynamic_index, parameters);
cover = std::move(gbwt::GBWT(dynamic_index));
}

Expand Down Expand Up @@ -2764,14 +2814,25 @@ IndexRegistry VGIndexes::get_vg_index_registry() {
};

// make a GBWT from a greedy path cover
gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*xg_index,
IndexingParameters::path_cover_depth,
IndexingParameters::downsample_context_length,
std::max<gbwt::size_type>(IndexingParameters::gbwt_insert_batch_size, 20 * max_comp_size), // buffer size recommendation from Jouni
IndexingParameters::gbwt_sampling_interval,
true, // Also include named paths from the graph
&path_filter,
IndexingParameters::verbosity >= IndexingParameters::Debug);
gbwtgraph::PathCoverParameters parameters;
parameters.num_paths = IndexingParameters::path_cover_depth;
parameters.context = IndexingParameters::downsample_context_length;
parameters.batch_size = std::max<gbwt::size_type>(IndexingParameters::gbwt_insert_batch_size, 20 * max_comp_size);
parameters.sample_interval = IndexingParameters::gbwt_sampling_interval;
std::int64_t available_memory = plan->target_memory_usage() - xg_index_size(*xg_index);
parameters.parallel_jobs = guess_parallel_gbwt_jobs(
xg_index->get_node_count(),
parameters.num_paths,
std::max(available_memory, std::int64_t(0)),
parameters.batch_size
);
parameters.show_progress = (IndexingParameters::verbosity >= IndexingParameters::Debug);
if (IndexingParameters::verbosity >= IndexingParameters::Debug) {
std::cerr << "[IndexRegistry]: Running " << parameters.parallel_jobs << " jobs in parallel" << std::endl;
}
gbwt::GBWT cover = gbwtgraph::path_cover_gbwt(*xg_index, parameters, true, &path_filter);
// Determine reference samples from reference paths.
copy_reference_samples(*xg_index, cover);

save_gbwt(cover, output_name, IndexingParameters::verbosity == IndexingParameters::Debug);

Expand Down Expand Up @@ -3853,8 +3914,20 @@ IndexRegistry VGIndexes::get_vg_index_registry() {
params.batch_size = IndexingParameters::gbwt_insert_batch_size;
params.sample_interval = IndexingParameters::gbwt_sampling_interval;
params.max_node_length = IndexingParameters::max_node_size;
// TODO: Here we assume that the GFA file contains 100 haplotypes. If there are more,
// we could safely launch more jobs. Using 600 as the divisor would be a better
// estimate of the number of segments, but we chop long segments to 32 bp nodes.
params.parallel_jobs = guess_parallel_gbwt_jobs(
std::max(get_file_size(gfa_filename) / 300, std::int64_t(1)),
100,
plan->target_memory_usage(),
params.batch_size
);
params.show_progress = IndexingParameters::verbosity == IndexingParameters::Debug;

if (IndexingParameters::verbosity >= IndexingParameters::Debug) {
std::cerr << "[IndexRegistry]: Running " << params.parallel_jobs << " jobs in parallel" << std::endl;
}

// jointly generate the GBWT and record sequences
unique_ptr<gbwt::GBWT> gbwt_index;
unique_ptr<gbwtgraph::SequenceSource> seq_source;
Expand Down
Loading
Loading