From 79f474db7daf88f8966c906b6a8d1a69b63de6fd Mon Sep 17 00:00:00 2001 From: Dmitry Antipov Date: Tue, 9 Jul 2024 18:00:40 -0400 Subject: [PATCH] wip --- src/Snakefiles/8-hicPipeline.sm | 16 +-- src/scripts/parse_sam_pairs.py | 5 +- src/scripts/scaffolding/scaffold_graph.py | 146 +++++++++++++++------- src/verkko.sh | 14 ++- 4 files changed, 127 insertions(+), 54 deletions(-) diff --git a/src/Snakefiles/8-hicPipeline.sm b/src/Snakefiles/8-hicPipeline.sm index 6cee91c6..6214da89 100644 --- a/src/Snakefiles/8-hicPipeline.sm +++ b/src/Snakefiles/8-hicPipeline.sm @@ -154,7 +154,7 @@ rule prepare_reference_mashmap: input: unitigs_hpc = '8-hicPipeline/unitigs.hpc.fasta', graph_hpc = '8-hicPipeline/unitigs.hpc.noseq.gfa', - rukki_tsv = '8-hicPipeline/rukki.paths.tsv' + rukki_tsv = '8-hicPipeline/prescaf_rukki.paths.tsv' output: path_mashmap = '8-hicPipeline/paths2ref.mashmap' log: @@ -432,7 +432,7 @@ cat > ./transform_bwa.sh < ../{output.byread_mapping} +{params.SAMTOOLS} view -q 1 ../{input.bwa_mapping} | {PYTHON} {VERKKO}/scripts/parse_sam_pairs.py > ../{output.byread_mapping} EOF chmod +x ./transform_bwa.sh @@ -551,12 +551,12 @@ rule HiC_rdnascaff: log: err = '8-hicPipeline/hic_scaff.err' threads: - int(config['fhc_n_cpus']) + int(config['shc_n_cpus']) resources: job_id = 1, - n_cpus = int(config['fhc_n_cpus']), - mem_gb = lambda wildcards, input, attempt: getMemoryRequest(attempt, 'fhc'), - time_h = lambda wildcards, input, attempt: getTimeRequest(attempt, 'fhc') + n_cpus = int(config['shc_n_cpus']), + mem_gb = lambda wildcards, input, attempt: getMemoryRequest(attempt, 'shc'), + time_h = lambda wildcards, input, attempt: getTimeRequest(attempt, 'shc') shell: ''' cd 8-hicPipeline @@ -567,8 +567,8 @@ set -e {PYTHON} {VERKKO}/scripts/rdna_scaff.py . -cp ../{output.scaff_tsv_path} ../{input.final_tsv_path} -cp ../{output.scaff_gaf_path} ../{input.final_gaf_path} +cp ../{output.scaff_tsv_path} ../{output.final_tsv_path} +cp ../{output.scaff_gaf_path} ../{output.final_gaf_path} cp ../{input.tsv_path} ../{rules.runRukkiHIC.output.final_paths_tsv} EOF diff --git a/src/scripts/parse_sam_pairs.py b/src/scripts/parse_sam_pairs.py index c7100626..77b4eb2e 100755 --- a/src/scripts/parse_sam_pairs.py +++ b/src/scripts/parse_sam_pairs.py @@ -36,7 +36,10 @@ def print_results(names): for line in input_stream: line=line.split() - if len(line) < 4: + if len(line) < 5: + continue + #TODO: remove later + if line[4] == "0": continue if name == "": name = line[0] diff --git a/src/scripts/scaffolding/scaffold_graph.py b/src/scripts/scaffolding/scaffold_graph.py index bf165ee0..75467ff0 100755 --- a/src/scripts/scaffolding/scaffold_graph.py +++ b/src/scripts/scaffolding/scaffold_graph.py @@ -100,10 +100,11 @@ def __init__(self, rukki_paths, telomere_locations_file, hic_alignment_file, mat self.logger = logger_wrap.UpdatedAdapter(logger, self.__class__.__name__) self.rukki_paths = rukki_paths self.uncompressed_lens = gf.get_lengths(uncompressed_fasta) - - all_connections, unique_connections = self.get_connections_bam(hic_alignment_file, True) - self.multiplicities = rukki_paths.getEdgeMultiplicities() + interesting_nodes = self.getInterestingNodes(self.uncompressed_lens) + self.logger.info(f"Total nodes {len(G.nodes)} interesting nodes {len(interesting_nodes)}") + all_connections, unique_connections = self.get_connections_bam(hic_alignment_file, interesting_nodes, True) + #upd_G - graph with telomeric nodes self.tel_nodes, self.upd_G = gf.get_telomeric_nodes(telomere_locations_file, G) self.logger.debug("Telomeric nodes") @@ -493,11 +494,12 @@ def generateScaffolds(self): #debugging part for or_path_id in scf: nor_path_id = or_path_id.strip('-+') - if nor_path_id in self.haploids and len(scf) > 1: + if nor_path_id in self.haploids and len(scf) > 12: self.logger.info (f"scaffold part {nor_path_id} is haploid") for i in range (0, len(scf) - 1): - if (scf[i].strip('-+'), scf[i+1].strip('-+')) in self.dangerous_swaps: - self.logger.warning (f"consecutive pair, {scf[i]} {scf[i+1]} did signficant changes on hi-c counts based on {self.dangerous_swaps[(scf[i].strip('-+'), scf[i+1].strip('-+'))]}") + swap_info = (scf[i].strip('-+'), scf[i+1].strip('-+'), scf[i][-1] + scf[i+1][-1]) + if (swap_info) in self.dangerous_swaps: + self.logger.warning (f"consecutive pair, {swap_info} did signficant changes on hi-c counts based on {self.dangerous_swaps[swap_info]}") path_str = "\t".join([largest_id, ",".join(scf_path), largest_label]) final_paths.addPath(path_str) telo_end = False @@ -563,7 +565,7 @@ def get_connections(self, alignment_file, use_multimappers:bool): res[(node_s, node_f)].append([int(second_coords[j]), int(first_coords[i]), weight]) return res - def get_connections_bam(self, bam_filename, use_multimappers:bool): + def get_connections_bam(self, bam_filename, interesting_names, use_multimappers:bool): res = {} unique_storage = {} #A01660:39:HNYM7DSX3:1:1101:1696:29982 utig4-73 utig4-1056 1 16949880 78591191 @@ -573,9 +575,8 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool): all_pairs = 0 created_pair = 0 - created_dist = 0 - increased_dist = 0 - + total_inserted = 0 + total_compressed = 0 bamfile = pysam.AlignmentFile(bam_filename, "rb") cur_name = "" reads = [] @@ -584,11 +585,30 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool): for cur_read in bamfile: if (total_reads % 10000000 == 0): - + if (total_reads % 100000000 == 0): + before_compressed = total_compressed + self.logger.debug("Starting map compression...") + for names_pair in res: + if len(res[names_pair]) > 1000: + names_sorted = sorted(res[names_pair]) + compressed = [] + slen = len(names_sorted) + ii = 0 + while ii < slen: + jj = ii + 1 + cur_w = names_sorted[ii][2] + while jj < slen and names_sorted[jj][0] == names_sorted[ii][0] and names_sorted[jj][1] == names_sorted[ii][1]: + cur_w += names_sorted[jj][2] + jj += 1 + total_compressed += 1 + compressed.append((names_sorted[ii][0], names_sorted[ii][1], cur_w)) + ii = jj + res[names_pair] = compressed + self.logger.debug(f"On current iteration compressed {total_compressed - before_compressed}") self.logger.debug (f"Processed {total_reads} alignment strings") self.logger.debug (f"Of them unique vaild unique pairs {unique_pairs}, total pairs {all_pairs} total valid {valid_pairs} ") self.logger.debug (f"Current memory usage {(psutil.virtual_memory().used / 1024 / 1024 / 1024):.2f} GB") - self.logger.debug (f"Created new pairs {created_pair} new mappings {created_dist} increased without creation {increased_dist - created_dist}") + self.logger.debug (f"Created new pairs {created_pair} total inserted pairs = {total_inserted} total compressed = {total_compressed}") #gc.collect() #self.logger.debug (f"Current memory usage after GC {psutil.virtual_memory().used / 1024 / 1024 / 1024}GB") #self.logger.debug (f"Mem usage of main map {asizeof.asizeof(res) / 1024 / 1024 / 1024}GB") @@ -613,6 +633,7 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool): names = [[prev_read.reference_name], [cur_read.reference_name]] coords = [[prev_read.reference_start], [cur_read.reference_start]] + quals = [prev_read.mapping_quality, cur_read.mapping_quality] # names = read.reference_name valid = True if use_multimappers: @@ -636,45 +657,50 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool): elif read.mapping_quality == 0: valid = False i += 1 - - lname0 = len(names[0]) - lname1 = len(names[1]) - if valid: + weight = self.INT_NORMALIZATION // (len(names[0]) * len(names[1])) + + filtered_names = [[], []] + filtered_coords = [[], []] + for i in range(0, 2): + lname = len(names[i]) + for j in range (0, lname): + #utig4-XXXX; ints are much faster than strings + if int(names[i][j][6:]) in interesting_names: + filtered_names[i].append(names[i][j]) + filtered_coords[i].append(coords[i][j]) + + lname0 = len(filtered_names[0]) + lname1 = len(filtered_names[1]) + #self.logger.debug (f" {valid} {lname0} {lname1}") + if valid and lname0 > 0 and lname1 > 0: valid_pairs += 1 - weight = self.INT_NORMALIZATION // (lname0 * lname1) for i in range (0, lname0): - node_f_len = self.uncompressed_lens[names[0][i]] - #memory opt - if node_f_len < ScaffoldGraph.SHORT_INGORED_NODE or (coords[0][i] > ScaffoldGraph.NEAR_PATH_END and node_f_len - coords[0][i] > ScaffoldGraph.NEAR_PATH_END): + node_f_len = self.uncompressed_lens[filtered_names[0][i]] + #TODO: remove check, this memory opt is already in hic_prefilter + if node_f_len < ScaffoldGraph.SHORT_INGORED_NODE or (filtered_coords[0][i] > ScaffoldGraph.NEAR_PATH_END and node_f_len - filtered_coords[0][i] > ScaffoldGraph.NEAR_PATH_END): continue for j in range (0, lname1): - node_s_len = self.uncompressed_lens[names[1][j]] - if node_s_len < ScaffoldGraph.SHORT_INGORED_NODE or (coords[1][j] > ScaffoldGraph.NEAR_PATH_END and node_s_len - coords[1][j] > ScaffoldGraph.NEAR_PATH_END): + node_s_len = self.uncompressed_lens[filtered_names[1][j]] + if node_s_len < ScaffoldGraph.SHORT_INGORED_NODE or (filtered_coords[1][j] > ScaffoldGraph.NEAR_PATH_END and node_s_len - filtered_coords[1][j] > ScaffoldGraph.NEAR_PATH_END): continue - if (names[0][i] < names[1][j]): - names_pair = (names[0][i], names[1][j]) - pre_coords_pair = (coords[0][i], coords[1][j]) + if (filtered_names[0][i] < filtered_names[1][j]): + names_pair = (filtered_names[0][i], filtered_names[1][j]) + pre_coords_pair = (filtered_coords[0][i], filtered_coords[1][j]) else : - names_pair = (names[1][j], names[0][i]) - pre_coords_pair = (coords[1][j], coords[0][i]) + names_pair = (filtered_names[1][j], filtered_names[0][i]) + pre_coords_pair = (filtered_coords[1][j], filtered_coords[0][i]) #To round to closest integer and not always down coords_pair = tuple(((c + self.APPROXIMATE_COORDS_HALF) // self.APPROXIMATE_COORDS) * self.APPROXIMATE_COORDS for c in pre_coords_pair) if not names_pair in res: - res[names_pair] = {} + res[names_pair] = [] created_pair += 1 - if not (coords_pair in res[names_pair]): - created_dist += 1 - res[names_pair][coords_pair] = 0 - res[names_pair][coords_pair] += weight - increased_dist += 1 - if weight == self.INT_NORMALIZATION: - if not names_pair in unique_storage: - unique_storage[names_pair] = {} - if not coords_pair in unique_storage[names_pair]: - unique_storage[names_pair][coords_pair] = 0 - unique_storage[names_pair][coords_pair] += self.INT_NORMALIZATION + res[names_pair].append((coords_pair[0], coords_pair[1], weight)) + total_inserted += 1 - + if quals[0] > 0 and quals[1] > 0: + if not names_pair in unique_storage: + unique_storage[names_pair] = [] + unique_storage[names_pair].append((coords_pair[0], coords_pair[1], weight)) #TODO: possibly require node1 < node2? #next = (coords[0][i], coords[1][j], weight) #res[(names[0][i], names[1][j])].append(next) @@ -721,7 +747,7 @@ def fixOrientation(self, path_ids, scores): incorrect_pair = "".join(incorrect_pair) #just logging if scores[incorrect_pair] >= ScaffoldGraph.MIN_LINKS* self.INT_NORMALIZATION and scores[incorrect_pair] > scores[correct_pair]: - self.dangerous_swaps[(path_ids[0], path_ids[1])] = "telomeres" + self.dangerous_swaps[(path_ids[0], path_ids[1], correct_pair)] = "telomeres" self.logger.debug(f"Dangerous telomeric tuning pair {path_ids}, i {i} scores {scores}, tels {self.scaffold_graph.nodes[path_ids[i]+'+']['telomere']} from {incorrect_pair} to {correct_pair}") self.logger.debug (f"moving {incorrect_pair} to {correct_pair}") if self.rukki_paths.getLength(path_ids[i]) <= self.NEAR_PATH_END: @@ -766,7 +792,7 @@ def fixOrientation(self, path_ids, scores): incorrect_pair = "".join(incorrect_pair) #just logging if scores[incorrect_pair] >= ScaffoldGraph.MIN_LINKS * self.INT_NORMALIZATION and scores[incorrect_pair] > scores[correct_pair]: - self.dangerous_swaps[(path_ids[0], path_ids[1])] = "connectivity" + self.dangerous_swaps[(path_ids[0], path_ids[1], correct_pair)] = "connectivity" self.logger.debug(f"Dangerous connectivity tuning pair {path_ids}, i {i} scores {scores}from {incorrect_pair} to {correct_pair}") if self.rukki_paths.getLength(path_ids[i]) <= self.NEAR_PATH_END: scores[correct_pair] += scores[incorrect_pair] @@ -802,13 +828,20 @@ def getNodePairConnections(self, pair, orientations, connections, shift_before, #stored only half #TODO: not optimal + if rc_pair in connections: + for conn in connections[rc_pair]: + cons.append((conn[1], conn[0], conn[2])) + if pair in connections: + for conn in connections[pair]: + cons.append(conn) + ''' if rc_pair in connections: for coords, w in connections[rc_pair].items(): cons.append((coords[1], coords[0], w)) if pair in connections: for coords, w in connections[pair].items(): cons.append((coords[0], coords[1], w)) - + ''' for conn in cons: in_homo = False for i in range (0, len(intervals[0])): @@ -857,7 +890,32 @@ def getNodePairConnections(self, pair, orientations, connections, shift_before, #self.logger.debug (f"Ignored intervals {intervals}") return scores - + def getInterestingNodes(self, lens): + interesting = set() + for path_id in self.rukki_paths.getPathIds(): + #TODO possibly reconsider this condition; /2 because of compressed/uncompressed + if self.rukki_paths.getLength(path_id) < ScaffoldGraph.MIN_PATH_TO_SCAFFOLD /2 : + continue + total_len = 0 + for or_node in self.rukki_paths.getPathById(path_id): + nor_node = or_node.strip('-+') + if nor_node in lens and lens[nor_node] > ScaffoldGraph.SHORT_INGORED_NODE and or_node in self.multiplicities and self.multiplicities[or_node] == 1: + total_len += lens[nor_node] + before = 0 + after = total_len + for or_node in self.rukki_paths.getPathById(path_id): + nor_node = or_node.strip('-+') + if nor_node in lens and lens[nor_node] > ScaffoldGraph.SHORT_INGORED_NODE and or_node in self.multiplicities and self.multiplicities[or_node] == 1: + after -= lens[nor_node] + if before < ScaffoldGraph.NEAR_PATH_END or after < ScaffoldGraph.NEAR_PATH_END: + #utig4-, optimizing for speed +# print (f"Interesting node {nor_node}") +# print (f"Interesting node {int(nor_node[5:])}") +# exit() + interesting.add(int(nor_node[6:])) + before += lens[nor_node] + return frozenset(interesting) + #return scores for each of the orientations, ++, -+, +-, --, def getPathPairConnections(self, path_ids, connections, lens): #from start to occurance of node exclusive, do not care about multiplicity > 1 (possibly should filter) diff --git a/src/verkko.sh b/src/verkko.sh index bdb59e56..03993b34 100755 --- a/src/verkko.sh +++ b/src/verkko.sh @@ -284,9 +284,13 @@ ahc_time_h=48 # fast things in hic pipeline fhc_n_cpus=8 -fhc_mem_gb=160 +fhc_mem_gb=16 fhc_time_h=24 +# hic scaffolding pipeline +shc_n_cpus=8 +shc_mem_gb=128 +shc_time_h=24 # @@ -534,6 +538,8 @@ while [ $# -gt 0 ] ; do elif [ "$opt" = "--cns-run" ] ; then cns_n_cpus=$1; cns_mem_gb=$2; cns_time_h=$3; shift; shift; shift; elif [ "$opt" = "--ahc-run" ] ; then ahc_n_cpus=$1; ahc_mem_gb=$2; ahc_time_h=$3; shift; shift; shift; elif [ "$opt" = "--fhc-run" ] ; then fhc_n_cpus=$1; fhc_mem_gb=$2; fhc_time_h=$3; shift; shift; shift; + elif [ "$opt" = "--shc-run" ] ; then shc_n_cpus=$1; shc_mem_gb=$2; shc_time_h=$3; shift; shift; shift; + # # unknown options @@ -921,6 +927,7 @@ if [ "x$help" = "xhelp" -o "x$errors" != "x" ] ; then echo " --cns-run" echo " --ahc-run" echo " --fhc-run" + echo " --shc-run" echo "" echo "ADVANCED MODULE PARAMETERS (expert users):" echo "HiFi read correction:" @@ -1185,6 +1192,11 @@ echo >> ${outd}/verkko.yml "# fast hic stuff" echo >> ${outd}/verkko.yml "fhc_n_cpus: '${fhc_n_cpus}'" echo >> ${outd}/verkko.yml "fhc_mem_gb: '${fhc_mem_gb}'" echo >> ${outd}/verkko.yml "fhc_time_h: '${fhc_time_h}'" +echo >> ${outd}/verkko.yml "" +echo >> ${outd}/verkko.yml "#hi-c scaffolding itself" +echo >> ${outd}/verkko.yml "shc_n_cpus: '${shc_n_cpus}'" +echo >> ${outd}/verkko.yml "shc_mem_gb: '${shc_mem_gb}'" +echo >> ${outd}/verkko.yml "shc_time_h: '${shc_time_h}' echo >> ${outd}/verkko.yml "# This is the end."