Skip to content

Commit

Permalink
Merge pull request #51 from UCLH-DHCT/jeremy/hf-data
Browse files Browse the repository at this point in the history
Generate real HL7 in the synth generator. Parse real HL7 in reader.
  • Loading branch information
jeremyestein authored Aug 27, 2024
2 parents ddbfc4e + fd45fed commit 3b901f3
Show file tree
Hide file tree
Showing 13 changed files with 506 additions and 51 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/emap-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
emap-interchange: ${{ steps.filter.outputs.emap-interchange }}
core: ${{ steps.filter.outputs.core }}
hl7-reader: ${{ steps.filter.outputs.hl7-reader }}
waveform-reader: ${{ steps.filter.outputs.waveform-reader }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
Expand All @@ -44,6 +45,11 @@ jobs:
- 'emap-interchange/**'
- 'emap-star/**'
- 'hl7-reader/**'
waveform-reader:
- '.github/**'
- 'emap-checker.xml'
- 'emap-interchange/**'
- 'waveform-reader/**'
emap-star-tests:
needs: [filter]
runs-on: ubuntu-latest
Expand Down Expand Up @@ -138,3 +144,26 @@ jobs:
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
waveform-reader-tests:
needs: [filter]
runs-on: ubuntu-latest
if: needs.filter.outputs.waveform-reader == 'true'
steps:
- uses: actions/checkout@v3
- name: Set up java
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
cache: 'maven'
- name: Build emap-interchange
working-directory: emap-interchange
run: mvn clean install
- name: Run waveform-reader tests
working-directory: waveform-reader
run: mvn clean test
- name: Publish Test Report
uses: mikepenz/action-junit-report@v2
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
6 changes: 6 additions & 0 deletions waveform-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.12.0</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.ac.ucl.rits.inform.datasources.waveform_generator;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -10,11 +12,15 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

@Component
Expand Down Expand Up @@ -98,21 +104,75 @@ public void generateMessages() throws IOException {
}
}

private String applyHl7Template(long samplingRate, String locationId, Instant observationDatetime,
String messageId, List<ImmutablePair<String, List<Double>>> valuesByStreamId) {
// lines in HL7 messages must be CR terminated
final String templateStr = """
MSH|^~\\&|DATACAPTOR||||${messageDatetime}||ORU^R01|${messageId}|P|2.3||||||UNICODE UTF-8|\r\
PID|\r\
PV1||I|${locationId}|\r\
OBR|||||||${obsDatetime}|||${locationId}|||${locationId}|\r\
""";
final String obxTemplate = """
OBX|${obxI}|${dataType}|${streamId}||${valuesAsStr}||||||F||20|${obsDatetime}|\r\
""";
ZoneId hospitalTimezone = ZoneId.of("Europe/London");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSSZZ");
String obsDatetime = formatter.format(observationDatetime.atZone(hospitalTimezone));
// go for something vaguely realistic
int milliSecondDelay = new Random().nextInt(50, 150);
Instant messageDatetime = observationDatetime.plusMillis(milliSecondDelay);
String messageDatetimeStr = formatter.format(messageDatetime.atZone(hospitalTimezone));
Map<String, String> parameters = new HashMap<>();
parameters.put("locationId", locationId);
parameters.put("obsDatetime", obsDatetime);
parameters.put("messageDatetime", messageDatetimeStr);
parameters.put("messageId", messageId);

StringSubstitutor stringSubstitutor = new StringSubstitutor(parameters);
StringBuilder obrMsg = new StringBuilder(stringSubstitutor.replace(templateStr));
for (int obxI = 0; obxI < valuesByStreamId.size(); obxI++) {
var streamValuePair = valuesByStreamId.get(obxI);
List<Double> values = streamValuePair.getRight();
String valuesAsStr = values.stream().map(d -> String.format("%.3f", d)).collect(Collectors.joining("^"));
String dataType;
if (values.size() == 1) {
dataType = "NM";
} else if (values.size() > 1) {
dataType = "NA";
} else {
logger.error("Empty value array, why?");
dataType = "";
}
parameters.put("obxI", Integer.toString(obxI + 1));
parameters.put("streamId", streamValuePair.getLeft());
parameters.put("dataType", dataType);
parameters.put("valuesAsStr", valuesAsStr);
obrMsg.append(stringSubstitutor.replace(obxTemplate));
}
String obrMsgStr = obrMsg.toString();
if (obrMsgStr.contains("\n")) {
throw new RuntimeException("HL7 message contains LF char; lines must be CR terminated");
}
return obrMsgStr;
}

