From 360fe564186dab5bb607bf2eb5a55ee4fdd9553c Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 2 Oct 2024 18:33:01 +0100 Subject: [PATCH 01/17] Python setup script - allow independent switching of hoover, hl7 adt and hl7 waveform when running validation. Some refactoring to make testing better. --- emap-setup/emap_runner/runner.py | 49 ++++++++++------- .../validation/validation_runner.py | 4 +- emap-setup/tests/test_runner.py | 53 ++++++++++--------- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/emap-setup/emap_runner/runner.py b/emap-setup/emap_runner/runner.py index a843c6db0..d10c714f4 100644 --- a/emap-setup/emap_runner/runner.py +++ b/emap-setup/emap_runner/runner.py @@ -1,6 +1,7 @@ import argparse from pathlib import Path +from typing import Any from emap_runner.parser import Parser from emap_runner.utils import TimeWindow @@ -100,20 +101,28 @@ def create_parser() -> Parser: action="store_true", ) - validation_source_group = validation_parser.add_mutually_exclusive_group() - validation_source_group.add_argument( - "--only-hl7", - dest="use_only_hl7_reader", - help="Use only the hl7-reader service (no hoover)", - default=False, - action="store_true", + validation_parser.add_argument( + "--use-hl7-reader", + dest="use_hl7_reader", + action=argparse.BooleanOptionalAction, + default=True, + help="Enable/disable the main HL7 ADT reader", ) - validation_source_group.add_argument( - "--only-hoover", - dest="use_only_hoover", - help="Use only the hoover service (no hl7-reader)", + + validation_parser.add_argument( + "--use-waveform-synth", + dest="use_waveform_synth", + action=argparse.BooleanOptionalAction, default=False, - action="store_true", + help="Enable/disable synthetic waveform generator", + ) + + validation_parser.add_argument( + "--use-hoover", + dest="use_hoover", + action=argparse.BooleanOptionalAction, + default=True, + help="Enable/disable the hoover service", ) config_parser = subparsers.add_parser("config", help="Configuration operations") @@ -188,10 +197,10 @@ def docker(self) -> None: return None - def validation(self) -> None: + def validation(self) -> ValidationRunner: """Run a validation run of EMAP""" # allow for hoover not to be defined in global config - use_hoover = ("hoover" in self.global_config["repositories"]) and (not self.args.use_only_hl7_reader) + use_hoover = ("hoover" in self.global_config["repositories"]) and self.args.use_hoover runner = ValidationRunner( docker_runner=DockerRunner(project_dir=Path.cwd(), config=self.global_config), @@ -199,17 +208,17 @@ def validation(self) -> None: start_date=self.args.start_date, end_date=self.args.end_date ), should_build=not self.args.skip_build, - use_hl7_reader=not self.args.use_only_hoover, + use_hl7_reader=self.args.use_hl7_reader, use_hoover=use_hoover, + use_waveform_synth=self.args.use_waveform_synth, ) - runner.run() + return runner - return None - - def run(self, method_name: str) -> None: + def run(self) -> Any: """Call a method of this runner instance defined by its name""" + method_name = self.args.subcommand if hasattr(self, method_name): return getattr(self, method_name)() @@ -227,7 +236,7 @@ def main(): exit(f"Configuration file {args.filename} not found. Exiting") runner = EMAPRunner(args=args, config=GlobalConfiguration(args.filename)) - runner.run(args.subcommand) + runner.run() return None diff --git a/emap-setup/emap_runner/validation/validation_runner.py b/emap-setup/emap_runner/validation/validation_runner.py index f219094ee..f513eb73a 100644 --- a/emap-setup/emap_runner/validation/validation_runner.py +++ b/emap-setup/emap_runner/validation/validation_runner.py @@ -20,14 +20,14 @@ def __init__( should_build: bool = True, use_hl7_reader: bool = True, use_hoover: bool = True, - use_waveform_reader: bool = True, + use_waveform_synth: bool = False, ): """Validation runner that will be run over a time window""" self.should_build = should_build self.use_hl7_reader = use_hl7_reader self.use_hoover = use_hoover - self.use_waveform_reader = use_waveform_reader + self.use_waveform_synth = use_waveform_synth self.start_time = None self.timeout = timedelta(hours=10) diff --git a/emap-setup/tests/test_runner.py b/emap-setup/tests/test_runner.py index f2a7f0a33..491f24b56 100644 --- a/emap-setup/tests/test_runner.py +++ b/emap-setup/tests/test_runner.py @@ -1,3 +1,5 @@ +from unittest.mock import patch, MagicMock + import pytest from os import mkdir @@ -15,19 +17,22 @@ from emap_runner.utils import TimeWindow from .utils import work_in_tmp_directory -config_path = Path( +config_path_only_docs = Path( dirname(abspath(__file__)), "data", "test-global-configuration-only-docs.yaml" ) +config_path_all = Path( + dirname(abspath(__file__)), "data", "test-global-configuration.yaml" +) @work_in_tmp_directory(to_copy=None) def test_clone_then_clean_repos(): parser = create_parser() - config = GlobalConfiguration(config_path) + config = GlobalConfiguration(config_path_only_docs) runner = EMAPRunner(args=parser.parse_args(["setup", "-i"]), config=config) - runner.run("setup") + runner.run() # Ensure that the cloned directory exists assert exists("emap_documentation") @@ -35,7 +40,7 @@ def test_clone_then_clean_repos(): # and can be cleaned runner = EMAPRunner(args=parser.parse_args(["setup", "-c"]), config=config) - runner.run("setup") + runner.run() assert not exists("emap_documentation") @@ -50,7 +55,7 @@ def test_default_time_window(): assert window.start.toordinal() == date.today().toordinal() - 7 -@work_in_tmp_directory(to_copy=[config_path]) +@work_in_tmp_directory(to_copy=[config_path_only_docs]) def test_time_window_is_set(): config = GlobalConfiguration(Path("test-global-configuration-only-docs.yaml")) @@ -73,31 +78,29 @@ def test_time_window_is_set(): assert line.strip().endswith("T00:00:00.00Z") -def test_validation_source_arguments_cannot_be_both_only_hl7_and_hoover(): - """Test that both hl7 and hoover can't be used as only-X arguments""" - - parser = create_parser() - - with pytest.raises(SystemExit): - _ = parser.parse_args(["validation", "--only-hl7", "--only-hoover"]) - +@pytest.mark.parametrize( + "args_list,use_hl7,use_hoover,use_waveform", + [ + ([], True, True, False), + (["--no-use-hoover"], True, False, False), + (["--no-use-hl7"], False, True, False), + (["--use-waveform-synth"], True, True, True), -def test_validation_source_arguments_set_correct_runner_attributes(): + ]) +def test_validation_source_arguments_set_correct_runner_attributes(args_list, use_hl7, use_hoover, use_waveform): """Test source parser arguments translate to correct runner attributes""" - parser = create_parser() - args = parser.parse_args(["validation", "--only-hl7"]) - global_config = GlobalConfiguration(config_path) - - runner = ValidationRunner( - docker_runner=DockerRunner(project_dir=Path.cwd(), config=global_config), - time_window=TimeWindow(args.start_date, args.end_date), - use_hl7_reader=not args.use_only_hoover, - use_hoover=not args.use_only_hl7_reader, - ) + args = parser.parse_args(["validation", *args_list]) + global_config = GlobalConfiguration(config_path_all) - assert runner.use_hl7_reader and not runner.use_hoover + with patch.object(ValidationRunner, 'run', new_callable=MagicMock) as mock_obj: + emap_runner = EMAPRunner(args=args, config=global_config) + validation_runner = emap_runner.run() + mock_obj.assert_called_once() + assert validation_runner.use_hl7_reader == use_hl7 + assert validation_runner.use_hoover == use_hoover + assert validation_runner.use_waveform_synth == use_waveform @pytest.mark.parametrize("queue_length,expected", [(0, True), (1, False)]) From ecba32f0808e0eee40381d94eef5c7b2afc19631 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 3 Oct 2024 16:18:18 +0100 Subject: [PATCH 02/17] BooleanOptionalAction doesn't exist on python 3.8, do it ourselves --- emap-setup/emap_runner/runner.py | 47 ++++++++++++++++---------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/emap-setup/emap_runner/runner.py b/emap-setup/emap_runner/runner.py index d10c714f4..375546995 100644 --- a/emap-setup/emap_runner/runner.py +++ b/emap-setup/emap_runner/runner.py @@ -101,29 +101,10 @@ def create_parser() -> Parser: action="store_true", ) - validation_parser.add_argument( - "--use-hl7-reader", - dest="use_hl7_reader", - action=argparse.BooleanOptionalAction, - default=True, - help="Enable/disable the main HL7 ADT reader", - ) - - validation_parser.add_argument( - "--use-waveform-synth", - dest="use_waveform_synth", - action=argparse.BooleanOptionalAction, - default=False, - help="Enable/disable synthetic waveform generator", - ) - - validation_parser.add_argument( - "--use-hoover", - dest="use_hoover", - action=argparse.BooleanOptionalAction, - default=True, - help="Enable/disable the hoover service", - ) + # add use-FOO and no-use-FOO options for enabling/disabling various services in validation + add_boolean_optional_action(validation_parser, "hl7-reader", True, "the main HL7 ADT reader") + add_boolean_optional_action(validation_parser, "hoover", True, "the hoover service") + add_boolean_optional_action(validation_parser, "waveform-synth", False, "synthetic waveform generator") config_parser = subparsers.add_parser("config", help="Configuration operations") config_parser.add_argument( @@ -144,6 +125,26 @@ def create_parser() -> Parser: return parser +# BooleanOptionalAction doesn't exist on python 3.8, do it ourselves +def add_boolean_optional_action(parser: Parser, name: str, enabled_by_default, help_str): + mutex = parser.add_mutually_exclusive_group() + dest_name = "use_{}".format(name.replace("-", "_")) + mutex.add_argument( + f"--use-{name}", + dest=dest_name, + action='store_true', + default=enabled_by_default, + help=f"Enable {help_str}", + ) + mutex.add_argument( + f"--no-use-{name}", + dest=dest_name, + action='store_false', + default=enabled_by_default, + help=f"Disable {help_str}", + ) + + class EMAPRunner: def __init__(self, args: argparse.Namespace, config: GlobalConfiguration): From 9902814501b4971be44f73006fe57b112ce8d1bc Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 3 Oct 2024 16:44:51 +0100 Subject: [PATCH 03/17] synth vs reader confusion --- emap-setup/emap_runner/validation/validation_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/emap-setup/emap_runner/validation/validation_runner.py b/emap-setup/emap_runner/validation/validation_runner.py index f513eb73a..3930d3499 100644 --- a/emap-setup/emap_runner/validation/validation_runner.py +++ b/emap-setup/emap_runner/validation/validation_runner.py @@ -156,7 +156,9 @@ def _run_emap(self) -> None: output_filename=f"{self.log_file_prefix}_hoover.txt", ) - if self.use_waveform_reader: + # This needs further thought: it only brings up the reader for now (which may be + # reading from a file), probably need to have separate options for file vs synth? + if self.use_waveform_synth: self.docker.run( "up --exit-code-from waveform-reader waveform-reader", output_filename=f"{self.log_file_prefix}_waveform-reader.txt", From e178b6d335e9100718d37a393dbc854266ea233d Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 3 Oct 2024 19:05:39 +0100 Subject: [PATCH 04/17] Don't parse values as numeric unless they are in fact numeric --- .../datasources/waveform/Hl7ParseAndSend.java | 20 ++++++++++++++----- .../src/test/resources/hl7/test1.hl7 | 1 + 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java index 13a2c8ed4..bd3a6cc8c 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Set; @Component public class Hl7ParseAndSend { @@ -78,11 +79,6 @@ List parseHl7(String messageAsStr) throws Hl7ParseException { Instant obsDatetime = Instant.from(ta); String streamId = obx.getField(3); - String allPointsStr = obx.getField(5); - if (allPointsStr.contains("~")) { - throw new Hl7ParseException("must only be 1 repeat in OBX-5"); - } - List points = Arrays.stream(allPointsStr.split("\\^")).map(Double::parseDouble).toList(); Optional metadataOpt = sourceMetadata.getStreamMetadata(streamId); if (metadataOpt.isEmpty()) { @@ -100,6 +96,20 @@ List parseHl7(String messageAsStr) throws Hl7ParseException { String mappedStreamDescription = metadata.mappedStreamDescription(); String unit = metadata.unit(); + // non-numerical types won't be able to go in the waveform table, but it's possible + // we might need them as a VisitObservation + String hl7Type = obx.getField(2); + if (!Set.of("NM", "NA").contains(hl7Type)) { + logger.warn("Skipping stream {} with type {}, not numerical", streamId, hl7Type); + continue; + } + String allPointsStr = obx.getField(5); + if (allPointsStr.contains("~")) { + throw new Hl7ParseException("must only be 1 repeat in OBX-5"); + } + + List points = Arrays.stream(allPointsStr.split("\\^")).map(Double::parseDouble).toList(); + String messageIdSpecific = String.format("%s_%d_%d", messageIdBase, obrI, obxI); logger.debug("location {}, time {}, messageId {}, value count = {}", locationId, obsDatetime, messageIdSpecific, points.size()); diff --git a/waveform-reader/src/test/resources/hl7/test1.hl7 b/waveform-reader/src/test/resources/hl7/test1.hl7 index 91f672cd3..0c12b91c4 100644 --- a/waveform-reader/src/test/resources/hl7/test1.hl7 +++ b/waveform-reader/src/test/resources/hl7/test1.hl7 @@ -9,3 +9,4 @@ OBR|||||||20240731142108.111+0100|||UCHT03ICURM08|||UCHT03ICURM08| OBX|1|NA|51911||42.40^43.40^44.40^45.40||||||F||20|20240731142108.111+0100| OBX|2|NA|52921||42.50^43.50^44.5^45.5^46.5||||||F||20|20240731142108.111+0100| OBX|3|NA|4242||42.60||||||F||20|20240731142108.111+0100| +OBX|4|ST|52921||oopsnotanumber||||||F||20|20240731142108.111+0100| From 387f9d4824b80002336158e5f71a0b8716f3c91b Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 4 Oct 2024 13:31:22 +0100 Subject: [PATCH 05/17] Log, skip and continue if HL7 messages cause parsing errors --- .../datasources/waveform/Hl7ParseAndSend.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java index bd3a6cc8c..307f12c26 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java @@ -144,12 +144,20 @@ private WaveformMessage waveformMessageFromValues( /** * Parse an HL7 message and store the resulting WaveformMessage in the queue awaiting collation. + * If HL7 is invalid or in a form that the ad hoc parser can't handle, log error and skip. * @param messageAsStr One HL7 message as a string - * @throws Hl7ParseException if HL7 is invalid or in a form that the ad hoc parser can't handle * @throws WaveformCollator.CollationException if the data has a logical error that prevents collation */ - public void parseAndQueue(String messageAsStr) throws Hl7ParseException, WaveformCollator.CollationException { - List msgs = parseHl7(messageAsStr); + public void parseAndQueue(String messageAsStr) throws WaveformCollator.CollationException { + List msgs; + try { + msgs = parseHl7(messageAsStr); + } catch (Hl7ParseException e) { + logger.error("HL7 parsing failed, first 100 chars: {}\nstacktrace {}", + messageAsStr.substring(0, 100), + e.getStackTrace()); + return; + } logger.trace("HL7 message generated {} Waveform messages, sending for collation", msgs.size()); waveformCollator.addMessages(msgs); From c9f49fd6fb9bbb0bbc460884fa5a87ee7bed7473 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 4 Oct 2024 14:31:28 +0100 Subject: [PATCH 06/17] Logic for checking empty queues seems to be inverted --- emap-setup/emap_runner/validation/validation_runner.py | 4 ++-- emap-setup/tests/test_runner.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/emap-setup/emap_runner/validation/validation_runner.py b/emap-setup/emap_runner/validation/validation_runner.py index 3930d3499..14f061110 100644 --- a/emap-setup/emap_runner/validation/validation_runner.py +++ b/emap-setup/emap_runner/validation/validation_runner.py @@ -217,10 +217,10 @@ def _has_populated_queues(self) -> bool: "exec rabbitmq rabbitmqctl -q list_queues", output_lines=output_lines ) - return self._stdout_rabbitmq_lines_have_zero_length_queues(output_lines) + return not self._stdout_rabbitmq_queues_all_zero_length(output_lines) @staticmethod - def _stdout_rabbitmq_lines_have_zero_length_queues(lines: List[str]) -> bool: + def _stdout_rabbitmq_queues_all_zero_length(lines: List[str]) -> bool: """ Do a set of output lines generated from querying the rabbitmq queues indicate that all queues are empty? diff --git a/emap-setup/tests/test_runner.py b/emap-setup/tests/test_runner.py index 491f24b56..b0eecc40f 100644 --- a/emap-setup/tests/test_runner.py +++ b/emap-setup/tests/test_runner.py @@ -114,5 +114,5 @@ def test_zero_length_queues(queue_length, expected): "hl7Queue 0\n", ] - method = ValidationRunner._stdout_rabbitmq_lines_have_zero_length_queues + method = ValidationRunner._stdout_rabbitmq_queues_all_zero_length assert method(lines) == expected From 68b8ad1b8bb1219bed6f908c29a6ac5a435a3e2b Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 4 Oct 2024 14:35:30 +0100 Subject: [PATCH 07/17] microsecond/second mixup meant that timeout was never reached --- emap-setup/emap_runner/validation/validation_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/emap-setup/emap_runner/validation/validation_runner.py b/emap-setup/emap_runner/validation/validation_runner.py index 14f061110..c90ee1e1f 100644 --- a/emap-setup/emap_runner/validation/validation_runner.py +++ b/emap-setup/emap_runner/validation/validation_runner.py @@ -197,8 +197,8 @@ def _exceeded_timeout(self) -> bool: @property def _elapsed_time(self) -> timedelta: - """Seconds elapsed since the runner started""" - return timedelta(microseconds=time() - self.start_time) + """Time elapsed since the runner started""" + return timedelta(seconds=time() - self.start_time) @property def _has_populated_queues(self) -> bool: From 21c425983cdda8cc3e6c133dcd24007028308762 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 4 Oct 2024 14:48:21 +0100 Subject: [PATCH 08/17] Handle missing segments without crashing --- .../rits/inform/datasources/waveform/hl7parse/Hl7Message.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Message.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Message.java index 664ca6b22..0a32ea43c 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Message.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Message.java @@ -95,7 +95,7 @@ public String getField(String segmentName, int field1Index) throws Hl7ParseExcep } private Hl7Segment getSingleHl7Segment(String segmentName, int field1Index) throws Hl7ParseException { - List seg = segmentsBySegmentName.get(segmentName); + List seg = getSegments(segmentName); if (seg.size() != 1) { throw new Hl7ParseException( String.format("getField(%s, %d) can only be called on single segments seg, got size %d", @@ -105,7 +105,7 @@ private Hl7Segment getSingleHl7Segment(String segmentName, int field1Index) thro } public List getSegments(String segmentName) { - return segmentsBySegmentName.get(segmentName); + return segmentsBySegmentName.getOrDefault(segmentName, new ArrayList<>()); } } From c5a2465a0d753db853db5a3a61894b449785dc03 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 4 Oct 2024 14:58:47 +0100 Subject: [PATCH 09/17] String.substring needs to be in bounds --- .../ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java index 307f12c26..519ebc0fb 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java @@ -154,7 +154,7 @@ public void parseAndQueue(String messageAsStr) throws WaveformCollator.Collation msgs = parseHl7(messageAsStr); } catch (Hl7ParseException e) { logger.error("HL7 parsing failed, first 100 chars: {}\nstacktrace {}", - messageAsStr.substring(0, 100), + messageAsStr.substring(0, Math.min(100, messageAsStr.length())), e.getStackTrace()); return; } From 10474e0cfaa71a7d65d832ad785a69e502472a55 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 4 Oct 2024 15:32:41 +0100 Subject: [PATCH 10/17] Translate more assorted parsing errors to Hl7ParseException for better handling. --- .../inform/datasources/waveform/Hl7FromFile.java | 7 +++++-- .../datasources/waveform/Hl7ParseAndSend.java | 10 ++++++++-- .../datasources/waveform/hl7parse/Hl7Segment.java | 14 ++++++++++++-- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java index e083c7703..d157410eb 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java @@ -56,8 +56,11 @@ void readOnceAndQueue(File hl7DumpFile) throws Hl7ParseException, WaveformCollat } List messages = readFromFile(hl7DumpFile); logger.info("Read {} HL7 messages from test dump file", messages.size()); - for (var m: messages) { - hl7ParseAndSend.parseAndQueue(m); + for (int mi = 0; mi < messages.size(); mi++) { + hl7ParseAndSend.parseAndQueue(messages.get(mi)); + if (mi % 100 == 0) { + logger.info("handled {} messages out of {}", mi + 1, messages.size()); + } } logger.info("Queued {} HL7 messages from test dump file", messages.size()); } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java index 519ebc0fb..fb6aed15d 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java @@ -12,6 +12,7 @@ import uk.ac.ucl.rits.inform.interchange.InterchangeValue; import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage; +import java.time.DateTimeException; import java.time.Instant; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; @@ -75,8 +76,13 @@ List parseHl7(String messageAsStr) throws Hl7ParseException { logger.trace("Parsing datetime {}", obsDatetimeStr); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSSZZ"); - TemporalAccessor ta = formatter.parse(obsDatetimeStr); - Instant obsDatetime = Instant.from(ta); + Instant obsDatetime; + try { + TemporalAccessor ta = formatter.parse(obsDatetimeStr); + obsDatetime = Instant.from(ta); + } catch (DateTimeException e) { + throw (Hl7ParseException) new Hl7ParseException("Datetime parsing failed").initCause(e); + } String streamId = obx.getField(3); diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Segment.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Segment.java index b917a0fe2..7e1661cee 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Segment.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/hl7parse/Hl7Segment.java @@ -33,8 +33,18 @@ public Hl7Segment(String segment) { logger.trace("Segment: name = {}, fields = {}", this.segmentName, this.fields); } - public String getField(int field1Index) { - return fields[field1Index]; + /** + * Get unparsed contents of HL7 field for this segment. + * @param field1Index The HL7 field index within the segment, starting at 1 + * @return unparsed string contents of the field + * @throws Hl7ParseException if field does not exist + */ + public String getField(int field1Index) throws Hl7ParseException { + try { + return fields[field1Index]; + } catch (ArrayIndexOutOfBoundsException e) { + throw (Hl7ParseException) new Hl7ParseException("non existent field").initCause(e); + } } public void addChildSegment(Hl7Segment seg) { From 4b966782a0cdbc89d83991b01daba98e6ba6bbf1 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 7 Oct 2024 15:07:33 +0100 Subject: [PATCH 11/17] Don't use waveform generator in validation --- emap-setup/emap_runner/runner.py | 4 ++-- .../emap_runner/validation/validation_runner.py | 11 ++++++----- emap-setup/tests/test_runner.py | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/emap-setup/emap_runner/runner.py b/emap-setup/emap_runner/runner.py index 375546995..04317aef0 100644 --- a/emap-setup/emap_runner/runner.py +++ b/emap-setup/emap_runner/runner.py @@ -104,7 +104,7 @@ def create_parser() -> Parser: # add use-FOO and no-use-FOO options for enabling/disabling various services in validation add_boolean_optional_action(validation_parser, "hl7-reader", True, "the main HL7 ADT reader") add_boolean_optional_action(validation_parser, "hoover", True, "the hoover service") - add_boolean_optional_action(validation_parser, "waveform-synth", False, "synthetic waveform generator") + add_boolean_optional_action(validation_parser, "waveform", False, "waveform reader") config_parser = subparsers.add_parser("config", help="Configuration operations") config_parser.add_argument( @@ -211,7 +211,7 @@ def validation(self) -> ValidationRunner: should_build=not self.args.skip_build, use_hl7_reader=self.args.use_hl7_reader, use_hoover=use_hoover, - use_waveform_synth=self.args.use_waveform_synth, + use_waveform=self.args.use_waveform, ) runner.run() return runner diff --git a/emap-setup/emap_runner/validation/validation_runner.py b/emap-setup/emap_runner/validation/validation_runner.py index c90ee1e1f..27b75636a 100644 --- a/emap-setup/emap_runner/validation/validation_runner.py +++ b/emap-setup/emap_runner/validation/validation_runner.py @@ -20,14 +20,14 @@ def __init__( should_build: bool = True, use_hl7_reader: bool = True, use_hoover: bool = True, - use_waveform_synth: bool = False, + use_waveform: bool = False, ): """Validation runner that will be run over a time window""" self.should_build = should_build self.use_hl7_reader = use_hl7_reader self.use_hoover = use_hoover - self.use_waveform_synth = use_waveform_synth + self.use_waveform = use_waveform self.start_time = None self.timeout = timedelta(hours=10) @@ -156,9 +156,10 @@ def _run_emap(self) -> None: output_filename=f"{self.log_file_prefix}_hoover.txt", ) - # This needs further thought: it only brings up the reader for now (which may be - # reading from a file), probably need to have separate options for file vs synth? - if self.use_waveform_synth: + # Assume for the time being that this validation option is only to control whether + # the waveform *reader* is brought up, which will be reading from an HL7 dump file + # and therefore doesn't require the waveform-generator. + if self.use_waveform: self.docker.run( "up --exit-code-from waveform-reader waveform-reader", output_filename=f"{self.log_file_prefix}_waveform-reader.txt", diff --git a/emap-setup/tests/test_runner.py b/emap-setup/tests/test_runner.py index b0eecc40f..3ed78ade9 100644 --- a/emap-setup/tests/test_runner.py +++ b/emap-setup/tests/test_runner.py @@ -84,7 +84,7 @@ def test_time_window_is_set(): ([], True, True, False), (["--no-use-hoover"], True, False, False), (["--no-use-hl7"], False, True, False), - (["--use-waveform-synth"], True, True, True), + (["--use-waveform"], True, True, True), ]) def test_validation_source_arguments_set_correct_runner_attributes(args_list, use_hl7, use_hoover, use_waveform): @@ -100,7 +100,7 @@ def test_validation_source_arguments_set_correct_runner_attributes(args_list, us mock_obj.assert_called_once() assert validation_runner.use_hl7_reader == use_hl7 assert validation_runner.use_hoover == use_hoover - assert validation_runner.use_waveform_synth == use_waveform + assert validation_runner.use_waveform == use_waveform @pytest.mark.parametrize("queue_length,expected", [(0, True), (1, False)]) From 2d5872d9e08830d5fd0dd514e3210ccb3a00c501 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 7 Oct 2024 15:54:06 +0100 Subject: [PATCH 12/17] Reproduce issue #25 --- emap-setup/tests/test_runner.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/emap-setup/tests/test_runner.py b/emap-setup/tests/test_runner.py index 3ed78ade9..85ad7a1ab 100644 --- a/emap-setup/tests/test_runner.py +++ b/emap-setup/tests/test_runner.py @@ -14,7 +14,7 @@ ) from emap_runner.docker.docker_runner import DockerRunner from emap_runner.global_config import GlobalConfiguration -from emap_runner.utils import TimeWindow +from emap_runner.utils import TimeWindow, EMAPRunnerException from .utils import work_in_tmp_directory config_path_only_docs = Path( @@ -45,6 +45,34 @@ def test_clone_then_clean_repos(): assert not exists("emap_documentation") +@work_in_tmp_directory(to_copy=None) +def test_double_clone(): + """ + Reproduce issue #25 + """ + parser = create_parser() + config = GlobalConfiguration(config_path_only_docs) + + args = ["setup", "-i"] + runner = EMAPRunner(args=parser.parse_args(args), config=config) + runner.run() + # Make some un-pushed changes to newly cloned repo + file_to_keep = Path('emap_documentation/my_favourite_file.txt') + with open(file_to_keep, 'w') as fh: + fh.write("cheese") + + assert file_to_keep.exists() + + # try to clone again, should get error + with pytest.raises(EMAPRunnerException, match='already existed'): + runner = EMAPRunner(args=parser.parse_args(args), config=config) + runner.run() + + # my local changes are still there + assert file_to_keep.exists() + + + def test_default_time_window(): parser = create_parser() From e0d9c999be8f67347bb17e27efdb7d2d71830f53 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 7 Oct 2024 16:10:49 +0100 Subject: [PATCH 13/17] Fix issue #25 - don't delete existing repo without warning when running `setup -i` --- emap-setup/emap_runner/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/emap-setup/emap_runner/runner.py b/emap-setup/emap_runner/runner.py index 04317aef0..67914a7f2 100644 --- a/emap-setup/emap_runner/runner.py +++ b/emap-setup/emap_runner/runner.py @@ -157,7 +157,6 @@ def setup(self) -> None: repos = self.global_config.extract_repositories(branch_name=self.args.branch) if self.args.init: - repos.clean(print_warnings=False) repos.clone() elif self.args.update: From fd9d98792d62b15dfc46ffd45ca3076ac498baca Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 7 Oct 2024 18:48:00 +0100 Subject: [PATCH 14/17] Bring design doc more up to date. Still needs more work though. --- docs/dev/features/waveform_hf_data.md | 84 +++++++++++++++++-------- emap-setup/emap_runner/global_config.py | 8 ++- 2 files changed, 63 insertions(+), 29 deletions(-) diff --git a/docs/dev/features/waveform_hf_data.md b/docs/dev/features/waveform_hf_data.md index c8d83ca04..38d994631 100644 --- a/docs/dev/features/waveform_hf_data.md +++ b/docs/dev/features/waveform_hf_data.md @@ -3,24 +3,41 @@ ## Feature overview +## Config options added + +- retention time +- ??? ## Design details ### Emap DB design -As we understand it, there is no mechanism for correcting or updating waveform data, so we may just not have audit tables at all, -as nothing would ever be put into them. Does this remove the need for the valid_from / stored_from columns? +A key part of the waveform data table design is the use of SQL arrays, without which the storage would be +extremely inefficient. This way we can store up to 3000 data points in a single database row. + +There is one set of metadata per row regardless of array size, +so increasing the array size increases the storage efficiency. -We will still need an observation date, which in a less storage critical table would be identical to the valid_from date. -Removing that duplication could be worth it, even if we lose some semantic tidiness. +But there is a tradeoff with timeliness. Since we don't append to arrays once written, we have to wait for all +data points to arrive before we can write them in one go. 3000 points at 300Hz is 10 seconds worth of data. +Having to wait for too much data would mean that our aim to make Emap no more than 10-20 seconds out of date would +no longer hold. +(see also https://github.com/UCLH-DHCT/emap/issues/65 ) -stored_from might still be useful to know when that item got written to the database, even if we never intend to unstore it. +As far as we know, there is no mechanism for correcting or updating waveform data, +so the audit table will always be empty. +Although we do delete old data for storage capacity reasons, moving it to the audit table in this case +would defeat the whole purpose of its deletion! -TODO: the actual DB design. See the Core processor logic issue for more discussion on this. +Stream metadata is stored in the `visit_observation_type` table, as it is for visit observations. +Waveform data semantically could have gone in the `visit_observation` table if it weren't for the +storage efficiency problems this would cause. ### Core processor logic (orphan data problem) +See issue https://github.com/UCLH-DHCT/emap/issues/36 + The HL7 messages that Emap has been ingesting up to this point tend to be highly redundant. Every message contains basic patient demographic/encounter data. This means that if we receive messages out of order, keeping the database in a usable state at all intermediate points is not too hard. @@ -88,9 +105,12 @@ Performance metrics that we need to watch: ### Storage efficiency -My initial tests have assumed that there will be 30 patients generating data from one 50Hz and one 300Hz waveform source at all times. +My initial tests assumed that there will be 30 patients generating data from one 50Hz and one 300Hz waveform source at all times. +(In actual fact it's going to be more like 5-10 patients +ventilated at any one time, but the ventilator has more than two data streams) -At this rate of data flow, my very naive DB implementation results in ~100GB of backend postgres disk usage being generated per day +At this rate of data flow, my very naive DB implementation using 1 data point per row +resulted in ~100GB of backend postgres disk usage being generated per day - clearly far too much if we're aiming for the UDS to stay under 1TB, although that figure may be quite out of date! You can calculate a theoretical minimum: @@ -100,40 +120,50 @@ If we keep only the last 7 days of data, that caps it at **~60GB overall**. Will need to allow for some row metadata, the underlying DB having to be padded/aligned/whatever, and it will be a bit more. Am assuming compression is impossible. -Using SQL arrays is likely to significantly reduce the data storage needed vs the naive implementation. +Using SQL arrays vastly improves the actual efficiency vs 1 per row. +#### Further improvements + +See issue https://github.com/UCLH-DHCT/emap/issues/62 for a discussion of further improvements. ### HL7 ingress -There is a piece of software in the hospital called Smartlinx, which can apparently be fairly easily configured to stream HL7 waveform data in our direction. -Looking at Elise's code for performing dumps of waveform data, it seems to be setting up a server, which Smartlinx then connects to. +There is a piece of software in the hospital called Smartlinx which collates data from little dongles +which plug into the back of ventilators and bedside monitors. +It can be easily configured to stream HL7 waveform data in our direction. However, a large concern is that we avoid +jeopardizing the existing flow of (non-waveform) data that goes from Smartlinx to Epic. + +Therefore we will need to have our own Smartlinx licence and the server to run it on, so we are separate from the +existing one. + +So far, it has only been set up to provide one patient's waveform data at a time, and only for short periods. It's +unknown how it would cope if it were sending *all* patients' data to us. + +To receive waveform data from Smartlinx, you must listen on a TCP port which Smartlinx then connects to. This seems rather unusual to me! We should be validating the source IP address at the very least if this is how it has to work. - - Can Smartlinx replay missed messages if we go down? - - Does Smartlinx support/require acknowledgement messages? - - Will we need to do our own buffering? Can we do the same thing the IDS does (whatever that is)? +Can Smartlinx replay missed messages if we go down? No. + +Does Smartlinx support/require acknowledgement messages? No. + +Will we need to do our own buffering? Can we do the same thing the IDS does (whatever that is)? +Maybe. See issue https://github.com/UCLH-DHCT/emap/issues/48 for why we might have to implement some sort of buffering. HL7 messages from bedside monitor have ~40 measurements per message; ventilators ~1-10 (50Hz); ECGs (300Hz) not known. So this will be a high volume of text relative to the actual data. Although this inefficiency might keep a CPU core on the GAE fairly busy, at least it won't have any effect on the Emap queue and DB. Since we plan to use SQL arrays in the DB, and Emap doesn't have a mechanism to collate multiple incoming interchange messages, -we want each interchange message to result in (at least) one DB row being written in its final form (updating an SQL array is likely not efficient). -Therefore I plan to collect up about a second's worth of data for a given patient/machine and send that as one interchange message, so it can become a single row in the DB. - -This could mean having some sort of buffer for the HL7 reader that stores pending data. Probably post-parsing. - -Or you could just wait for 1 second of HL7 messages to come in - which will span many patients/machines of course - and -process them in a batch, in memory, assigning all data points for the same patient/machine to the same message, then send them all. -This avoids the need for storing pending data, but could mean the data is chopped up into slightly uneven fragments (consider -what happens if machine type A likes to batch up 5 second's worth of messages, and machine type B likes to drip feed them). -Also, this will be 1 second of message receipt, not of observation time! +we want each interchange message to result in one DB waveform row. +Therefore I collect up to 3000 data points in memory for each patient+data stream, collate it and send as a single +interchange message, so it can become a single row in the DB. -Speaking of timestamps, the HL7 messages contain two of them. The "capsule axon" time, and the server time. -I've forgotten the difference, but Elise knows. The local time on the ventilators that has to be set manually twice a year to account for DST is not in the HL7 messages. +The HL7 messages contain two timestamps. The "capsule axon" time (observation time?), and the server time (in MSH?). +I've forgotten the difference, but Elise knows. +The local time on the ventilators that has to be set manually twice a year to account for DST is not in the HL7 messages. ### Pre-release validation -This assumes we have the ability to replay waveform HL7 messages. +This requires the ability to replay waveform HL7 messages. We can currently do this using a text HL7 dump file. We could keep a test stream of real or synthetic messages, but it would have to be continously updated to store (say) the last 7 days of messages, otherwise this would lose some of the benefits of the validation process. As a fallback, you could perform waveform validation live, but this would mean a 7 day validation period would take 7 days to run, diff --git a/emap-setup/emap_runner/global_config.py b/emap-setup/emap_runner/global_config.py index bc2b147a0..0e53033f5 100644 --- a/emap-setup/emap_runner/global_config.py +++ b/emap-setup/emap_runner/global_config.py @@ -138,8 +138,12 @@ def _substitute_vars(self, env_file: "EnvironmentFile") -> None: return None def get_first(self, key: str, section: str) -> str: - """Get the first value of a key within a section of this global - configuration. If it cannot be found then use the top-level""" + """ + Search the config for the given key in the following order: + - In the section given by arg `section` + - In any of the sections in possible_sections + - At the top level + """ if section in self and key in self[section]: """ From b7819da7acf670c9fade710dc642a244c211da39 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Tue, 8 Oct 2024 11:11:09 +0100 Subject: [PATCH 15/17] Address review feedback --- emap-setup/emap_runner/runner.py | 7 +++--- .../validation/validation_runner.py | 25 ++++++------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/emap-setup/emap_runner/runner.py b/emap-setup/emap_runner/runner.py index 67914a7f2..ac42db116 100644 --- a/emap-setup/emap_runner/runner.py +++ b/emap-setup/emap_runner/runner.py @@ -199,8 +199,9 @@ def docker(self) -> None: def validation(self) -> ValidationRunner: """Run a validation run of EMAP""" - # allow for hoover not to be defined in global config - use_hoover = ("hoover" in self.global_config["repositories"]) and self.args.use_hoover + # user should explicitly switch off hoover if not defined in global config + if self.args.use_hoover and "hoover" not in self.global_config["repositories"]: + raise ValueError("hoover requested but is missing from repositories in global config") runner = ValidationRunner( docker_runner=DockerRunner(project_dir=Path.cwd(), config=self.global_config), @@ -209,7 +210,7 @@ def validation(self) -> ValidationRunner: ), should_build=not self.args.skip_build, use_hl7_reader=self.args.use_hl7_reader, - use_hoover=use_hoover, + use_hoover=self.args.use_hoover, use_waveform=self.args.use_waveform, ) runner.run() diff --git a/emap-setup/emap_runner/validation/validation_runner.py b/emap-setup/emap_runner/validation/validation_runner.py index 27b75636a..49c2e8f91 100644 --- a/emap-setup/emap_runner/validation/validation_runner.py +++ b/emap-setup/emap_runner/validation/validation_runner.py @@ -1,6 +1,6 @@ from subprocess import Popen from datetime import date, timedelta -from time import time, sleep +import time from typing import Union, List from pathlib import Path @@ -29,7 +29,6 @@ def __init__( self.use_hoover = use_hoover self.use_waveform = use_waveform - self.start_time = None self.timeout = timedelta(hours=10) self.docker = docker_runner @@ -174,33 +173,23 @@ def _wait_for_queue_to_empty(self) -> None: If it's still going after 10 hours something's gone very wrong and we should give up """ - self.start_time = time() + start_time_monotonic = time.monotonic() while self._has_populated_queues: - - sleep(120) - - if self._exceeded_timeout: + time.sleep(120) + elapsed_time = timedelta(seconds=time.monotonic() - start_time_monotonic) + if elapsed_time > self.timeout: self._save_logs_and_stop() raise ValidationRunnerException( f"Waiting for queue timed out. Elapsed time " - f"({self._elapsed_time}) > timeout ({self.timeout})" + f"({elapsed_time}) > timeout ({self.timeout})" ) # exits too keenly from databaseExtracts queue, adding in a wait period - sleep(600) + time.sleep(600) return None - @property - def _exceeded_timeout(self) -> bool: - return self._elapsed_time > self.timeout - - @property - def _elapsed_time(self) -> timedelta: - """Time elapsed since the runner started""" - return timedelta(seconds=time() - self.start_time) - @property def _has_populated_queues(self) -> bool: """Are there queues that are still populated? From 498a49540f56b5cd588698ea1feac18b8f5f55d9 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Tue, 8 Oct 2024 17:28:54 +0100 Subject: [PATCH 16/17] Add tests for recently changed code --- .../validation/validation_runner.py | 14 ++-- emap-setup/tests/test_runner.py | 75 +++++++++++++++---- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/emap-setup/emap_runner/validation/validation_runner.py b/emap-setup/emap_runner/validation/validation_runner.py index 49c2e8f91..b0b7ed116 100644 --- a/emap-setup/emap_runner/validation/validation_runner.py +++ b/emap-setup/emap_runner/validation/validation_runner.py @@ -13,6 +13,10 @@ class ValidationRunnerException(EMAPRunnerException): class ValidationRunner: + timeout = timedelta(hours=10) + wait_secs = 120 + final_wait_secs = 600 + def __init__( self, docker_runner: "DockerRunner", @@ -29,8 +33,6 @@ def __init__( self.use_hoover = use_hoover self.use_waveform = use_waveform - self.timeout = timedelta(hours=10) - self.docker = docker_runner self.time_window = time_window @@ -176,17 +178,17 @@ def _wait_for_queue_to_empty(self) -> None: start_time_monotonic = time.monotonic() while self._has_populated_queues: - time.sleep(120) + time.sleep(ValidationRunner.wait_secs) elapsed_time = timedelta(seconds=time.monotonic() - start_time_monotonic) - if elapsed_time > self.timeout: + if elapsed_time > ValidationRunner.timeout: self._save_logs_and_stop() raise ValidationRunnerException( f"Waiting for queue timed out. Elapsed time " - f"({elapsed_time}) > timeout ({self.timeout})" + f"({elapsed_time}) > timeout ({ValidationRunner.timeout})" ) # exits too keenly from databaseExtracts queue, adding in a wait period - time.sleep(600) + time.sleep(ValidationRunner.final_wait_secs) return None diff --git a/emap-setup/tests/test_runner.py b/emap-setup/tests/test_runner.py index 85ad7a1ab..e44cf8e7c 100644 --- a/emap-setup/tests/test_runner.py +++ b/emap-setup/tests/test_runner.py @@ -1,16 +1,17 @@ -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, PropertyMock import pytest from os import mkdir from os.path import dirname, abspath, exists from pathlib import Path -from datetime import date +from datetime import date, timedelta from emap_runner.runner import create_parser, EMAPRunner from emap_runner.validation.validation_runner import ( ValidationRunner, TemporaryEnvironmentState, + ValidationRunnerException, ) from emap_runner.docker.docker_runner import DockerRunner from emap_runner.global_config import GlobalConfiguration @@ -107,29 +108,75 @@ def test_time_window_is_set(): @pytest.mark.parametrize( - "args_list,use_hl7,use_hoover,use_waveform", + "args_list,hoover_in_config,exp_use_hl7,exp_use_hoover,exp_use_waveform,exp_raises", [ - ([], True, True, False), - (["--no-use-hoover"], True, False, False), - (["--no-use-hl7"], False, True, False), - (["--use-waveform"], True, True, True), + ([], True, True, True, False, False), + ([], False, True, True, False, True), # hoover requested but not present in repos + (["--no-use-hoover"], True, True, False, False, False), + (["--no-use-hoover"], False, True, False, False, False), + (["--no-use-hl7"], True, False, True, False, False), + (["--use-waveform"], True, True, True, True, False), ]) -def test_validation_source_arguments_set_correct_runner_attributes(args_list, use_hl7, use_hoover, use_waveform): +def test_validation_source_arguments_set_correct_runner_attributes(args_list, + hoover_in_config, + exp_use_hl7, exp_use_hoover, exp_use_waveform, exp_raises): """Test source parser arguments translate to correct runner attributes""" parser = create_parser() args = parser.parse_args(["validation", *args_list]) - global_config = GlobalConfiguration(config_path_all) + global_config = GlobalConfiguration(config_path_all if hoover_in_config else config_path_only_docs) with patch.object(ValidationRunner, 'run', new_callable=MagicMock) as mock_obj: emap_runner = EMAPRunner(args=args, config=global_config) - validation_runner = emap_runner.run() - mock_obj.assert_called_once() - assert validation_runner.use_hl7_reader == use_hl7 - assert validation_runner.use_hoover == use_hoover - assert validation_runner.use_waveform == use_waveform + if exp_raises: + with pytest.raises(ValueError, match="hoover requested but is missing"): + validation_runner = emap_runner.run() + else: + validation_runner = emap_runner.run() + mock_obj.assert_called_once() + assert validation_runner.use_hl7_reader == exp_use_hl7 + assert validation_runner.use_hoover == exp_use_hoover + assert validation_runner.use_waveform == exp_use_waveform + + +@pytest.mark.parametrize( + "num_trues,timeout_seconds,expect_raises", + [ + (0, 1, False), + (1, 1, False), + (2, 1, True), + ]) +def test_validation_timeout(num_trues, timeout_seconds, expect_raises): + parser = create_parser() + args = parser.parse_args(["validation", "--use-waveform"]) + global_config = GlobalConfiguration(config_path_all) + # simulate having to go through the check and wait loop a variable number of times + def mock_has_populated_queues(num_trues: int): + call_count = 0 + + def inner_mock_has_populated_queues(): + nonlocal call_count + call_count += 1 + return call_count <= num_trues + return inner_mock_has_populated_queues + + with patch.multiple(ValidationRunner, timeout=timedelta(seconds=timeout_seconds), wait_secs=0.6, final_wait_secs=0): + with patch.object(ValidationRunner, '_run_emap', new_callable=MagicMock) as run_emap: + with patch.object(ValidationRunner, '_has_populated_queues', new_callable=PropertyMock) as populated_queues: + with patch.object(ValidationRunner, '_save_logs_and_stop', new_callable=MagicMock) as stop: + populated_queues.side_effect = mock_has_populated_queues(num_trues) + # run the whole validation run, with some mocks + emap_runner = EMAPRunner(args=args, config=global_config) + if expect_raises: + with pytest.raises(ValidationRunnerException, match="Waiting for queue timed out"): + emap_runner.run() + else: + emap_runner.run() + run_emap.assert_called_once() + stop.assert_called_once() + populated_queues.assert_called() @pytest.mark.parametrize("queue_length,expected", [(0, True), (1, False)]) def test_zero_length_queues(queue_length, expected): From d2b0c8fccfb9d7156242f39f385a748d4074ccb5 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 9 Oct 2024 14:14:15 +0100 Subject: [PATCH 17/17] Update docs and rename property for clarity --- .../dataprocessors/WaveformProcessor.java | 10 +- .../src/main/resources/application.properties | 2 +- docs/dev/features/waveform_hf_data.md | 92 +++++++++++++------ 3 files changed, 70 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java b/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java index b31a33a1c..073666652 100644 --- a/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java +++ b/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java @@ -27,8 +27,8 @@ public class WaveformProcessor { @Value("${core.waveform.retention_hours}") private int retentionTimeHours; - @Value("${core.waveform.is_synthetic_data}") - private boolean isSyntheticData; + @Value("${core.waveform.is_non_current_test_data}") + private boolean isNonCurrentTestData; /** * @param visitObservationController visit observation controller @@ -61,8 +61,8 @@ public void processMessage(final WaveformMessage msg, final Instant storedFrom) public void deleteOldWaveformData() { logger.info("deleteOldWaveformData: Checking for old waveform data for deletion"); Instant baselineDatetime; - if (isSyntheticData) { - // while still in proof of concept, use the current data (which may be for a + if (isNonCurrentTestData) { + // while testing, use the current data (which may be for a // date far from the present) as a reference for when to apply retention cutoff date from. // ie. assume the time of the most recent data is "now" baselineDatetime = waveformController.mostRecentObservationDatatime(); @@ -74,8 +74,6 @@ public void deleteOldWaveformData() { } else { baselineDatetime = Instant.now(); } - // probably want to round to the nearest day so we do all the day at once, - // rather than little and often Instant cutoff = baselineDatetime.minus(retentionTimeHours, ChronoUnit.HOURS); logger.info("deleteOldWaveformData: baseline = {}, cutoff = {}", baselineDatetime, cutoff); int numDeleted = waveformController.deleteOldWaveformData(cutoff); diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties index 4e62f9fae..6769be9aa 100644 --- a/core/src/main/resources/application.properties +++ b/core/src/main/resources/application.properties @@ -25,6 +25,6 @@ core.rabbitmq.listen_queues = hl7Queue,databaseExtracts,extensionProjects,wavefo # Data older than this is liable to be deleted to keep overall disk usage small. # In production we will want to have this longer (more like 7 days) core.waveform.retention_hours = 1 -core.waveform.is_synthetic_data = 1 +core.waveform.is_non_current_test_data = 0 spring.rabbitmq.listener.simple.acknowledge-mode=manual diff --git a/docs/dev/features/waveform_hf_data.md b/docs/dev/features/waveform_hf_data.md index 38d994631..ba2ea4ea4 100644 --- a/docs/dev/features/waveform_hf_data.md +++ b/docs/dev/features/waveform_hf_data.md @@ -1,12 +1,52 @@ # Waveform (high-frequency) data +Waveform data is data such as ECG traces and ventilator pressure readings that is sampled multiple times a second. -## Feature overview +## User data requirements + +Data requirements differ depending on the project and cover concerns such as +how far back the data needs to go, how up-to-date it needs to be, and quality issues like +how many gaps it can tolerate. + +One project is interested in looking at 30 second snapshots before and after making some adjustment +to the patient's care. (eg. change ventilator settings) + +In the future they may wish to look at a period of ~12 hours, to see if secretions +are building up gradually over time and maybe some intervention is needed. + +How long do we need to keep waveform data for? The data is very large, so being able to delete data older than +a configurable time period has been implemented to mitigate storage problems. I foresee 1-7 days being a useful +value. +This is implemented as a Spring scheduled task in the core processor. Notably this is the first +background database operation in Emap, so it could happen alongside regular Emap processing. +This will produce significant "churn" in the database; I'm assuming postgres is designed to handle this, but +it's something to consider if any performance problems appear. + +How live does it have to be? Our standard guarantee is no more than a minute out of date, +but in practice it's typically 20 seconds. We have aimed for similar. ## Config options added -- retention time -- ??? +Core: + - `core.waveform.retention_hours` periodically delete data more than this many hours old + - `core.waveform.is_non_current_test_data` for testing only - when deciding which data to delete/retain, if set to true, + then treat the "now" point as the most recent observation date in the waveform table, rather than the actual + current time. Purpose is to avoid test data getting immediately deleted because it's too old, which could happen + if we have a fixed set of test data with observation dates way in the past. + +Waveform Generator: + - `waveform.hl7.send_host`, `waveform.hl7.send_port` - the host and port to send the generated data to + - `test.synthetic.num_patients` - number of different patients (locations) to generate data for + - `test.synthetic.start_datetime` observation date to start generating data from or null to simulate live data + (see comment in code for more) + - `test.synthetic.end_datetime` if not null, exit the generator when observation date hits this date + - `test.synthetic.warp_factor` How many times real time to generate data at. Live mode implies warp factor = 1. + +Waveform Reader: + - `waveform.hl7.listen_port` port inside the container to listen on for waveform HL7 generator + - `waveform.hl7.source_address_allow_list` comma-separated list of source IP addresses to accept connections from. + If the listen contains the value "ALL", then all source IP addresses are allowed. + - `waveform.hl7.test_dump_file` If specified, read messages from this file and then exit - intended for validation ## Design details @@ -85,13 +125,6 @@ the middle of the night, when waveform data will still be coming in. Other solution is to fix it up later when the feed comes back, but that involves a lot of continuously rechecking stuff, and we will have been giving out wrong information in the meantime. And then we will need that audit table! -### User requirements - -What data do our users require? What sort of queries will they be making? - -How live does it have to be? I'm guessing 10 minutes is ok, 60 minutes isn't. - -How long do we need to keep live data for? Can we delete data older than eg. 7 days? This could mitigate storage problems. ### Performance monitoring @@ -110,14 +143,15 @@ My initial tests assumed that there will be 30 patients generating data from one ventilated at any one time, but the ventilator has more than two data streams) At this rate of data flow, my very naive DB implementation using 1 data point per row -resulted in ~100GB of backend postgres disk usage being generated per day -- clearly far too much if we're aiming for the UDS to stay under 1TB, although that figure may be quite out of date! +resulted in ~100GB of backend postgres disk usage being generated per day - clearly +far too much if we're aiming for the UDS to stay under 1TB (which used to be the limit we were observing), given +that there could be three or more Emap instances in the DB at once (two lives and at least one in dev) You can calculate a theoretical minimum: -30 * 86400 * 350 = 907 million data points per day. +30 * 86400 * (300 + 50) = 907 million data points per day. I don't know what numerical type the original data uses, but assuming 8 bytes per data point, that's **~8GB per day**. -If we keep only the last 7 days of data, that caps it at **~60GB overall**. -Will need to allow for some row metadata, the underlying DB having to be padded/aligned/whatever, and it will be a bit more. +If we keep only the last 7 days of data, that caps it at **~60GB** per instance of Emap. +Allowing for some row metadata, the underlying DB to be padded/aligned/whatever, and it will be a bit more. Am assuming compression is impossible. Using SQL arrays vastly improves the actual efficiency vs 1 per row. @@ -128,15 +162,15 @@ See issue https://github.com/UCLH-DHCT/emap/issues/62 for a discussion of furthe ### HL7 ingress -There is a piece of software in the hospital called Smartlinx which collates data from little dongles +There is a piece of software in the hospital called Smartlinx which collects data from little dongles which plug into the back of ventilators and bedside monitors. It can be easily configured to stream HL7 waveform data in our direction. However, a large concern is that we avoid jeopardizing the existing flow of (non-waveform) data that goes from Smartlinx to Epic. -Therefore we will need to have our own Smartlinx licence and the server to run it on, so we are separate from the -existing one. +Therefore we will need to have our own Smartlinx server (with accompanying software licence) to run it on, so we +are separate from the existing one. -So far, it has only been set up to provide one patient's waveform data at a time, and only for short periods. It's +So far, it has only been used to provide one patient's waveform data at a time, and only for short periods. It's unknown how it would cope if it were sending *all* patients' data to us. To receive waveform data from Smartlinx, you must listen on a TCP port which Smartlinx then connects to. @@ -146,14 +180,15 @@ Can Smartlinx replay missed messages if we go down? No. Does Smartlinx support/require acknowledgement messages? No. -Will we need to do our own buffering? Can we do the same thing the IDS does (whatever that is)? +Will we need to do our own buffering? Can we do the same sort of thing the IDS does? Maybe. See issue https://github.com/UCLH-DHCT/emap/issues/48 for why we might have to implement some sort of buffering. -HL7 messages from bedside monitor have ~40 measurements per message; ventilators ~1-10 (50Hz); ECGs (300Hz) not known. +HL7 messages are not particularly space efficient: messages from bedside monitors have ~40 measurements per message; +ventilators have ~1-10 per message (50Hz); ECGs (300Hz) not known. So this will be a high volume of text relative to the actual data. -Although this inefficiency might keep a CPU core on the GAE fairly busy, at least it won't have any effect on the Emap queue and DB. +Although this inefficiency might keep a CPU core or two on the GAE fairly busy, at least it won't have any effect on the Emap queue and DB. Since we plan to use SQL arrays in the DB, and Emap doesn't have a mechanism to collate multiple incoming interchange messages, -we want each interchange message to result in one DB waveform row. +we want each interchange message to result in one DB waveform row being created. Therefore I collect up to 3000 data points in memory for each patient+data stream, collate it and send as a single interchange message, so it can become a single row in the DB. @@ -163,10 +198,13 @@ The local time on the ventilators that has to be set manually twice a year to ac ### Pre-release validation -This requires the ability to replay waveform HL7 messages. We can currently do this using a text HL7 dump file. -We could keep a test stream of real or synthetic messages, but it would have to be continously updated to store (say) the last 7 days of messages, -otherwise this would lose some of the benefits of the validation process. -As a fallback, you could perform waveform validation live, but this would mean a 7 day validation period would take 7 days to run, +This requires the ability to replay waveform HL7 messages. We currently do this using a text HL7 dump file. + +An alternative would be to maintain a test stream of real or synthetic messages, but it would have to be continuously +updated to store (say) the last 7 days of messages, so that it overlaps with the validation period, which is by default +the last 7 days. + +Or you could perform waveform validation live, but this would mean a 7 day validation period would take 7 days to run, and you'd have to run this separately from the main Emap validation process. Things you could check in validation: