Skip to content

Commit

Permalink
Split categorical measurement events into the question/answer pairs
Browse files Browse the repository at this point in the history
  • Loading branch information
ChaoPang committed Oct 2, 2024
1 parent 20ff415 commit b9c8cfc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/cehrbert_data/const/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@
UNKNOWN_CONCEPT = "[UNKNOWN]"
CONCEPT = "concept"
CONCEPT_ANCESTOR = "concept_ancestor"
MEASUREMENT_QUESTION_PREFIX = "Question:"
MEASUREMENT_ANSWER_PREFIX = "Answer:"
31 changes: 29 additions & 2 deletions src/cehrbert_data/decorators/clinical_event_decorator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from cehrbert_data.const.common import MEASUREMENT, CATEGORICAL_MEASUREMENT
from ..const.common import (
MEASUREMENT,
CATEGORICAL_MEASUREMENT,
MEASUREMENT_QUESTION_PREFIX,
MEASUREMENT_ANSWER_PREFIX
)
from pyspark.sql import DataFrame, functions as F, Window as W, types as T

from .patient_event_decorator_base import PatientEventDecorator
Expand Down Expand Up @@ -135,7 +140,29 @@ def _decorate(self, patient_events: DataFrame):
if "concept_value" not in patient_events.schema.fieldNames():
patient_events = patient_events.withColumn("concept_value", F.lit(0.0))

# Split the categorical measurement standard_concept_id into the question/answer pairs
categorical_measurement_events = (
patient_events.where("domain = 'categorical_measurement'")
.withColumn("measurement_components", F.split("standard_concept_id", "-"))
)

categorical_measurement_events_question = categorical_measurement_events.withColumn(
"standard_concept_id",
F.concat(F.lit(MEASUREMENT_QUESTION_PREFIX), F.col("measurement_components").getItem(0))
).drop("measurement_components")

categorical_measurement_events_answer = categorical_measurement_events.withColumn(
"standard_concept_id",
F.concat(F.lit(MEASUREMENT_ANSWER_PREFIX), F.col("measurement_components").getItem(1))
).drop("measurement_components")

other_events = patient_events.where("domain != 'categorical_measurement'")

# (cohort_member_id, person_id, standard_concept_id, date, datetime, visit_occurrence_id, domain,
# concept_value, visit_rank_order, visit_segment, priority, date_in_week,
# concept_value_mask, mlm_skip_value, age)
return patient_events
return other_events.unionByName(
categorical_measurement_events_question
).unionByName(
categorical_measurement_events_answer
)

0 comments on commit b9c8cfc

Please sign in to comment.