Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup script changes and misc bug fixes #64

Merged
merged 17 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public class WaveformProcessor {

@Value("${core.waveform.retention_hours}")
private int retentionTimeHours;
@Value("${core.waveform.is_synthetic_data}")
private boolean isSyntheticData;
@Value("${core.waveform.is_non_current_test_data}")
private boolean isNonCurrentTestData;

/**
* @param visitObservationController visit observation controller
Expand Down Expand Up @@ -61,8 +61,8 @@ public void processMessage(final WaveformMessage msg, final Instant storedFrom)
public void deleteOldWaveformData() {
logger.info("deleteOldWaveformData: Checking for old waveform data for deletion");
Instant baselineDatetime;
if (isSyntheticData) {
// while still in proof of concept, use the current data (which may be for a
if (isNonCurrentTestData) {
// while testing, use the current data (which may be for a
// date far from the present) as a reference for when to apply retention cutoff date from.
// ie. assume the time of the most recent data is "now"
baselineDatetime = waveformController.mostRecentObservationDatatime();
Expand All @@ -74,8 +74,6 @@ public void deleteOldWaveformData() {
} else {
baselineDatetime = Instant.now();
}
// probably want to round to the nearest day so we do all the day at once,
// rather than little and often
Instant cutoff = baselineDatetime.minus(retentionTimeHours, ChronoUnit.HOURS);
logger.info("deleteOldWaveformData: baseline = {}, cutoff = {}", baselineDatetime, cutoff);
int numDeleted = waveformController.deleteOldWaveformData(cutoff);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ core.rabbitmq.listen_queues = hl7Queue,databaseExtracts,extensionProjects,wavefo
# Data older than this is liable to be deleted to keep overall disk usage small.
# In production we will want to have this longer (more like 7 days)
core.waveform.retention_hours = 1
core.waveform.is_synthetic_data = 1
core.waveform.is_non_current_test_data = 0

spring.rabbitmq.listener.simple.acknowledge-mode=manual
158 changes: 113 additions & 45 deletions docs/dev/features/waveform_hf_data.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,83 @@
# Waveform (high-frequency) data


## Feature overview

Waveform data is data such as ECG traces and ventilator pressure readings that is sampled multiple times a second.

## User data requirements

Data requirements differ depending on the project and cover concerns such as
how far back the data needs to go, how up-to-date it needs to be, and quality issues like
how many gaps it can tolerate.

One project is interested in looking at 30 second snapshots before and after making some adjustment
to the patient's care. (eg. change ventilator settings)

In the future they may wish to look at a period of ~12 hours, to see if secretions
are building up gradually over time and maybe some intervention is needed.

How long do we need to keep waveform data for? The data is very large, so being able to delete data older than
a configurable time period has been implemented to mitigate storage problems. I foresee 1-7 days being a useful
value.
This is implemented as a Spring scheduled task in the core processor. Notably this is the first
background database operation in Emap, so it could happen alongside regular Emap processing.
This will produce significant "churn" in the database; I'm assuming postgres is designed to handle this, but
it's something to consider if any performance problems appear.

How live does it have to be? Our standard guarantee is no more than a minute out of date,
but in practice it's typically 20 seconds. We have aimed for similar.

## Config options added

Core:
- `core.waveform.retention_hours` periodically delete data more than this many hours old
- `core.waveform.is_non_current_test_data` for testing only - when deciding which data to delete/retain, if set to true,
then treat the "now" point as the most recent observation date in the waveform table, rather than the actual
current time. Purpose is to avoid test data getting immediately deleted because it's too old, which could happen
if we have a fixed set of test data with observation dates way in the past.

Waveform Generator:
- `waveform.hl7.send_host`, `waveform.hl7.send_port` - the host and port to send the generated data to
- `test.synthetic.num_patients` - number of different patients (locations) to generate data for
- `test.synthetic.start_datetime` observation date to start generating data from or null to simulate live data
(see comment in code for more)
- `test.synthetic.end_datetime` if not null, exit the generator when observation date hits this date
- `test.synthetic.warp_factor` How many times real time to generate data at. Live mode implies warp factor = 1.

Waveform Reader:
- `waveform.hl7.listen_port` port inside the container to listen on for waveform HL7 generator
- `waveform.hl7.source_address_allow_list` comma-separated list of source IP addresses to accept connections from.
If the listen contains the value "ALL", then all source IP addresses are allowed.
- `waveform.hl7.test_dump_file` If specified, read messages from this file and then exit - intended for validation

## Design details

### Emap DB design

As we understand it, there is no mechanism for correcting or updating waveform data, so we may just not have audit tables at all,
as nothing would ever be put into them. Does this remove the need for the valid_from / stored_from columns?
A key part of the waveform data table design is the use of SQL arrays, without which the storage would be
extremely inefficient. This way we can store up to 3000 data points in a single database row.

We will still need an observation date, which in a less storage critical table would be identical to the valid_from date.
Removing that duplication could be worth it, even if we lose some semantic tidiness.
There is one set of metadata per row regardless of array size,
so increasing the array size increases the storage efficiency.

stored_from might still be useful to know when that item got written to the database, even if we never intend to unstore it.
But there is a tradeoff with timeliness. Since we don't append to arrays once written, we have to wait for all
data points to arrive before we can write them in one go. 3000 points at 300Hz is 10 seconds worth of data.
Having to wait for too much data would mean that our aim to make Emap no more than 10-20 seconds out of date would
no longer hold.
(see also https://github.com/UCLH-DHCT/emap/issues/65 )

TODO: the actual DB design. See the Core processor logic issue for more discussion on this.
As far as we know, there is no mechanism for correcting or updating waveform data,
so the audit table will always be empty.
Although we do delete old data for storage capacity reasons, moving it to the audit table in this case
would defeat the whole purpose of its deletion!

Stream metadata is stored in the `visit_observation_type` table, as it is for visit observations.
Waveform data semantically could have gone in the `visit_observation` table if it weren't for the
storage efficiency problems this would cause.


### Core processor logic (orphan data problem)

See issue https://github.com/UCLH-DHCT/emap/issues/36

The HL7 messages that Emap has been ingesting up to this point tend to be highly redundant.
Every message contains basic patient demographic/encounter data.
This means that if we receive messages out of order, keeping the database in a usable state at all intermediate points is not too hard.
Expand Down Expand Up @@ -68,13 +125,6 @@ the middle of the night, when waveform data will still be coming in.
Other solution is to fix it up later when the feed comes back, but that involves a lot of continuously rechecking stuff,
and we will have been giving out wrong information in the meantime. And then we will need that audit table!

### User requirements

What data do our users require? What sort of queries will they be making?

How live does it have to be? I'm guessing 10 minutes is ok, 60 minutes isn't.

How long do we need to keep live data for? Can we delete data older than eg. 7 days? This could mitigate storage problems.

### Performance monitoring

Expand All @@ -88,55 +138,73 @@ Performance metrics that we need to watch:

### Storage efficiency

My initial tests have assumed that there will be 30 patients generating data from one 50Hz and one 300Hz waveform source at all times.
My initial tests assumed that there will be 30 patients generating data from one 50Hz and one 300Hz waveform source at all times.
(In actual fact it's going to be more like 5-10 patients
ventilated at any one time, but the ventilator has more than two data streams)

At this rate of data flow, my very naive DB implementation results in ~100GB of backend postgres disk usage being generated per day
- clearly far too much if we're aiming for the UDS to stay under 1TB, although that figure may be quite out of date!
At this rate of data flow, my very naive DB implementation using 1 data point per row
resulted in ~100GB of backend postgres disk usage being generated per day - clearly
far too much if we're aiming for the UDS to stay under 1TB (which used to be the limit we were observing), given
that there could be three or more Emap instances in the DB at once (two lives and at least one in dev)

You can calculate a theoretical minimum:
30 * 86400 * 350 = 907 million data points per day.
30 * 86400 * (300 + 50) = 907 million data points per day.
I don't know what numerical type the original data uses, but assuming 8 bytes per data point, that's **~8GB per day**.
If we keep only the last 7 days of data, that caps it at **~60GB overall**.
Will need to allow for some row metadata, the underlying DB having to be padded/aligned/whatever, and it will be a bit more.
If we keep only the last 7 days of data, that caps it at **~60GB** per instance of Emap.
Allowing for some row metadata, the underlying DB to be padded/aligned/whatever, and it will be a bit more.
Am assuming compression is impossible.

Using SQL arrays is likely to significantly reduce the data storage needed vs the naive implementation.
Using SQL arrays vastly improves the actual efficiency vs 1 per row.

#### Further improvements

See issue https://github.com/UCLH-DHCT/emap/issues/62 for a discussion of further improvements.

### HL7 ingress

There is a piece of software in the hospital called Smartlinx, which can apparently be fairly easily configured to stream HL7 waveform data in our direction.
Looking at Elise's code for performing dumps of waveform data, it seems to be setting up a server, which Smartlinx then connects to.
There is a piece of software in the hospital called Smartlinx which collects data from little dongles
which plug into the back of ventilators and bedside monitors.
It can be easily configured to stream HL7 waveform data in our direction. However, a large concern is that we avoid
jeopardizing the existing flow of (non-waveform) data that goes from Smartlinx to Epic.

Therefore we will need to have our own Smartlinx server (with accompanying software licence) to run it on, so we
are separate from the existing one.

So far, it has only been used to provide one patient's waveform data at a time, and only for short periods. It's
unknown how it would cope if it were sending *all* patients' data to us.

To receive waveform data from Smartlinx, you must listen on a TCP port which Smartlinx then connects to.
This seems rather unusual to me! We should be validating the source IP address at the very least if this is how it has to work.

- Can Smartlinx replay missed messages if we go down?
- Does Smartlinx support/require acknowledgement messages?
- Will we need to do our own buffering? Can we do the same thing the IDS does (whatever that is)?
Can Smartlinx replay missed messages if we go down? No.

Does Smartlinx support/require acknowledgement messages? No.

HL7 messages from bedside monitor have ~40 measurements per message; ventilators ~1-10 (50Hz); ECGs (300Hz) not known.
Will we need to do our own buffering? Can we do the same sort of thing the IDS does?
Maybe. See issue https://github.com/UCLH-DHCT/emap/issues/48 for why we might have to implement some sort of buffering.

HL7 messages are not particularly space efficient: messages from bedside monitors have ~40 measurements per message;
ventilators have ~1-10 per message (50Hz); ECGs (300Hz) not known.
So this will be a high volume of text relative to the actual data.
Although this inefficiency might keep a CPU core on the GAE fairly busy, at least it won't have any effect on the Emap queue and DB.
Although this inefficiency might keep a CPU core or two on the GAE fairly busy, at least it won't have any effect on the Emap queue and DB.
Since we plan to use SQL arrays in the DB, and Emap doesn't have a mechanism to collate multiple incoming interchange messages,
we want each interchange message to result in (at least) one DB row being written in its final form (updating an SQL array is likely not efficient).
Therefore I plan to collect up about a second's worth of data for a given patient/machine and send that as one interchange message, so it can become a single row in the DB.
we want each interchange message to result in one DB waveform row being created.
Therefore I collect up to 3000 data points in memory for each patient+data stream, collate it and send as a single
interchange message, so it can become a single row in the DB.

This could mean having some sort of buffer for the HL7 reader that stores pending data. Probably post-parsing.
The HL7 messages contain two timestamps. The "capsule axon" time (observation time?), and the server time (in MSH?).
I've forgotten the difference, but Elise knows.
The local time on the ventilators that has to be set manually twice a year to account for DST is not in the HL7 messages.

Or you could just wait for 1 second of HL7 messages to come in - which will span many patients/machines of course - and
process them in a batch, in memory, assigning all data points for the same patient/machine to the same message, then send them all.
This avoids the need for storing pending data, but could mean the data is chopped up into slightly uneven fragments (consider
what happens if machine type A likes to batch up 5 second's worth of messages, and machine type B likes to drip feed them).
Also, this will be 1 second of message receipt, not of observation time!
### Pre-release validation

Speaking of timestamps, the HL7 messages contain two of them. The "capsule axon" time, and the server time.
I've forgotten the difference, but Elise knows. The local time on the ventilators that has to be set manually twice a year to account for DST is not in the HL7 messages.
This requires the ability to replay waveform HL7 messages. We currently do this using a text HL7 dump file.

### Pre-release validation
An alternative would be to maintain a test stream of real or synthetic messages, but it would have to be continuously
updated to store (say) the last 7 days of messages, so that it overlaps with the validation period, which is by default
the last 7 days.

This assumes we have the ability to replay waveform HL7 messages.
We could keep a test stream of real or synthetic messages, but it would have to be continously updated to store (say) the last 7 days of messages,
otherwise this would lose some of the benefits of the validation process.
As a fallback, you could perform waveform validation live, but this would mean a 7 day validation period would take 7 days to run,
Or you could perform waveform validation live, but this would mean a 7 day validation period would take 7 days to run,
and you'd have to run this separately from the main Emap validation process.

Things you could check in validation:
Expand Down
8 changes: 6 additions & 2 deletions emap-setup/emap_runner/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ def _substitute_vars(self, env_file: "EnvironmentFile") -> None:
return None

def get_first(self, key: str, section: str) -> str:
"""Get the first value of a key within a section of this global
configuration. If it cannot be found then use the top-level"""
"""
Search the config for the given key in the following order:
- In the section given by arg `section`
- In any of the sections in possible_sections
- At the top level
"""

if section in self and key in self[section]:
"""
Expand Down
Loading
Loading