Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
ckienle committed Oct 23, 2023
2 parents d6552c9 + 4fb1c55 commit dbda948
Show file tree
Hide file tree
Showing 29 changed files with 376 additions and 90 deletions.
2 changes: 1 addition & 1 deletion java/compat_impl/edge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/compat_impl/host/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.tahu.host;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.eclipse.tahu.exception.TahuException;
Expand All @@ -24,6 +25,7 @@
import org.eclipse.tahu.message.model.Message;
import org.eclipse.tahu.message.model.Metric;
import org.eclipse.tahu.message.model.SparkplugDescriptor;
import org.eclipse.tahu.message.model.SparkplugMeta;
import org.eclipse.tahu.model.MqttServerDefinition;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
Expand Down Expand Up @@ -97,8 +99,9 @@ public static void main(String[] arg) {

public SparkplugHostApplication() {
try {
hostApplication =
new HostApplication(this, HOST_ID, mqttServerDefinitions, null, new SparkplugBPayloadDecoder());
hostApplication = new HostApplication(this, HOST_ID,
new ArrayList<>(Arrays.asList(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX + "/#")),
mqttServerDefinitions, null, new SparkplugBPayloadDecoder());
} catch (Exception e) {
logger.error("Failed to create the HostApplication", e);
}
Expand Down
2 changes: 1 addition & 1 deletion java/examples/device_timestamp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/examples/edge_node_control/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/examples/host_file/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/examples/listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<packaging>pom</packaging>

<name>Eclipse Tahu</name>
Expand Down
2 changes: 1 addition & 1 deletion java/examples/raspberry_pi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/examples/records/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/examples/simple/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/examples/udt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu-examples</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion java/lib/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private SparkplugBProto.Payload.PropertySet.Builder convertPropertySet(PropertyS
break;
case Unknown:
default:
logger.error("Unknown PropertyDataType: {}", value.getType());
logger.error("Unsupported PropertyDataType: '{}' for the '{}' property", value.getType(), key);
throw new Exception("Failed to convert value " + value.getType());
}
}
Expand Down Expand Up @@ -767,7 +767,8 @@ private SparkplugBProto.Payload.Metric.Builder setMetricValue(SparkplugBProto.Pa
break;
case Unknown:
default:
logger.error("Unsupported MetricDataType: {}", metric.getDataType());
logger.error("Unsupported MetricDataType: {} for the {} metric", metric.getDataType(),
metric.getName());
throw new Exception("Failed to encode");

}
Expand Down
86 changes: 55 additions & 31 deletions java/lib/core/src/main/java/org/eclipse/tahu/mqtt/TahuClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,8 @@ public int subscribe(String topic, int qos) throws TahuException {
if (client != null) {
if (client.isConnected()) {
try {
logger.debug("{}: on connection to {} - Attempting to subscribe on topic {} with QoS={}",
getClientId(), getMqttServerName(), topic, qos);
logger.debug("{}: server {} - Attempting to subscribe on topic {} with QoS={}", getClientId(),
getMqttServerName(), topic, qos);
IMqttToken token = client.subscribe(topic, qos);
logger.trace("{}: Waiting for subscription on {}", getClientId(), topic);
token.waitForCompletion();
Expand All @@ -498,14 +498,14 @@ public int subscribe(String topic, int qos) throws TahuException {
if (grantedQos != null && grantedQos.length == 1) {
return grantedQos[0];
} else {
String errorMessage = getClientId() + ": on connection to " + getMqttServerName()
String errorMessage = getClientId() + ": server " + getMqttServerName()
+ " - Failed to subscribe to " + topic;
logger.error(errorMessage);
throw new TahuException(TahuErrorCode.NOT_AUTHORIZED, errorMessage);
}
} catch (MqttException e) {
logger.error(getClientId() + ": on connection to " + getMqttServerName()
+ " - Failed to subscribe to " + topic);
logger.error(getClientId() + ": server " + getMqttServerName() + " - Failed to subscribe to "
+ topic);
throw new TahuException(TahuErrorCode.INTERNAL_ERROR, e);
}
}
Expand Down Expand Up @@ -593,6 +593,11 @@ public void unsubscribe(String topic) throws TahuException {
@Override
public void connectionLost(Throwable cause) {
logger.debug("{}: MQTT connectionLost() to {} :: {}", getClientId(), getMqttServerName(), getMqttServerUrl());
if (logger.isTraceEnabled()) {
if (client != null) {
client.getDebug().dumpClientDebug();
}
}

// reset the timers if needed
if (getDisconnectTime() == null) {
Expand Down Expand Up @@ -1249,37 +1254,56 @@ public void connectComplete(boolean reconnect, String serverURI) {

String topicStr = Arrays.toString(topics);
String qosStr = Arrays.toString(qosLevels);
logger.debug("{}: on connection to {} - Attempting to subscribe on topic {} with QoS={}",
getClientId(), getMqttServerName(), topicStr, qosStr);
logger.debug("{}: server {} - Attempting to subscribe on topic {} with QoS={}", getClientId(),
getMqttServerName(), topicStr, qosStr);
try {
IMqttToken token = client.subscribe(topics, qosLevels);
logger.trace("{}: Waiting for subscription on {}", getClientId(), topicStr);
token.waitForCompletion();
logger.trace("{}: Done waiting for subscription on {}", getClientId(), topicStr);
int[] grantedQos = token.getGrantedQos();
if (Arrays.equals(qosLevels, grantedQos)) {
logger.debug("{}: on connection to {} - Successfully subscribed on {} on QoS={}",
getClientId(), getMqttServerName(), topicStr, qosStr);
} else {
try {
logger.error("{}: on connection to {} - Failed to subscribe on {} - forcing disconnect",
getClientId(), getMqttServerName(), topicStr);
client.subscribe(topics, qosLevels, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
int[] grantedQos = asyncActionToken.getGrantedQos();
if (Arrays.equals(qosLevels, grantedQos)) {
logger.debug("{}: server {} - Successfully subscribed on {} on QoS={}",
getClientId(), getMqttServerName(), topicStr, qosStr);
} else {
try {
String grantedQosStr = Arrays.toString(grantedQos);
logger.error("{}: server {} - Failed subscribe on {} granted QoS {} != {}",
getClientId(), getMqttServerName(), topicStr, qosStr, grantedQosStr);

// FIXME - remove This sleep is necessary due to:
// https://github.com/eclipse/paho.mqtt.java/issues/850
Thread.sleep(1000);
// FIXME - remove This sleep is necessary due to:
// https://github.com/eclipse/paho.mqtt.java/issues/850
Thread.sleep(1000);

// Force the disconnect and return
client.disconnectForcibly(0, 1, false);
return;
} catch (Exception e) {
logger.error("{}: on connection to {} - Failed to disconnect on failed subscription",
getClientId(), getMqttServerName(), e);
break;
synchronized (clientLock) {
// Force the disconnect and return
client.disconnectForcibly(0, 1, false);
}
return;
} catch (Exception e) {
logger.error(
"{}: server {} - Failed disconnect on failed subscribe granted QoS",
getClientId(), getMqttServerName(), e);
}
}
}
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
synchronized (clientLock) {
try {
logger.error("{}: server {} - Failed to subscribe on {}",
getClientId(), getMqttServerName(), topicStr);
client.disconnectForcibly(0, 1, false);
} catch (MqttException e) {
logger.error("{}: server {} - Failed disconnect on failed subscribe",
getClientId(), getMqttServerName(), e);
}
}
}

});
} catch (MqttException e) {
logger.error("{}: on connection to {} - Failed to subscribe on {} with QoS={}", getClientId(),
logger.error("{}: server {} - Failed to subscribe on {} with QoS={}", getClientId(),
getMqttServerName(), topicStr, qosStr, e);
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed Materials - Property of Cirrus Link Solutions
* Copyright (c) 2022 Cirrus Link Solutions LLC - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited
* Proprietary and confidential
*/
package org.eclipse.tahu.test;

import static org.assertj.core.api.Assertions.fail;

import java.util.Date;

import org.eclipse.tahu.SparkplugInvalidTypeException;
import org.eclipse.tahu.message.PayloadDecoder;
import org.eclipse.tahu.message.SparkplugBPayloadDecoder;
import org.eclipse.tahu.message.SparkplugBPayloadEncoder;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.SparkplugBPayload.SparkplugBPayloadBuilder;
import org.testng.annotations.Test;

/**
* Sparkplug Test class for encoding and decoding sparkplug payloads
*/
public class SequenceTest {

public SequenceTest() {
}

@Test
public void testEnDeCode() throws SparkplugInvalidTypeException {
unit(null);
unit(0L);
unit(1L);
}

private void unit(Long seq) {
Date currentTime = new Date(0L);

// Encode
SparkplugBPayloadEncoder encoder = new SparkplugBPayloadEncoder();
PayloadDecoder<SparkplugBPayload> decoder = new SparkplugBPayloadDecoder();
try {
SparkplugBPayload initialPayload =
new SparkplugBPayloadBuilder().setTimestamp(currentTime).setSeq(seq).createPayload();
byte[] bytes = encoder.getBytes(initialPayload, false);
SparkplugBPayload decodedPayload = decoder.buildFromByteArray(bytes, null);
System.out.println("Initial: " + initialPayload);
System.out.println(seq + ": " + bytesToHex(bytes));
System.out.println("Decoded: " + decodedPayload);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private final static char[] hexArray = "0123456789ABCDEF".toCharArray();

private static String bytesToHex(byte[] bytes) {
char[] hexChars = new char[bytes.length * 3];
int v;
for (int j = 0; j < bytes.length; j++) {
v = bytes[j] & 0xFF;
hexChars[j * 3] = hexArray[v >>> 4];
hexChars[j * 3 + 1] = hexArray[v & 0x0F];
hexChars[j * 3 + 2] = 0x20; // space separator
}
return new String(hexChars);
}
}
2 changes: 1 addition & 1 deletion java/lib/edge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.eclipse.tahu</groupId>
<artifactId>tahu</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Loading

0 comments on commit dbda948

Please sign in to comment.