Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry-Antipov committed Jul 9, 2024
1 parent 1f10fdd commit 79f474d
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 54 deletions.
16 changes: 8 additions & 8 deletions src/Snakefiles/8-hicPipeline.sm
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ rule prepare_reference_mashmap:
input:
unitigs_hpc = '8-hicPipeline/unitigs.hpc.fasta',
graph_hpc = '8-hicPipeline/unitigs.hpc.noseq.gfa',
rukki_tsv = '8-hicPipeline/rukki.paths.tsv'
rukki_tsv = '8-hicPipeline/prescaf_rukki.paths.tsv'
output:
path_mashmap = '8-hicPipeline/paths2ref.mashmap'
log:
Expand Down Expand Up @@ -432,7 +432,7 @@ cat > ./transform_bwa.sh <<EOF
#!/bin/sh
set -e

{params.SAMTOOLS} view ../{input.bwa_mapping} | {PYTHON} {VERKKO}/scripts/parse_sam_pairs.py > ../{output.byread_mapping}
{params.SAMTOOLS} view -q 1 ../{input.bwa_mapping} | {PYTHON} {VERKKO}/scripts/parse_sam_pairs.py > ../{output.byread_mapping}
EOF

chmod +x ./transform_bwa.sh
Expand Down Expand Up @@ -551,12 +551,12 @@ rule HiC_rdnascaff:
log:
err = '8-hicPipeline/hic_scaff.err'
threads:
int(config['fhc_n_cpus'])
int(config['shc_n_cpus'])
resources:
job_id = 1,
n_cpus = int(config['fhc_n_cpus']),
mem_gb = lambda wildcards, input, attempt: getMemoryRequest(attempt, 'fhc'),
time_h = lambda wildcards, input, attempt: getTimeRequest(attempt, 'fhc')
n_cpus = int(config['shc_n_cpus']),
mem_gb = lambda wildcards, input, attempt: getMemoryRequest(attempt, 'shc'),
time_h = lambda wildcards, input, attempt: getTimeRequest(attempt, 'shc')
shell:
'''
cd 8-hicPipeline
Expand All @@ -567,8 +567,8 @@ set -e
{PYTHON} {VERKKO}/scripts/rdna_scaff.py .


cp ../{output.scaff_tsv_path} ../{input.final_tsv_path}
cp ../{output.scaff_gaf_path} ../{input.final_gaf_path}
cp ../{output.scaff_tsv_path} ../{output.final_tsv_path}
cp ../{output.scaff_gaf_path} ../{output.final_gaf_path}

cp ../{input.tsv_path} ../{rules.runRukkiHIC.output.final_paths_tsv}
EOF
Expand Down
5 changes: 4 additions & 1 deletion src/scripts/parse_sam_pairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ def print_results(names):

for line in input_stream:
line=line.split()
if len(line) < 4:
if len(line) < 5:
continue
#TODO: remove later
if line[4] == "0":
continue
if name == "":
name = line[0]
Expand Down
146 changes: 102 additions & 44 deletions src/scripts/scaffolding/scaffold_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ def __init__(self, rukki_paths, telomere_locations_file, hic_alignment_file, mat
self.logger = logger_wrap.UpdatedAdapter(logger, self.__class__.__name__)
self.rukki_paths = rukki_paths
self.uncompressed_lens = gf.get_lengths(uncompressed_fasta)

all_connections, unique_connections = self.get_connections_bam(hic_alignment_file, True)

self.multiplicities = rukki_paths.getEdgeMultiplicities()
interesting_nodes = self.getInterestingNodes(self.uncompressed_lens)
self.logger.info(f"Total nodes {len(G.nodes)} interesting nodes {len(interesting_nodes)}")
all_connections, unique_connections = self.get_connections_bam(hic_alignment_file, interesting_nodes, True)

#upd_G - graph with telomeric nodes
self.tel_nodes, self.upd_G = gf.get_telomeric_nodes(telomere_locations_file, G)
self.logger.debug("Telomeric nodes")
Expand Down Expand Up @@ -493,11 +494,12 @@ def generateScaffolds(self):
#debugging part
for or_path_id in scf:
nor_path_id = or_path_id.strip('-+')
if nor_path_id in self.haploids and len(scf) > 1:
if nor_path_id in self.haploids and len(scf) > 12:
self.logger.info (f"scaffold part {nor_path_id} is haploid")
for i in range (0, len(scf) - 1):
if (scf[i].strip('-+'), scf[i+1].strip('-+')) in self.dangerous_swaps:
self.logger.warning (f"consecutive pair, {scf[i]} {scf[i+1]} did signficant changes on hi-c counts based on {self.dangerous_swaps[(scf[i].strip('-+'), scf[i+1].strip('-+'))]}")
swap_info = (scf[i].strip('-+'), scf[i+1].strip('-+'), scf[i][-1] + scf[i+1][-1])
if (swap_info) in self.dangerous_swaps:
self.logger.warning (f"consecutive pair, {swap_info} did signficant changes on hi-c counts based on {self.dangerous_swaps[swap_info]}")
path_str = "\t".join([largest_id, ",".join(scf_path), largest_label])
final_paths.addPath(path_str)
telo_end = False
Expand Down Expand Up @@ -563,7 +565,7 @@ def get_connections(self, alignment_file, use_multimappers:bool):
res[(node_s, node_f)].append([int(second_coords[j]), int(first_coords[i]), weight])
return res

def get_connections_bam(self, bam_filename, use_multimappers:bool):
def get_connections_bam(self, bam_filename, interesting_names, use_multimappers:bool):
res = {}
unique_storage = {}
#A01660:39:HNYM7DSX3:1:1101:1696:29982 utig4-73 utig4-1056 1 16949880 78591191
Expand All @@ -573,9 +575,8 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool):
all_pairs = 0

created_pair = 0
created_dist = 0
increased_dist = 0

total_inserted = 0
total_compressed = 0
bamfile = pysam.AlignmentFile(bam_filename, "rb")
cur_name = ""
reads = []
Expand All @@ -584,11 +585,30 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool):

for cur_read in bamfile:
if (total_reads % 10000000 == 0):

if (total_reads % 100000000 == 0):
before_compressed = total_compressed
self.logger.debug("Starting map compression...")
for names_pair in res:
if len(res[names_pair]) > 1000:
names_sorted = sorted(res[names_pair])
compressed = []
slen = len(names_sorted)
ii = 0
while ii < slen:
jj = ii + 1
cur_w = names_sorted[ii][2]
while jj < slen and names_sorted[jj][0] == names_sorted[ii][0] and names_sorted[jj][1] == names_sorted[ii][1]:
cur_w += names_sorted[jj][2]
jj += 1
total_compressed += 1
compressed.append((names_sorted[ii][0], names_sorted[ii][1], cur_w))
ii = jj
res[names_pair] = compressed
self.logger.debug(f"On current iteration compressed {total_compressed - before_compressed}")
self.logger.debug (f"Processed {total_reads} alignment strings")
self.logger.debug (f"Of them unique vaild unique pairs {unique_pairs}, total pairs {all_pairs} total valid {valid_pairs} ")
self.logger.debug (f"Current memory usage {(psutil.virtual_memory().used / 1024 / 1024 / 1024):.2f} GB")
self.logger.debug (f"Created new pairs {created_pair} new mappings {created_dist} increased without creation {increased_dist - created_dist}")
self.logger.debug (f"Created new pairs {created_pair} total inserted pairs = {total_inserted} total compressed = {total_compressed}")
#gc.collect()
#self.logger.debug (f"Current memory usage after GC {psutil.virtual_memory().used / 1024 / 1024 / 1024}GB")
#self.logger.debug (f"Mem usage of main map {asizeof.asizeof(res) / 1024 / 1024 / 1024}GB")
Expand All @@ -613,6 +633,7 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool):

names = [[prev_read.reference_name], [cur_read.reference_name]]
coords = [[prev_read.reference_start], [cur_read.reference_start]]
quals = [prev_read.mapping_quality, cur_read.mapping_quality]
# names = read.reference_name
valid = True
if use_multimappers:
Expand All @@ -636,45 +657,50 @@ def get_connections_bam(self, bam_filename, use_multimappers:bool):
elif read.mapping_quality == 0:
valid = False
i += 1

lname0 = len(names[0])
lname1 = len(names[1])
if valid:
weight = self.INT_NORMALIZATION // (len(names[0]) * len(names[1]))

filtered_names = [[], []]
filtered_coords = [[], []]
for i in range(0, 2):
lname = len(names[i])
for j in range (0, lname):
#utig4-XXXX; ints are much faster than strings
if int(names[i][j][6:]) in interesting_names:
filtered_names[i].append(names[i][j])
filtered_coords[i].append(coords[i][j])

lname0 = len(filtered_names[0])
lname1 = len(filtered_names[1])
#self.logger.debug (f" {valid} {lname0} {lname1}")
if valid and lname0 > 0 and lname1 > 0:
valid_pairs += 1
weight = self.INT_NORMALIZATION // (lname0 * lname1)
for i in range (0, lname0):
node_f_len = self.uncompressed_lens[names[0][i]]
#memory opt
if node_f_len < ScaffoldGraph.SHORT_INGORED_NODE or (coords[0][i] > ScaffoldGraph.NEAR_PATH_END and node_f_len - coords[0][i] > ScaffoldGraph.NEAR_PATH_END):
node_f_len = self.uncompressed_lens[filtered_names[0][i]]
#TODO: remove check, this memory opt is already in hic_prefilter
if node_f_len < ScaffoldGraph.SHORT_INGORED_NODE or (filtered_coords[0][i] > ScaffoldGraph.NEAR_PATH_END and node_f_len - filtered_coords[0][i] > ScaffoldGraph.NEAR_PATH_END):
continue
for j in range (0, lname1):
node_s_len = self.uncompressed_lens[names[1][j]]
if node_s_len < ScaffoldGraph.SHORT_INGORED_NODE or (coords[1][j] > ScaffoldGraph.NEAR_PATH_END and node_s_len - coords[1][j] > ScaffoldGraph.NEAR_PATH_END):
node_s_len = self.uncompressed_lens[filtered_names[1][j]]
if node_s_len < ScaffoldGraph.SHORT_INGORED_NODE or (filtered_coords[1][j] > ScaffoldGraph.NEAR_PATH_END and node_s_len - filtered_coords[1][j] > ScaffoldGraph.NEAR_PATH_END):
continue
if (names[0][i] < names[1][j]):
names_pair = (names[0][i], names[1][j])
pre_coords_pair = (coords[0][i], coords[1][j])
if (filtered_names[0][i] < filtered_names[1][j]):
names_pair = (filtered_names[0][i], filtered_names[1][j])
pre_coords_pair = (filtered_coords[0][i], filtered_coords[1][j])
else :
names_pair = (names[1][j], names[0][i])
pre_coords_pair = (coords[1][j], coords[0][i])
names_pair = (filtered_names[1][j], filtered_names[0][i])
pre_coords_pair = (filtered_coords[1][j], filtered_coords[0][i])
#To round to closest integer and not always down
coords_pair = tuple(((c + self.APPROXIMATE_COORDS_HALF) // self.APPROXIMATE_COORDS) * self.APPROXIMATE_COORDS for c in pre_coords_pair)
if not names_pair in res:
res[names_pair] = {}
res[names_pair] = []
created_pair += 1
if not (coords_pair in res[names_pair]):
created_dist += 1
res[names_pair][coords_pair] = 0
res[names_pair][coords_pair] += weight
increased_dist += 1
if weight == self.INT_NORMALIZATION:
if not names_pair in unique_storage:
unique_storage[names_pair] = {}
if not coords_pair in unique_storage[names_pair]:
unique_storage[names_pair][coords_pair] = 0
unique_storage[names_pair][coords_pair] += self.INT_NORMALIZATION
res[names_pair].append((coords_pair[0], coords_pair[1], weight))
total_inserted += 1


if quals[0] > 0 and quals[1] > 0:
if not names_pair in unique_storage:
unique_storage[names_pair] = []
unique_storage[names_pair].append((coords_pair[0], coords_pair[1], weight))
#TODO: possibly require node1 < node2?
#next = (coords[0][i], coords[1][j], weight)
#res[(names[0][i], names[1][j])].append(next)
Expand Down Expand Up @@ -721,7 +747,7 @@ def fixOrientation(self, path_ids, scores):
incorrect_pair = "".join(incorrect_pair)
#just logging
if scores[incorrect_pair] >= ScaffoldGraph.MIN_LINKS* self.INT_NORMALIZATION and scores[incorrect_pair] > scores[correct_pair]:
self.dangerous_swaps[(path_ids[0], path_ids[1])] = "telomeres"
self.dangerous_swaps[(path_ids[0], path_ids[1], correct_pair)] = "telomeres"
self.logger.debug(f"Dangerous telomeric tuning pair {path_ids}, i {i} scores {scores}, tels {self.scaffold_graph.nodes[path_ids[i]+'+']['telomere']} from {incorrect_pair} to {correct_pair}")
self.logger.debug (f"moving {incorrect_pair} to {correct_pair}")
if self.rukki_paths.getLength(path_ids[i]) <= self.NEAR_PATH_END:
Expand Down Expand Up @@ -766,7 +792,7 @@ def fixOrientation(self, path_ids, scores):
incorrect_pair = "".join(incorrect_pair)
#just logging
if scores[incorrect_pair] >= ScaffoldGraph.MIN_LINKS * self.INT_NORMALIZATION and scores[incorrect_pair] > scores[correct_pair]:
self.dangerous_swaps[(path_ids[0], path_ids[1])] = "connectivity"
self.dangerous_swaps[(path_ids[0], path_ids[1], correct_pair)] = "connectivity"
self.logger.debug(f"Dangerous connectivity tuning pair {path_ids}, i {i} scores {scores}from {incorrect_pair} to {correct_pair}")
if self.rukki_paths.getLength(path_ids[i]) <= self.NEAR_PATH_END:
scores[correct_pair] += scores[incorrect_pair]
Expand Down Expand Up @@ -802,13 +828,20 @@ def getNodePairConnections(self, pair, orientations, connections, shift_before,

#stored only half
#TODO: not optimal
if rc_pair in connections:
for conn in connections[rc_pair]:
cons.append((conn[1], conn[0], conn[2]))
if pair in connections:
for conn in connections[pair]:
cons.append(conn)
'''
if rc_pair in connections:
for coords, w in connections[rc_pair].items():
cons.append((coords[1], coords[0], w))
if pair in connections:
for coords, w in connections[pair].items():
cons.append((coords[0], coords[1], w))
'''
for conn in cons:
in_homo = False
for i in range (0, len(intervals[0])):
Expand Down Expand Up @@ -857,7 +890,32 @@ def getNodePairConnections(self, pair, orientations, connections, shift_before,
#self.logger.debug (f"Ignored intervals {intervals}")
return scores


def getInterestingNodes(self, lens):
interesting = set()
for path_id in self.rukki_paths.getPathIds():
#TODO possibly reconsider this condition; /2 because of compressed/uncompressed
if self.rukki_paths.getLength(path_id) < ScaffoldGraph.MIN_PATH_TO_SCAFFOLD /2 :
continue
total_len = 0
for or_node in self.rukki_paths.getPathById(path_id):
nor_node = or_node.strip('-+')
if nor_node in lens and lens[nor_node] > ScaffoldGraph.SHORT_INGORED_NODE and or_node in self.multiplicities and self.multiplicities[or_node] == 1:
total_len += lens[nor_node]
before = 0
after = total_len
for or_node in self.rukki_paths.getPathById(path_id):
nor_node = or_node.strip('-+')
if nor_node in lens and lens[nor_node] > ScaffoldGraph.SHORT_INGORED_NODE and or_node in self.multiplicities and self.multiplicities[or_node] == 1:
after -= lens[nor_node]
if before < ScaffoldGraph.NEAR_PATH_END or after < ScaffoldGraph.NEAR_PATH_END:
#utig4-, optimizing for speed
# print (f"Interesting node {nor_node}")
# print (f"Interesting node {int(nor_node[5:])}")
# exit()
interesting.add(int(nor_node[6:]))
before += lens[nor_node]
return frozenset(interesting)

#return scores for each of the orientations, ++, -+, +-, --,
def getPathPairConnections(self, path_ids, connections, lens):
#from start to occurance of node exclusive, do not care about multiplicity > 1 (possibly should filter)
Expand Down
14 changes: 13 additions & 1 deletion src/verkko.sh
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,13 @@ ahc_time_h=48

# fast things in hic pipeline
fhc_n_cpus=8
fhc_mem_gb=160
fhc_mem_gb=16
fhc_time_h=24

# hic scaffolding pipeline
shc_n_cpus=8
shc_mem_gb=128
shc_time_h=24


#
Expand Down Expand Up @@ -534,6 +538,8 @@ while [ $# -gt 0 ] ; do
elif [ "$opt" = "--cns-run" ] ; then cns_n_cpus=$1; cns_mem_gb=$2; cns_time_h=$3; shift; shift; shift;
elif [ "$opt" = "--ahc-run" ] ; then ahc_n_cpus=$1; ahc_mem_gb=$2; ahc_time_h=$3; shift; shift; shift;
elif [ "$opt" = "--fhc-run" ] ; then fhc_n_cpus=$1; fhc_mem_gb=$2; fhc_time_h=$3; shift; shift; shift;
elif [ "$opt" = "--shc-run" ] ; then shc_n_cpus=$1; shc_mem_gb=$2; shc_time_h=$3; shift; shift; shift;


#
# unknown options
Expand Down Expand Up @@ -921,6 +927,7 @@ if [ "x$help" = "xhelp" -o "x$errors" != "x" ] ; then
echo " --cns-run"
echo " --ahc-run"
echo " --fhc-run"
echo " --shc-run"
echo ""
echo "ADVANCED MODULE PARAMETERS (expert users):"
echo "HiFi read correction:"
Expand Down Expand Up @@ -1185,6 +1192,11 @@ echo >> ${outd}/verkko.yml "# fast hic stuff"
echo >> ${outd}/verkko.yml "fhc_n_cpus: '${fhc_n_cpus}'"
echo >> ${outd}/verkko.yml "fhc_mem_gb: '${fhc_mem_gb}'"
echo >> ${outd}/verkko.yml "fhc_time_h: '${fhc_time_h}'"
echo >> ${outd}/verkko.yml ""
echo >> ${outd}/verkko.yml "#hi-c scaffolding itself"
echo >> ${outd}/verkko.yml "shc_n_cpus: '${shc_n_cpus}'"
echo >> ${outd}/verkko.yml "shc_mem_gb: '${shc_mem_gb}'"
echo >> ${outd}/verkko.yml "shc_time_h: '${shc_time_h}'
echo >> ${outd}/verkko.yml "# This is the end."

Expand Down

0 comments on commit 79f474d

Please sign in to comment.