From 18fe342b428e65c5cbd39a060a5a5d8a6a96f86b Mon Sep 17 00:00:00 2001 From: ValMobBIllich <120391217+ValMobBIllich@users.noreply.github.com> Date: Tue, 29 Oct 2024 13:46:51 +0100 Subject: [PATCH] Enable encrypted MQTT/ connection with Azure EventGrid Enable/Disable SubscriptionIds depending on broker capabilities. Azure EventGrid does not support SubscriptionIds even though they are specified in the MQTTv5 spec. Co-authored-by: Pete LeVasseur --- .gitignore | 2 + Cargo.toml | 2 +- README.md | 17 +++- examples/encrypted_publisher_example.rs | 92 ++++++++++++++++++++ examples/encrypted_subscriber_example.rs | 105 +++++++++++++++++++++++ examples/publisher_example.rs | 22 ++++- examples/subscriber_example.rs | 24 +++++- src/lib.rs | 65 +++++++++++--- src/transport.rs | 13 +-- 9 files changed, 309 insertions(+), 33 deletions(-) create mode 100644 examples/encrypted_publisher_example.rs create mode 100644 examples/encrypted_subscriber_example.rs diff --git a/.gitignore b/.gitignore index dd87ceb..94cbff5 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ target/ # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html Cargo.lock +.cargo/ + # These are backup files generated by rustfmt **/*.rs.bk diff --git a/Cargo.toml b/Cargo.toml index 078e966..b03c5d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } tokio = { version = "1.38", features = ["full"] } tokio-macros = { version = "2.3" } -up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "da722852004f657fa8d0282369fcfccb0ccda112" } +up-rust = "0.2.0" url = { version = "2.5" } uuid = { version = "1.7", features = ["v8"] } diff --git a/README.md b/README.md index 3faf04f..1b1cc4b 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,22 @@ cargo test ### Running the Examples -First, ensure you have a local MQTT broker running, such as [Mosquitto](https://github.com/eclipse/mosquitto). +1. Start an MQTT broker (e.g. mosquitto) + +2. Set up your environment (for example with a config file at .cargo/config.toml) + +Make sure to set these parameters: +```toml +[env] +MQTT_PROTOCOL = "'mqtt' or 'mqtts'" +MQTT_PORT = "8883 for ssl encrypted mqtt" +MQTT_HOSTNAME = "the hostname/ url of the broker" +KEY_STORE = "the .pem file location corresponding to an ssl certificate (if using mqtts)" +PRIVATE_KEY_PW = "the password to the .pem file (if using mqtts)" +CLIENT_NAME = "the name of the eventgrid client (if using mqtts)" +``` -Then start the following two examples from your repo root directory. +3. Start the following two examples from your repo root directory. ```bash cargo run --example publisher_example diff --git a/examples/encrypted_publisher_example.rs b/examples/encrypted_publisher_example.rs new file mode 100644 index 0000000..de5f680 --- /dev/null +++ b/examples/encrypted_publisher_example.rs @@ -0,0 +1,92 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use std::{env, str::FromStr, time::SystemTime}; + +use env_logger::{Builder, Target}; +use log::LevelFilter; +use paho_mqtt::SslOptionsBuilder; +use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType}; +use up_rust::{UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri, UUID}; + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + Builder::new() + .target(Target::Stdout) // Logs to stdout + .filter(None, LevelFilter::Trace) // Default level + .init(); + + // Set the protocol type (Mqtts for encrypted mqtt) + let protocol = MqttProtocol::Mqtts; + + // Build the ssl options (only needed if protocol is Mqtts!) + let ssl_options = Some( + SslOptionsBuilder::new() + .key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found")) + .expect("Certificate file not found.") + .private_key_password( + env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"), + ) + .enable_server_cert_auth(false) + .finalize(), + ); + // If the mqtt broker has a specific username attached to the ssl certificate, it must be included in the config + let user_name = env::var("CLIENT_NAME") + .expect("CLIENT_NAME env variable not found") + .to_string(); + + let config = MqttConfig { + mqtt_protocol: protocol, + mqtt_hostname: env::var("MQTT_HOSTNAME") + .expect("MQTT_HOSTNAME env variable not found") + .to_string(), + mqtt_port: 8883, + max_buffered_messages: 100, + max_subscriptions: 100, + session_expiry_interval: 3600, + ssl_options, + username: user_name, + }; + + let client = UPClientMqtt::new( + config, + UUID::build(), + "Vehicle_B".to_string(), + UPClientMqttType::Device, + ) + .await?; + + let source = + UUri::from_str("//Vehicle_B/A8000/2/8A50").expect("Failed to create source filter"); + + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let current_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let message = UMessageBuilder::publish(source.clone()) + .build_with_payload( + current_time.to_string(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT, + ) + .expect("Failed to build message"); + + println!( + "Sending message: {} to source: {}", + current_time, + source.to_uri(false) + ); + client.send(message).await?; + } +} diff --git a/examples/encrypted_subscriber_example.rs b/examples/encrypted_subscriber_example.rs new file mode 100644 index 0000000..da8724d --- /dev/null +++ b/examples/encrypted_subscriber_example.rs @@ -0,0 +1,105 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use std::{ + env, + str::{self, FromStr}, + sync::Arc, +}; + +use async_trait::async_trait; +use env_logger::{Builder, Target}; +use log::LevelFilter; +use paho_mqtt::SslOptionsBuilder; +use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType}; +use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID}; + +const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF; +const WILDCARD_ENTITY_VERSION: u32 = 0x0000_00FF; +const WILDCARD_RESOURCE_ID: u32 = 0x0000_FFFF; + +struct PrintlnListener {} + +#[async_trait] +impl UListener for PrintlnListener { + async fn on_receive(&self, message: UMessage) { + let msg_payload = message.payload.unwrap(); + let msg_str: &str = str::from_utf8(&msg_payload).unwrap(); + println!("Received message: {msg_str}"); + } +} + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + Builder::new() + .target(Target::Stdout) // Logs to stdout + .filter(None, LevelFilter::Trace) // Default level + .init(); + + // Set the protocol type ("mqtts" for encrypted mqtt) + let protocol = MqttProtocol::Mqtts; + + // Build the ssl options (only needed if protocol is Mqtts!) + let ssl_options = Some( + SslOptionsBuilder::new() + .key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found")) + .expect("Certificate file not found.") + .private_key_password( + env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"), + ) + .enable_server_cert_auth(false) + .finalize(), + ); + // If the mqtt broker has a specific username attached to the ssl certificate, it must be included in the config + let user_name = env::var("CLIENT_NAME") + .expect("CLIENT_NAME env variable not found") + .to_string(); + + // Build the configuration for the connection + let config = MqttConfig { + mqtt_protocol: protocol, + mqtt_hostname: env::var("MQTT_HOSTNAME") + .expect("MQTT_HOSTNAME env variable not found") + .to_string(), + mqtt_port: 8883, + max_buffered_messages: 100, + max_subscriptions: 100, + session_expiry_interval: 3600, + ssl_options, + username: user_name, + }; + + let client = UPClientMqtt::new( + config, + UUID::build(), + "Vehicle_B".to_string(), + UPClientMqttType::Device, + ) + .await?; + + let listener = Arc::new(PrintlnListener {}); + let source_filter = UUri::from_str(&format!( + "//Vehicle_B/{WILDCARD_ENTITY_ID:X}/{WILDCARD_ENTITY_VERSION:X}/{WILDCARD_RESOURCE_ID:X}" + )) + .expect("Failed to create source filter"); + + println!("Subscribing to: {}", source_filter.to_uri(false)); + + client + .register_listener(&source_filter, None, listener.clone()) + .await?; + + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } +} diff --git a/examples/publisher_example.rs b/examples/publisher_example.rs index bbb085d..1c43ce0 100644 --- a/examples/publisher_example.rs +++ b/examples/publisher_example.rs @@ -13,18 +13,34 @@ use std::{str::FromStr, time::SystemTime}; -use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType}; +use env_logger::{Builder, Target}; +use log::LevelFilter; +use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType}; use up_rust::{UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri, UUID}; #[tokio::main] async fn main() -> Result<(), UStatus> { + Builder::new() + .target(Target::Stdout) // Logs to stdout + .filter(None, LevelFilter::Trace) // Default level + .init(); + + // Set the protocol type ("mqtt" for unencrypted mqtt) + let protocol = MqttProtocol::Mqtt; + + // no need to build ssl options since we are using unencrypted mqtt, username is arbitrary + let ssl_options = None; + let user_name = "eclipse_testuser".to_string(); + let config = MqttConfig { + mqtt_protocol: protocol, mqtt_hostname: "localhost".to_string(), - mqtt_port: "1883".to_string(), + mqtt_port: 1883, max_buffered_messages: 100, max_subscriptions: 100, session_expiry_interval: 3600, - ssl_options: None, + ssl_options, + username: user_name, }; let client = UPClientMqtt::new( diff --git a/examples/subscriber_example.rs b/examples/subscriber_example.rs index 4f3b11c..fa0d60c 100644 --- a/examples/subscriber_example.rs +++ b/examples/subscriber_example.rs @@ -17,7 +17,9 @@ use std::{ }; use async_trait::async_trait; -use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType}; +use env_logger::{Builder, Target}; +use log::LevelFilter; +use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType}; use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID}; const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF; @@ -37,19 +39,33 @@ impl UListener for PrintlnListener { #[tokio::main] async fn main() -> Result<(), UStatus> { + Builder::new() + .target(Target::Stdout) // Logs to stdout + .filter(None, LevelFilter::Trace) // Default level + .init(); + + // Set the protocol type ("mqtt" for unencrypted mqtt) + let protocol = MqttProtocol::Mqtt; + + // no need to build ssl options since we are using unencrypted mqtt, username is arbitrary + let ssl_options = None; + let user_name = "eclipse_testuser".to_string(); + let config = MqttConfig { + mqtt_protocol: protocol, mqtt_hostname: "localhost".to_string(), - mqtt_port: "1883".to_string(), + mqtt_port: 1883, max_buffered_messages: 100, max_subscriptions: 100, session_expiry_interval: 3600, - ssl_options: None, + ssl_options, + username: user_name, }; let client = UPClientMqtt::new( config, UUID::build(), - "Vehicle_A".to_string(), + "Vehicle_B".to_string(), UPClientMqttType::Device, ) .await?; diff --git a/src/lib.rs b/src/lib.rs index a26f9fc..1e36caf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,8 +22,10 @@ use async_channel::Receiver; use async_trait::async_trait; use bytes::Bytes; use futures::stream::StreamExt; -use log::{info, trace, warn}; -use paho_mqtt::{self as mqtt, AsyncReceiver, Message, MQTT_VERSION_5, QOS_1}; +use log::{debug, info, trace, warn}; +use paho_mqtt::{ + self as mqtt, AsyncReceiver, Message, Properties, SslOptions, MQTT_VERSION_5, QOS_1, +}; use protobuf::MessageDyn; use tokio::{sync::RwLock, task::JoinHandle}; use up_rust::{ @@ -80,6 +82,22 @@ pub trait MockableMqttClient: Sync + Send { pub struct AsyncMqttClient { inner_mqtt_client: Arc, + sub_identifier_available: bool, +} + +fn check_subscription_identifier_available_in_response(props: &Properties) -> bool { + let property = props.get(paho_mqtt::PropertyCode::SubscriptionIdentifiersAvailable); + if let Some(property) = property { + let property_value = property.get_byte(); + if let Some(value) = property_value { + if value != 1 { + debug!("Subscription Identifier not supported by broker"); + return false; + } + } + } + debug!("Subscription Identifier supported by broker"); + true } // Create a set of poperties with a single Subscription ID @@ -103,10 +121,9 @@ impl MockableMqttClient for AsyncMqttClient { where Self: Sized, { - let mqtt_protocol = if config.ssl_options.is_some() { - "mqtts" - } else { - "mqtt" + let mqtt_protocol = match config.mqtt_protocol { + MqttProtocol::Mqtt => "mqtt", + MqttProtocol::Mqtts => "mqtts", }; let mqtt_uri = format!( @@ -128,22 +145,28 @@ impl MockableMqttClient for AsyncMqttClient { let message_stream = mqtt_cli.get_stream(100); - // TODO: Integrate ssl options when connecting, may need a username, etc. - let conn_opts = mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5) + let conn_opts = + mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5) .clean_start(false) .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => config.session_expiry_interval]) + .ssl_options(config.ssl_options.or_else(|| Some(SslOptions::default())).unwrap()) + .user_name(config.username) .finalize(); - mqtt_cli.connect(conn_opts).await.map_err(|e| { + let token = mqtt_cli.connect(conn_opts).await.map_err(|e| { UStatus::fail_with_code( UCode::INTERNAL, format!("Unable to connect to mqtt broker: {e:?}"), ) })?; + let sub_identifier_available = + check_subscription_identifier_available_in_response(token.properties()); + Ok(( Self { inner_mqtt_client: Arc::new(mqtt_cli), + sub_identifier_available, }, message_stream, )) @@ -173,8 +196,18 @@ impl MockableMqttClient for AsyncMqttClient { /// * `topic` - Topic to subscribe to. async fn subscribe(&self, topic: &str, id: i32) -> Result<(), UStatus> { // QOS 1 - Delivered and received at least once + let sub_id_prop = if self.sub_identifier_available { + debug!( + "Subcription identifier supported by broker. Subscribe with subscription id {}", + id + ); + Some(sub_id(id)) + } else { + None + }; + self.inner_mqtt_client - .subscribe_with_options(topic, QOS_1, None, sub_id(id)) + .subscribe_with_options(topic, QOS_1, None, sub_id_prop) .await .map_err(|e| { UStatus::fail_with_code( @@ -207,8 +240,10 @@ impl MockableMqttClient for AsyncMqttClient { /// Configuration for the mqtt client. pub struct MqttConfig { + /// Schema of the mqtt broker (mqtt or mqtts) + pub mqtt_protocol: MqttProtocol, /// Port of the mqtt broker to connect to. - pub mqtt_port: String, + pub mqtt_port: u16, /// Hostname of the mqtt broker. pub mqtt_hostname: String, /// Max buffered messages for the mqtt client. @@ -219,6 +254,8 @@ pub struct MqttConfig { pub session_expiry_interval: i32, /// Optional SSL options for the mqtt connection. pub ssl_options: Option, + /// Username + pub username: String, } /// UP Client for mqtt. @@ -245,6 +282,12 @@ pub enum UPClientMqttType { Cloud, } +/// Type of MQTT protocol +pub enum MqttProtocol { + Mqtt, + Mqtts, +} + impl UPClientMqtt { /// Create a new UPClientMqtt. /// diff --git a/src/transport.rs b/src/transport.rs index 9f47655..65e4e96 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -37,7 +37,7 @@ impl UTransport for UPClientMqtt { let topic = self.to_mqtt_topic_string(src_uri, sink_uri); // Extract payload from umessage to send - let payload = message.payload.clone(); + let payload = message.payload; self.send_message(&topic, attributes, payload).await } @@ -63,17 +63,6 @@ impl UTransport for UPClientMqtt { self.remove_listener(&topic, listener).await } - - async fn receive( - &self, - _source_filter: &UUri, - _sink_filter: Option<&UUri>, - ) -> Result { - Err(UStatus::fail_with_code( - UCode::UNIMPLEMENTED, - "This method is not implemented for mqtt. Use register_listener instead.", - )) - } } #[cfg(test)]