forked from 23andMe/yhaplo
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpath.py
181 lines (141 loc) · 6.27 KB
/
path.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# David Poznik
# 2016.06.08
# path.py
#
# Defines the Path class.
#----------------------------------------------------------------------
from __future__ import absolute_import
from collections import deque
from .snp import SNP
class Path(object):
'''
An instance of this class represents a path through a tree.
It stores the next node to visit, a list of SNPs observed in the derived state,
the most derived SNP observed, and the number of ancestral alleles encountered.
'''
def __init__(self, node):
self.node = node
self.derSNPlist = list()
self.mostDerivedSNP = None
self.numAncestral = 0
self.initPushThroughVars()
def initPushThroughVars(self):
'''
initializes variables that track progress subsequent to pushing through
a branch with 1 ancestral and 0 derived alleles
'''
self.nodeWhenPushedThrough = None
self.mostDerivedSNPWhenPushedThrough = None
self.numAncSincePushThrough = 0
self.numDerSincePushThrough = 0
def setPushThroughVars(self):
'set memory of pushthough state to current state'
self.nodeWhenPushedThrough = self.node
self.mostDerivedSNPWhenPushedThrough = self.mostDerivedSNP
def updatePushThroughVars(self, numAncestral, numDerived):
'update pushthrough state with data from most recent branch assessment'
self.numAncSincePushThrough += numAncestral
self.numDerSincePushThrough += numDerived
if self.numDerSincePushThrough > self.numAncSincePushThrough:
self.initPushThroughVars()
def copyAllAttributesOtherThanNode(self, other):
'copies all attributes of another path, other than its node'
self.derSNPlist = list(other.derSNPlist)
self.mostDerivedSNP = other.mostDerivedSNP
self.numAncestral = other.numAncestral
self.nodeWhenPushedThrough = other.nodeWhenPushedThrough
self.mostDerivedSNPWhenPushedThrough = other.mostDerivedSNPWhenPushedThrough
self.numAncSincePushThrough = other.numAncSincePushThrough
self.numDerSincePushThrough = other.numDerSincePushThrough
def __str__(self):
return '%d %d\n%s\n%s\n' % (self.numAncestral, self.numDerived,
self.nodeString, self.snpString)
# properties
#----------------------------------------------------------------------
@property
def hasPushedThrough(self):
'whether or not this path has pushed through a branch with 1 ancestral and 0 derived'
return self.nodeWhenPushedThrough is not None
@property
def nodeString(self):
'string concatenation of nodes visited'
return ' '.join([node.label for node in self.node.backTracePath()])
@property
def numDerived(self):
'number of derived SNPs in the list'
return len(self.derSNPlist)
@property
def snpString(self):
'string concatenation of derived SNPs observed'
return ' '.join([snp.label for snp in self.derSNPlist])
# regular methods
#----------------------------------------------------------------------
def betterThan(self, other):
'evaluates whether this path is better than another'
return (other is None
or self.numDerived > other.numDerived
or (self.numDerived == other.numDerived
and self.numAncestral < other.numAncestral))
def fork(self, nodeList):
'''
returns a deque of paths, each of which is identical to self
but with a new current node
'''
pathDeque = deque()
for node in nodeList:
path = Path(node)
path.copyAllAttributesOtherThanNode(self)
pathDeque.append(path)
return pathDeque
def revertIfPushedThroughTooFar(self):
'''
if the path has pushed through a branch with 1 ancestral and 0 derived
and, after doing so, it has encountered just one derived allele and a nonzero
number of ancestral alleles, revert the path to its state before pushing through
'''
if (self.hasPushedThrough
and self.numAncSincePushThrough > 0
and self.numDerSincePushThrough == 1):
self.node = self.nodeWhenPushedThrough
del(self.derSNPlist[-1])
self.mostDerivedSNP = self.mostDerivedSNPWhenPushedThrough
self.numAncestral -= self.numAncSincePushThrough
self.initPushThroughVars()
def updateWithBranchAssessment(self, ancSNPlist, derSNPlist):
'''
extends derived SNP list, sets most derived SNP, and adds number of
ancestral alleles seen. also, manages tracking of whether or not path
has pushed through an (anc,der)==(1,0) branch
'''
numAncestral, numDerived = len(ancSNPlist), len(derSNPlist)
self.numAncestral += numAncestral
self.derSNPlist.extend(derSNPlist)
if derSNPlist:
self.mostDerivedSNP = SNP.mostHighlyRankedMarkerOnList(derSNPlist)
if self.hasPushedThrough:
self.updatePushThroughVars(numAncestral, numDerived)
elif (numAncestral, numDerived) == (1, 0):
self.setPushThroughVars()
# static methods
#----------------------------------------------------------------------
@staticmethod
def createPathDeque(nodeList):
'returns a deque of paths, with the node of each initialized from the given list'
pathDeque = deque()
for node in nodeList:
pathDeque.append(Path(node))
return pathDeque
@staticmethod
def postProcessPathListAndSelectBest(pathList):
'post-processes each path in list and returns the best one'
for path in pathList:
path.revertIfPushedThroughTooFar()
return Path.bestPathInList(pathList)
@staticmethod
def bestPathInList(pathList):
'selects the best from a list of paths'
bestPath = None
for path in pathList:
if path.betterThan(bestPath):
bestPath = path
return bestPath