From a6af71781f0843a205f57aa0f5499fe88f4caabb Mon Sep 17 00:00:00 2001 From: Jan Jongen Date: Wed, 18 Dec 2024 18:51:40 +0100 Subject: [PATCH] Improve message dispatching & small refactoring --- src/lib.rs | 93 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4d82a18..23fcd6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -345,6 +345,7 @@ impl UPClientMqtt { tokio::spawn(async move { while let Some(msg_opt) = message_stream.next().await { let Some(msg) = msg_opt else { + //TODO: None means that the connection is dropped. This should be handled correctly. trace!("Received empty message from stream."); continue; }; @@ -354,58 +355,70 @@ impl UPClientMqtt { .get_int(mqtt::PropertyCode::SubscriptionIdentifier); // Get attributes from mqtt header. - let uattributes = { - match UPClientMqtt::get_uattributes_from_mqtt_properties(msg.properties()) { - Ok(uattributes) => uattributes, - Err(e) => { - warn!("Unable to get UAttributes from mqtt properties: {}", e); - continue; + let umessage = if msg.properties().is_empty() { + protobuf::Message::parse_from_bytes(msg.payload()).unwrap() + } else { + let uattributes = { + match UPClientMqtt::get_uattributes_from_mqtt_properties(msg.properties()) { + Ok(uattributes) => uattributes, + Err(e) => { + warn!("Unable to get UAttributes from mqtt properties: {}", e); + continue; + } } + }; + UMessage { + attributes: Some(uattributes).into(), + payload: Some(Bytes::copy_from_slice(msg.payload())), + ..Default::default() } }; - let payload = msg.payload(); - let upayload = payload.to_vec(); - - // Create UMessage from UAttributes and UPayload. - let umessage = UMessage { - attributes: Some(uattributes).into(), - payload: Some(upayload.into()), - ..Default::default() - }; - - let topic_map_read = topic_map.read().await; - let subscription_map_read = subscription_map.read().await; - // If subscription ID is present, only notify listeners for that subscription. - if let Some(sub_id) = sub_id { - let Some(sub_topic) = subscription_map_read.get(&sub_id) else { - trace!( - "Received message with subscription id that is not registered: {}", - sub_id - ); - continue; + let listeners = if let Some(sub_id) = sub_id { + let sub_topic = { + let subscription_map_read = subscription_map.read().await; + + let Some(sub_topic) = subscription_map_read.get(&sub_id) else { + trace!( + "Received message with subscription id that is not registered: {}", + sub_id + ); + continue; + }; + sub_topic.clone() }; - let Some(listeners) = topic_map_read.get(sub_topic) else { - trace!("No listeners registered for topic: {}", sub_topic); - continue; + let owned_listeners = { + let topic_map_read = topic_map.read().await; + + let Some(listeners) = topic_map_read.get(&sub_topic) else { + trace!("No listeners registered for topic: {}", sub_topic); + continue; + }; + listeners.clone() }; - for listener in listeners.iter() { - listener.on_receive(umessage.clone()).await; - } + owned_listeners } else { // Filter the topic map for topics that match the received topic, including wildcards. - let topics_iter = topic_map_read - .iter() - .filter(|(key, _)| UPClientMqtt::compare_topic(topic, key)); + let listeners = { + let topic_map_read = topic_map.read().await; + + topic_map_read + .iter() + .filter(|(key, _)| UPClientMqtt::compare_topic(topic, key)) + .flat_map(|(_topic, listener)| listener.to_owned()) + .collect() + }; + listeners + }; - for (_topic, listeners) in topics_iter { - for listener in listeners.iter() { - listener.on_receive(umessage.clone()).await; - } - } + for listener in listeners { + let msg = umessage.clone(); + tokio::spawn(async move { + listener.on_receive(msg.clone()).await; + }); } } })