Skip to content

Commit

Permalink
Initialize.
Browse files Browse the repository at this point in the history
  • Loading branch information
TDMedina committed Jan 17, 2022
0 parents commit 7958934
Show file tree
Hide file tree
Showing 6 changed files with 590 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea/
data.csv
__pycache__/
149 changes: 149 additions & 0 deletions admissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""HFVC - Admission Objects."""

from datetime import timedelta
import pandas as pd


ADMISSION_TYPES = {
"ELCV": "elective CV",
"ELXX": "elective misc.",
"EMCV": "emergency CV",
"EMHF": "emergency HF",
"EMXX": "emergency misc."
}


class Admission:
def __init__(self, patient_id, admission_type, index, date, length_of_stay):
self.patient_id = patient_id
self.index = index
self.type = admission_type
self.date = date
self.length_of_stay = length_of_stay

def __repr__(self):
string = (f'Admission(patient_id="{self.patient_id}", type="{self.type}", '
f'index={self.index}, date={repr(self.date)}, '
f'length_of_stay={repr(self.length_of_stay)})')
return string

def __str__(self):
string = (f"Patient {self.patient_id} {ADMISSION_TYPES[self.type]} "
f"admission #{self.index}: {self.date.strftime('%Y-%m-%d')}, "
f"{self.length_of_stay.days} day(s)")
return string

def __hash__(self):
return hash(f"{self.patient_id},{self.type},{self.index}")

@staticmethod
def convert_from_DatedValue(pid, dated_value):
"""DatedValue(name, value, date)"""
_, code, _, index = dated_value.name.replace("LOS", "_LOS").split("_")
index = int(index[1:])
return Admission(pid, code, index, dated_value.date, timedelta(dated_value.value))


class AdmissionList:
def __init__(self, admissions=None):
self.ELCV = []
self.ELXX = []
self.EMCV = []
self.EMHF = []
self.EMXX = []

if admissions is not None:
self.assign_admissions(admissions)

def __len__(self):
size = self.get_counts(False)
return size

def assign_admissions(self, admissions):
for admission in admissions:
self.__getattribute__(admission.type).append(admission)

def get_all(self, container="dict"):
admits = {"ELCV": self.ELCV,
"ELXX": self.ELXX,
"EMCV": self.EMCV,
"EMHF": self.EMHF,
"EMXX": self.EMXX}
match container.lower():
case "list":
admits = [admit for admit_list in admits.values()
for admit in admit_list]
admits.sort(key=lambda admit: admit.date)
case "lol":
admits = [[admit_type, admit] for admit_type in admits
for admit in admits[admit_type]]
case "tuples" | "tuple" | "tup":
admits = [(admit_type, admit) for admit_type in admits
for admit in admits[admit_type]]
case _:
pass
return admits

# def list_all(self):
# admit_list = self.get_all()
# admit_list = [(key, admit) for key in admit_list
# for admit in admit_list[key]]
# admit_list.sort(key=lambda x: x[1].date)
# return admit_list

def show_all(self):
string = ""
for admit_type, admits in self.get_all().items():
if not admits:
continue
string += f"{admit_type}:\n"
string += "\n".join([f" {str(admit)}" for admit in admits])
string += "\n"
print(string)

def get_counts(self, per_type=True):
if per_type:
counts = {admit_type: len(admits)
for admit_type, admits in self.get_all().items()}
else:
counts = len(self.get_all("list"))
return counts

def show_counts(self):
string = ""
for admit_type, count in self.get_counts().items():
if count == 0:
continue
string += f"{admit_type}: {count}\n"
print(string)
# return string

def show_timeline(self):
for entry in sorted(self.get_all("tuples"), key=lambda x: x[1].date):
print(f"{entry[0]}: {entry[1]}")

def filter_admissions(self, admit_type="", date_range=None, as_AdmissionList=False):
admit_list = self.get_all("list")
match admit_type.upper():
case "":
pass
case "EMHF" | "EMCV" | "EMXX" | "ELCV" | "ELXX":
admit_list = self.__getattribute__(admit_type.upper())
case "EM" | "EMERGENCY":
admit_list = [x for x in admit_list if x.type.startswith("EM")]
case "EL" | "ELECTIVE":
admit_list = [x for x in admit_list if x.type.startswith("EL")]
case "HF" | "HEART FAILURE":
admit_list = [x for x in admit_list if x.type.endswith("HF")]
case "CV" | "CARDIOVASCULAR":
admit_list = [x for x in admit_list if x.type.endswith("CV")]
case "XX" | "OTHER" | "MISC":
admit_list = [x for x in admit_list if x.type.endswith("XX")]
case _:
raise ValueError("Unknown admission type.")
if date_range is not None:
admit_list = [x for x in admit_list
if date_range[0] <= x.date < date_range[1]]
if as_AdmissionList:
admit_list = AdmissionList(admit_list)
return admit_list
192 changes: 192 additions & 0 deletions health_markov.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Heart Failure Virtual Consultation - Financial Analysis.
Created on Mon Jan 10 16:25:26 2022
@author: T.D. Medina
"""

from datetime import datetime
import pandas as pd
from admissions import Admission, AdmissionList
from patients import Patient, PatientDatabase
from prescriptions import PrescriptionList
from utilities import DatedValue


