-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnexus_from_munster.py
136 lines (116 loc) · 4.03 KB
/
nexus_from_munster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# -*- coding: utf-8 -*-
import sys
import string
import MySQLdb
MISSING = "-"
GAP = "?"
_possible_symbols = list(string.ascii_letters) + list(string.digits)
_sym_map = {}
def get_symbol(x):
"""
Return an appropriate symbol to use for MyBayes, always returning the
same symbol for the same input.
"""
if x in _sym_map:
return _sym_map[x]
else:
sym = _possible_symbols.pop(0)
_sym_map[x] = sym
return sym
def nexus(host, db, user, password, table, book, perc, filename):
"""
Connect to the mysql db and loop through what we find
"""
db = MySQLdb.connect(host=host, user=user, passwd=password, db=db, charset='utf8')
cur = db.cursor()
if book:
cur.execute("SELECT id FROM {}_ed_vus WHERE BOOK=%s ORDER BY id".format(table), (book, ))
else:
cur.execute("SELECT id FROM {}_ed_vus ORDER BY id".format(table))
vus = sorted([x[0] for x in cur.fetchall()])
target = len(vus) * perc / 100.0
print("Including only witnesses extant in {} ({}%) variant units".format(target, perc))
cur.execute("SELECT DISTINCT(witness) FROM {}_ed_map".format(table))
witnesses = [x[0] for x in cur.fetchall()]
symbols = set()
matrix = []
print()
witnesses_copy = witnesses[:]
for i, wit in enumerate(witnesses_copy):
sys.stdout.write("\r{}/{}: {} ".format(i + 1, len(witnesses_copy), wit))
sys.stdout.flush()
cur.execute("SELECT vu_id, ident FROM {}_ed_map WHERE witness = %s".format(table),
(wit, ))
wit_map = {}
for row in cur.fetchall():
ident = row[1]
# zw: the textual evidence does not allow to cite the witness for
# only one of the variants (equivalent of the double arrow in the
# ECM apparatus).
# zz: lacuna
if ident in ('zw', 'zz'):
label = MISSING
else:
label = get_symbol(ident)
symbols.add(label)
wit_map[row[0]] = label
stripe = []
for vu in vus:
stripe.append(wit_map.get(vu, GAP))
this_count = len([x for x in stripe if x not in (GAP, MISSING)])
if this_count > target:
matrix.append("{} {}".format(wit, ''.join(stripe)))
else:
print("Deleting witness {} - it is only extant in {} variant unit(s)".format(wit, this_count))
del witnesses[witnesses.index(wit)]
nexus_data = """#nexus
BEGIN Taxa;
DIMENSIONS ntax={};
TAXLABELS
{}
;
END;
BEGIN Characters;
DIMENSIONS nchar={};
FORMAT
datatype=STANDARD
missing={}
gap={}
symbols="{}"
;
MATRIX
{}
;
END;
""".format(len(witnesses),
"\n".join(witnesses),
len(vus),
MISSING,
GAP,
' '.join(sorted(list(symbols))),
'\n'.join(matrix))
with open(filename, 'w') as fh:
fh.write(nexus_data)
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--mysql-user', required=True, help='User to connect to mysql with')
parser.add_argument('-p', '--mysql-password', required=True, help='Password to connect to mysql with')
parser.add_argument('-s', '--mysql-host', required=True, help='Host to connect to')
parser.add_argument('-d', '--mysql-db', required=True, help='Database to connect to')
parser.add_argument('-t', '--table', required=True, help='Table name to get data from')
parser.add_argument('-b', '--book', default=0, type=int, help='Restrict to the specified book number')
parser.add_argument('-e', '--extant_perc', default=0, type=int, help='Percentage of variant units a witness must attest to be included')
parser.add_argument('output_file', help='Filename to save nexus data to')
args = parser.parse_args()
nexus(args.mysql_host,
args.mysql_db,
args.mysql_user,
args.mysql_password,
args.table,
args.book,
args.extant_perc,
args.output_file)
print()
if __name__ == "__main__":
main()