/**
* Make synthetic HL7 messages for a single patient and single machine, max one second per message.
* @param locationId where the data originates from (machine/bed location)
* @param streamId identifier for the stream
* @param samplingRate in samples per second
* @param numMillis number of milliseconds to produce data for
* @param locationId where the data originates from (machine/bed location)
* @param startTime observation time of the beginning of the period that the messages are to cover
* @param millisPerMessage max time per message (will split into multiple if needed)
* @return all messages
*/
private List<String> makeSyntheticWaveformMsgs(String locationId,
final long samplingRate,
final long numMillis,
final Instant startTime,
final long millisPerMessage
private List<String> makeSyntheticWaveformMsgs(final String locationId,
final String streamId,
final long samplingRate,
final long numMillis,
final Instant startTime,
final long millisPerMessage
) {
List<String> allMessages = new ArrayList<>();
final long numSamples = numMillis * samplingRate / 1000;
Expand All @@ -123,13 +183,6 @@ private List<String> makeSyntheticWaveformMsgs(String locationId,
String timeStr = DateTimeFormatter.ofPattern("HHmmss").format(startTime.atOffset(ZoneOffset.UTC));
String messageId = String.format("%s_t%s_msg%05d", locationId, timeStr, overallSampleIdx);

// XXX: make this into real HL7
StringBuilder hl7Template = new StringBuilder();
hl7Template.append("/samplingRate/").append(samplingRate).append("/");
hl7Template.append("/locationId/").append(locationId).append("/");
hl7Template.append("/messageStartTime/").append(messageStartTime).append("/");
hl7Template.append("/messageId/").append(messageId).append("/");

var values = new ArrayList<Double>();
long samplesPerMessage = samplingRate * millisPerMessage / 1000;
for (long valueIdx = 0;
Expand All @@ -138,10 +191,12 @@ private List<String> makeSyntheticWaveformMsgs(String locationId,
// a sine wave between maxValue and -maxValue
values.add(2 * maxValue * Math.sin(overallSampleIdx * 0.01) - maxValue);
}
String valuesAsStr = values.stream().map(d -> String.format("%.3f", d)).collect(Collectors.joining(","));
hl7Template.append("/values/").append(valuesAsStr).append("/");

allMessages.add(hl7Template.toString());
// Only one stream ID per HL7 message for the time being
List<ImmutablePair<String, List<Double>>> valuesByStreamId = new ArrayList<>();
valuesByStreamId.add(new ImmutablePair<>(streamId, values));
String fullHl7message = applyHl7Template(samplingRate, locationId, messageStartTime, messageId, valuesByStreamId);
allMessages.add(fullHl7message);
}
return allMessages;
}
Expand All @@ -159,14 +214,15 @@ public List<String> makeSyntheticWaveformMsgsAllPatients(
Instant startTime, long numPatients, long numMillis) {
List<String> waveformMsgs = new ArrayList<>();
for (int p = 0; p < numPatients; p++) {
var machine1Str = String.format("P%03d_mach1", p);
var machine2Str = String.format("P%03d_mach2", p);
var location = String.format("Bed%03d", p);
String streamId1 = "52912";
String streamId2 = "52913";
final long millisPerMessage = 10000;
int sizeBefore = waveformMsgs.size();
waveformMsgs.addAll(makeSyntheticWaveformMsgs(
machine1Str, 50, numMillis, startTime, millisPerMessage));
location, streamId1, 50, numMillis, startTime, millisPerMessage));
waveformMsgs.addAll(makeSyntheticWaveformMsgs(
machine2Str, 300, numMillis, startTime, millisPerMessage));
location, streamId2, 300, numMillis, startTime, millisPerMessage));
int sizeAfter = waveformMsgs.size();
logger.debug("Patient {}, generated {} messages", p, sizeAfter - sizeBefore);
}
Expand Down
12 changes: 12 additions & 0 deletions waveform-reader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down Expand Up @@ -120,6 +127,11 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>


</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;
import org.springframework.messaging.Message;
import uk.ac.ucl.rits.inform.datasources.waveform.hl7parse.Hl7ParseException;

import java.net.Socket;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -87,14 +88,13 @@ public TcpReceivingChannelAdapter inbound(TcpNetServerConnectionFactory connecti
/**
* Message handler. Source IP check has passed if we get here. No reply is expected.
* @param msg the incoming message
* @throws InterruptedException .
* @throws InterruptedException if publisher send is interrupted
* @throws Hl7ParseException .
*/
@ServiceActivator(inputChannel = "hl7Stream")
public void handler(Message<byte[]> msg) throws InterruptedException {
public void handler(Message<byte[]> msg) throws InterruptedException, Hl7ParseException {
byte[] asBytes = msg.getPayload();
String asStr = new String(asBytes, StandardCharsets.UTF_8);
logger.trace("received message = {}", asStr);
logger.info("MESSAGE of size {}", asStr.length());
// parse message from HL7 to interchange message, send to publisher
hl7ParseAndSend.parseAndSend(asStr);
}
Expand Down
Loading

0 comments on commit 3b901f3

Please sign in to comment.