From c080256ee5e94f3760b77e431170505013bee982 Mon Sep 17 00:00:00 2001 From: gk-kindred <118979108+gk-kindred@users.noreply.github.com> Date: Mon, 12 Aug 2024 15:46:39 +1000 Subject: [PATCH] fix: messenger publish order and hang state (#102) * fix: ordering of on_commit kafka messages * fix: hanging of messenger due to unexpected message format * chore: use debug level logging for consumer messages --- .../src/kafka/service.rs | 51 ++++++++++++------- .../src/services/inbound_service.rs | 45 ++++++++++------ 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 9f5eea16..3c51c021 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use async_trait::async_trait; -use log::{error, info}; +use futures_util::future::join_all; +use log::{debug, error, info}; use tokio::sync::mpsc; use talos_messenger_core::{ @@ -31,31 +32,43 @@ where info!("Running Kafka Publisher service!!"); loop { tokio::select! { - Some(actions) = self.rx_actions_channel.recv() => { - let MessengerCommitActions {version, commit_actions, headers } = actions; + actions_result = self.rx_actions_channel.recv() => { - if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){ - match get_actions_deserialised::>(publish_actions_for_type) { - Ok(actions) => { + match actions_result { + Some(actions) => { - let total_len = actions.len() as u32; + let MessengerCommitActions {version, commit_actions, headers } = actions; - let headers_cloned = headers.clone(); - for action in actions { - let publisher = self.publisher.clone(); - let headers = headers_cloned.clone(); - // Publish the message - tokio::spawn(async move { - publisher.send(version, action, headers, total_len ).await; - }); + if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){ + match get_actions_deserialised::>(publish_actions_for_type) { + Ok(actions) => { + let total_len = actions.len() as u32; + + let headers_cloned = headers.clone(); + + let publish_vec = actions.into_iter().map(|action| { + let publisher = self.publisher.clone(); + let headers = headers_cloned.clone(); + async move { + publisher.send(version, action, headers, total_len ).await; + } + }); + join_all(publish_vec).await; + }, + Err(err) => { + error!("Failed to deserialise for version={version} key={} for data={:?} with error={:?}",&self.publisher.get_publish_type(), err.data, err.reason ) + }, } - }, - Err(err) => { - error!("Failed to deserialise for version={version} key={} for data={:?} with error={:?}",&self.publisher.get_publish_type(), err.data, err.reason ) - }, + } + }, + None=> { + debug!("No actions to process..") } + } + + } } } diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index fb4b20c2..8ff5c3f1 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -75,11 +75,11 @@ where // Call prune method on suffix. let _ = self.suffix.prune_till_index(index_to_prune); - let commit_offset = version + 1; - debug!("[Commit] Updating tpl to version .. {commit_offset}"); - let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + // let commit_offset = version + 1; + // debug!("[Commit] Updating tpl to version .. {commit_offset}"); + // let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); - self.message_receiver.commit_async(); + // self.message_receiver.commit_async(); } } _ => {} @@ -145,14 +145,14 @@ where loop { tokio::select! { // 1. Consume message. - Ok(Some(msg)) = self.message_receiver.consume_message() => { + // Ok(Some(msg)) = self.message_receiver.consume_message() => { + reciever_result = self.message_receiver.consume_message() => { - // 2. Add/update to suffix. - match msg { + match reciever_result { // 2.1 For CM - Install messages on the version - ChannelMessage::Candidate(candidate) => { - + Ok(Some(ChannelMessage::Candidate(candidate))) => { let version = candidate.message.version; + debug!("Candidate version received is {version}"); if version > 0 { // insert item to suffix let _ = self.suffix.insert(version, candidate.message.into()); @@ -176,10 +176,9 @@ where } else { warn!("Version 0 will not be inserted into suffix.") } - }, // 2.2 For DM - Update the decision with outcome + safepoint. - ChannelMessage::Decision(decision) => { + Ok(Some(ChannelMessage::Decision(decision))) => { let version = decision.message.get_candidate_version(); info!("[Decision Message] Version received = {} and {}", decision.decision_version, version); @@ -190,21 +189,35 @@ where self.process_next_actions().await?; }, + Ok(None) => { + info!("No message to process.."); + }, + Err(error) => { + // Catch the error propogated, and if it has a version, mark the item as completed. + if let Some(version) = error.version { + if let Some(item_to_update) = self.suffix.get_mut(version){ + item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::ErrorProcessing)); + } + } + error!("error consuming message....{:?}", error); + }, } - } // Receive feedback from publisher. - Some(feedback) = self.rx_feedback_channel.recv() => { - match feedback { - MessengerChannelFeedback::Error(version, key, message_error) => { + feedback_result = self.rx_feedback_channel.recv() => { + match feedback_result { + Some(MessengerChannelFeedback::Error(version, key, message_error)) => { error!("Failed to process version={version} with error={message_error:?}"); self.handle_action_failed(version, &key); }, - MessengerChannelFeedback::Success(version, key) => { + Some(MessengerChannelFeedback::Success(version, key)) => { info!("Successfully processed version={version} with action_key={key}"); self.handle_action_success(version, &key); }, + None => { + debug!("No feedback message to process.."); + } } // Process the next items with commit actions self.process_next_actions().await?