class HFVCDataManager:
def __init__(self):
pass

@staticmethod
def read_data(data_file):
table = pd.read_csv(data_file, delimiter="\t")
return table

@staticmethod
def convert_unix_dates(table):
for col in ["CHLDate_BL", "LDLDate_BL", "HDLDate_BL", "TGSDate_BL", "GLCDate_BL"]:
table[col] = [datetime.fromtimestamp(stamp * 86400) if not pd.isna(stamp) else pd.NaT
for stamp in table[col]]

@staticmethod
def convert_dates(table):
for col in table:
if "date" not in col.lower():
continue
table[col] = pd.to_datetime(table[col], dayfirst=True)
table[col] = [date.to_pydatetime().date() for date in table[col]]

@staticmethod
def convert_bools(table):
for col in table:
if "flag" not in col.lower() or "date" in col.lower():
continue
table[col] = table[col].astype("boolean")

@staticmethod
def rename_columns(table):
mapping = {
"Patient ID": "patient_id",
"male": "sex",
"birth_date": "date_of_birth",
"RIP_flag": "deceased",
"death_date": "date_of_death",
"DaysFU": "follow_up_duration",
"FollowUpDate": "follow_up_date",
}
table.rename(columns=mapping, inplace=True)

@staticmethod
def convert_sex(table):
conversion = {0: "female", 1: "male"}
table.rename(columns={"male": "sex"}, inplace=True)
table["sex"] = [conversion[x] for x in table["sex"]]

@staticmethod
def make_dated_values(patient_data, fields=None):
if fields is None:
fields = list(
next((patient for patient in patient_data.values())).keys()
)
for field in sorted(fields):
if "LOS_n" in field:
date = field.replace("LOS", "Date")
elif field.endswith("_BL") and not field.endswith("Date_BL"):
date = field.replace("_BL", "Date_BL")
else:
continue
if date not in fields:
continue

for patient_id in patient_data:
patient_data[patient_id][field] = DatedValue(
field,
patient_data[patient_id][field],
patient_data[patient_id][date]
)
del patient_data[patient_id][date]
return

# @staticmethod
# def separate_MED_classes(patient_data):
# for patient, fields in patient_data.items():
# og_meds = fields["MED_classes_BL"]
# if pd.isna(og_meds.value):
# meds = [pd.NA]
# else:
# meds = og_meds.value.strip("[]").split(",")
# meds = [med.strip('"') for med in meds]
# meds = sorted([med for med in meds if med])
# patient_data[patient]["medication_classes"] = DatedValue(
# name="medication_classes",
# value=meds,
# date=og_meds.date
# )
# del patient_data[patient]["MED_classes_BL"]

@staticmethod
def separate_MED_scripts(patient_data):
for patient_id, patient in patient_data.items():
if pd.isna(patient["MED_script_BL"].value):
prescriptions = pd.NA
else:
prescriptions = PrescriptionList._import_from_json_string(
patient_id,
patient["MED_script_BL"].value
)
patient_data[patient_id]["prescriptions"] = prescriptions
del patient_data[patient_id]["MED_script_BL"]
del patient_data[patient_id]["MED_classes_BL"]

@staticmethod
def group_admissions(patient_data):
for patient, fields in patient_data.items():

admissions = []
remove = []
for field, value in fields.items():
if not field.startswith("ADM_"):
continue
if not (pd.isna(value.value) and pd.isna(value.date)):
admission = Admission.convert_from_DatedValue(patient, value)
admissions.append(admission)
remove.append(field)
for field in remove:
del patient_data[patient][field]

patient_data[patient]["admissions"] = AdmissionList(admissions)

@staticmethod
def group_DGN_flags(patient_data):
for patient, fields in patient_data.items():
dgns = {field: value for field, value in fields.items() if field.startswith("DGN_")}
for dgn in dgns:
del patient_data[patient][dgn]
patient_data[patient]["dgn_flags"] = dgns

@staticmethod
def group_flags(patient_data):
for patient, fields in patient_data.items():
flags = {field: value for field, value in fields.items() if field.lower().endswith("flag_bl")}
for flag in flags:
del patient_data[patient][flag]
patient_data[patient]["other_flags"] = flags

@staticmethod
def group_bl_metrics(patient_data):
for patient, fields in patient_data.items():
bls = {field: value for field, value in fields.items() if field.lower().endswith("_bl")}
for bl in bls:
del patient_data[patient][bl]
patient_data[patient]["metrics"] = bls

@classmethod
def import_data(cls, filepath):
data = cls.read_data(filepath)
cls.convert_unix_dates(data)
cls.convert_dates(data)
cls.convert_bools(data)
cls.rename_columns(data)
cls.convert_sex(data)
data = data.to_dict(orient="records")
data = {patient["patient_id"]: patient for patient in data}
cls.make_dated_values(data)
cls.separate_MED_scripts(data)
cls.group_admissions(data)
cls.group_DGN_flags(data)
cls.group_flags(data)
cls.group_bl_metrics(data)
data = PatientDatabase({patient_id: Patient._patient_from_dict(patient)
for patient_id, patient in data.items()})
return data


def main():
data = HFVCDataManager.import_data("data.csv")
return data


if __name__ == "__main__":
dataset = main()
Loading

0 comments on commit 7958934

Please sign in to comment.