From 1d73817406bed315d80da89f697a1eabfd9ef310 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Tue, 30 Jul 2024 10:32:18 +0100 Subject: [PATCH 1/3] In a seperate process, generate synthetic messages to simulate an HL7 message stream. Use a quick, ad hoc text format for the time being - convert to real HL7 later. This demonstrates the acceptable performance of sending large amounts of text over the network and then parsing it. Some hardening of the docker compose init process - add rabbitmq healthcheck. --- core/docker-compose.fakeuds.yml | 1 - core/docker-compose.yml | 14 +- .../emap_runner/docker/docker_runner.py | 1 + hl7-reader/docker-compose.yml | 1 - waveform-generator/config.json | 24 +++ waveform-generator/docker-compose.yml | 19 ++ waveform-generator/glowroot.properties | 2 + waveform-generator/pom.xml | 199 ++++++++++++++++++ .../waveform_generator/Application.java | 29 +++ .../waveform_generator}/Hl7Generator.java | 64 +++--- .../waveform_generator/Hl7TcpClient.java | 41 ++++ .../Hl7TcpClientFactory.java | 23 ++ .../waveform_generator/package-info.java | 4 + waveform-reader/Dockerfile | 24 ++- waveform-reader/config.json | 5 +- waveform-reader/docker-compose.yml | 25 +-- .../waveform/Hl7ListenerConfig.java | 21 +- .../datasources/waveform/Hl7ParseAndSend.java | 38 ++-- .../waveform/WaveformOperations.java | 3 - 19 files changed, 450 insertions(+), 88 deletions(-) create mode 100644 waveform-generator/config.json create mode 100644 waveform-generator/docker-compose.yml create mode 100644 waveform-generator/glowroot.properties create mode 100644 waveform-generator/pom.xml create mode 100644 waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Application.java rename {waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform => waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator}/Hl7Generator.java (73%) create mode 100644 waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClient.java create mode 100644 waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClientFactory.java create mode 100644 waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/package-info.java diff --git a/core/docker-compose.fakeuds.yml b/core/docker-compose.fakeuds.yml index e3e630bb0..cd10aaafc 100644 --- a/core/docker-compose.fakeuds.yml +++ b/core/docker-compose.fakeuds.yml @@ -1,4 +1,3 @@ -version: '3.2' services: fakeuds: image: postgres:11-alpine diff --git a/core/docker-compose.yml b/core/docker-compose.yml index 04bbbc7a0..3a682ac1a 100644 --- a/core/docker-compose.yml +++ b/core/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.2' services: core: build: @@ -15,8 +14,10 @@ services: driver: "json-file" restart: on-failure depends_on: - - glowroot-central - - rabbitmq + glowroot-central: + condition: service_started + rabbitmq: + condition: service_healthy rabbitmq: image: rabbitmq:management env_file: @@ -25,6 +26,13 @@ services: - "${RABBITMQ_PORT}:5672" - "${RABBITMQ_ADMIN_PORT}:15672" restart: on-failure + healthcheck: + # rabbitmq server crashes if any rabbitmq-diagnostics cmd is run very soon + # after starting, so can't check too aggressively here + test: rabbitmq-diagnostics -q check_running + interval: 30s + timeout: 10s + retries: 3 cassandra: image: cassandra restart: on-failure diff --git a/emap-setup/emap_runner/docker/docker_runner.py b/emap-setup/emap_runner/docker/docker_runner.py index cf4673790..c00b27b32 100644 --- a/emap-setup/emap_runner/docker/docker_runner.py +++ b/emap-setup/emap_runner/docker/docker_runner.py @@ -86,6 +86,7 @@ def docker_compose_paths(self) -> List[Path]: Path(self.emap_dir, "core", "docker-compose.fakeuds.yml"), Path(self.emap_dir, "hl7-reader", "docker-compose.yml"), Path(self.emap_dir, "waveform-reader", "docker-compose.yml"), + Path(self.emap_dir, "waveform-generator", "docker-compose.yml"), ] # allow for hoover and to be optional compose path if "hoover" in self.config["repositories"]: diff --git a/hl7-reader/docker-compose.yml b/hl7-reader/docker-compose.yml index 7a0aaaec9..13be59d35 100644 --- a/hl7-reader/docker-compose.yml +++ b/hl7-reader/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.2' services: hl7-reader: build: diff --git a/waveform-generator/config.json b/waveform-generator/config.json new file mode 100644 index 000000000..f99e0ebab --- /dev/null +++ b/waveform-generator/config.json @@ -0,0 +1,24 @@ +{ + "transactions": { + "slowThresholdMillis": 300, + "profilingIntervalMillis": 1000, + "captureThreadStats": true + }, + "instrumentation": [ + { + "className": "uk.ac.ucl.rits.inform.datasources.waveform_generator.Hl7Generator", + "methodName": "generateMessages", + "methodParameterTypes": [ + ".." + ], + "captureKind": "transaction", + "transactionType": "Web", + "transactionNameTemplate": "mainLoop", + "transactionSlowThresholdMillis": 500, + "alreadyInTransactionBehavior": "capture-new-transaction", + "traceEntryMessageTemplate": "{{this}}.{{methodName}}({{0}}, {{1}}, {{2}})", + "timerName": "mainLoop" + } + ] +} + diff --git a/waveform-generator/docker-compose.yml b/waveform-generator/docker-compose.yml new file mode 100644 index 000000000..433b3f5c7 --- /dev/null +++ b/waveform-generator/docker-compose.yml @@ -0,0 +1,19 @@ +services: + waveform-generator: + build: + context: .. + dockerfile: waveform-reader/Dockerfile + target: waveform_generator + args: + HTTP_PROXY: ${HTTP_PROXY} + http_proxy: ${http_proxy} + HTTPS_PROXY: ${HTTPS_PROXY} + https_proxy: ${https_proxy} + SERVICE_SRC_DIR: waveform-generator + env_file: + - ../../config/waveform-reader-config-envs + logging: + driver: "json-file" + restart: "no" + depends_on: + - waveform-reader diff --git a/waveform-generator/glowroot.properties b/waveform-generator/glowroot.properties new file mode 100644 index 000000000..b6b225131 --- /dev/null +++ b/waveform-generator/glowroot.properties @@ -0,0 +1,2 @@ +agent.id=waveform-generator +collector.address=http://glowroot-central:8181 diff --git a/waveform-generator/pom.xml b/waveform-generator/pom.xml new file mode 100644 index 000000000..10bd87dd4 --- /dev/null +++ b/waveform-generator/pom.xml @@ -0,0 +1,199 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.6.3 + + + uk.ac.ucl.rits.inform + waveform-generator + jar + 2.7 + Waveform Synthetic Generator + Service to generate synthetic HL7 messages and send them to the generator as a test + + + UTF-8 + 2.6.3 + 17 + 2.7 + 10.3.1 + 3.3.0 + 3.2.1 + 3.0.1u2 + uk.ac.ucl.rits.inform.datasources.waveform.Application + 2.0.1.Final + 4.7.1 + 3.28.0-GA + 1.2.8 + + + + + uk.ac.ucl.rits.inform + emap-interchange + ${emap-interchange.version} + + + + uk.ac.ucl.rits.inform + emap-interchange + ${emap-interchange.version} + test-jar + test + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.integration + spring-integration-ip + + + + org.projectlombok + lombok + provided + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.core + jackson-annotations + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + com.fasterxml.jackson.module + jackson-module-jaxb-annotations + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + test + + + + + com.google.code.findbugs + annotations + ${com.google.code.findbugs.annotations.version} + + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + + + waveform-generator + + + org.springframework.boot + spring-boot-maven-plugin + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${checkstyle.plugin.version} + + ../emap-checker.xml + true + true + false + + + + com.puppycrawl.tools + checkstyle + ${checkstyle.version} + + + + + validate + validate + + check + + + + + + + de.qaware.maven + go-offline-maven-plugin + ${go-offline-maven-plugin.version} + + + + uk.ac.ucl.rits.inform + emap-interchange + ${emap-interchange.version} + MAIN + + + + + + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${java.version} + ${java.version} + ${java.version} + true + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + ${maven-surefire-plugin.version} + + + + + diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Application.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Application.java new file mode 100644 index 000000000..1dc379671 --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Application.java @@ -0,0 +1,29 @@ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * Spring application entry point. + * @author Jeremy Stein + */ +@SpringBootApplication(scanBasePackages = { + "uk.ac.ucl.rits.inform.datasources.waveform_generator", + }) +@EnableScheduling +public class Application { + private final Logger logger = LoggerFactory.getLogger(Application.class); + + /** + * @param args command line args + */ + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + + +} diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7Generator.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java similarity index 73% rename from waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7Generator.java rename to waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java index a862e7afa..af74a6da3 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7Generator.java +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java @@ -1,21 +1,23 @@ -package uk.ac.ucl.rits.inform.datasources.waveform; +package uk.ac.ucl.rits.inform.datasources.waveform_generator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @Component -@Profile("hl7gen") public class Hl7Generator { private final Logger logger = LoggerFactory.getLogger(Hl7Generator.class); @@ -44,19 +46,23 @@ public void setComputedDefaults() { } } -// private final WaveformOperations waveformOperations; -// -// public Hl7Generator(WaveformOperations waveformOperations) { -// this.waveformOperations = waveformOperations; -// } + private final Hl7TcpClientFactory hl7TcpClientFactory; /** - * Every one minute post a simulated batch of one minute's worth of data. - * Assume 30 patients, each with a 300Hz and a 50Hz machine. - * @throws InterruptedException dnowioinqwdnq + * @param hl7TcpClientFactory for sending generated messages + */ + public Hl7Generator(Hl7TcpClientFactory hl7TcpClientFactory) { + this.hl7TcpClientFactory = hl7TcpClientFactory; + } + + + /** + * Every one minute post a simulated batch of one minute's worth of data (times warp factor). + * Assume a given number of patients, each with a 300Hz and a 50Hz machine. + * @throws IOException on networking error */ @Scheduled(fixedRate = 60 * 1000) - public void generateMessages() throws InterruptedException { + public void generateMessages() throws IOException { var start = Instant.now(); // Usually if this method runs every N seconds, you would want to generate N // seconds worth of data. However, for non-live tests such as validation runs, @@ -67,9 +73,17 @@ public void generateMessages() throws InterruptedException { boolean shouldExit = false; for (int warpIdx = 0; warpIdx < warpFactor; warpIdx++) { List synthMsgs = makeSyntheticWaveformMsgsAllPatients(startDatetime, numPatients, numMillis); -// waveformOperations.sendSyntheticHl7Messages(synthMsgs); - // XXX: send to TCP port!!! - logger.info("Ready to send {} HL7 messages to a TCP port somewhere!", synthMsgs.size()); + logger.info("Sending {} HL7 messages", synthMsgs.size()); + + try (Hl7TcpClient tcpClient = hl7TcpClientFactory.createTcpClient()) { + for (var msgStr : synthMsgs) { + byte[] messageBytes = msgStr.getBytes(StandardCharsets.UTF_8); + logger.info("About to send message of size {} bytes", messageBytes.length); + logger.trace("Message = {}", messageBytes); + tcpClient.sendMessage(messageBytes); + } + } + startDatetime = startDatetime.plus(numMillis, ChronoUnit.MILLIS); if (endDatetime != null && startDatetime.isAfter(endDatetime)) { shouldExit = true; @@ -106,14 +120,15 @@ private List makeSyntheticWaveformMsgs(String locationId, for (long overallSampleIdx = 0; overallSampleIdx < numSamples;) { long microsAfterStart = overallSampleIdx * 1000_000 / samplingRate; Instant messageStartTime = startTime.plus(microsAfterStart, ChronoUnit.MICROS); - String messageId = String.format("%s_message%05d", locationId, overallSampleIdx); + String timeStr = DateTimeFormatter.ofPattern("HHmmss").format(startTime.atOffset(ZoneOffset.UTC)); + String messageId = String.format("%s_t%s_msg%05d", locationId, timeStr, overallSampleIdx); // XXX: make this into real HL7 StringBuilder hl7Template = new StringBuilder(); - hl7Template.append("/samplingRate|").append(samplingRate); - hl7Template.append("/locationId|").append(locationId); - hl7Template.append("/messageStartTime|").append(messageStartTime); - hl7Template.append("/messageId|").append(messageId); + hl7Template.append("/samplingRate/").append(samplingRate).append("/"); + hl7Template.append("/locationId/").append(locationId).append("/"); + hl7Template.append("/messageStartTime/").append(messageStartTime).append("/"); + hl7Template.append("/messageId/").append(messageId).append("/"); var values = new ArrayList(); long samplesPerMessage = samplingRate * millisPerMessage / 1000; @@ -123,8 +138,8 @@ private List makeSyntheticWaveformMsgs(String locationId, // a sine wave between maxValue and -maxValue values.add(2 * maxValue * Math.sin(overallSampleIdx * 0.01) - maxValue); } - String valuesAsStr = values.stream().map(Object::toString).collect(Collectors.joining(",")); - hl7Template.append("/values|").append(valuesAsStr); + String valuesAsStr = values.stream().map(d -> String.format("%.3f", d)).collect(Collectors.joining(",")); + hl7Template.append("/values/").append(valuesAsStr).append("/"); allMessages.add(hl7Template.toString()); } @@ -139,10 +154,9 @@ private List makeSyntheticWaveformMsgs(String locationId, * @param numPatients number of patients to generate for * @param numMillis length of observation period to generate data for * @return list of HL7 messages - * @throws InterruptedException . */ public List makeSyntheticWaveformMsgsAllPatients( - Instant startTime, long numPatients, long numMillis) throws InterruptedException { + Instant startTime, long numPatients, long numMillis) { List waveformMsgs = new ArrayList<>(); for (int p = 0; p < numPatients; p++) { var machine1Str = String.format("P%03d_mach1", p); @@ -154,7 +168,7 @@ public List makeSyntheticWaveformMsgsAllPatients( waveformMsgs.addAll(makeSyntheticWaveformMsgs( machine2Str, 300, numMillis, startTime, millisPerMessage)); int sizeAfter = waveformMsgs.size(); - logger.debug("JES: Patient {}, sending {} messages", p, sizeAfter - sizeBefore); + logger.debug("Patient {}, generated {} messages", p, sizeAfter - sizeBefore); } return waveformMsgs; diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClient.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClient.java new file mode 100644 index 000000000..528c3135a --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClient.java @@ -0,0 +1,41 @@ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; + +public class Hl7TcpClient implements AutoCloseable { + private final Socket socket; + + /** + * Connect over tcp. + * + * @param tcpHost connect to host + * @param tcpPort connect to port + * @throws IOException on connect error + */ + public Hl7TcpClient(String tcpHost, int tcpPort) throws IOException { + this.socket = new Socket(tcpHost, tcpPort); + } + + /** + * Send a message with CRLF. + * + * @param msg the byte array of the message to send, without the terminating CRLF + * @throws IOException tcp error + */ + public void sendMessage(byte[] msg) throws IOException { + OutputStream os = socket.getOutputStream(); + os.write(msg); + os.write("\r\n".getBytes()); + } + + /** + * Close connection. + * @throws IOException tcp error + */ + public void close() throws IOException { + socket.getOutputStream().close(); + socket.close(); + } +} diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClientFactory.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClientFactory.java new file mode 100644 index 000000000..484e71c3f --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClientFactory.java @@ -0,0 +1,23 @@ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +@Component +public class Hl7TcpClientFactory { + private final Logger logger = LoggerFactory.getLogger(Hl7TcpClientFactory.class); + + @Value("${waveform.hl7.send_host}") + private String tcpHost; + + @Value("${waveform.hl7.send_port}") + private int tcpPort; + + public Hl7TcpClient createTcpClient() throws IOException { + return new Hl7TcpClient(tcpHost, tcpPort); + } +} diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/package-info.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/package-info.java new file mode 100644 index 000000000..3d9bd6680 --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/package-info.java @@ -0,0 +1,4 @@ +/** + * Generate synthetic waveform messages for testing purposes. + */ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; diff --git a/waveform-reader/Dockerfile b/waveform-reader/Dockerfile index e24bca443..e86294980 100644 --- a/waveform-reader/Dockerfile +++ b/waveform-reader/Dockerfile @@ -6,29 +6,31 @@ RUN apt update; apt install -yy zip # Set up the Maven proxy settings COPY docker/set_mvn_proxy.sh /app/ # Download and extract glowroot -WORKDIR /app/waveform-reader +ARG SERVICE_SRC_DIR +WORKDIR /app/$SERVICE_SRC_DIR RUN curl -s https://api.github.com/repos/glowroot/glowroot/releases/latest \ | grep browser_download_url | grep "\.zip" | grep -v "central" | cut -d '"' -f 4 \ | xargs curl -L -o glowroot.zip; unzip glowroot.zip -COPY waveform-reader/glowroot.properties glowroot/glowroot.properties -COPY waveform-reader/config.json glowroot/config.json +COPY $SERVICE_SRC_DIR/glowroot.properties glowroot/glowroot.properties +COPY $SERVICE_SRC_DIR/config.json glowroot/config.json # Copy pom and checkstyle, then all generate resources COPY emap-checker.xml /app/ COPY emap-interchange/pom.xml /app/emap-interchange/ RUN source /app/set_mvn_proxy.sh; mvn dependency:go-offline -f /app/emap-interchange/pom.xml -COPY waveform-reader/pom.xml /app/waveform-reader/ -RUN source /app/set_mvn_proxy.sh; mvn de.qaware.maven:go-offline-maven-plugin:resolve-dependencies -f /app/waveform-reader/pom.xml +COPY $SERVICE_SRC_DIR/pom.xml /app/$SERVICE_SRC_DIR/ +RUN source /app/set_mvn_proxy.sh; mvn de.qaware.maven:go-offline-maven-plugin:resolve-dependencies -f /app/$SERVICE_SRC_DIR/pom.xml # Install emap-interchange COPY emap-interchange/src/ /app/emap-interchange/src/ RUN source /app/set_mvn_proxy.sh; mvn install -f /app/emap-interchange/pom.xml # Install waveform-reader -WORKDIR /app/waveform-reader -COPY waveform-reader/src/ /app/waveform-reader/src/ -RUN source /app/set_mvn_proxy.sh; mvn install -Dmaven.test.skip=true -Dstart-class=uk.ac.ucl.rits.inform.datasources.waveform.Application +WORKDIR /app/$SERVICE_SRC_DIR +COPY $SERVICE_SRC_DIR/src/ /app/$SERVICE_SRC_DIR/src/ FROM emap_java_base AS waveform_reader -CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-Dspring.profiles.active=hl7reader", "-jar", "./target/waveform-reader.jar"] +RUN source /app/set_mvn_proxy.sh; mvn install -Dmaven.test.skip=true -Dstart-class=uk.ac.ucl.rits.inform.datasources.waveform.Application +CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-jar", "./target/waveform-reader.jar"] -FROM emap_java_base AS waveform_hl7_generator -CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-Dspring.profiles.active=hl7gen", "-jar", "./target/waveform-reader.jar"] +FROM emap_java_base AS waveform_generator +RUN source /app/set_mvn_proxy.sh; mvn install -Dmaven.test.skip=true -Dstart-class=uk.ac.ucl.rits.inform.datasources.waveform_generator.Application +CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-jar", "./target/waveform-generator.jar"] diff --git a/waveform-reader/config.json b/waveform-reader/config.json index 4b385d63e..14e7ae522 100644 --- a/waveform-reader/config.json +++ b/waveform-reader/config.json @@ -6,8 +6,9 @@ }, "instrumentation": [ { - "className": "uk.ac.ucl.rits.inform.datasources.waveform.Application", - "methodName": "mainLoop", + + "className": "uk.ac.ucl.rits.inform.datasources.waveform.Hl7ListenerConfig", + "methodName": "handler", "methodParameterTypes": [ ".." ], diff --git a/waveform-reader/docker-compose.yml b/waveform-reader/docker-compose.yml index 310d48944..1b3d60543 100644 --- a/waveform-reader/docker-compose.yml +++ b/waveform-reader/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.4' services: waveform-reader: build: @@ -10,6 +9,7 @@ services: http_proxy: ${http_proxy} HTTPS_PROXY: ${HTTPS_PROXY} https_proxy: ${https_proxy} + SERVICE_SRC_DIR: waveform-reader env_file: - ../../config/waveform-reader-config-envs ports: @@ -19,22 +19,7 @@ services: restart: "no" depends_on: # Uses services from core, orchestrate using the EMAP setup package - - glowroot-central - - rabbitmq - waveform-hl7-generator: - build: - context: .. - dockerfile: waveform-reader/Dockerfile - target: waveform_hl7_generator - args: - HTTP_PROXY: ${HTTP_PROXY} - http_proxy: ${http_proxy} - HTTPS_PROXY: ${HTTPS_PROXY} - https_proxy: ${https_proxy} - env_file: - - ../../config/waveform-reader-config-envs - logging: - driver: "json-file" - restart: "no" - depends_on: - - waveform-reader + glowroot-central: + condition: service_started + rabbitmq: + condition: service_healthy diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java index 371f4907b..9ac2985f6 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java @@ -6,12 +6,12 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; import org.springframework.integration.ip.tcp.connection.DefaultTcpNetConnectionSupport; import org.springframework.integration.ip.tcp.connection.TcpNetConnection; import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; +import org.springframework.integration.ip.tcp.serializer.TcpCodecs; import org.springframework.messaging.Message; import java.net.Socket; @@ -22,7 +22,6 @@ * Listen on a TCP port for incoming HL7 messages. */ @Configuration -@Profile("hl7reader") public class Hl7ListenerConfig { private final Logger logger = LoggerFactory.getLogger(Hl7ListenerConfig.class); @@ -43,6 +42,12 @@ public TcpNetServerConnectionFactory serverConnectionFactory( @Value("${waveform.hl7.listen_port}") int listenPort, @Value("${waveform.hl7.source_address_allow_list}") List sourceAddressAllowList) { TcpNetServerConnectionFactory connFactory = new TcpNetServerConnectionFactory(listenPort); + connFactory.setSoSendBufferSize(10 * 1024 * 1024); + connFactory.setSoReceiveBufferSize(10 * 1024 * 1024); + connFactory.setSoTimeout(10_000); + connFactory.setSoTcpNoDelay(false); + connFactory.setSoKeepAlive(true); + connFactory.setDeserializer(TcpCodecs.crlf(5_000_000)); connFactory.setTcpNetConnectionSupport(new DefaultTcpNetConnectionSupport() { @Override public TcpNetConnection createNewConnection( @@ -53,7 +58,8 @@ public TcpNetConnection createNewConnection( String connectionFactoryName) { TcpNetConnection conn = super.createNewConnection(socket, server, lookupHost, applicationEventPublisher, connectionFactoryName); String sourceAddress = conn.getHostAddress(); - if (sourceAddressAllowList.contains(sourceAddress)) { + if (sourceAddressAllowList.contains(sourceAddress) + || sourceAddressAllowList.contains("ALL")) { logger.info("connection accepted from {}:{}", sourceAddress, conn.getPort()); } else { logger.warn("CONNECTION REFUSED from {}:{}, allowlist = {}", sourceAddress, conn.getPort(), sourceAddressAllowList); @@ -84,11 +90,12 @@ public TcpReceivingChannelAdapter inbound(TcpNetServerConnectionFactory connecti * @throws InterruptedException . */ @ServiceActivator(inputChannel = "hl7Stream") - public void handler(Message msg) throws InterruptedException { - byte[] asBytes = (byte[]) msg.getPayload(); + public void handler(Message msg) throws InterruptedException { + byte[] asBytes = msg.getPayload(); String asStr = new String(asBytes, StandardCharsets.UTF_8); - // XXX: parse message from HL7 to interchange message, send to publisher + logger.trace("received message = {}", asStr); + logger.info("MESSAGE of size {}", asStr.length()); + // parse message from HL7 to interchange message, send to publisher hl7ParseAndSend.parseAndSend(asStr); - logger.info("MESSAGE {}", asStr); } } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java index 973c320ec..f88fdf383 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java @@ -1,6 +1,7 @@ package uk.ac.ucl.rits.inform.datasources.waveform; -import org.springframework.context.annotation.Profile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import uk.ac.ucl.rits.inform.interchange.InterchangeValue; import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage; @@ -8,12 +9,13 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @Component -@Profile("hl7reader") public class Hl7ParseAndSend { + private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndSend.class); private final WaveformOperations waveformOperations; @@ -21,34 +23,40 @@ public class Hl7ParseAndSend { this.waveformOperations = waveformOperations; } + private String getReMatch(String inputText, String field) { + Pattern pat = Pattern.compile("/" + field + "/(.*?)/"); + Matcher mat = pat.matcher(inputText); + mat.find(); + return mat.group(1); + } + private WaveformMessage parseHl7(String messageAsStr) { // XXX: Need real HL7 parsing! - Pattern srPat = Pattern.compile("/samplingRate|(.*)/"); - Pattern loPat = Pattern.compile("/locationId|(.*)/"); - Pattern stPat = Pattern.compile("/messageStartTime|(.*)/"); - Pattern idPat = Pattern.compile("/messageId|(.*)/"); - Pattern vaPat = Pattern.compile("/values|(.*)/"); - String samplingRateStr = srPat.matcher(messageAsStr).group(1); - String locationId = loPat.matcher(messageAsStr).group(1); - String messageStartTimeStr = stPat.matcher(messageAsStr).group(1); - String messageId = idPat.matcher(messageAsStr).group(1); - String valuesStr = vaPat.matcher(messageAsStr).group(1); + logger.info("Parsing message of size {}", messageAsStr.length()); + String samplingRateStr = getReMatch(messageAsStr, "samplingRate"); + String locationId = getReMatch(messageAsStr, "locationId"); + String messageStartTimeStr = getReMatch(messageAsStr, "messageStartTime"); + String messageId = getReMatch(messageAsStr, "messageId"); + String valuesStr = getReMatch(messageAsStr, "values"); WaveformMessage waveformMessage = new WaveformMessage(); waveformMessage.setSamplingRate(Long.parseLong(samplingRateStr)); waveformMessage.setLocationString(locationId); waveformMessage.setObservationTime(Instant.parse(messageStartTimeStr)); waveformMessage.setSourceMessageId(messageId); - String[] valuesAsString = valuesStr.split(","); - List values = Arrays.stream(valuesAsString).map(Double::parseDouble).collect(Collectors.toList()); + String[] valuesArray = valuesStr.split(","); + logger.trace("valuesArray = {} (length = {})", valuesArray, valuesArray.length); + List values = Arrays.stream(valuesArray).map(Double::parseDouble).collect(Collectors.toList()); waveformMessage.setNumericValues(new InterchangeValue<>(values)); + logger.debug("waveform message contains {} numerical values", values.size()); + logger.trace("output interchange waveform message = {}", waveformMessage); return waveformMessage; } /** * Parse and publish an HL7 message. * @param messageAsStr One HL7 message as a string - * @throws InterruptedException . + * @throws InterruptedException if publisher send is interrupted */ public void parseAndSend(String messageAsStr) throws InterruptedException { WaveformMessage msg = parseHl7(messageAsStr); diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java index 4fd113706..39799f083 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java @@ -2,13 +2,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; import uk.ac.ucl.rits.inform.interchange.messaging.Publisher; import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage; @Component -@Profile("hl7reader") public class WaveformOperations { private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -20,7 +18,6 @@ public WaveformOperations(Publisher publisher) { private void publishMessage(Publisher publisher, String messageId, WaveformMessage m) throws InterruptedException { - // logger.debug("Message = {}", m.toString()); publisher.submit(m, messageId, messageId, () -> { logger.debug("Successful ACK for message with ID {}", messageId); }); From 51af8d9d7424097e448cddd923a695979393c2a0 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 9 Aug 2024 10:34:50 +0100 Subject: [PATCH 2/3] Generator does not need interchange --- waveform-generator/pom.xml | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/waveform-generator/pom.xml b/waveform-generator/pom.xml index 10bd87dd4..bdf4471bc 100644 --- a/waveform-generator/pom.xml +++ b/waveform-generator/pom.xml @@ -19,7 +19,6 @@ UTF-8 2.6.3 17 - 2.7 10.3.1 3.3.0 3.2.1 @@ -32,20 +31,6 @@ - - uk.ac.ucl.rits.inform - emap-interchange - ${emap-interchange.version} - - - - uk.ac.ucl.rits.inform - emap-interchange - ${emap-interchange.version} - test-jar - test - - org.springframework.boot spring-boot-starter-amqp @@ -163,16 +148,6 @@ de.qaware.maven go-offline-maven-plugin ${go-offline-maven-plugin.version} - - - - uk.ac.ucl.rits.inform - emap-interchange - ${emap-interchange.version} - MAIN - - - maven-compiler-plugin From cf878e6f0a0c82b86be9f07ac65fcfd8d33ec5eb Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 9 Aug 2024 10:41:31 +0100 Subject: [PATCH 3/3] Remove more unnecessary dependencies --- waveform-generator/pom.xml | 33 +-------------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/waveform-generator/pom.xml b/waveform-generator/pom.xml index bdf4471bc..60d0724d3 100644 --- a/waveform-generator/pom.xml +++ b/waveform-generator/pom.xml @@ -23,7 +23,7 @@ 3.3.0 3.2.1 3.0.1u2 - uk.ac.ucl.rits.inform.datasources.waveform.Application + uk.ac.ucl.rits.inform.datasources.waveform_generator.Application 2.0.1.Final 4.7.1 3.28.0-GA @@ -36,11 +36,6 @@ spring-boot-starter-amqp - - org.springframework.integration - spring-integration-ip - - org.projectlombok lombok @@ -65,32 +60,6 @@ test - - com.fasterxml.jackson.core - jackson-databind - - - - com.fasterxml.jackson.core - jackson-annotations - - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - - - - com.fasterxml.jackson.module - jackson-module-jaxb-annotations - - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - test - - com.google.code.findbugs