Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate real HL7 in the synth generator. Parse real HL7 in reader #51

Merged
merged 7 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/emap-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
emap-interchange: ${{ steps.filter.outputs.emap-interchange }}
core: ${{ steps.filter.outputs.core }}
hl7-reader: ${{ steps.filter.outputs.hl7-reader }}
waveform-reader: ${{ steps.filter.outputs.waveform-reader }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
Expand All @@ -44,6 +45,11 @@ jobs:
- 'emap-interchange/**'
- 'emap-star/**'
- 'hl7-reader/**'
waveform-reader:
- '.github/**'
- 'emap-checker.xml'
- 'emap-interchange/**'
- 'waveform-reader/**'
emap-star-tests:
needs: [filter]
runs-on: ubuntu-latest
Expand Down Expand Up @@ -138,3 +144,26 @@ jobs:
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
waveform-reader-tests:
needs: [filter]
runs-on: ubuntu-latest
if: needs.filter.outputs.waveform-reader == 'true'
steps:
- uses: actions/checkout@v3
- name: Set up java
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
cache: 'maven'
- name: Build emap-interchange
working-directory: emap-interchange
run: mvn clean install
- name: Run waveform-reader tests
working-directory: waveform-reader
run: mvn clean test
- name: Publish Test Report
uses: mikepenz/action-junit-report@v2
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
6 changes: 6 additions & 0 deletions waveform-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.12.0</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.ac.ucl.rits.inform.datasources.waveform_generator;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -10,11 +12,15 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

@Component
Expand Down Expand Up @@ -98,21 +104,75 @@ public void generateMessages() throws IOException {
}
}

private String applyHl7Template(long samplingRate, String locationId, Instant observationDatetime,
String messageId, List<ImmutablePair<String, List<Double>>> valuesByStreamId) {
// lines in HL7 messages must be CR terminated
final String templateStr = """
MSH|^~\\&|DATACAPTOR||||${messageDatetime}||ORU^R01|${messageId}|P|2.3||||||UNICODE UTF-8|\r\
PID|\r\
PV1||I|${locationId}|\r\
OBR|||||||${obsDatetime}|||${locationId}|||${locationId}|\r\
""";
final String obxTemplate = """
OBX|${obxI}|${dataType}|${streamId}||${valuesAsStr}||||||F||20|${obsDatetime}|\r\
""";
ZoneId hospitalTimezone = ZoneId.of("Europe/London");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSSZZ");
String obsDatetime = formatter.format(observationDatetime.atZone(hospitalTimezone));
// go for something vaguely realistic
int milliSecondDelay = new Random().nextInt(50, 150);
Instant messageDatetime = observationDatetime.plusMillis(milliSecondDelay);
String messageDatetimeStr = formatter.format(messageDatetime.atZone(hospitalTimezone));
Map<String, String> parameters = new HashMap<>();
parameters.put("locationId", locationId);
parameters.put("obsDatetime", obsDatetime);
parameters.put("messageDatetime", messageDatetimeStr);
parameters.put("messageId", messageId);

StringSubstitutor stringSubstitutor = new StringSubstitutor(parameters);
StringBuilder obrMsg = new StringBuilder(stringSubstitutor.replace(templateStr));
for (int obxI = 0; obxI < valuesByStreamId.size(); obxI++) {
var streamValuePair = valuesByStreamId.get(obxI);
List<Double> values = streamValuePair.getRight();
String valuesAsStr = values.stream().map(d -> String.format("%.3f", d)).collect(Collectors.joining("^"));
String dataType;
if (values.size() == 1) {
dataType = "NM";
} else if (values.size() > 1) {
dataType = "NA";
} else {
logger.error("Empty value array, why?");
dataType = "";
}
parameters.put("obxI", Integer.toString(obxI + 1));
parameters.put("streamId", streamValuePair.getLeft());
parameters.put("dataType", dataType);
parameters.put("valuesAsStr", valuesAsStr);
obrMsg.append(stringSubstitutor.replace(obxTemplate));
}
String obrMsgStr = obrMsg.toString();
if (obrMsgStr.contains("\n")) {
throw new RuntimeException("HL7 message contains LF char; lines must be CR terminated");
}
return obrMsgStr;
}

