Skip to content

Commit

Permalink
Improve message dispatching & small refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
jjj-vtm authored Dec 18, 2024
1 parent 053eef5 commit a6af717
Showing 1 changed file with 53 additions and 40 deletions.
93 changes: 53 additions & 40 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl UPClientMqtt {
tokio::spawn(async move {
while let Some(msg_opt) = message_stream.next().await {
let Some(msg) = msg_opt else {
//TODO: None means that the connection is dropped. This should be handled correctly.
trace!("Received empty message from stream.");
continue;
};
Expand All @@ -354,58 +355,70 @@ impl UPClientMqtt {
.get_int(mqtt::PropertyCode::SubscriptionIdentifier);

// Get attributes from mqtt header.
let uattributes = {
match UPClientMqtt::get_uattributes_from_mqtt_properties(msg.properties()) {
Ok(uattributes) => uattributes,
Err(e) => {
warn!("Unable to get UAttributes from mqtt properties: {}", e);
continue;
let umessage = if msg.properties().is_empty() {
protobuf::Message::parse_from_bytes(msg.payload()).unwrap()
} else {
let uattributes = {
match UPClientMqtt::get_uattributes_from_mqtt_properties(msg.properties()) {
Ok(uattributes) => uattributes,
Err(e) => {
warn!("Unable to get UAttributes from mqtt properties: {}", e);
continue;
}
}
};
UMessage {
attributes: Some(uattributes).into(),
payload: Some(Bytes::copy_from_slice(msg.payload())),
..Default::default()
}
};

let payload = msg.payload();
let upayload = payload.to_vec();

// Create UMessage from UAttributes and UPayload.
let umessage = UMessage {
attributes: Some(uattributes).into(),
payload: Some(upayload.into()),
..Default::default()
};

let topic_map_read = topic_map.read().await;
let subscription_map_read = subscription_map.read().await;

// If subscription ID is present, only notify listeners for that subscription.
if let Some(sub_id) = sub_id {
let Some(sub_topic) = subscription_map_read.get(&sub_id) else {
trace!(
"Received message with subscription id that is not registered: {}",
sub_id
);
continue;
let listeners = if let Some(sub_id) = sub_id {
let sub_topic = {
let subscription_map_read = subscription_map.read().await;

let Some(sub_topic) = subscription_map_read.get(&sub_id) else {
trace!(
"Received message with subscription id that is not registered: {}",
sub_id
);
continue;
};
sub_topic.clone()
};

let Some(listeners) = topic_map_read.get(sub_topic) else {
trace!("No listeners registered for topic: {}", sub_topic);
continue;
let owned_listeners = {
let topic_map_read = topic_map.read().await;

let Some(listeners) = topic_map_read.get(&sub_topic) else {
trace!("No listeners registered for topic: {}", sub_topic);
continue;
};
listeners.clone()
};

for listener in listeners.iter() {
listener.on_receive(umessage.clone()).await;
}
owned_listeners
} else {
// Filter the topic map for topics that match the received topic, including wildcards.
let topics_iter = topic_map_read
.iter()
.filter(|(key, _)| UPClientMqtt::compare_topic(topic, key));
let listeners = {
let topic_map_read = topic_map.read().await;

topic_map_read
.iter()
.filter(|(key, _)| UPClientMqtt::compare_topic(topic, key))
.flat_map(|(_topic, listener)| listener.to_owned())
.collect()
};
listeners
};

for (_topic, listeners) in topics_iter {
for listener in listeners.iter() {
listener.on_receive(umessage.clone()).await;
}
}
for listener in listeners {
let msg = umessage.clone();
tokio::spawn(async move {
listener.on_receive(msg.clone()).await;
});
}
}
})
Expand Down

0 comments on commit a6af717

Please sign in to comment.