-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcases_browser.py
executable file
·142 lines (121 loc) · 4.79 KB
/
cases_browser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python3
"""
Commandline browser of a flowCat dataset.
Ability to output lists of case ids and other properties to ease
generating static configurations (eg not regenerated in each run),
which makes searching for bugs much easier.
"""
import argparse
import cmd
from flowcat import configuration, utils
from flowcat.dataset import case_dataset
def tokenize(arg):
"Structure of query tokens: <g|l>::<query>"
cmds = arg.split(" > ")
if len(cmds) == 1:
search, save = cmds[0], None
elif len(cmds) == 2:
search, save = cmds
else:
raise RuntimeError
queries = search.split()
tokens = []
for query in queries:
args = query.split("::")
tokens.append({
"type": args[0],
"query": args[1],
})
return tokens, save
def info_case(case):
casestr = f"{case.id} {case.group} {case.infiltration} {case.sureness} {len(case.filepaths)} samples"
print(casestr)
class Interpreter(cmd.Cmd):
"""Commandline interpreter."""
intro = "Explore and extract dataset information."
prompt = "> "
def __init__(self, path, *args, **kwargs):
super().__init__(*args, **kwargs)
path = utils.URLPath(path)
self.data = case_dataset.CaseCollection.from_path(path)
def do_info(self, arg):
"Output basic information on dataset"
print(self.data)
def do_ls(self, arg):
"List members of groups or cases."
if arg == "":
group_counts = self.data.group_count
for name, count in group_counts.items():
print(f"{name}: {count} cases")
else:
queries, save = tokenize(arg)
cases = list(self.data.data)
for query in queries:
if query["type"] == "g":
cases = [c for c in cases if c.group == query["query"]]
elif query["type"] == "ig":
cases = [
c for c in cases if c.infiltration > float(query["query"])]
elif query["type"] == "il":
cases = [
c for c in cases if c.infiltration < float(query["query"])]
elif query["type"] == "s":
cases = [
c for c in cases if c.infiltration == int(query["query"])]
elif query["type"] == "p":
cases = [
c for c in cases if len(c.filepaths) == int(query["query"])]
elif query["type"] == "dg":
date_min = utils.str_to_date(query["query"])
cases = [
c for c in cases if c.date >= date_min]
elif query["type"] == "dl":
date_max = utils.str_to_date(query["query"])
cases = [
c for c in cases if c.date <= date_max]
else:
print("Invalid type ", query["type"])
[info_case(c) for c in cases[:10]]
print(f"Total {len(cases)}")
if save:
print(f"Saving labels to {save}")
labels = [c.id for c in cases]
utils.save_json(labels, save)
def do_sm(self, arg):
"Show selected markers for the following query"
cases = list(self.data.data)
save = None
if arg != "":
queries, save = tokenize(arg)
for query in queries:
if query["type"] == "g":
cases = [c for c in cases if c.group == query["query"]]
elif query["type"] == "ig":
cases = [
c for c in cases if c.infiltration > float(query["query"])]
elif query["type"] == "il":
cases = [
c for c in cases if c.infiltration < float(query["query"])]
elif query["type"] == "s":
cases = [
c for c in cases if c.infiltration == int(query["query"])]
elif query["type"] == "p":
cases = [
c for c in cases if len(c.filepaths) == int(query["query"])]
else:
print("Invalid type ", query["type"])
selected_markers = {
tube: case_dataset.get_selected_markers(cases, tube)
for tube in self.data.tubes
}
for tube, markers in selected_markers.items():
print(f"{tube}: {', '.join(markers)}")
if save:
print(f"Saving markers to {save}")
utils.save_json(selected_markers, save)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("cases")
args = parser.parse_args()
interpreter = Interpreter(args.cases)
interpreter.cmdloop()