/**
* Make synthetic HL7 messages for a single patient and single machine, max one second per message.
* @param locationId where the data originates from (machine/bed location)
* @param streamId identifier for the stream
* @param samplingRate in samples per second
* @param numMillis number of milliseconds to produce data for
* @param locationId where the data originates from (machine/bed location)
* @param startTime observation time of the beginning of the period that the messages are to cover
* @param millisPerMessage max time per message (will split into multiple if needed)
* @return all messages
*/
private List<String> makeSyntheticWaveformMsgs(String locationId,
final long samplingRate,
final long numMillis,
final Instant startTime,
final long millisPerMessage
private List<String> makeSyntheticWaveformMsgs(final String locationId,
final String streamId,
final long samplingRate,
final long numMillis,
final Instant startTime,
final long millisPerMessage
) {
List<String> allMessages = new ArrayList<>();
final long numSamples = numMillis * samplingRate / 1000;
Expand All @@ -123,13 +183,6 @@ private List<String> makeSyntheticWaveformMsgs(String locationId,
String timeStr = DateTimeFormatter.ofPattern("HHmmss").format(startTime.atOffset(ZoneOffset.UTC));
String messageId = String.format("%s_t%s_msg%05d", locationId, timeStr, overallSampleIdx);

// XXX: make this into real HL7
StringBuilder hl7Template = new StringBuilder();
hl7Template.append("/samplingRate/").append(samplingRate).append("/");
hl7Template.append("/locationId/").append(locationId).append("/");
hl7Template.append("/messageStartTime/").append(messageStartTime).append("/");
hl7Template.append("/messageId/").append(messageId).append("/");

var values = new ArrayList<Double>();
long samplesPerMessage = samplingRate * millisPerMessage / 1000;
for (long valueIdx = 0;
Expand All @@ -138,10 +191,12 @@ private List<String> makeSyntheticWaveformMsgs(String locationId,
// a sine wave between maxValue and -maxValue
values.add(2 * maxValue * Math.sin(overallSampleIdx * 0.01) - maxValue);
}
String valuesAsStr = values.stream().map(d -> String.format("%.3f", d)).collect(Collectors.joining(","));
hl7Template.append("/values/").append(valuesAsStr).append("/");

allMessages.add(hl7Template.toString());
// Only one stream ID per HL7 message for the time being
List<ImmutablePair<String, List<Double>>> valuesByStreamId = new ArrayList<>();
valuesByStreamId.add(new ImmutablePair<>(streamId, values));
String fullHl7message = applyHl7Template(samplingRate, locationId, messageStartTime, messageId, valuesByStreamId);
allMessages.add(fullHl7message);
}
return allMessages;
}
Expand All @@ -159,14 +214,15 @@ public List<String> makeSyntheticWaveformMsgsAllPatients(
Instant startTime, long numPatients, long numMillis) {
List<String> waveformMsgs = new ArrayList<>();
for (int p = 0; p < numPatients; p++) {
var machine1Str = String.format("P%03d_mach1", p);
var machine2Str = String.format("P%03d_mach2", p);
var location = String.format("Bed%03d", p);
String streamId1 = "52912";
String streamId2 = "52913";
final long millisPerMessage = 10000;
int sizeBefore = waveformMsgs.size();
waveformMsgs.addAll(makeSyntheticWaveformMsgs(
machine1Str, 50, numMillis, startTime, millisPerMessage));
location, streamId1, 50, numMillis, startTime, millisPerMessage));
waveformMsgs.addAll(makeSyntheticWaveformMsgs(
machine2Str, 300, numMillis, startTime, millisPerMessage));
location, streamId2, 300, numMillis, startTime, millisPerMessage));
int sizeAfter = waveformMsgs.size();
logger.debug("Patient {}, generated {} messages", p, sizeAfter - sizeBefore);
}
Expand Down
12 changes: 12 additions & 0 deletions waveform-reader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down Expand Up @@ -120,6 +127,11 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>


</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;
import org.springframework.messaging.Message;
import uk.ac.ucl.rits.inform.datasources.waveform.hl7parse.Hl7ParseException;

import java.net.Socket;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -87,14 +88,13 @@ public TcpReceivingChannelAdapter inbound(TcpNetServerConnectionFactory connecti
/**
* Message handler. Source IP check has passed if we get here. No reply is expected.
* @param msg the incoming message
* @throws InterruptedException .
* @throws InterruptedException if publisher send is interrupted
* @throws Hl7ParseException .
*/
@ServiceActivator(inputChannel = "hl7Stream")
public void handler(Message<byte[]> msg) throws InterruptedException {
public void handler(Message<byte[]> msg) throws InterruptedException, Hl7ParseException {
byte[] asBytes = msg.getPayload();
String asStr = new String(asBytes, StandardCharsets.UTF_8);
logger.trace("received message = {}", asStr);
logger.info("MESSAGE of size {}", asStr.length());
// parse message from HL7 to interchange message, send to publisher
hl7ParseAndSend.parseAndSend(asStr);
}
Expand Down
Loading
Loading