diff --git a/core/docker-compose.fakeuds.yml b/core/docker-compose.fakeuds.yml index e3e630bb0..cd10aaafc 100644 --- a/core/docker-compose.fakeuds.yml +++ b/core/docker-compose.fakeuds.yml @@ -1,4 +1,3 @@ -version: '3.2' services: fakeuds: image: postgres:11-alpine diff --git a/core/docker-compose.yml b/core/docker-compose.yml index 04bbbc7a0..3a682ac1a 100644 --- a/core/docker-compose.yml +++ b/core/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.2' services: core: build: @@ -15,8 +14,10 @@ services: driver: "json-file" restart: on-failure depends_on: - - glowroot-central - - rabbitmq + glowroot-central: + condition: service_started + rabbitmq: + condition: service_healthy rabbitmq: image: rabbitmq:management env_file: @@ -25,6 +26,13 @@ services: - "${RABBITMQ_PORT}:5672" - "${RABBITMQ_ADMIN_PORT}:15672" restart: on-failure + healthcheck: + # rabbitmq server crashes if any rabbitmq-diagnostics cmd is run very soon + # after starting, so can't check too aggressively here + test: rabbitmq-diagnostics -q check_running + interval: 30s + timeout: 10s + retries: 3 cassandra: image: cassandra restart: on-failure diff --git a/emap-setup/emap_runner/docker/docker_runner.py b/emap-setup/emap_runner/docker/docker_runner.py index cf4673790..c00b27b32 100644 --- a/emap-setup/emap_runner/docker/docker_runner.py +++ b/emap-setup/emap_runner/docker/docker_runner.py @@ -86,6 +86,7 @@ def docker_compose_paths(self) -> List[Path]: Path(self.emap_dir, "core", "docker-compose.fakeuds.yml"), Path(self.emap_dir, "hl7-reader", "docker-compose.yml"), Path(self.emap_dir, "waveform-reader", "docker-compose.yml"), + Path(self.emap_dir, "waveform-generator", "docker-compose.yml"), ] # allow for hoover and to be optional compose path if "hoover" in self.config["repositories"]: diff --git a/hl7-reader/docker-compose.yml b/hl7-reader/docker-compose.yml index 7a0aaaec9..13be59d35 100644 --- a/hl7-reader/docker-compose.yml +++ b/hl7-reader/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.2' services: hl7-reader: build: diff --git a/waveform-generator/config.json b/waveform-generator/config.json new file mode 100644 index 000000000..f99e0ebab --- /dev/null +++ b/waveform-generator/config.json @@ -0,0 +1,24 @@ +{ + "transactions": { + "slowThresholdMillis": 300, + "profilingIntervalMillis": 1000, + "captureThreadStats": true + }, + "instrumentation": [ + { + "className": "uk.ac.ucl.rits.inform.datasources.waveform_generator.Hl7Generator", + "methodName": "generateMessages", + "methodParameterTypes": [ + ".." + ], + "captureKind": "transaction", + "transactionType": "Web", + "transactionNameTemplate": "mainLoop", + "transactionSlowThresholdMillis": 500, + "alreadyInTransactionBehavior": "capture-new-transaction", + "traceEntryMessageTemplate": "{{this}}.{{methodName}}({{0}}, {{1}}, {{2}})", + "timerName": "mainLoop" + } + ] +} + diff --git a/waveform-generator/docker-compose.yml b/waveform-generator/docker-compose.yml new file mode 100644 index 000000000..433b3f5c7 --- /dev/null +++ b/waveform-generator/docker-compose.yml @@ -0,0 +1,19 @@ +services: + waveform-generator: + build: + context: .. + dockerfile: waveform-reader/Dockerfile + target: waveform_generator + args: + HTTP_PROXY: ${HTTP_PROXY} + http_proxy: ${http_proxy} + HTTPS_PROXY: ${HTTPS_PROXY} + https_proxy: ${https_proxy} + SERVICE_SRC_DIR: waveform-generator + env_file: + - ../../config/waveform-reader-config-envs + logging: + driver: "json-file" + restart: "no" + depends_on: + - waveform-reader diff --git a/waveform-generator/glowroot.properties b/waveform-generator/glowroot.properties new file mode 100644 index 000000000..b6b225131 --- /dev/null +++ b/waveform-generator/glowroot.properties @@ -0,0 +1,2 @@ +agent.id=waveform-generator +collector.address=http://glowroot-central:8181 diff --git a/waveform-generator/pom.xml b/waveform-generator/pom.xml new file mode 100644 index 000000000..60d0724d3 --- /dev/null +++ b/waveform-generator/pom.xml @@ -0,0 +1,143 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.6.3 + + + uk.ac.ucl.rits.inform + waveform-generator + jar + 2.7 + Waveform Synthetic Generator + Service to generate synthetic HL7 messages and send them to the generator as a test + + + UTF-8 + 2.6.3 + 17 + 10.3.1 + 3.3.0 + 3.2.1 + 3.0.1u2 + uk.ac.ucl.rits.inform.datasources.waveform_generator.Application + 2.0.1.Final + 4.7.1 + 3.28.0-GA + 1.2.8 + + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.projectlombok + lombok + provided + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + + com.google.code.findbugs + annotations + ${com.google.code.findbugs.annotations.version} + + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + + + waveform-generator + + + org.springframework.boot + spring-boot-maven-plugin + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${checkstyle.plugin.version} + + ../emap-checker.xml + true + true + false + + + + com.puppycrawl.tools + checkstyle + ${checkstyle.version} + + + + + validate + validate + + check + + + + + + + de.qaware.maven + go-offline-maven-plugin + ${go-offline-maven-plugin.version} + + + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${java.version} + ${java.version} + ${java.version} + true + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + ${maven-surefire-plugin.version} + + + + + diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Application.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Application.java new file mode 100644 index 000000000..1dc379671 --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Application.java @@ -0,0 +1,29 @@ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * Spring application entry point. + * @author Jeremy Stein + */ +@SpringBootApplication(scanBasePackages = { + "uk.ac.ucl.rits.inform.datasources.waveform_generator", + }) +@EnableScheduling +public class Application { + private final Logger logger = LoggerFactory.getLogger(Application.class); + + /** + * @param args command line args + */ + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + + +} diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7Generator.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java similarity index 73% rename from waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7Generator.java rename to waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java index a862e7afa..af74a6da3 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7Generator.java +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java @@ -1,21 +1,23 @@ -package uk.ac.ucl.rits.inform.datasources.waveform; +package uk.ac.ucl.rits.inform.datasources.waveform_generator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @Component -@Profile("hl7gen") public class Hl7Generator { private final Logger logger = LoggerFactory.getLogger(Hl7Generator.class); @@ -44,19 +46,23 @@ public void setComputedDefaults() { } } -// private final WaveformOperations waveformOperations; -// -// public Hl7Generator(WaveformOperations waveformOperations) { -// this.waveformOperations = waveformOperations; -// } + private final Hl7TcpClientFactory hl7TcpClientFactory; /** - * Every one minute post a simulated batch of one minute's worth of data. - * Assume 30 patients, each with a 300Hz and a 50Hz machine. - * @throws InterruptedException dnowioinqwdnq + * @param hl7TcpClientFactory for sending generated messages + */ + public Hl7Generator(Hl7TcpClientFactory hl7TcpClientFactory) { + this.hl7TcpClientFactory = hl7TcpClientFactory; + } + + + /** + * Every one minute post a simulated batch of one minute's worth of data (times warp factor). + * Assume a given number of patients, each with a 300Hz and a 50Hz machine. + * @throws IOException on networking error */ @Scheduled(fixedRate = 60 * 1000) - public void generateMessages() throws InterruptedException { + public void generateMessages() throws IOException { var start = Instant.now(); // Usually if this method runs every N seconds, you would want to generate N // seconds worth of data. However, for non-live tests such as validation runs, @@ -67,9 +73,17 @@ public void generateMessages() throws InterruptedException { boolean shouldExit = false; for (int warpIdx = 0; warpIdx < warpFactor; warpIdx++) { List synthMsgs = makeSyntheticWaveformMsgsAllPatients(startDatetime, numPatients, numMillis); -// waveformOperations.sendSyntheticHl7Messages(synthMsgs); - // XXX: send to TCP port!!! - logger.info("Ready to send {} HL7 messages to a TCP port somewhere!", synthMsgs.size()); + logger.info("Sending {} HL7 messages", synthMsgs.size()); + + try (Hl7TcpClient tcpClient = hl7TcpClientFactory.createTcpClient()) { + for (var msgStr : synthMsgs) { + byte[] messageBytes = msgStr.getBytes(StandardCharsets.UTF_8); + logger.info("About to send message of size {} bytes", messageBytes.length); + logger.trace("Message = {}", messageBytes); + tcpClient.sendMessage(messageBytes); + } + } + startDatetime = startDatetime.plus(numMillis, ChronoUnit.MILLIS); if (endDatetime != null && startDatetime.isAfter(endDatetime)) { shouldExit = true; @@ -106,14 +120,15 @@ private List makeSyntheticWaveformMsgs(String locationId, for (long overallSampleIdx = 0; overallSampleIdx < numSamples;) { long microsAfterStart = overallSampleIdx * 1000_000 / samplingRate; Instant messageStartTime = startTime.plus(microsAfterStart, ChronoUnit.MICROS); - String messageId = String.format("%s_message%05d", locationId, overallSampleIdx); + String timeStr = DateTimeFormatter.ofPattern("HHmmss").format(startTime.atOffset(ZoneOffset.UTC)); + String messageId = String.format("%s_t%s_msg%05d", locationId, timeStr, overallSampleIdx); // XXX: make this into real HL7 StringBuilder hl7Template = new StringBuilder(); - hl7Template.append("/samplingRate|").append(samplingRate); - hl7Template.append("/locationId|").append(locationId); - hl7Template.append("/messageStartTime|").append(messageStartTime); - hl7Template.append("/messageId|").append(messageId); + hl7Template.append("/samplingRate/").append(samplingRate).append("/"); + hl7Template.append("/locationId/").append(locationId).append("/"); + hl7Template.append("/messageStartTime/").append(messageStartTime).append("/"); + hl7Template.append("/messageId/").append(messageId).append("/"); var values = new ArrayList(); long samplesPerMessage = samplingRate * millisPerMessage / 1000; @@ -123,8 +138,8 @@ private List makeSyntheticWaveformMsgs(String locationId, // a sine wave between maxValue and -maxValue values.add(2 * maxValue * Math.sin(overallSampleIdx * 0.01) - maxValue); } - String valuesAsStr = values.stream().map(Object::toString).collect(Collectors.joining(",")); - hl7Template.append("/values|").append(valuesAsStr); + String valuesAsStr = values.stream().map(d -> String.format("%.3f", d)).collect(Collectors.joining(",")); + hl7Template.append("/values/").append(valuesAsStr).append("/"); allMessages.add(hl7Template.toString()); } @@ -139,10 +154,9 @@ private List makeSyntheticWaveformMsgs(String locationId, * @param numPatients number of patients to generate for * @param numMillis length of observation period to generate data for * @return list of HL7 messages - * @throws InterruptedException . */ public List makeSyntheticWaveformMsgsAllPatients( - Instant startTime, long numPatients, long numMillis) throws InterruptedException { + Instant startTime, long numPatients, long numMillis) { List waveformMsgs = new ArrayList<>(); for (int p = 0; p < numPatients; p++) { var machine1Str = String.format("P%03d_mach1", p); @@ -154,7 +168,7 @@ public List makeSyntheticWaveformMsgsAllPatients( waveformMsgs.addAll(makeSyntheticWaveformMsgs( machine2Str, 300, numMillis, startTime, millisPerMessage)); int sizeAfter = waveformMsgs.size(); - logger.debug("JES: Patient {}, sending {} messages", p, sizeAfter - sizeBefore); + logger.debug("Patient {}, generated {} messages", p, sizeAfter - sizeBefore); } return waveformMsgs; diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClient.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClient.java new file mode 100644 index 000000000..528c3135a --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClient.java @@ -0,0 +1,41 @@ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; + +public class Hl7TcpClient implements AutoCloseable { + private final Socket socket; + + /** + * Connect over tcp. + * + * @param tcpHost connect to host + * @param tcpPort connect to port + * @throws IOException on connect error + */ + public Hl7TcpClient(String tcpHost, int tcpPort) throws IOException { + this.socket = new Socket(tcpHost, tcpPort); + } + + /** + * Send a message with CRLF. + * + * @param msg the byte array of the message to send, without the terminating CRLF + * @throws IOException tcp error + */ + public void sendMessage(byte[] msg) throws IOException { + OutputStream os = socket.getOutputStream(); + os.write(msg); + os.write("\r\n".getBytes()); + } + + /** + * Close connection. + * @throws IOException tcp error + */ + public void close() throws IOException { + socket.getOutputStream().close(); + socket.close(); + } +} diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClientFactory.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClientFactory.java new file mode 100644 index 000000000..484e71c3f --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7TcpClientFactory.java @@ -0,0 +1,23 @@ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +@Component +public class Hl7TcpClientFactory { + private final Logger logger = LoggerFactory.getLogger(Hl7TcpClientFactory.class); + + @Value("${waveform.hl7.send_host}") + private String tcpHost; + + @Value("${waveform.hl7.send_port}") + private int tcpPort; + + public Hl7TcpClient createTcpClient() throws IOException { + return new Hl7TcpClient(tcpHost, tcpPort); + } +} diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/package-info.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/package-info.java new file mode 100644 index 000000000..3d9bd6680 --- /dev/null +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/package-info.java @@ -0,0 +1,4 @@ +/** + * Generate synthetic waveform messages for testing purposes. + */ +package uk.ac.ucl.rits.inform.datasources.waveform_generator; diff --git a/waveform-reader/Dockerfile b/waveform-reader/Dockerfile index e24bca443..e86294980 100644 --- a/waveform-reader/Dockerfile +++ b/waveform-reader/Dockerfile @@ -6,29 +6,31 @@ RUN apt update; apt install -yy zip # Set up the Maven proxy settings COPY docker/set_mvn_proxy.sh /app/ # Download and extract glowroot -WORKDIR /app/waveform-reader +ARG SERVICE_SRC_DIR +WORKDIR /app/$SERVICE_SRC_DIR RUN curl -s https://api.github.com/repos/glowroot/glowroot/releases/latest \ | grep browser_download_url | grep "\.zip" | grep -v "central" | cut -d '"' -f 4 \ | xargs curl -L -o glowroot.zip; unzip glowroot.zip -COPY waveform-reader/glowroot.properties glowroot/glowroot.properties -COPY waveform-reader/config.json glowroot/config.json +COPY $SERVICE_SRC_DIR/glowroot.properties glowroot/glowroot.properties +COPY $SERVICE_SRC_DIR/config.json glowroot/config.json # Copy pom and checkstyle, then all generate resources COPY emap-checker.xml /app/ COPY emap-interchange/pom.xml /app/emap-interchange/ RUN source /app/set_mvn_proxy.sh; mvn dependency:go-offline -f /app/emap-interchange/pom.xml -COPY waveform-reader/pom.xml /app/waveform-reader/ -RUN source /app/set_mvn_proxy.sh; mvn de.qaware.maven:go-offline-maven-plugin:resolve-dependencies -f /app/waveform-reader/pom.xml +COPY $SERVICE_SRC_DIR/pom.xml /app/$SERVICE_SRC_DIR/ +RUN source /app/set_mvn_proxy.sh; mvn de.qaware.maven:go-offline-maven-plugin:resolve-dependencies -f /app/$SERVICE_SRC_DIR/pom.xml # Install emap-interchange COPY emap-interchange/src/ /app/emap-interchange/src/ RUN source /app/set_mvn_proxy.sh; mvn install -f /app/emap-interchange/pom.xml # Install waveform-reader -WORKDIR /app/waveform-reader -COPY waveform-reader/src/ /app/waveform-reader/src/ -RUN source /app/set_mvn_proxy.sh; mvn install -Dmaven.test.skip=true -Dstart-class=uk.ac.ucl.rits.inform.datasources.waveform.Application +WORKDIR /app/$SERVICE_SRC_DIR +COPY $SERVICE_SRC_DIR/src/ /app/$SERVICE_SRC_DIR/src/ FROM emap_java_base AS waveform_reader -CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-Dspring.profiles.active=hl7reader", "-jar", "./target/waveform-reader.jar"] +RUN source /app/set_mvn_proxy.sh; mvn install -Dmaven.test.skip=true -Dstart-class=uk.ac.ucl.rits.inform.datasources.waveform.Application +CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-jar", "./target/waveform-reader.jar"] -FROM emap_java_base AS waveform_hl7_generator -CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-Dspring.profiles.active=hl7gen", "-jar", "./target/waveform-reader.jar"] +FROM emap_java_base AS waveform_generator +RUN source /app/set_mvn_proxy.sh; mvn install -Dmaven.test.skip=true -Dstart-class=uk.ac.ucl.rits.inform.datasources.waveform_generator.Application +CMD ["java", "-javaagent:./glowroot/glowroot.jar", "-jar", "./target/waveform-generator.jar"] diff --git a/waveform-reader/config.json b/waveform-reader/config.json index 4b385d63e..14e7ae522 100644 --- a/waveform-reader/config.json +++ b/waveform-reader/config.json @@ -6,8 +6,9 @@ }, "instrumentation": [ { - "className": "uk.ac.ucl.rits.inform.datasources.waveform.Application", - "methodName": "mainLoop", + + "className": "uk.ac.ucl.rits.inform.datasources.waveform.Hl7ListenerConfig", + "methodName": "handler", "methodParameterTypes": [ ".." ], diff --git a/waveform-reader/docker-compose.yml b/waveform-reader/docker-compose.yml index 310d48944..1b3d60543 100644 --- a/waveform-reader/docker-compose.yml +++ b/waveform-reader/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.4' services: waveform-reader: build: @@ -10,6 +9,7 @@ services: http_proxy: ${http_proxy} HTTPS_PROXY: ${HTTPS_PROXY} https_proxy: ${https_proxy} + SERVICE_SRC_DIR: waveform-reader env_file: - ../../config/waveform-reader-config-envs ports: @@ -19,22 +19,7 @@ services: restart: "no" depends_on: # Uses services from core, orchestrate using the EMAP setup package - - glowroot-central - - rabbitmq - waveform-hl7-generator: - build: - context: .. - dockerfile: waveform-reader/Dockerfile - target: waveform_hl7_generator - args: - HTTP_PROXY: ${HTTP_PROXY} - http_proxy: ${http_proxy} - HTTPS_PROXY: ${HTTPS_PROXY} - https_proxy: ${https_proxy} - env_file: - - ../../config/waveform-reader-config-envs - logging: - driver: "json-file" - restart: "no" - depends_on: - - waveform-reader + glowroot-central: + condition: service_started + rabbitmq: + condition: service_healthy diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java index 371f4907b..9ac2985f6 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java @@ -6,12 +6,12 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; import org.springframework.integration.ip.tcp.connection.DefaultTcpNetConnectionSupport; import org.springframework.integration.ip.tcp.connection.TcpNetConnection; import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; +import org.springframework.integration.ip.tcp.serializer.TcpCodecs; import org.springframework.messaging.Message; import java.net.Socket; @@ -22,7 +22,6 @@ * Listen on a TCP port for incoming HL7 messages. */ @Configuration -@Profile("hl7reader") public class Hl7ListenerConfig { private final Logger logger = LoggerFactory.getLogger(Hl7ListenerConfig.class); @@ -43,6 +42,12 @@ public TcpNetServerConnectionFactory serverConnectionFactory( @Value("${waveform.hl7.listen_port}") int listenPort, @Value("${waveform.hl7.source_address_allow_list}") List sourceAddressAllowList) { TcpNetServerConnectionFactory connFactory = new TcpNetServerConnectionFactory(listenPort); + connFactory.setSoSendBufferSize(10 * 1024 * 1024); + connFactory.setSoReceiveBufferSize(10 * 1024 * 1024); + connFactory.setSoTimeout(10_000); + connFactory.setSoTcpNoDelay(false); + connFactory.setSoKeepAlive(true); + connFactory.setDeserializer(TcpCodecs.crlf(5_000_000)); connFactory.setTcpNetConnectionSupport(new DefaultTcpNetConnectionSupport() { @Override public TcpNetConnection createNewConnection( @@ -53,7 +58,8 @@ public TcpNetConnection createNewConnection( String connectionFactoryName) { TcpNetConnection conn = super.createNewConnection(socket, server, lookupHost, applicationEventPublisher, connectionFactoryName); String sourceAddress = conn.getHostAddress(); - if (sourceAddressAllowList.contains(sourceAddress)) { + if (sourceAddressAllowList.contains(sourceAddress) + || sourceAddressAllowList.contains("ALL")) { logger.info("connection accepted from {}:{}", sourceAddress, conn.getPort()); } else { logger.warn("CONNECTION REFUSED from {}:{}, allowlist = {}", sourceAddress, conn.getPort(), sourceAddressAllowList); @@ -84,11 +90,12 @@ public TcpReceivingChannelAdapter inbound(TcpNetServerConnectionFactory connecti * @throws InterruptedException . */ @ServiceActivator(inputChannel = "hl7Stream") - public void handler(Message msg) throws InterruptedException { - byte[] asBytes = (byte[]) msg.getPayload(); + public void handler(Message msg) throws InterruptedException { + byte[] asBytes = msg.getPayload(); String asStr = new String(asBytes, StandardCharsets.UTF_8); - // XXX: parse message from HL7 to interchange message, send to publisher + logger.trace("received message = {}", asStr); + logger.info("MESSAGE of size {}", asStr.length()); + // parse message from HL7 to interchange message, send to publisher hl7ParseAndSend.parseAndSend(asStr); - logger.info("MESSAGE {}", asStr); } } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java index 973c320ec..f88fdf383 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java @@ -1,6 +1,7 @@ package uk.ac.ucl.rits.inform.datasources.waveform; -import org.springframework.context.annotation.Profile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import uk.ac.ucl.rits.inform.interchange.InterchangeValue; import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage; @@ -8,12 +9,13 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @Component -@Profile("hl7reader") public class Hl7ParseAndSend { + private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndSend.class); private final WaveformOperations waveformOperations; @@ -21,34 +23,40 @@ public class Hl7ParseAndSend { this.waveformOperations = waveformOperations; } + private String getReMatch(String inputText, String field) { + Pattern pat = Pattern.compile("/" + field + "/(.*?)/"); + Matcher mat = pat.matcher(inputText); + mat.find(); + return mat.group(1); + } + private WaveformMessage parseHl7(String messageAsStr) { // XXX: Need real HL7 parsing! - Pattern srPat = Pattern.compile("/samplingRate|(.*)/"); - Pattern loPat = Pattern.compile("/locationId|(.*)/"); - Pattern stPat = Pattern.compile("/messageStartTime|(.*)/"); - Pattern idPat = Pattern.compile("/messageId|(.*)/"); - Pattern vaPat = Pattern.compile("/values|(.*)/"); - String samplingRateStr = srPat.matcher(messageAsStr).group(1); - String locationId = loPat.matcher(messageAsStr).group(1); - String messageStartTimeStr = stPat.matcher(messageAsStr).group(1); - String messageId = idPat.matcher(messageAsStr).group(1); - String valuesStr = vaPat.matcher(messageAsStr).group(1); + logger.info("Parsing message of size {}", messageAsStr.length()); + String samplingRateStr = getReMatch(messageAsStr, "samplingRate"); + String locationId = getReMatch(messageAsStr, "locationId"); + String messageStartTimeStr = getReMatch(messageAsStr, "messageStartTime"); + String messageId = getReMatch(messageAsStr, "messageId"); + String valuesStr = getReMatch(messageAsStr, "values"); WaveformMessage waveformMessage = new WaveformMessage(); waveformMessage.setSamplingRate(Long.parseLong(samplingRateStr)); waveformMessage.setLocationString(locationId); waveformMessage.setObservationTime(Instant.parse(messageStartTimeStr)); waveformMessage.setSourceMessageId(messageId); - String[] valuesAsString = valuesStr.split(","); - List values = Arrays.stream(valuesAsString).map(Double::parseDouble).collect(Collectors.toList()); + String[] valuesArray = valuesStr.split(","); + logger.trace("valuesArray = {} (length = {})", valuesArray, valuesArray.length); + List values = Arrays.stream(valuesArray).map(Double::parseDouble).collect(Collectors.toList()); waveformMessage.setNumericValues(new InterchangeValue<>(values)); + logger.debug("waveform message contains {} numerical values", values.size()); + logger.trace("output interchange waveform message = {}", waveformMessage); return waveformMessage; } /** * Parse and publish an HL7 message. * @param messageAsStr One HL7 message as a string - * @throws InterruptedException . + * @throws InterruptedException if publisher send is interrupted */ public void parseAndSend(String messageAsStr) throws InterruptedException { WaveformMessage msg = parseHl7(messageAsStr); diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java index 4fd113706..39799f083 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformOperations.java @@ -2,13 +2,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; import uk.ac.ucl.rits.inform.interchange.messaging.Publisher; import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage; @Component -@Profile("hl7reader") public class WaveformOperations { private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -20,7 +18,6 @@ public WaveformOperations(Publisher publisher) { private void publishMessage(Publisher publisher, String messageId, WaveformMessage m) throws InterruptedException { - // logger.debug("Message = {}", m.toString()); publisher.submit(m, messageId, messageId, () -> { logger.debug("Successful ACK for message with ID {}", messageId); });