From 25dea0172110f5cc338f4e8a2460e2644d88dbe9 Mon Sep 17 00:00:00 2001 From: Chad Kienle Date: Fri, 11 Aug 2023 11:04:24 -0700 Subject: [PATCH 1/9] Upticked develop pom versions to v1.0.6-SNAPSHOT Signed-off-by: Chad Kienle --- java/compat_impl/edge/pom.xml | 2 +- java/compat_impl/host/pom.xml | 2 +- java/examples/device_timestamp/pom.xml | 2 +- java/examples/edge_node_control/pom.xml | 2 +- java/examples/host_file/pom.xml | 2 +- java/examples/listener/pom.xml | 2 +- java/examples/pom.xml | 2 +- java/examples/raspberry_pi/pom.xml | 2 +- java/examples/records/pom.xml | 2 +- java/examples/simple/pom.xml | 2 +- java/examples/udt/pom.xml | 2 +- java/lib/core/pom.xml | 2 +- java/lib/edge/pom.xml | 2 +- java/lib/host/pom.xml | 2 +- java/pom.xml | 2 +- 15 files changed, 15 insertions(+), 15 deletions(-) diff --git a/java/compat_impl/edge/pom.xml b/java/compat_impl/edge/pom.xml index 1d297031..5b085113 100644 --- a/java/compat_impl/edge/pom.xml +++ b/java/compat_impl/edge/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.5 + 1.0.6-SNAPSHOT ../../pom.xml diff --git a/java/compat_impl/host/pom.xml b/java/compat_impl/host/pom.xml index acf28b2c..9a3bc78b 100644 --- a/java/compat_impl/host/pom.xml +++ b/java/compat_impl/host/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.5 + 1.0.6-SNAPSHOT ../../pom.xml diff --git a/java/examples/device_timestamp/pom.xml b/java/examples/device_timestamp/pom.xml index a96ea4bd..16293f44 100644 --- a/java/examples/device_timestamp/pom.xml +++ b/java/examples/device_timestamp/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/examples/edge_node_control/pom.xml b/java/examples/edge_node_control/pom.xml index 0a1fcc04..d3551f61 100644 --- a/java/examples/edge_node_control/pom.xml +++ b/java/examples/edge_node_control/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/examples/host_file/pom.xml b/java/examples/host_file/pom.xml index 8d67a231..17a2a11a 100644 --- a/java/examples/host_file/pom.xml +++ b/java/examples/host_file/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/examples/listener/pom.xml b/java/examples/listener/pom.xml index 87820422..55da4398 100644 --- a/java/examples/listener/pom.xml +++ b/java/examples/listener/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/examples/pom.xml b/java/examples/pom.xml index 725d72b9..3e1519d0 100644 --- a/java/examples/pom.xml +++ b/java/examples/pom.xml @@ -29,7 +29,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT pom Eclipse Tahu diff --git a/java/examples/raspberry_pi/pom.xml b/java/examples/raspberry_pi/pom.xml index b4f44ae4..6da58429 100644 --- a/java/examples/raspberry_pi/pom.xml +++ b/java/examples/raspberry_pi/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/examples/records/pom.xml b/java/examples/records/pom.xml index ed017e6d..279b8eaf 100644 --- a/java/examples/records/pom.xml +++ b/java/examples/records/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/examples/simple/pom.xml b/java/examples/simple/pom.xml index 0118f1d8..19faa95c 100644 --- a/java/examples/simple/pom.xml +++ b/java/examples/simple/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/examples/udt/pom.xml b/java/examples/udt/pom.xml index d2d2c3c7..4e24f0af 100644 --- a/java/examples/udt/pom.xml +++ b/java/examples/udt/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.5 + 1.0.6-SNAPSHOT ../pom.xml diff --git a/java/lib/core/pom.xml b/java/lib/core/pom.xml index eaff7ba8..4213fbd5 100644 --- a/java/lib/core/pom.xml +++ b/java/lib/core/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.5 + 1.0.6-SNAPSHOT ../../pom.xml diff --git a/java/lib/edge/pom.xml b/java/lib/edge/pom.xml index 123b8b4c..b7314bcc 100644 --- a/java/lib/edge/pom.xml +++ b/java/lib/edge/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.5 + 1.0.6-SNAPSHOT ../../pom.xml diff --git a/java/lib/host/pom.xml b/java/lib/host/pom.xml index 0571e1a1..5cbe103f 100644 --- a/java/lib/host/pom.xml +++ b/java/lib/host/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.5 + 1.0.6-SNAPSHOT ../../pom.xml diff --git a/java/pom.xml b/java/pom.xml index 485920b6..bce0af22 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -31,7 +31,7 @@ org.eclipse.tahu tahu - 1.0.5 + 1.0.6-SNAPSHOT pom Eclipse Tahu From 42a7055bcf6657941141bd9c3fddb4949e53bba2 Mon Sep 17 00:00:00 2001 From: Chad Kienle Date: Tue, 22 Aug 2023 15:58:46 -0700 Subject: [PATCH 2/9] Fixed inbound msg lockup when subscribing in connectComplete callback --- .../org/eclipse/tahu/mqtt/TahuClient.java | 86 ++++++++++++------- 1 file changed, 55 insertions(+), 31 deletions(-) diff --git a/java/lib/core/src/main/java/org/eclipse/tahu/mqtt/TahuClient.java b/java/lib/core/src/main/java/org/eclipse/tahu/mqtt/TahuClient.java index d4498cbb..56a8724e 100644 --- a/java/lib/core/src/main/java/org/eclipse/tahu/mqtt/TahuClient.java +++ b/java/lib/core/src/main/java/org/eclipse/tahu/mqtt/TahuClient.java @@ -486,8 +486,8 @@ public int subscribe(String topic, int qos) throws TahuException { if (client != null) { if (client.isConnected()) { try { - logger.debug("{}: on connection to {} - Attempting to subscribe on topic {} with QoS={}", - getClientId(), getMqttServerName(), topic, qos); + logger.debug("{}: server {} - Attempting to subscribe on topic {} with QoS={}", getClientId(), + getMqttServerName(), topic, qos); IMqttToken token = client.subscribe(topic, qos); logger.trace("{}: Waiting for subscription on {}", getClientId(), topic); token.waitForCompletion(); @@ -498,14 +498,14 @@ public int subscribe(String topic, int qos) throws TahuException { if (grantedQos != null && grantedQos.length == 1) { return grantedQos[0]; } else { - String errorMessage = getClientId() + ": on connection to " + getMqttServerName() + String errorMessage = getClientId() + ": server " + getMqttServerName() + " - Failed to subscribe to " + topic; logger.error(errorMessage); throw new TahuException(TahuErrorCode.NOT_AUTHORIZED, errorMessage); } } catch (MqttException e) { - logger.error(getClientId() + ": on connection to " + getMqttServerName() - + " - Failed to subscribe to " + topic); + logger.error(getClientId() + ": server " + getMqttServerName() + " - Failed to subscribe to " + + topic); throw new TahuException(TahuErrorCode.INTERNAL_ERROR, e); } } @@ -593,6 +593,11 @@ public void unsubscribe(String topic) throws TahuException { @Override public void connectionLost(Throwable cause) { logger.debug("{}: MQTT connectionLost() to {} :: {}", getClientId(), getMqttServerName(), getMqttServerUrl()); + if (logger.isTraceEnabled()) { + if (client != null) { + client.getDebug().dumpClientDebug(); + } + } // reset the timers if needed if (getDisconnectTime() == null) { @@ -1249,37 +1254,56 @@ public void connectComplete(boolean reconnect, String serverURI) { String topicStr = Arrays.toString(topics); String qosStr = Arrays.toString(qosLevels); - logger.debug("{}: on connection to {} - Attempting to subscribe on topic {} with QoS={}", - getClientId(), getMqttServerName(), topicStr, qosStr); + logger.debug("{}: server {} - Attempting to subscribe on topic {} with QoS={}", getClientId(), + getMqttServerName(), topicStr, qosStr); try { - IMqttToken token = client.subscribe(topics, qosLevels); - logger.trace("{}: Waiting for subscription on {}", getClientId(), topicStr); - token.waitForCompletion(); - logger.trace("{}: Done waiting for subscription on {}", getClientId(), topicStr); - int[] grantedQos = token.getGrantedQos(); - if (Arrays.equals(qosLevels, grantedQos)) { - logger.debug("{}: on connection to {} - Successfully subscribed on {} on QoS={}", - getClientId(), getMqttServerName(), topicStr, qosStr); - } else { - try { - logger.error("{}: on connection to {} - Failed to subscribe on {} - forcing disconnect", - getClientId(), getMqttServerName(), topicStr); + client.subscribe(topics, qosLevels, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + int[] grantedQos = asyncActionToken.getGrantedQos(); + if (Arrays.equals(qosLevels, grantedQos)) { + logger.debug("{}: server {} - Successfully subscribed on {} on QoS={}", + getClientId(), getMqttServerName(), topicStr, qosStr); + } else { + try { + String grantedQosStr = Arrays.toString(grantedQos); + logger.error("{}: server {} - Failed subscribe on {} granted QoS {} != {}", + getClientId(), getMqttServerName(), topicStr, qosStr, grantedQosStr); - // FIXME - remove This sleep is necessary due to: - // https://github.com/eclipse/paho.mqtt.java/issues/850 - Thread.sleep(1000); + // FIXME - remove This sleep is necessary due to: + // https://github.com/eclipse/paho.mqtt.java/issues/850 + Thread.sleep(1000); - // Force the disconnect and return - client.disconnectForcibly(0, 1, false); - return; - } catch (Exception e) { - logger.error("{}: on connection to {} - Failed to disconnect on failed subscription", - getClientId(), getMqttServerName(), e); - break; + synchronized (clientLock) { + // Force the disconnect and return + client.disconnectForcibly(0, 1, false); + } + return; + } catch (Exception e) { + logger.error( + "{}: server {} - Failed disconnect on failed subscribe granted QoS", + getClientId(), getMqttServerName(), e); + } + } } - } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + synchronized (clientLock) { + try { + logger.error("{}: server {} - Failed to subscribe on {}", + getClientId(), getMqttServerName(), topicStr); + client.disconnectForcibly(0, 1, false); + } catch (MqttException e) { + logger.error("{}: server {} - Failed disconnect on failed subscribe", + getClientId(), getMqttServerName(), e); + } + } + } + + }); } catch (MqttException e) { - logger.error("{}: on connection to {} - Failed to subscribe on {} with QoS={}", getClientId(), + logger.error("{}: server {} - Failed to subscribe on {} with QoS={}", getClientId(), getMqttServerName(), topicStr, qosStr, e); break; } From 9ba0869525a55db4628380c055193bda7296f041 Mon Sep 17 00:00:00 2001 From: Chad Kienle Date: Wed, 6 Sep 2023 10:24:31 -0700 Subject: [PATCH 3/9] Upticked versions of js libraries sparkplug-payload to 1.0.3, sparkplug-client to 3.2.4 --- javascript/core/sparkplug-client/README.md | 1 + javascript/core/sparkplug-client/package-lock.json | 4 ++-- javascript/core/sparkplug-client/package.json | 4 ++-- javascript/core/sparkplug-payload/README.md | 1 + javascript/core/sparkplug-payload/package-lock.json | 2 +- javascript/core/sparkplug-payload/package.json | 2 +- 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/javascript/core/sparkplug-client/README.md b/javascript/core/sparkplug-client/README.md index 963f8378..ec4649b1 100644 --- a/javascript/core/sparkplug-client/README.md +++ b/javascript/core/sparkplug-client/README.md @@ -433,6 +433,7 @@ client.on('close', function () { * 3.2.1 Updated License and repo links, cleaned up logging. * 3.2.2 Bug Fixes * 3.2.3 Bug Fixes, added typescript +* 3.2.4 Updated sparkplug-payload dependency version. ## License diff --git a/javascript/core/sparkplug-client/package-lock.json b/javascript/core/sparkplug-client/package-lock.json index 7ae7868a..5305f065 100644 --- a/javascript/core/sparkplug-client/package-lock.json +++ b/javascript/core/sparkplug-client/package-lock.json @@ -1,12 +1,12 @@ { "name": "sparkplug-client", - "version": "3.2.3", + "version": "3.2.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "sparkplug-client", - "version": "3.2.3", + "version": "3.2.4", "license": "EPL-2.0", "dependencies": { "debug": "^4.3.4", diff --git a/javascript/core/sparkplug-client/package.json b/javascript/core/sparkplug-client/package.json index 84ac525e..cf95033d 100644 --- a/javascript/core/sparkplug-client/package.json +++ b/javascript/core/sparkplug-client/package.json @@ -1,6 +1,6 @@ { "name": "sparkplug-client", - "version": "3.2.3", + "version": "3.2.4", "description": "A client module for MQTT communication using the Sparkplug specification from Cirrus Link Solutions", "main": "index.js", "scripts": { @@ -34,7 +34,7 @@ "debug": "^4.3.4", "mqtt": "^4.2.8", "pako": "^2.0.4", - "sparkplug-payload": "^1.0.2" + "sparkplug-payload": "^1.0.3" }, "devDependencies": { "@types/debug": "^4.1.7", diff --git a/javascript/core/sparkplug-payload/README.md b/javascript/core/sparkplug-payload/README.md index 0dcd6cd4..2e774653 100644 --- a/javascript/core/sparkplug-payload/README.md +++ b/javascript/core/sparkplug-payload/README.md @@ -52,6 +52,7 @@ var decoded = sparkplug.decodePayload(encoded); * 1.0.0 Initial release * 1.0.1 Bug fixes * 1.0.2 Bug fixes, added typescript +* 1.0.3 Bug fixes ## License diff --git a/javascript/core/sparkplug-payload/package-lock.json b/javascript/core/sparkplug-payload/package-lock.json index 5e6cb31d..9c15361b 100644 --- a/javascript/core/sparkplug-payload/package-lock.json +++ b/javascript/core/sparkplug-payload/package-lock.json @@ -1,6 +1,6 @@ { "name": "sparkplug-payload", - "version": "1.0.2", + "version": "1.0.3", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/javascript/core/sparkplug-payload/package.json b/javascript/core/sparkplug-payload/package.json index 4d6a76f2..3f10e9d8 100644 --- a/javascript/core/sparkplug-payload/package.json +++ b/javascript/core/sparkplug-payload/package.json @@ -1,6 +1,6 @@ { "name": "sparkplug-payload", - "version": "1.0.2", + "version": "1.0.3", "description": "A library for encoding and decoding Sparkplug payloads", "main": "index.js", "scripts": { From f72805e1160c747de90201f6d8586766b97eb4f6 Mon Sep 17 00:00:00 2001 From: wes-johnson Date: Thu, 21 Sep 2023 15:22:31 -0700 Subject: [PATCH 4/9] Modified Host Applications to support configurable subscriptions and added null checks --- .../tahu/host/SparkplugHostApplication.java | 7 +- .../org/eclipse/tahu/test/SequenceTest.java | 69 ++++++++++++++++ .../eclipse/tahu/host/HostApplication.java | 81 ++++++++++++------- .../tahu/host/manager/MetricManager.java | 10 ++- 4 files changed, 134 insertions(+), 33 deletions(-) create mode 100644 java/lib/core/src/test/java/org/eclipse/tahu/test/SequenceTest.java diff --git a/java/compat_impl/host/src/main/java/org/eclipse/tahu/host/SparkplugHostApplication.java b/java/compat_impl/host/src/main/java/org/eclipse/tahu/host/SparkplugHostApplication.java index e28bf250..ba9f9010 100644 --- a/java/compat_impl/host/src/main/java/org/eclipse/tahu/host/SparkplugHostApplication.java +++ b/java/compat_impl/host/src/main/java/org/eclipse/tahu/host/SparkplugHostApplication.java @@ -14,6 +14,7 @@ package org.eclipse.tahu.host; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.eclipse.tahu.exception.TahuException; @@ -24,6 +25,7 @@ import org.eclipse.tahu.message.model.Message; import org.eclipse.tahu.message.model.Metric; import org.eclipse.tahu.message.model.SparkplugDescriptor; +import org.eclipse.tahu.message.model.SparkplugMeta; import org.eclipse.tahu.model.MqttServerDefinition; import org.eclipse.tahu.mqtt.MqttClientId; import org.eclipse.tahu.mqtt.MqttServerName; @@ -97,8 +99,9 @@ public static void main(String[] arg) { public SparkplugHostApplication() { try { - hostApplication = - new HostApplication(this, HOST_ID, mqttServerDefinitions, null, new SparkplugBPayloadDecoder()); + hostApplication = new HostApplication(this, HOST_ID, + new ArrayList<>(Arrays.asList(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX + "/#")), + mqttServerDefinitions, null, new SparkplugBPayloadDecoder()); } catch (Exception e) { logger.error("Failed to create the HostApplication", e); } diff --git a/java/lib/core/src/test/java/org/eclipse/tahu/test/SequenceTest.java b/java/lib/core/src/test/java/org/eclipse/tahu/test/SequenceTest.java new file mode 100644 index 00000000..31c87eaa --- /dev/null +++ b/java/lib/core/src/test/java/org/eclipse/tahu/test/SequenceTest.java @@ -0,0 +1,69 @@ +/* + * Licensed Materials - Property of Cirrus Link Solutions + * Copyright (c) 2022 Cirrus Link Solutions LLC - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited + * Proprietary and confidential + */ +package org.eclipse.tahu.test; + +import static org.assertj.core.api.Assertions.fail; + +import java.util.Date; + +import org.eclipse.tahu.SparkplugInvalidTypeException; +import org.eclipse.tahu.message.PayloadDecoder; +import org.eclipse.tahu.message.SparkplugBPayloadDecoder; +import org.eclipse.tahu.message.SparkplugBPayloadEncoder; +import org.eclipse.tahu.message.model.SparkplugBPayload; +import org.eclipse.tahu.message.model.SparkplugBPayload.SparkplugBPayloadBuilder; +import org.testng.annotations.Test; + +/** + * Sparkplug Test class for encoding and decoding sparkplug payloads + */ +public class SequenceTest { + + public SequenceTest() { + } + + @Test + public void testEnDeCode() throws SparkplugInvalidTypeException { + unit(null); + unit(0L); + unit(1L); + } + + private void unit(Long seq) { + Date currentTime = new Date(0L); + + // Encode + SparkplugBPayloadEncoder encoder = new SparkplugBPayloadEncoder(); + PayloadDecoder decoder = new SparkplugBPayloadDecoder(); + try { + SparkplugBPayload initialPayload = + new SparkplugBPayloadBuilder().setTimestamp(currentTime).setSeq(seq).createPayload(); + byte[] bytes = encoder.getBytes(initialPayload, false); + SparkplugBPayload decodedPayload = decoder.buildFromByteArray(bytes, null); + System.out.println("Initial: " + initialPayload); + System.out.println(seq + ": " + bytesToHex(bytes)); + System.out.println("Decoded: " + decodedPayload); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private final static char[] hexArray = "0123456789ABCDEF".toCharArray(); + + private static String bytesToHex(byte[] bytes) { + char[] hexChars = new char[bytes.length * 3]; + int v; + for (int j = 0; j < bytes.length; j++) { + v = bytes[j] & 0xFF; + hexChars[j * 3] = hexArray[v >>> 4]; + hexChars[j * 3 + 1] = hexArray[v & 0x0F]; + hexChars[j * 3 + 2] = 0x20; // space separator + } + return new String(hexChars); + } +} diff --git a/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java b/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java index 5cbeca01..8d5ddb3e 100644 --- a/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java +++ b/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java @@ -43,37 +43,50 @@ public class HostApplication implements CommandPublisher { private final String hostId; private final RandomStartupDelay randomStartupDelay; private final String stateTopic; - + private final List sparkplugSubscriptons; private final TahuHostCallback tahuHostCallback; - private final List mqttServerDefinitions; private final Map tahuClients = new HashMap<>(); - public HostApplication(HostApplicationEventHandler eventHandler, String hostId, + public HostApplication(HostApplicationEventHandler eventHandler, String hostId, List sparkplugSubscriptons, List mqttServerDefinitions, RandomStartupDelay randomStartupDelay, PayloadDecoder payloadDecoder) { logger.info("Creating the Host Application"); - this.hostId = hostId; + if (hostId != null) { + this.hostId = hostId; + this.stateTopic = SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_PREFIX + "/" + hostId; + } else { + this.hostId = null; + this.stateTopic = null; + } + this.sparkplugSubscriptons = sparkplugSubscriptons; this.mqttServerDefinitions = mqttServerDefinitions; this.randomStartupDelay = randomStartupDelay; - this.stateTopic = SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_PREFIX + "/" + hostId; SequenceReorderManager sequenceReorderManager = SequenceReorderManager.getInstance(); sequenceReorderManager.init(eventHandler, this, payloadDecoder, 5000L); this.tahuHostCallback = new TahuHostCallback(eventHandler, this, sequenceReorderManager, payloadDecoder); } - public HostApplication(HostApplicationEventHandler eventHandler, String hostId, TahuHostCallback tahuHostCallback, - Map tahuClients, RandomStartupDelay randomStartupDelay) { + public HostApplication(HostApplicationEventHandler eventHandler, String hostId, List sparkplugSubscriptons, + TahuHostCallback tahuHostCallback, Map tahuClients, + RandomStartupDelay randomStartupDelay) { logger.info("Creating the Host Application"); - this.hostId = hostId; + if (hostId != null) { + this.hostId = hostId; + this.stateTopic = SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_PREFIX + "/" + hostId; + } else { + this.hostId = null; + this.stateTopic = null; + } + + this.sparkplugSubscriptons = sparkplugSubscriptons; this.tahuHostCallback = tahuHostCallback; this.mqttServerDefinitions = null; this.tahuClients.putAll(tahuClients); this.randomStartupDelay = randomStartupDelay; - this.stateTopic = SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_PREFIX + "/" + hostId; } public void start() { @@ -110,21 +123,24 @@ private void startClient(TahuClient tahuClient) { tahuClient.setAutoReconnect(true); tahuClient.connect(); - // Subscribe to our own spBv1.0/STATE topic - logger.debug("PrimaryHostId is set. Subscribing on {}", stateTopic); - int grantedQos = tahuClient.subscribe(stateTopic, MqttOperatorDefs.QOS1); - if (grantedQos != 1) { - logger.error("Failed to subscribe to '{}'", stateTopic); - return; + // Subscribe to our own STATE topic + if (stateTopic != null) { + logger.debug("PrimaryHostId is set. Subscribing on {}", stateTopic); + int grantedQos = tahuClient.subscribe(stateTopic, MqttOperatorDefs.QOS1); + if (grantedQos != 1) { + logger.error("Failed to subscribe to '{}'", stateTopic); + return; + } } - // Subscribe to the spBv1.0 namespace - String topic = "spBv1.0/#"; - logger.debug("PrimaryHostId is set. Subscribing on {}", topic); - grantedQos = tahuClient.subscribe(topic, MqttOperatorDefs.QOS0); - if (grantedQos != 0) { - logger.error("Failed to subscribe to '{}'", topic); - return; + for (String subscriptionTopic : sparkplugSubscriptons) { + // Subscribe to the Sparkplug namespace(s) + logger.debug("Subscribing on {}", subscriptionTopic); + int grantedQos = tahuClient.subscribe(subscriptionTopic, MqttOperatorDefs.QOS0); + if (grantedQos != 0) { + logger.error("Failed to subscribe to '{}'", subscriptionTopic); + return; + } } // Pub @@ -144,14 +160,17 @@ public void shutdown() { // Unsubscribe // removeMqttClientSubscriptions(tahuClient, unsubscribe); - // Clean up spBv1.0/STATE subscriptions - logger.debug("Unsubscribing from {}", stateTopic); - tahuClient.unsubscribe(stateTopic); + if (stateTopic != null) { + // Clean up STATE subscriptions + logger.debug("Unsubscribing from {}", stateTopic); + tahuClient.unsubscribe(stateTopic); + } - // Clean up the Sparkplug subscription - String topic = "spBv1.0/#"; - logger.debug("Unsubscribing from {}", topic); - tahuClient.unsubscribe(topic); + for (String subscriptionTopic : sparkplugSubscriptons) { + // Clean up the Sparkplug subscription(s) + logger.debug("Unsubscribing from {}", subscriptionTopic); + tahuClient.unsubscribe(subscriptionTopic); + } // Shut down the client after the MQTT client is disconnected to prevent RejectedExecutionExceptions tahuHostCallback.shutdown(); @@ -175,6 +194,10 @@ public void shutdown() { } } + public String getHostId() { + return hostId; + } + @Override public void publishCommand(Topic topic, SparkplugBPayload payload) throws Exception { for (MqttServerName mqttServerName : tahuClients.keySet()) { diff --git a/java/lib/host/src/main/java/org/eclipse/tahu/host/manager/MetricManager.java b/java/lib/host/src/main/java/org/eclipse/tahu/host/manager/MetricManager.java index 98ef7dae..0df77a6d 100644 --- a/java/lib/host/src/main/java/org/eclipse/tahu/host/manager/MetricManager.java +++ b/java/lib/host/src/main/java/org/eclipse/tahu/host/manager/MetricManager.java @@ -48,11 +48,17 @@ public void putMetric(String metricName, HostMetric metric) { } public void updateValue(String metricName, Object value) { - metricMap.get(metricName).setValue(value); + HostMetric hostMetric = metricMap.get(metricName); + if (hostMetric != null) { + hostMetric.setValue(value); + } } public void setStale(String metricName, boolean stale) { - metricMap.get(metricName).setStale(stale); + HostMetric hostMetric = metricMap.get(metricName); + if (hostMetric != null) { + hostMetric.setStale(stale); + } } public void clearMetrics() { From 8b979467aea485e1b9271465d533d2529c8eb443 Mon Sep 17 00:00:00 2001 From: wes-johnson Date: Thu, 5 Oct 2023 13:43:38 -0700 Subject: [PATCH 5/9] Added EdgeNodeMetricMaps class for handling datatype lookups in Edge Nodes --- .../eclipse/tahu/edge/EdgeNodeMetricMaps.java | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 java/lib/edge/src/main/java/org/eclipse/tahu/edge/EdgeNodeMetricMaps.java diff --git a/java/lib/edge/src/main/java/org/eclipse/tahu/edge/EdgeNodeMetricMaps.java b/java/lib/edge/src/main/java/org/eclipse/tahu/edge/EdgeNodeMetricMaps.java new file mode 100644 index 00000000..02480a41 --- /dev/null +++ b/java/lib/edge/src/main/java/org/eclipse/tahu/edge/EdgeNodeMetricMaps.java @@ -0,0 +1,152 @@ +/******************************************************************************** + * Copyright (c) 2023 Cirrus Link Solutions and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Cirrus Link Solutions - initial implementation + ********************************************************************************/ + +package org.eclipse.tahu.edge; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.eclipse.tahu.message.model.Metric; +import org.eclipse.tahu.message.model.MetricDataType; +import org.eclipse.tahu.message.model.SparkplugDescriptor; +import org.eclipse.tahu.message.model.Template; +import org.eclipse.tahu.model.MetricDataTypeMap; +import org.eclipse.tahu.model.MetricMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EdgeNodeMetricMaps { + + private static Logger logger = LoggerFactory.getLogger(EdgeNodeMetricMaps.class.getName()); + + private static Map instances; + + private final Map> allEdgeNodeMetricMaps; + + private final Object mapLock = new Object(); + + public static EdgeNodeMetricMaps getInstance(String agentName) { + if (instances == null) { + instances = new ConcurrentHashMap<>(); + } + if (instances.get(agentName) == null) { + instances.put(agentName, new EdgeNodeMetricMaps()); + } + return instances.get(agentName); + } + + private EdgeNodeMetricMaps() { + allEdgeNodeMetricMaps = new ConcurrentHashMap<>(); + } + + public void addMetric(EdgeNodeDescriptor edgeNodeDescriptor, SparkplugDescriptor sparkplugDescriptor, + String metricName, Metric metric) { + synchronized (mapLock) { + Map edgeNodeMetricMaps = + allEdgeNodeMetricMaps.computeIfAbsent(edgeNodeDescriptor, (k) -> new ConcurrentHashMap<>()); + MetricMap metricMap = edgeNodeMetricMaps.computeIfAbsent(sparkplugDescriptor, (k) -> new MetricMap()); + metricMap.addAlias(metricName, metric.getAlias(), metric.getDataType()); + + if (metric.getDataType() == MetricDataType.Template && metric.getValue() != null + && Template.class.isAssignableFrom(metric.getValue().getClass())) { + Template template = (Template) metric.getValue(); + for (Metric childMetric : template.getMetrics()) { + addMetric(edgeNodeDescriptor, sparkplugDescriptor, metricName + "/" + childMetric.getName(), + childMetric); + } + } + } + } + + public void clear() { + synchronized (mapLock) { + allEdgeNodeMetricMaps.clear(); + } + } + + public Long getAlias(EdgeNodeDescriptor edgeNodeDescriptor, SparkplugDescriptor sparkplugDescriptor, + String metricName) { + Map edgeNodeMetricMaps = allEdgeNodeMetricMaps.get(edgeNodeDescriptor); + if (edgeNodeMetricMaps != null) { + MetricMap metricMap = edgeNodeMetricMaps.get(sparkplugDescriptor); + if (metricMap != null) { + return metricMap.getAlias(metricName); + } else { + return null; + } + } else { + return null; + } + } + + public String getMetricName(EdgeNodeDescriptor edgeNodeDescriptor, SparkplugDescriptor sparkplugDescriptor, + long alias) { + Map edgeNodeMetricMaps = allEdgeNodeMetricMaps.get(edgeNodeDescriptor); + if (edgeNodeMetricMaps != null) { + MetricMap metricMap = edgeNodeMetricMaps.get(sparkplugDescriptor); + if (metricMap != null) { + return metricMap.getMetricName(alias); + } else { + return null; + } + } else { + return null; + } + } + + public boolean aliasExists(EdgeNodeDescriptor edgeNodeDescriptor, SparkplugDescriptor sparkplugDescriptor, + long alias) { + Map edgeNodeMetricMaps = allEdgeNodeMetricMaps.get(edgeNodeDescriptor); + if (edgeNodeMetricMaps != null && edgeNodeMetricMaps.get(sparkplugDescriptor) != null) { + MetricMap metricMap = edgeNodeMetricMaps.get(sparkplugDescriptor); + if (metricMap != null && metricMap.getMetricName(alias) != null) { + return true; + } else { + return false; + } + } else { + return false; + } + } + + public MetricDataTypeMap getMetricDataTypeMap(EdgeNodeDescriptor edgeNodeDescriptor, + SparkplugDescriptor sparkplugDescriptor) { + Map edgeNodeMetricMaps = allEdgeNodeMetricMaps.get(edgeNodeDescriptor); + if (edgeNodeMetricMaps != null && edgeNodeMetricMaps.get(sparkplugDescriptor) != null) { + return edgeNodeMetricMaps.get(sparkplugDescriptor).getMetricDataTypeMap(); + } else { + return null; + } + } + + public MetricDataType getDataType(EdgeNodeDescriptor edgeNodeDescriptor, SparkplugDescriptor sparkplugDescriptor, + String metricName) { + Map edgeNodeMetricMaps = allEdgeNodeMetricMaps.get(edgeNodeDescriptor); + if (edgeNodeMetricMaps != null && edgeNodeMetricMaps.get(sparkplugDescriptor) != null) { + return edgeNodeMetricMaps.get(sparkplugDescriptor).getMetricDataType(metricName); + } else { + return null; + } + } + + public MetricDataType getDataType(EdgeNodeDescriptor edgeNodeDescriptor, SparkplugDescriptor sparkplugDescriptor, + Long alias) { + Map edgeNodeMetricMaps = allEdgeNodeMetricMaps.get(edgeNodeDescriptor); + if (edgeNodeMetricMaps != null && edgeNodeMetricMaps.get(sparkplugDescriptor) != null) { + return edgeNodeMetricMaps.get(sparkplugDescriptor).getMetricDataType(alias); + } else { + return null; + } + } +} From 44c699ad217e7d565281a72b5b699679e6222585 Mon Sep 17 00:00:00 2001 From: wes-johnson Date: Wed, 11 Oct 2023 14:56:23 -0700 Subject: [PATCH 6/9] Added empty string check on Host ID in the HostApplication --- .../src/main/java/org/eclipse/tahu/host/HostApplication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java b/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java index 8d5ddb3e..ee0acdba 100644 --- a/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java +++ b/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java @@ -74,7 +74,7 @@ public HostApplication(HostApplicationEventHandler eventHandler, String hostId, RandomStartupDelay randomStartupDelay) { logger.info("Creating the Host Application"); - if (hostId != null) { + if (hostId != null && !hostId.trim().isEmpty()) { this.hostId = hostId; this.stateTopic = SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_PREFIX + "/" + hostId; } else { From 1c6df23afcd2aee1cbc46012fa2d6eebfb79ad9f Mon Sep 17 00:00:00 2001 From: Ilya Binshtok Date: Mon, 16 Oct 2023 14:49:51 -0500 Subject: [PATCH 7/9] [IMM-3853][IMM-3863] Improved logging for unsupported PropertyDataType and MetricDataType. --- .../org/eclipse/tahu/message/SparkplugBPayloadEncoder.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/java/lib/core/src/main/java/org/eclipse/tahu/message/SparkplugBPayloadEncoder.java b/java/lib/core/src/main/java/org/eclipse/tahu/message/SparkplugBPayloadEncoder.java index d2ebbdeb..45be9eee 100644 --- a/java/lib/core/src/main/java/org/eclipse/tahu/message/SparkplugBPayloadEncoder.java +++ b/java/lib/core/src/main/java/org/eclipse/tahu/message/SparkplugBPayloadEncoder.java @@ -244,7 +244,7 @@ private SparkplugBProto.Payload.PropertySet.Builder convertPropertySet(PropertyS break; case Unknown: default: - logger.error("Unknown PropertyDataType: {}", value.getType()); + logger.error("Unsupported PropertyDataType: '{}' for the '{}' property", value.getType(), key); throw new Exception("Failed to convert value " + value.getType()); } } @@ -767,7 +767,8 @@ private SparkplugBProto.Payload.Metric.Builder setMetricValue(SparkplugBProto.Pa break; case Unknown: default: - logger.error("Unsupported MetricDataType: {}", metric.getDataType()); + logger.error("Unsupported MetricDataType: {} for the {} metric", metric.getDataType(), + metric.getName()); throw new Exception("Failed to encode"); } From 99d8c65d8f9582e1c57add8ea1fbe379e3e01f61 Mon Sep 17 00:00:00 2001 From: wes-johnson Date: Mon, 16 Oct 2023 13:32:43 -0700 Subject: [PATCH 8/9] Ensured STATE status is only updated for the appropriate Sparkplug Host ID --- .../main/java/org/eclipse/tahu/host/HostApplication.java | 3 ++- .../java/org/eclipse/tahu/host/TahuHostCallback.java | 9 +++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java b/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java index ee0acdba..aba2bea6 100644 --- a/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java +++ b/java/lib/host/src/main/java/org/eclipse/tahu/host/HostApplication.java @@ -66,7 +66,8 @@ public HostApplication(HostApplicationEventHandler eventHandler, String hostId, SequenceReorderManager sequenceReorderManager = SequenceReorderManager.getInstance(); sequenceReorderManager.init(eventHandler, this, payloadDecoder, 5000L); - this.tahuHostCallback = new TahuHostCallback(eventHandler, this, sequenceReorderManager, payloadDecoder); + this.tahuHostCallback = + new TahuHostCallback(eventHandler, this, sequenceReorderManager, payloadDecoder, hostId); } public HostApplication(HostApplicationEventHandler eventHandler, String hostId, List sparkplugSubscriptons, diff --git a/java/lib/host/src/main/java/org/eclipse/tahu/host/TahuHostCallback.java b/java/lib/host/src/main/java/org/eclipse/tahu/host/TahuHostCallback.java index 2542e2c1..bc1905d6 100644 --- a/java/lib/host/src/main/java/org/eclipse/tahu/host/TahuHostCallback.java +++ b/java/lib/host/src/main/java/org/eclipse/tahu/host/TahuHostCallback.java @@ -59,8 +59,11 @@ public class TahuHostCallback implements ClientCallback { private final PayloadDecoder payloadDecoder; + private final String hostId; + public TahuHostCallback(HostApplicationEventHandler eventHandler, CommandPublisher commandPublisher, - SequenceReorderManager sequenceReorderManager, PayloadDecoder payloadDecoder) { + SequenceReorderManager sequenceReorderManager, PayloadDecoder payloadDecoder, + String hostId) { this.eventHandler = eventHandler; this.commandPublisher = commandPublisher; if (sequenceReorderManager != null) { @@ -72,6 +75,7 @@ public TahuHostCallback(HostApplicationEventHandler eventHandler, CommandPublish this.sequenceReorderManager = null; } this.payloadDecoder = payloadDecoder; + this.hostId = hostId; this.sparkplugBExecutors = new ThreadPoolExecutor[DEFAULT_NUM_OF_THREADS]; for (int i = 0; i < DEFAULT_NUM_OF_THREADS; i++) { @@ -137,7 +141,8 @@ public void messageArrived(MqttServerName server, MqttServerUrl url, MqttClientI // This is a STATE message - handle as needed ObjectMapper mapper = new ObjectMapper(); StatePayload statePayload = mapper.readValue(new String(message.getPayload()), StatePayload.class); - if (!statePayload.isOnline()) { + if (hostId != null && !hostId.trim().isEmpty() && splitTopic[2].equals(hostId) + && !statePayload.isOnline()) { // Make sure this isn't an OFFLINE message logger.info( "This is a offline STATE message from {} - correcting with new online STATE message", From 4fb1c5527fe1950941632096c8904a13cdf0925d Mon Sep 17 00:00:00 2001 From: Chad Kienle Date: Mon, 23 Oct 2023 14:02:55 -0700 Subject: [PATCH 9/9] Upticked pom versions to v1.0.6 for release Signed-off-by: Chad Kienle --- java/compat_impl/edge/pom.xml | 2 +- java/compat_impl/host/pom.xml | 2 +- java/examples/device_timestamp/pom.xml | 2 +- java/examples/edge_node_control/pom.xml | 2 +- java/examples/host_file/pom.xml | 2 +- java/examples/listener/pom.xml | 2 +- java/examples/pom.xml | 2 +- java/examples/raspberry_pi/pom.xml | 2 +- java/examples/records/pom.xml | 2 +- java/examples/simple/pom.xml | 2 +- java/examples/udt/pom.xml | 2 +- java/lib/core/pom.xml | 2 +- java/lib/edge/pom.xml | 2 +- java/lib/host/pom.xml | 2 +- java/pom.xml | 2 +- 15 files changed, 15 insertions(+), 15 deletions(-) diff --git a/java/compat_impl/edge/pom.xml b/java/compat_impl/edge/pom.xml index 5b085113..98c03baf 100644 --- a/java/compat_impl/edge/pom.xml +++ b/java/compat_impl/edge/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.6-SNAPSHOT + 1.0.6 ../../pom.xml diff --git a/java/compat_impl/host/pom.xml b/java/compat_impl/host/pom.xml index 9a3bc78b..9b89c868 100644 --- a/java/compat_impl/host/pom.xml +++ b/java/compat_impl/host/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.6-SNAPSHOT + 1.0.6 ../../pom.xml diff --git a/java/examples/device_timestamp/pom.xml b/java/examples/device_timestamp/pom.xml index 16293f44..9596505f 100644 --- a/java/examples/device_timestamp/pom.xml +++ b/java/examples/device_timestamp/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/examples/edge_node_control/pom.xml b/java/examples/edge_node_control/pom.xml index d3551f61..a48b8e6d 100644 --- a/java/examples/edge_node_control/pom.xml +++ b/java/examples/edge_node_control/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/examples/host_file/pom.xml b/java/examples/host_file/pom.xml index 17a2a11a..68d8666e 100644 --- a/java/examples/host_file/pom.xml +++ b/java/examples/host_file/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/examples/listener/pom.xml b/java/examples/listener/pom.xml index 55da4398..7b80fe58 100644 --- a/java/examples/listener/pom.xml +++ b/java/examples/listener/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/examples/pom.xml b/java/examples/pom.xml index 3e1519d0..ab2b0ea3 100644 --- a/java/examples/pom.xml +++ b/java/examples/pom.xml @@ -29,7 +29,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 pom Eclipse Tahu diff --git a/java/examples/raspberry_pi/pom.xml b/java/examples/raspberry_pi/pom.xml index 6da58429..d30b5a69 100644 --- a/java/examples/raspberry_pi/pom.xml +++ b/java/examples/raspberry_pi/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/examples/records/pom.xml b/java/examples/records/pom.xml index 279b8eaf..09b3ff77 100644 --- a/java/examples/records/pom.xml +++ b/java/examples/records/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/examples/simple/pom.xml b/java/examples/simple/pom.xml index 19faa95c..ad92bf36 100644 --- a/java/examples/simple/pom.xml +++ b/java/examples/simple/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/examples/udt/pom.xml b/java/examples/udt/pom.xml index 4e24f0af..7162483c 100644 --- a/java/examples/udt/pom.xml +++ b/java/examples/udt/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu-examples - 1.0.6-SNAPSHOT + 1.0.6 ../pom.xml diff --git a/java/lib/core/pom.xml b/java/lib/core/pom.xml index 4213fbd5..9e35082d 100644 --- a/java/lib/core/pom.xml +++ b/java/lib/core/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.6-SNAPSHOT + 1.0.6 ../../pom.xml diff --git a/java/lib/edge/pom.xml b/java/lib/edge/pom.xml index b7314bcc..dc03ad67 100644 --- a/java/lib/edge/pom.xml +++ b/java/lib/edge/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.6-SNAPSHOT + 1.0.6 ../../pom.xml diff --git a/java/lib/host/pom.xml b/java/lib/host/pom.xml index 5cbe103f..b6f775d3 100644 --- a/java/lib/host/pom.xml +++ b/java/lib/host/pom.xml @@ -18,7 +18,7 @@ org.eclipse.tahu tahu - 1.0.6-SNAPSHOT + 1.0.6 ../../pom.xml diff --git a/java/pom.xml b/java/pom.xml index bce0af22..cda51ca3 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -31,7 +31,7 @@ org.eclipse.tahu tahu - 1.0.6-SNAPSHOT + 1.0.6 pom Eclipse Tahu