Skip to content

Commit

Permalink
Implement UTransport for MQTT v5 (#4)
Browse files Browse the repository at this point in the history
* Initial impl of send func

* Better implementation of send message, still missing topic split

* Initial update to 1.5.8 spec

* Fix clippy warnings

* Update cargo to reflect current rust toolchain version

* Address some PR comments

* ran cargo fmt

* First step towards removing hard coded protobuf field number values

* Update properties > attributes fn and add more tests

* Fix clippy warnings

* Add unit tests for lib

* Fix lib tests, add skeleton for transport tests

* add remaining tests, add examples, update readme

* Fix library to properly handle wildcard subscriptions without duplication of messages

* remove dead_code macro from upclient implementation

* resolved remaining unwraps that could lead to panics

* fixed clippy warnings, updated rust version to address test pipeline error

* Remove unecessary println statements

* Add reference to examples to show how to use UPClientMqtt

* Fix free subscription id bug

* Use channels rather than blocking callback thread for handling listeners

* removed async-std from cargo.toml

* remove subscriber_example comment

* refactored code to remove indentations and other minor improvements

* added newlines to test_cases to try to make inputs more readable

* added check to confirm when test is expected to hit an error

* Update up-rust to 0.1.0

* Fix tests and add uattributes version to mqtt props

* cargo fmt fix

* Improve logging for mqtt props to attributes
  • Loading branch information
devkelley authored Oct 4, 2024
1 parent 1bc487f commit 8af1a74
Show file tree
Hide file tree
Showing 7 changed files with 2,193 additions and 29 deletions.
13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,25 @@ license = "Apache-2.0"
name = "up-client-mqtt5-rust"
readme = "README.md"
repository = "https://github.com/eclipse-uprotocol/up-client-mqtt5-rust"
rust-version = "1.72"
rust-version = "1.76"
version = "0.1.0"

[dependencies]
async-channel = {version = "1.6" }
async-trait = { version = "0.1" }
bytes = { version = "1.5" }
paho-mqtt = { version = "0.12.1" }
env_logger = { version = "0.10" }
futures = { version = "0.3" }
log = { version = "^0.4" }
paho-mqtt = { version = "0.12.3", features = ["vendored-ssl"] }
protobuf = { version = "3.3" }
rand = { version = "0.8" }
regex = { version = "1.10" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", version = "0.1.5" }
tokio = { version = "1.38", features = ["full"] }
tokio-macros = { version = "2.3" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "da722852004f657fa8d0282369fcfccb0ccda112" }
url = { version = "2.5" }
uuid = { version = "1.7", features = ["v8"] }

Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,35 @@ This library implements a uTransport client for MQTT5 in Rust following the uPro

To build the library, run `cargo build` in the project root directory. Tests can be run with `cargo test`. This library leverages the [up-rust](https://github.com/eclipse-uprotocol/up-rust/tree/main) library for data types and models specified by uProtocol.

### Running the Tests

To run the tests from the repo root directory, run
```bash
cargo test
```

### Running the Examples

First, ensure you have a local MQTT broker running, such as [Mosquitto](https://github.com/eclipse/mosquitto).

Then start the following two examples from your repo root directory.

```bash
cargo run --example publisher_example
```

```bash
cargo run --example subscriber_example
```

This shows an example of a UPMqttClient publishing from one device and a UPMqttClient subscribing to the publishing device to receive data.

### Using the Library

The library contains the following modules:

Package | [uProtocol spec](https://github.com/eclipse-uprotocol/uprotocol-spec) | Purpose
---|---|---
transport | [uP-L1 Specifications](https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l1/README.adoc) | Implementation of MQTT5 uTransport client used for bidirectional point-2-point communication between uEs.

Please refer to the [publisher_example](/examples/publisher_example.rs) and [subscriber_example](/examples/subscriber_example.rs) examples to see how to initialize and use the [UPClientMqtt](/src/transport.rs) client.
61 changes: 61 additions & 0 deletions examples/publisher_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/********************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{str::FromStr, time::SystemTime};

use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType};
use up_rust::{UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri, UUID};

#[tokio::main]
async fn main() -> Result<(), UStatus> {
let config = MqttConfig {
mqtt_hostname: "localhost".to_string(),
mqtt_port: "1883".to_string(),
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: None,
};

let client = UPClientMqtt::new(
config,
UUID::build(),
"Vehicle_B".to_string(),
UPClientMqttType::Device,
)
.await?;

let source =
UUri::from_str("//Vehicle_B/A8000/2/8A50").expect("Failed to create source filter");

loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let message = UMessageBuilder::publish(source.clone())
.build_with_payload(
current_time.to_string(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.expect("Failed to build message");

println!(
"Sending message: {} to source: {}",
current_time,
source.to_uri(false)
);
client.send(message).await?;
}
}
72 changes: 72 additions & 0 deletions examples/subscriber_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/********************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{
str::{self, FromStr},
sync::Arc,
};

use async_trait::async_trait;
use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID};

const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF;
const WILDCARD_ENTITY_VERSION: u32 = 0x0000_00FF;
const WILDCARD_RESOURCE_ID: u32 = 0x0000_FFFF;

struct PrintlnListener {}

#[async_trait]
impl UListener for PrintlnListener {
async fn on_receive(&self, message: UMessage) {
let msg_payload = message.payload.unwrap();
let msg_str: &str = str::from_utf8(&msg_payload).unwrap();
println!("Received message: {msg_str}");
}
}

#[tokio::main]
async fn main() -> Result<(), UStatus> {
let config = MqttConfig {
mqtt_hostname: "localhost".to_string(),
mqtt_port: "1883".to_string(),
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: None,
};

let client = UPClientMqtt::new(
config,
UUID::build(),
"Vehicle_A".to_string(),
UPClientMqttType::Device,
)
.await?;

let listener = Arc::new(PrintlnListener {});
let source_filter = UUri::from_str(&format!(
"//Vehicle_B/{WILDCARD_ENTITY_ID:X}/{WILDCARD_ENTITY_VERSION:X}/{WILDCARD_RESOURCE_ID:X}"
))
.expect("Failed to create source filter");

println!("Subscribing to: {}", source_filter.to_uri(false));

client
.register_listener(&source_filter, None, listener.clone())
.await?;

loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.76"
channel = "1.77"
Loading

0 comments on commit 8af1a74

Please sign in to comment.