Skip to content

Commit

Permalink
CDA/FHIR connector components (#83)
Browse files Browse the repository at this point in the history
* Added connector modules

* Fix typo

* Added processing of io connectors in pipelines

* Refactored CDA related processing in use case to connectors

* Added tests

* Added CdsFhirConnector

* Updated use case functions and tests

* WIP connector usage in pipelines and components

* Fix model import name in docs

* Update Bundle validator method to dynamically import nested resource types

* Update CdsFhirConnector input method validations

* Add create method to CdsFhirData

* Fixed CdsResponse should return list of actions

* Added tests

* Added pipeline tests

* Changed .add() -> .add_node() to make more explicit and use convention of BaseObject and base.py in modules

* Update documentation to reflect changes in this PR
  • Loading branch information
jenniferjiangkells authored Oct 24, 2024
1 parent 3efcb92 commit 378e6b2
Show file tree
Hide file tree
Showing 56 changed files with 1,975 additions and 386 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ nlp_pipeline = Pipeline[Document]()

# Add TextPreProcessor component
preprocessor = TextPreProcessor(tokenizer="spacy")
nlp_pipeline.add(preprocessor)
nlp_pipeline.add_node(preprocessor)

# Add Model component (assuming we have a pre-trained model)
model = Model(model_path="path/to/pretrained/model")
nlp_pipeline.add(model)
nlp_pipeline.add_node(model)

# Add TextPostProcessor component
postprocessor = TextPostProcessor(
Expand All @@ -60,7 +60,7 @@ postprocessor = TextPostProcessor(
"high blood pressure": "hypertension"
}
)
nlp_pipeline.add(postprocessor)
nlp_pipeline.add_node(postprocessor)

# Build the pipeline
nlp = nlp_pipeline.build()
Expand Down
4 changes: 2 additions & 2 deletions docs/api/component.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Component

::: healthchain.pipeline.components.basecomponent
::: healthchain.pipeline.components.base
::: healthchain.pipeline.components.preprocessors
::: healthchain.pipeline.components.models
::: healthchain.pipeline.components.model
::: healthchain.pipeline.components.postprocessors
5 changes: 5 additions & 0 deletions docs/api/connectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Connectors

::: healthchain.io.base
::: healthchain.io.cdaconnector
::: healthchain.io.cdsfhirconnector
2 changes: 1 addition & 1 deletion docs/api/pipeline.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Pipeline

::: healthchain.pipeline.basepipeline
::: healthchain.pipeline.base
20 changes: 12 additions & 8 deletions docs/cookbook/cds_sandbox.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Build a CDS sandbox

A CDS sandbox which uses `gpt-4o` to summarise patient information from synthetically generated FHIR resources received from the `patient-view` CDS hook.
A CDS sandbox which uses `gpt-4o` to summarise patient information from synthetically generated FHIR resources received from the `patient-view` CDS hook. [NEEDS UPDATING]

```python
import healthchain as hc

from healthchain.use_cases import ClinicalDecisionSupport
from healthchain.data_generators import CdsDataGenerator
from healthchain.models import Card, CdsFhirData, CDSRequest
from healthchain.models import CdsFhirData, CDSRequest, CDSResponse

from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
Expand Down Expand Up @@ -37,12 +37,16 @@ class CdsSandbox(ClinicalDecisionSupport):
return data

@hc.api
def my_service(self, request: CDSRequest) -> List[Card]:
def my_service(self, request: CDSRequest) -> CDSResponse:
result = self.chain.invoke(str(request.prefetch))
return Card(
summary="Patient summary",
indicator="info",
source={"label": "openai"},
detail=result,
return CDSResponse(
cards=[
Card(
summary="Patient summary",
indicator="info",
source={"label": "openai"},
detail=result,
)
]
)
```
38 changes: 4 additions & 34 deletions docs/cookbook/notereader_sandbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ from healthchain.models import (
class NotereaderSandbox(ClinicalDocumentation):
def __init__(self):
self.cda_path = "./resources/uclh_cda.xml"
self.pipeline = MedicalCodingPipeline.load("./resources/models/medcat_model.zip")

@hc.ehr(workflow="sign-note-inpatient")
def load_data_in_client(self) -> CcdData:
Expand All @@ -27,38 +28,7 @@ class NotereaderSandbox(ClinicalDocumentation):
return CcdData(cda_xml=xml_string)

@hc.api
def my_service(self, ccd_data: CcdData) -> CcdData:

# Apply extraction method from ccd_data.note

new_problem = ProblemConcept(
code="38341003",
code_system="2.16.840.1.113883.6.96",
code_system_name="SNOMED CT",
display_name="Hypertension",
)
new_allergy = AllergyConcept(
code="70618",
code_system="2.16.840.1.113883.6.96",
code_system_name="SNOMED CT",
display_name="Allergy to peanuts",
)
new_medication = MedicationConcept(
code="197361",
code_system="2.16.840.1.113883.6.88",
code_system_name="RxNorm",
display_name="Lisinopril 10 MG Oral Tablet",
dosage=Quantity(value=10, unit="mg"),
route=Concept(
code="26643006",
code_system="2.16.840.1.113883.6.96",
code_system_name="SNOMED CT",
display_name="Oral",
),
)
ccd_data.problems = [new_problem]
ccd_data.allergies = [new_allergy]
ccd_data.medications = [new_medication]

return ccd_data
def my_service(self, request: CdaRequest) -> CdaResponse:
response = self.pipeline(request)
return response
```
80 changes: 48 additions & 32 deletions docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,34 @@ After [installing HealthChain](installation.md), get up to speed quickly with th

### Pipeline 🛠️

The `Pipeline` module in HealthChain provides a flexible way to build and manage processing pipelines for NLP and ML tasks that can easily interface with
parsers and connectors to integrate with electronic health record (EHR) systems.
HealthChain Pipelines provide a flexible way to build and manage processing pipelines for NLP and ML tasks that can easily integrate with electronic health record (EHR) systems.

You can build pipelines with three different approaches:

#### 1. Build Your Own Pipeline with Inline Functions

This is the most flexible approach, ideal for quick experiments and prototyping. Initialize a pipeline type hinted with the container type you want to process, then add components to your pipeline with the `@add` decorator.
This is the most flexible approach, ideal for quick experiments and prototyping. Initialize a pipeline type hinted with the container type you want to process, then add components to your pipeline with the `@add_node` decorator.

Compile the pipeline with `.build()` to use it.

```python
from healthchain.pipeline import Pipeline
from healthchain.io.containers import Document
from healthchain.io import Document

nlp_pipeline = Pipeline[Document]()

@nlp_pipeline.add
@nlp_pipeline.add_node
def tokenize(doc: Document) -> Document:
doc.tokens = doc.text.split()
return doc

@nlp_pipeline.add
@nlp_pipeline.add_node
def pos_tag(doc: Document) -> Document:
# Dummy POS tagging
doc.pos_tags = ["NOUN" if token[0].isupper() else "VERB" for token in doc.tokens]
return doc

# Build and use the pipeline
nlp = nlp_pipeline.build()

doc = Document("Patient has a fracture of the left femur.")
doc = nlp(doc)

Expand All @@ -46,51 +44,74 @@ print(doc.pos_tags)
# ['NOUN', 'VERB', 'VERB', 'VERB', 'VERB', 'VERB']
```

#### 2. Build Your Own Pipeline with Components and Models
#### 2. Build Your Own Pipeline with Components, Models, and Connectors

Components are stateful - they're classes instead of functions. They can be useful for grouping related processing steps together, or wrapping specific models.
Components are stateful - they're classes instead of functions. They can be useful for grouping related processing steps together, setting configurations, or wrapping specific model loading steps.

HealthChain comes with a few pre-built components, but you can also easily add your own. You can find more details on the [Components](./reference/pipeline/component.md) and [Models](./reference/pipeline/models/models.md) documentation pages.

Add components to your pipeline with the `.add()` method and compile with `.build()`.
Add components to your pipeline with the `.add_node()` method and compile with `.build()`.

```python
from healthchain.pipeline import Pipeline
from healthchain.io.containers import Document
from healthchain.pipeline.components import TextPreProcessor, Model, TextPostProcessor
from healthchain.io import Document

pipeline = Pipeline[Document]()

pipeline.add(TextPreProcessor())
pipeline.add(Model(model_path="path/to/model"))
pipeline.add(TextPostProcessor())
pipeline.add_node(TextPreProcessor())
pipeline.add_node(Model(model_path="path/to/model"))
pipeline.add_node(TextPostProcessor())

pipe = pipeline.build()

doc = Document("Patient presents with hypertension.")
doc = pipe(doc)
output = pipe(doc)
```

Let's go one step further! You can use [Connectors](./reference/pipeline/connectors/connectors.md) to work directly with [CDA](https://www.hl7.org.uk/standards/hl7-standards/cda-clinical-document-architecture/) and [FHIR](https://hl7.org/fhir/) data received from healthcare system APIs. Add Connectors to your pipeline with the `.add_input()` and `.add_output()` methods.

```python
from healthchain.pipeline import Pipeline
from healthchain.pipeline.components import Model
from healthchain.io import CdaConnector
from healthchain.models import CdaRequest

pipeline = Pipeline()
cda_connector = CdaConnector()

pipeline.add_input(cda_connector)
pipeline.add_node(Model(model_path="path/to/model"))
pipeline.add_output(cda_connector)

pipe = pipeline.build()

cda_data = CdaRequest(document="<CDA XML content>")
output = pipe(cda_data)
```

#### 3. Use Prebuilt Pipelines

Prebuilt pipelines are pre-configured collections of `Components` and `Models`. They are configured for specific use cases, offering the highest level of abstraction. This is the easiest way to get started if you already know the use case you want to build for.
Prebuilt pipelines are pre-configured collections of Components, Models, and Connectors. They are built for specific use cases, offering the highest level of abstraction. This is the easiest way to get started if you already know the use case you want to build for.

For a full list of available prebuilt pipelines and details on how to configure and customize them, see the [Pipelines](./reference/pipeline/pipeline.md) documentation page.

```python
from healthchain.pipeline import MedicalCodingPipeline
from healthchain.models import CdaRequest

pipeline = MedicalCodingPipeline.load("./path/to/model")

doc = Document("Patient diagnosed with myocardial infarction.")
doc = pipeline(doc)
cda_data = CdaRequest(document="<CDA XML content>")
output = pipeline(cda_data)
```

### Sandbox 🧪
Once you've built your pipeline, you might want to experiment with how you want your pipeline to interact with different health systems. A sandbox helps you stage and test the end-to-end workflow of your pipeline application where real-time EHR integrations are involved.
Once you've built your pipeline, you might want to experiment with how it interacts with different healthcare systems. A sandbox helps you stage and test the end-to-end workflow of your pipeline application where real-time EHR integrations are involved.

Running a sandbox will start a `FastAPI` server with standardized API endpoints and create a sandboxed environment for you to interact with your application.
Running a sandbox will start a [FastAPI](https://fastapi.tiangolo.com/) server with pre-defined standardized endpoints and create a sandboxed environment for you to interact with your application.

To create a sandbox, initialize a class that inherits from a type of `UseCase` and decorate it with the `@hc.sandbox` decorator.
To create a sandbox, initialize a class that inherits from a type of [UseCase](./reference/sandbox/use_cases/use_cases.md) and decorate it with the `@hc.sandbox` decorator.

Every sandbox also requires a **client** function marked by `@hc.ehr` and a **service** function marked by `@hc.api`. A **workflow** must be specified when creating an EHR client.

Expand All @@ -101,6 +122,7 @@ import healthchain as hc

from healthchain.use_cases import ClinicalDocumentation
from healthchain.pipeline import MedicalCodingPipeline
from healthchain.models import CdaRequest, CdaResponse, CcdData

@hc.sandbox
class MyCoolSandbox(ClinicalDocumentation):
Expand All @@ -117,9 +139,9 @@ class MyCoolSandbox(ClinicalDocumentation):
return CcdData(cda_xml=xml_string)

@hc.api
def my_service(self, ccd_data: CcdData) -> CcdData:
def my_service(self, request: CdaRequest) -> CdaResponse:
# Run your pipeline
results = self.pipeline(ccd_data)
results = self.pipeline(request)
return results

if __name__ == "__main__":
Expand All @@ -137,27 +159,21 @@ healthchain run my_sandbox.py

This will start a server by default at `http://127.0.0.1:8000`, and you can interact with the exposed endpoints at `/docs`. Data generated from your sandbox runs is saved at `./output/` by default.

Then run:

```bash
cd streamlist_demo
streamlit run app.py
```

## Utilities ⚙️
### Data Generator

You can use the data generator to generate synthetic data for your sandbox runs.

The `.generate()` is dependent on use case and workflow. For example, `CdsDataGenerator` will generate synthetic [FHIR](https://hl7.org/fhir/) data suitable for the workflow specified by the use case.

We're currently working on generating synthetic [CDA](https://www.hl7.org.uk/standards/hl7-standards/cda-clinical-document-architecture/) data. If you're interested in contributing, please [reach out](https://discord.gg/UQC6uAepUz)!
We're working on generating synthetic [CDA](https://www.hl7.org.uk/standards/hl7-standards/cda-clinical-document-architecture/) data. If you're interested in contributing, please [reach out](https://discord.gg/UQC6uAepUz)!

[(Full Documentation on Data Generators)](./reference/utilities/data_generator.md)

=== "Within client"
```python
import healthchain as hc

from healthchain.use_cases import ClinicalDecisionSupport
from healthchain.models import CdsFhirData
from healthchain.data_generators import CdsDataGenerator
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/pipeline/component.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Components are the building blocks of the healthchain pipeline. They are designe
You can create your own custom components by extending the `BaseComponent` class and implementing the `__call__` method.

```python
from healthchain.pipeline.basecomponent import BaseComponent
from healthchain.pipeline.base import BaseComponent

class MyCustomComponent(BaseComponent):
def __init__(self, **kwargs):
Expand Down
55 changes: 55 additions & 0 deletions docs/reference/pipeline/connectors/cdaconnector.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# CDA Connector

The `CdaConnector` handles Clinical Document Architecture (CDA) documents, serving as both an input and output connector in the pipeline. It parses CDA documents, extracting free-text notes and relevant structured clinical data into a `Document` object, and can return an annotated CDA document as output.

This connector is particularly useful for clinical documentation improvement (CDI) workflows where CDA documents need to be processed and updated with additional structured data.

[(Full Documentation on Clinical Documentation)](../../sandbox/use_cases/clindoc.md)

## Usage

```python
from healthchain.io import CdaConnector, Document
from healthchain.models import CdaRequest
from healthchain.pipeline import Pipeline

# Create a pipeline with CdaConnector
pipeline = Pipeline()

cda_connector = CdaConnector()
pipeline.add_input(cda_connector)
pipeline.add_output(cda_connector)

# Example CDA request
cda_request = CdaRequest(document="<CDA XML content>")

# Example 1: Simple pipeline execution
pipe = pipeline.build()
cda_response = pipe(cda_request)
print(cda_response)
# Output: CdaResponse(document='<Annotated CDA XML content>')

# Example 2: Accessing CDA data inside a pipeline node
@pipeline.add_node
def example_pipeline_node(document: Document) -> Document:
print(document.ccd_data)
return document

pipe = pipeline.build()
cda_response = pipe(cda_request)
# Output: CcdData object...
```

## Accessing data inside your pipeline

Data parsed from the CDA document is stored in the `Document.ccd_data` attribute as a `CcdData` object, as shown in the example above.

[(CcdData Reference)](../../../api/data_models.md#healthchain.models.data.ccddata.CcdData)

## Configuration

The `overwrite` parameter in the `CdaConnector` constructor determines whether existing data in the document should be overwritten. This can be useful for readability with very long CDA documents when the receiving system does not require the full document.

```python
cda_connector = CdaConnector(overwrite=True)
```
Loading

0 comments on commit 378e6b2

Please sign in to comment.