From 79589349f92da709150b58abb336d961d89a7539 Mon Sep 17 00:00:00 2001 From: TDMedina Date: Mon, 17 Jan 2022 16:40:33 +0000 Subject: [PATCH] Initialize. --- .gitignore | 3 + admissions.py | 149 ++++++++++++++++++++++++++++++++++++ health_markov.py | 192 +++++++++++++++++++++++++++++++++++++++++++++++ patients.py | 155 ++++++++++++++++++++++++++++++++++++++ prescriptions.py | 68 +++++++++++++++++ utilities.py | 23 ++++++ 6 files changed, 590 insertions(+) create mode 100644 .gitignore create mode 100644 admissions.py create mode 100644 health_markov.py create mode 100644 patients.py create mode 100644 prescriptions.py create mode 100644 utilities.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6a252d8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea/ +data.csv +__pycache__/ diff --git a/admissions.py b/admissions.py new file mode 100644 index 0000000..83cd6be --- /dev/null +++ b/admissions.py @@ -0,0 +1,149 @@ +"""HFVC - Admission Objects.""" + +from datetime import timedelta +import pandas as pd + + +ADMISSION_TYPES = { + "ELCV": "elective CV", + "ELXX": "elective misc.", + "EMCV": "emergency CV", + "EMHF": "emergency HF", + "EMXX": "emergency misc." +} + + +class Admission: + def __init__(self, patient_id, admission_type, index, date, length_of_stay): + self.patient_id = patient_id + self.index = index + self.type = admission_type + self.date = date + self.length_of_stay = length_of_stay + + def __repr__(self): + string = (f'Admission(patient_id="{self.patient_id}", type="{self.type}", ' + f'index={self.index}, date={repr(self.date)}, ' + f'length_of_stay={repr(self.length_of_stay)})') + return string + + def __str__(self): + string = (f"Patient {self.patient_id} {ADMISSION_TYPES[self.type]} " + f"admission #{self.index}: {self.date.strftime('%Y-%m-%d')}, " + f"{self.length_of_stay.days} day(s)") + return string + + def __hash__(self): + return hash(f"{self.patient_id},{self.type},{self.index}") + + @staticmethod + def convert_from_DatedValue(pid, dated_value): + """DatedValue(name, value, date)""" + _, code, _, index = dated_value.name.replace("LOS", "_LOS").split("_") + index = int(index[1:]) + return Admission(pid, code, index, dated_value.date, timedelta(dated_value.value)) + + +class AdmissionList: + def __init__(self, admissions=None): + self.ELCV = [] + self.ELXX = [] + self.EMCV = [] + self.EMHF = [] + self.EMXX = [] + + if admissions is not None: + self.assign_admissions(admissions) + + def __len__(self): + size = self.get_counts(False) + return size + + def assign_admissions(self, admissions): + for admission in admissions: + self.__getattribute__(admission.type).append(admission) + + def get_all(self, container="dict"): + admits = {"ELCV": self.ELCV, + "ELXX": self.ELXX, + "EMCV": self.EMCV, + "EMHF": self.EMHF, + "EMXX": self.EMXX} + match container.lower(): + case "list": + admits = [admit for admit_list in admits.values() + for admit in admit_list] + admits.sort(key=lambda admit: admit.date) + case "lol": + admits = [[admit_type, admit] for admit_type in admits + for admit in admits[admit_type]] + case "tuples" | "tuple" | "tup": + admits = [(admit_type, admit) for admit_type in admits + for admit in admits[admit_type]] + case _: + pass + return admits + + # def list_all(self): + # admit_list = self.get_all() + # admit_list = [(key, admit) for key in admit_list + # for admit in admit_list[key]] + # admit_list.sort(key=lambda x: x[1].date) + # return admit_list + + def show_all(self): + string = "" + for admit_type, admits in self.get_all().items(): + if not admits: + continue + string += f"{admit_type}:\n" + string += "\n".join([f" {str(admit)}" for admit in admits]) + string += "\n" + print(string) + + def get_counts(self, per_type=True): + if per_type: + counts = {admit_type: len(admits) + for admit_type, admits in self.get_all().items()} + else: + counts = len(self.get_all("list")) + return counts + + def show_counts(self): + string = "" + for admit_type, count in self.get_counts().items(): + if count == 0: + continue + string += f"{admit_type}: {count}\n" + print(string) + # return string + + def show_timeline(self): + for entry in sorted(self.get_all("tuples"), key=lambda x: x[1].date): + print(f"{entry[0]}: {entry[1]}") + + def filter_admissions(self, admit_type="", date_range=None, as_AdmissionList=False): + admit_list = self.get_all("list") + match admit_type.upper(): + case "": + pass + case "EMHF" | "EMCV" | "EMXX" | "ELCV" | "ELXX": + admit_list = self.__getattribute__(admit_type.upper()) + case "EM" | "EMERGENCY": + admit_list = [x for x in admit_list if x.type.startswith("EM")] + case "EL" | "ELECTIVE": + admit_list = [x for x in admit_list if x.type.startswith("EL")] + case "HF" | "HEART FAILURE": + admit_list = [x for x in admit_list if x.type.endswith("HF")] + case "CV" | "CARDIOVASCULAR": + admit_list = [x for x in admit_list if x.type.endswith("CV")] + case "XX" | "OTHER" | "MISC": + admit_list = [x for x in admit_list if x.type.endswith("XX")] + case _: + raise ValueError("Unknown admission type.") + if date_range is not None: + admit_list = [x for x in admit_list + if date_range[0] <= x.date < date_range[1]] + if as_AdmissionList: + admit_list = AdmissionList(admit_list) + return admit_list diff --git a/health_markov.py b/health_markov.py new file mode 100644 index 0000000..44823f4 --- /dev/null +++ b/health_markov.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Heart Failure Virtual Consultation - Financial Analysis. + +Created on Mon Jan 10 16:25:26 2022 + +@author: T.D. Medina +""" + +from datetime import datetime +import pandas as pd +from admissions import Admission, AdmissionList +from patients import Patient, PatientDatabase +from prescriptions import PrescriptionList +from utilities import DatedValue + + +class HFVCDataManager: + def __init__(self): + pass + + @staticmethod + def read_data(data_file): + table = pd.read_csv(data_file, delimiter="\t") + return table + + @staticmethod + def convert_unix_dates(table): + for col in ["CHLDate_BL", "LDLDate_BL", "HDLDate_BL", "TGSDate_BL", "GLCDate_BL"]: + table[col] = [datetime.fromtimestamp(stamp * 86400) if not pd.isna(stamp) else pd.NaT + for stamp in table[col]] + + @staticmethod + def convert_dates(table): + for col in table: + if "date" not in col.lower(): + continue + table[col] = pd.to_datetime(table[col], dayfirst=True) + table[col] = [date.to_pydatetime().date() for date in table[col]] + + @staticmethod + def convert_bools(table): + for col in table: + if "flag" not in col.lower() or "date" in col.lower(): + continue + table[col] = table[col].astype("boolean") + + @staticmethod + def rename_columns(table): + mapping = { + "Patient ID": "patient_id", + "male": "sex", + "birth_date": "date_of_birth", + "RIP_flag": "deceased", + "death_date": "date_of_death", + "DaysFU": "follow_up_duration", + "FollowUpDate": "follow_up_date", + } + table.rename(columns=mapping, inplace=True) + + @staticmethod + def convert_sex(table): + conversion = {0: "female", 1: "male"} + table.rename(columns={"male": "sex"}, inplace=True) + table["sex"] = [conversion[x] for x in table["sex"]] + + @staticmethod + def make_dated_values(patient_data, fields=None): + if fields is None: + fields = list( + next((patient for patient in patient_data.values())).keys() + ) + for field in sorted(fields): + if "LOS_n" in field: + date = field.replace("LOS", "Date") + elif field.endswith("_BL") and not field.endswith("Date_BL"): + date = field.replace("_BL", "Date_BL") + else: + continue + if date not in fields: + continue + + for patient_id in patient_data: + patient_data[patient_id][field] = DatedValue( + field, + patient_data[patient_id][field], + patient_data[patient_id][date] + ) + del patient_data[patient_id][date] + return + + # @staticmethod + # def separate_MED_classes(patient_data): + # for patient, fields in patient_data.items(): + # og_meds = fields["MED_classes_BL"] + # if pd.isna(og_meds.value): + # meds = [pd.NA] + # else: + # meds = og_meds.value.strip("[]").split(",") + # meds = [med.strip('"') for med in meds] + # meds = sorted([med for med in meds if med]) + # patient_data[patient]["medication_classes"] = DatedValue( + # name="medication_classes", + # value=meds, + # date=og_meds.date + # ) + # del patient_data[patient]["MED_classes_BL"] + + @staticmethod + def separate_MED_scripts(patient_data): + for patient_id, patient in patient_data.items(): + if pd.isna(patient["MED_script_BL"].value): + prescriptions = pd.NA + else: + prescriptions = PrescriptionList._import_from_json_string( + patient_id, + patient["MED_script_BL"].value + ) + patient_data[patient_id]["prescriptions"] = prescriptions + del patient_data[patient_id]["MED_script_BL"] + del patient_data[patient_id]["MED_classes_BL"] + + @staticmethod + def group_admissions(patient_data): + for patient, fields in patient_data.items(): + + admissions = [] + remove = [] + for field, value in fields.items(): + if not field.startswith("ADM_"): + continue + if not (pd.isna(value.value) and pd.isna(value.date)): + admission = Admission.convert_from_DatedValue(patient, value) + admissions.append(admission) + remove.append(field) + for field in remove: + del patient_data[patient][field] + + patient_data[patient]["admissions"] = AdmissionList(admissions) + + @staticmethod + def group_DGN_flags(patient_data): + for patient, fields in patient_data.items(): + dgns = {field: value for field, value in fields.items() if field.startswith("DGN_")} + for dgn in dgns: + del patient_data[patient][dgn] + patient_data[patient]["dgn_flags"] = dgns + + @staticmethod + def group_flags(patient_data): + for patient, fields in patient_data.items(): + flags = {field: value for field, value in fields.items() if field.lower().endswith("flag_bl")} + for flag in flags: + del patient_data[patient][flag] + patient_data[patient]["other_flags"] = flags + + @staticmethod + def group_bl_metrics(patient_data): + for patient, fields in patient_data.items(): + bls = {field: value for field, value in fields.items() if field.lower().endswith("_bl")} + for bl in bls: + del patient_data[patient][bl] + patient_data[patient]["metrics"] = bls + + @classmethod + def import_data(cls, filepath): + data = cls.read_data(filepath) + cls.convert_unix_dates(data) + cls.convert_dates(data) + cls.convert_bools(data) + cls.rename_columns(data) + cls.convert_sex(data) + data = data.to_dict(orient="records") + data = {patient["patient_id"]: patient for patient in data} + cls.make_dated_values(data) + cls.separate_MED_scripts(data) + cls.group_admissions(data) + cls.group_DGN_flags(data) + cls.group_flags(data) + cls.group_bl_metrics(data) + data = PatientDatabase({patient_id: Patient._patient_from_dict(patient) + for patient_id, patient in data.items()}) + return data + + +def main(): + data = HFVCDataManager.import_data("data.csv") + return data + + +if __name__ == "__main__": + dataset = main() diff --git a/patients.py b/patients.py new file mode 100644 index 0000000..1938086 --- /dev/null +++ b/patients.py @@ -0,0 +1,155 @@ +"""HFVC - Patient Objects.""" + +from datetime import datetime, timedelta +from warnings import warn +import pandas as pd +from admissions import AdmissionList +from utilities import NamedDate + + +class PatientDatabase: + def __init__(self, patients=None): + self.patients = patients + if self.patients is None: + self.patients = {} + self._index = list(self.patients.keys()) + + def __str__(self): + string = f"PatientDatabase(size={self.size})" + return string + + def __len__(self): + size = len(self.patients) + return size + + def __contains__(self, item): + return item in self.patients + + def __getitem__(self, key): + return self.patients[key] + + def __setitem__(self, key, value): + if not isinstance(value, Patient): + raise TypeError("Value to add is not Patient object.") + self.patients[key] = value + + def __iter__(self): + self.__iteri__ = 0 + return self + + def __next__(self): + if self.__iteri__ == self.size: + raise StopIteration + result = self.patients[self._index[self.__iteri__]] + self.__iteri__ += 1 + return result + + def keys(self): + return self.patients.keys() + + def values(self): + return self.patients.values() + + def items(self): + return self.patients.items() + + @property + def size(self): + return self.__len__() + + +class Patient: + def __init__(self, patient_id, MRN, patient_type, sex, date_of_birth, + min_clinic_date, follow_up_date, follow_up_duration, + deceased=False, date_of_death=pd.NaT, prescriptions=None, + metrics=None, dgn_flags=None, other_flags=None, admissions=None, + **kwargs): + self.id = patient_id + self.MRN = MRN + self.type = patient_type + self.sex = sex + + self.min_clinic_date = min_clinic_date + self.follow_up_date = follow_up_date + self.follow_up_duration = follow_up_duration + self.admissions = admissions + if admissions is None: + self.admissions = AdmissionList() + + self.prescriptions = prescriptions + self.metrics = metrics + self.dgn_flags = dgn_flags + self.other_flags = other_flags + + self.date_of_birth = date_of_birth + self.deceased = deceased + self.date_of_death = date_of_death + + # self.stage = self.determine_stage() + + def __repr__(self): + string = f"Patient(ID={self.id})" + return string + + @property + def age(self): + if self.deceased is True: + warn("Warning: Patient is deceased. Age is age at death.") + end = self.date_of_death + else: + end = datetime.today().date() + age = round((end - self.date_of_birth).days / 365, 2) + return age + + @staticmethod + def _patient_from_dict(p_dict): + return Patient(**p_dict) + + def make_timeline(self, filter_admission_type=""): + events = [ + ["Birth", NamedDate("Birth", self.date_of_birth)], + ["Min Clinic Date", NamedDate("Min Clinic Date", self.min_clinic_date)], + ["Follow-Up Date", NamedDate("Follow-Up Date", self.follow_up_date)], + ["Death", NamedDate("Death", self.date_of_death)] + ] + admissions = self.admissions.filter_admissions(filter_admission_type, None, True) + events += admissions.get_all("lol") + events = [x for x in events if not pd.isna(x[1].date)] + events.sort(key=lambda x: x[1].date) + return events + + def show_timeline(self, filter_admission_type=""): + timeline = self.make_timeline(filter_admission_type) + timeline = [(x[0], x[1].date) if isinstance(x[1], NamedDate) + else (x[0], x[1]) for x in timeline] + for entry in timeline: + print(f"{entry[0]}: {entry[1]}") + + def determine_stage(self, reference_date=datetime.today().date()): + date_range = [reference_date - timedelta(365), reference_date + timedelta(1)] + stage_b = self.other_flags["StageB_FLAG_BL"] + if self.deceased and self.date_of_death <= reference_date: + return 5 + if pd.isna(stage_b.value) or not stage_b.value or reference_date < stage_b.date: + if not self.admissions.filter_admissions("EM", date_range): + return 1 + return 2 + if stage_b.value and stage_b.date <= reference_date: + if not self.admissions.filter_admissions("EM", date_range): + return 3 + return 4 + + def make_staged_timeline(self, filter_admission_type=""): + timeline = self.make_timeline(filter_admission_type) + stage = 1 + for i, event in enumerate(timeline): + stage = max(stage, self.determine_stage(event[1].date)) + timeline[i].append(stage) + return timeline + + def show_staged_timeline(self): + timeline = self.make_staged_timeline() + timeline = [(x[0], x[1].date, x[2]) if isinstance(x[1], NamedDate) + else (x[0], x[1], x[2]) for x in timeline] + for entry in timeline: + print(f"{entry[0]}: {entry[1]} - Stage {entry[2]}") diff --git a/prescriptions.py b/prescriptions.py new file mode 100644 index 0000000..b5efb05 --- /dev/null +++ b/prescriptions.py @@ -0,0 +1,68 @@ +"""HFVN - Prescription Objects.""" + +from datetime import datetime +import json + + +class Prescription: + def __init__(self, trade, generic, dose, freq, daily_dose, unit, + drug_classes=None, **kwargs): + self.trade_name = trade + self.generic_name = generic + self.dose = dose + self.frequency = freq + self.daily_dose = daily_dose + self.unit = unit + self.drug_classes = drug_classes + if "class" in kwargs: + self.drug_classes = kwargs["class"] + + def __repr__(self): + string = "Prescription(" + string += ", ".join([f"{name}={attr}" for name, attr in self.__dict__.items()]) + string += ")" + return string + + def __str__(self): + if self.trade_name: + string = f"Prescription({self.trade_name})" + else: + string = f"Prescription({self.generic_name})" + return string + + +class PrescriptionList: + def __init__(self, patient_id, ID, date, drug_classes, script_type, prescriptions): + self.patient_ID = patient_id + self.ID = ID + self.date = date + self.drug_classes = drug_classes + self.type = script_type + self.prescriptions = prescriptions + + def __str__(self): + string = (f"PrescriptionList(patient_ID={self.patient_ID}, " + f"drug_classes={self.drug_classes}") + return string + + def __len__(self): + return len(self.prescriptions) + + @staticmethod + def _import_from_json_string(patient_id, json_string): + json_dict = json.loads(json_string) + prescription_list = PrescriptionList( + patient_id=patient_id, + ID=json_dict["id"], + date=datetime.strptime(json_dict["date"], "%Y-%m-%d").date(), + drug_classes=sorted([med for med in json_dict["class"]]), + script_type=json_dict["type"], + prescriptions=[Prescription(**med) for med in json_dict["meds"]] + ) + return prescription_list + + def all_medications(self): + trades = [med.trade_name for med in self.prescriptions] + generics = [med.generic_name for med in self.prescriptions] + meds = list(zip(trades, generics)) + return meds diff --git a/utilities.py b/utilities.py new file mode 100644 index 0000000..8b9dc87 --- /dev/null +++ b/utilities.py @@ -0,0 +1,23 @@ +"""HFVC - Utility Objects.""" + +import pandas as pd + +class DatedValue: + def __init__(self, name, value, date): + self.name = name + self.value = value + self.date = date + + def __repr__(self): + string = (f'{self.__class__.__name__}(name="{self.name}", value={self.value}, ' + f'date={repr(self.date)})') + return string + + def __str__(self): + string = f"{self.name}({self.value}, {self.date})" + return string + + +class NamedDate(DatedValue): + def __init__(self, name, date): + super().__init__(name=name, value=pd.NA, date=date)