-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
220 lines (184 loc) · 7.82 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, Optional
from random import shuffle
import json
import pandas as pd
class MedicalCodingSequencer(ABC):
""" Abstract Base Class for medical coding sequencers """
def __init__(self):
super().__init__()
@abstractmethod
def sequence(self):
return None
@abstractmethod
def serialize(self, output_file):
pass
class TemporalRecord:
""" A single instance of a timestamped record """
def __init__(self, timestamp: datetime, code: Any):
self.timestamp = timestamp
self.code = code
def serialize(self):
""" String representation of the format (timestamp, code) """
return f'({self.timestamp.strftime("%Y-%m-%d_%H:%M:%S.%f")}, {self.code})'
@staticmethod
def read(input_str):
""" Reads the string representation back into a TemporalRecord object """
input_list = input_str[1:-1].split(', ')
timestamp = datetime.strptime(input_list[0], '%Y-%m-%d_%H:%M:%S.%f')
code = input_list[1]
return TemporalRecord(timestamp, code)
class TemporalSequencer(MedicalCodingSequencer):
""" Converts a list of TemporalRecords (not necessarily ordered) into an ordered Temporal Sequence """
_shuffle_dict = {
'd': '%Y-%m-%d',
'H': '%Y-%m-%d_%H',
'M': '%Y-%m-%d_%H:%M',
'S': '%Y-%m-%d_%H:%M:%S',
'f': '%Y-%m-%d_%H:%M:%S.%f',
}
def __init__(self, metadata: Dict = None, sep: str = '\t'):
""" Constructor
Params
======
metadata: dict - Metadata about this container object. For example, the container object could be a patient,
and metadata may include person_id. Must be JSON serializable.
sep: str - Separator character. Default: tab
"""
self.data = list()
if metadata is None:
self.metadata = dict()
else:
if type(metadata) is dict:
self.metadata = metadata
else:
raise ValueError()
self.sep = sep
self._shuffle_level = None
self._sequenced = False
super().__init__()
def add_data(self, timestamp: datetime, code: Any):
""" Adds timestamp and code to the data to be sequenced """
self.data.append(TemporalRecord(timestamp, code))
self._sequenced = False
def add_temporal_record(self, ts: TemporalRecord):
""" Adds TemporalRecord to the data to be sequenced """
self.data.append(ts)
self._sequenced = False
def sequence(self, shuffle_level: str = None, reverse=False):
""" Sequences the data
Params
======
shuffle_level: str - Indicates which time unit to shuffle records on. None - no shuffling. Otherwise,
specified using strftime codes. For example, use '%d' to shuffle all codes recorded on
the same day. Supported values:
'%d' - day
'%H' - hour
'%M' - minute
'%S' - second
'%f' - microsecond
None - No shuffling
reverse: bool - True to sort with most recent records first. Default: False
"""
if shuffle_level is not None and (type(shuffle_level) is not str or shuffle_level not in TemporalSequencer._shuffle_dict.keys()):
raise ValueError()
self._shuffle_level = shuffle_level
# First, sort strictly by timestamp
self.data.sort(key=lambda x: x.timestamp, reverse=reverse)
if shuffle_level is not None:
# Shuffle records that occur at the same time level
fmt = TemporalSequencer._shuffle_dict[shuffle_level]
new_seq = list()
current_datetime = None
current_group = list()
for r in self.data:
# Convert the datetime to a string of a certain precision as a crude way of ignoring lower precision
new_datetime = r.timestamp.strftime(fmt)
if new_datetime != current_datetime:
# Shuffle the current group of records and add them to the new sequence
shuffle(current_group)
new_seq.extend(current_group)
# Start keeping track of the new group of records
current_datetime = new_datetime
current_group = list()
current_group.append(r)
self.data = new_seq
self._sequenced = True
def serialize(self, shuffle_level: str = None, reverse=False):
""" Sequences the data and serializes to a string representation
Params
======
shuffle_level: str - Indicates which time unit to shuffle records on. See the
documentation for the sequence method for more details.
reverse: bool - True to sort with most recent records first. Default: False
Returns
=======
String serialization of temporal coding sequence
"""
if not self._sequenced or self._shuffle_level != shuffle_level:
self.sequence(shuffle_level, reverse)
seq_str = ''
if self.metadata is None:
self.metadata = dict()
seq_str += json.dumps(self.metadata) + self.sep
seq_str += self.sep.join([r.serialize() for r in self.data])
return seq_str
@staticmethod
def read(input_str: str, sep='\t'):
""" Reads the serialized temporal coding sequence back into a TemporalSequencer object
Params
======
input_str: str - Input string to read
sep: str - Separator character
Returns
=======
TemporalSequencer object
"""
input_list = input_str.split(sep)
metadata = json.loads(input_list.pop(0))
ts = TemporalSequencer(metadata=metadata, sep=sep)
for r in input_list:
tr = TemporalRecord.read(r)
ts.add_temporal_record(tr)
ts._sequenced = True
return ts
@staticmethod
def read_excel(file_in: str, col_pid, col_time, col_codes, file_out, sheet_name: Optional[str] = None) \
-> Dict[Any, 'TemporalSequencer']:
""" Reads in an excel file and generates a dictionary of TemporalSequences. Expects an file with a format like:
patient_id timestamp code
42 2000-01-01 00:00:00 313217, 320218
42 2000-01-01 00:01:00 314159
Params
------
file_in: str - excel file to read
sheet_name: str or None - name of excel sheet to read
col_pid: str - column name with patient identifier
col_time: str - column name with timestamp
col_codes: str - column name with codes or other data to be sequenced
Returns
-------
Dictionary with patient IDs as keys and TemporalSequencer objects as values
"""
if sheet_name is not None:
df = pd.read_excel(file_in, sheet_name=sheet_name)
else:
df = pd.read_excel(file_in)
pids = df[col_pid].unique().tolist()
pt_seqs = dict()
for pid in pids:
pt_records = df[df[col_pid] == pid]
ts = TemporalSequencer(metadata={'pat_id': pid})
for index, row in pt_records.iterrows():
icd_codes_str = row[col_codes]
# icd_codes column can contain multiple ICD codes separated by comma
icd_codes = [x.strip() for x in icd_codes_str.split(',')]
for icd_code in icd_codes:
ts.add_data(row[col_time], icd_code)
ts.sequence()
pt_seqs[pid] = ts
with open(file_out, 'w') as f:
serialized = [x.serialize() for x in pt_seqs.values()]
f.writelines('\n'.join(serialized))
return pt_seqs