forked from nickzren/human-gene-ontology
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
209 lines (166 loc) · 8.09 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os
import pandas as pd
import networkx as nx
import obonet
import csv
import gzip
HUMAN_TAX_ID = 9606
INPUT_FILE = {
"GENE_INFO": "data/input/protein_coding_gene.csv",
"GENE2GO": "data/input/gene2go.gz",
"GO_OBO": "data/input/go-basic.obo"
}
INPUT_COLUMN = {
"GENE2GO": ['tax_id', 'GeneID', 'GO_ID', 'Evidence', 'Qualifier']
}
INPUT_DTYPE = {
'tax_id': 'Int64', 'GeneID': 'Int64', 'Qualifier': str
}
OUTPUT_DIR = "data/output/csv"
REMOVE_SUBSETS = {'goantislim_grouping', 'gocheck_do_not_annotate', 'gocheck_do_not_manually_annotate'}
EXPERIMENTAL_CODES = {'EXP', 'IDA', 'IPI', 'IMP', 'IGI', 'IEP'}
PROPAGATE_ALONG = {'is_a', 'part_of'}
GO_DOMAINS = {'biological_process', 'molecular_function', 'cellular_component'}
def load_gene_data_from_csv(file_path):
gene_ids_set = set()
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
gene_ids_set.add(row['GeneID'])
return gene_ids_set
def join_list_with_pipe(lst):
return '|'.join(map(str, lst))
def load_filtered_dataframe_iteratively(file_key):
gene_ids_set = load_gene_data_from_csv(INPUT_FILE["GENE_INFO"])
path = INPUT_FILE[file_key]
selected_data = []
with gzip.open(path, 'rt') as f:
header_line = ''
while not header_line.startswith('#'):
header_line = f.readline().strip()
adjusted_header = header_line.replace('#', '').split('\t')
col_indices = [adjusted_header.index(col) for col in INPUT_COLUMN[file_key]]
tax_id_index = adjusted_header.index('tax_id')
gene_id_index = adjusted_header.index('GeneID')
for line in f:
split_line = line.strip().split('\t')
if split_line[tax_id_index] == str(HUMAN_TAX_ID):
gene_id = split_line[gene_id_index]
if gene_id in gene_ids_set:
selected_data.append([split_line[i] for i in col_indices])
return pd.DataFrame(selected_data, columns=INPUT_COLUMN[file_key])
def read_go_to_graph():
with open(INPUT_FILE["GO_OBO"]) as read_file:
graph = obonet.read_obo(read_file)
graph.remove_nodes_from({
node for node, data in graph.nodes(data=True)
if REMOVE_SUBSETS & set(data.get('subset', []))
})
graph.remove_edges_from([
(u, v, key) for u, v, key in graph.edges(data=False, keys=True)
if key not in PROPAGATE_ALONG
])
assert nx.is_directed_acyclic_graph(graph)
return graph
def go_graph_to_dataframe(graph):
rows = [(node, data['name'], data['namespace']) for node, data in graph.nodes(data=True)]
go_df = pd.DataFrame(rows, columns=['go_id', 'go_name', 'go_domain'])
return go_df.sort_values('go_id')
def setup_annotation_keys(graph):
keys = ('direct_annotations', 'direct_not_annotations', 'inferred_annotations')
for node in graph:
graph.nodes[node].update({key: set() for key in keys})
def process_annotations(graph, goa_df):
not_qualifier = goa_df['Qualifier'].apply(lambda qualifier: qualifier.startswith('NOT'))
# Filtering direct annotations (where Qualifier is not 'NOT')
direct_annotations = goa_df.loc[~not_qualifier, ['GO_ID', 'GeneID']]
direct_not_annotations = goa_df.loc[not_qualifier, ['GO_ID', 'GeneID']]
for go_id, gene in direct_annotations.itertuples(index=False):
if go_id in graph:
graph.nodes[go_id]['direct_annotations'].add(gene)
for go_id, gene in direct_not_annotations.itertuples(index=False):
if go_id in graph:
graph.nodes[go_id]['direct_not_annotations'].add(gene)
def propagate_direct_annotations(graph):
for node in nx.topological_sort(graph):
data = graph.nodes[node]
data['inferred_annotations'].difference_update(data['direct_not_annotations'])
data['inferred_annotations'].update(data['direct_annotations'])
# Propagate to successor nodes
for successor_node in graph.successors(node):
graph.nodes[successor_node]['inferred_annotations'].update(data['inferred_annotations'])
def annotate_and_propagate(graph, goa_df):
graph = graph.copy()
setup_annotation_keys(graph)
process_annotations(graph, goa_df)
propagate_direct_annotations(graph)
return graph
def create_annotations_df(graph_annot, annotation_key, exclude_keys=None):
exclude_keys = exclude_keys or []
annotations = [
(node, gene) for node, genes in graph_annot.nodes(data=annotation_key)
for gene in genes if not any(gene in graph_annot.nodes[node][key] for key in exclude_keys)
]
return pd.DataFrame(annotations, columns=['go_id', 'GeneID'])
def merge_gene_info(annotations_df, gene_df):
return annotations_df.merge(gene_df, on='GeneID')
def aggregate_gene_ids(annotations_df):
return annotations_df.groupby('go_id').agg({
'GeneID': lambda x: join_list_with_pipe(sorted(set(x)))
}).reset_index()
def extract_annotation_df(graph_annot, go_df):
direct_df = create_annotations_df(graph_annot, 'direct_annotations')
inferred_df = create_annotations_df(graph_annot, 'inferred_annotations', ['direct_annotations'])
direct_agg = aggregate_gene_ids(direct_df)
inferred_agg = aggregate_gene_ids(inferred_df)
final_df = direct_agg.merge(inferred_agg, on='go_id', how='outer', suffixes=('_direct', '_inferred'))
final_df = go_df.merge(final_df, on='go_id', how='right')
final_df = final_df[['go_id', 'go_name', 'go_domain', 'GeneID_direct', 'GeneID_inferred']]
return final_df
def create_node_csv_files(annotation_df):
for domain in GO_DOMAINS:
domain_df = annotation_df[annotation_df['go_domain'] == domain].rename(
columns={'go_id': 'identifier', 'go_name': 'name'}
)[['identifier', 'name']]
formatted_domain = ''.join(word.capitalize() for word in domain.split('_'))
node_file_name = f'node_{formatted_domain}.csv'
path = os.path.join(OUTPUT_DIR, node_file_name)
domain_df.to_csv(path, index=False)
def create_edge_csv_files(annotation_df, gene2go_df):
gene2go_evidence_df = gene2go_df[gene2go_df['Evidence'].isin(EXPERIMENTAL_CODES)]
gene_go_identifier_set = set(gene2go_evidence_df.apply(lambda row: f"{row['GeneID']}_{row['GO_ID']}", axis=1))
files = {}
for domain in GO_DOMAINS:
formatted_domain = ''.join(word.capitalize() for word in domain.split('_'))
relationship_verb = 'enables' if domain == 'molecular_function' else 'locatedIn' if domain == 'cellular_component' else 'involvedIn'
file_path = os.path.join(OUTPUT_DIR, f'edge_Gene_{relationship_verb}_{formatted_domain}.csv')
files[domain] = open(file_path, 'w', newline='')
writers = {domain: csv.writer(file) for domain, file in files.items()}
for writer in writers.values():
writer.writerow(['source_id', 'target_id', 'type', 'experimental'])
for _, row in annotation_df.iterrows():
go_id = row['go_id']
direct_gene_ids = extract_gene_ids(row['GeneID_direct'])
inferred_gene_ids = extract_gene_ids(row['GeneID_inferred'])
domain = row['go_domain']
if direct_gene_ids:
is_experimental = [str(gene_id) + '_' + go_id in gene_go_identifier_set for gene_id in direct_gene_ids]
for i, gene_id in enumerate(direct_gene_ids):
writers[domain].writerow([gene_id, go_id, 'direct', is_experimental[i]])
if inferred_gene_ids:
for gene_id in inferred_gene_ids:
writers[domain].writerow([gene_id, go_id, 'inferred', False])
for file in files.values():
file.close()
def extract_gene_ids(gene_ids_str):
return [int(gene_id) for gene_id in gene_ids_str.split('|')] if not pd.isna(gene_ids_str) and gene_ids_str else []
def main():
gene2go_df = load_filtered_dataframe_iteratively("GENE2GO")
go_graph = read_go_to_graph()
go_df = go_graph_to_dataframe(go_graph)
graph_annot = annotate_and_propagate(go_graph, gene2go_df)
annotation_df = extract_annotation_df(graph_annot, go_df)
create_node_csv_files(annotation_df)
create_edge_csv_files(annotation_df, gene2go_df)
if __name__ == "__main__":
main()