From a6b338bdd228c58511407d2a743093daf48e15c0 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Thu, 30 Nov 2023 19:55:03 +0100 Subject: [PATCH] feat: remove task manager channels Signed-off-by: Simon Paitrault --- crates/topos-tce-broadcast/Cargo.toml | 3 - .../src/double_echo/mod.rs | 26 +-- crates/topos-tce-broadcast/src/lib.rs | 5 +- .../mod.rs | 0 .../task.rs | 0 .../src/task_manager_channels/mod.rs | 157 ------------------ .../src/task_manager_channels/task.rs | 103 ------------ crates/topos/Cargo.toml | 1 - 8 files changed, 2 insertions(+), 293 deletions(-) rename crates/topos-tce-broadcast/src/{task_manager_futures => task_manager}/mod.rs (100%) rename crates/topos-tce-broadcast/src/{task_manager_futures => task_manager}/task.rs (100%) delete mode 100644 crates/topos-tce-broadcast/src/task_manager_channels/mod.rs delete mode 100644 crates/topos-tce-broadcast/src/task_manager_channels/task.rs diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index 5709d24c4..c0a880ccc 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -33,9 +33,6 @@ rand.workspace = true hex.workspace = true topos-test-sdk = { path = "../topos-test-sdk/" } -[features] -task-manager-channels = [] - [[bench]] name = "double_echo" path = "benches/double_echo.rs" diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 67721444f..c07429b88 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -76,14 +76,13 @@ impl DoubleEcho { } } - #[cfg(not(feature = "task-manager-channels"))] pub fn spawn_task_manager( &mut self, task_manager_message_receiver: mpsc::Receiver, ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); - let (task_manager, shutdown_receiver) = crate::task_manager_futures::TaskManager::new( + let (task_manager, shutdown_receiver) = crate::task_manager::TaskManager::new( task_manager_message_receiver, task_completion_sender, self.subscriptions.clone(), @@ -100,29 +99,6 @@ impl DoubleEcho { task_completion_receiver } - #[cfg(feature = "task-manager-channels")] - pub fn spawn_task_manager( - &mut self, - task_manager_message_receiver: mpsc::Receiver, - ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { - let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); - - let (task_manager, shutdown_receiver) = crate::task_manager_channels::TaskManager::new( - task_manager_message_receiver, - task_completion_sender, - self.subscriptions.clone(), - self.event_sender.clone(), - self.validator_id, - self.message_signer.clone(), - self.params.clone(), - self.validator_store.clone(), - ); - - tokio::spawn(task_manager.run(shutdown_receiver)); - - task_completion_receiver - } - /// DoubleEcho main loop /// - Listen for shutdown signal /// - Read new messages from command_receiver diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index c05466ed0..5e4969769 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -58,10 +58,7 @@ mod constant; pub mod double_echo; pub mod sampler; -#[cfg(feature = "task-manager-channels")] -pub mod task_manager_channels; -#[cfg(not(feature = "task-manager-channels"))] -pub mod task_manager_futures; +pub mod task_manager; #[cfg(test)] mod tests; diff --git a/crates/topos-tce-broadcast/src/task_manager_futures/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs similarity index 100% rename from crates/topos-tce-broadcast/src/task_manager_futures/mod.rs rename to crates/topos-tce-broadcast/src/task_manager/mod.rs diff --git a/crates/topos-tce-broadcast/src/task_manager_futures/task.rs b/crates/topos-tce-broadcast/src/task_manager/task.rs similarity index 100% rename from crates/topos-tce-broadcast/src/task_manager_futures/task.rs rename to crates/topos-tce-broadcast/src/task_manager/task.rs diff --git a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs b/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs deleted file mode 100644 index 3a8f70ad5..000000000 --- a/crates/topos-tce-broadcast/src/task_manager_channels/mod.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; -use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; -use tokio::{spawn, sync::mpsc}; -use topos_core::{types::ValidatorId, uci::CertificateId}; -use topos_crypto::messages::MessageSigner; -use tracing::warn; - -pub mod task; - -use crate::double_echo::broadcast_state::BroadcastState; -use crate::sampler::SubscriptionsView; -use crate::TaskStatus; -use crate::{constant, DoubleEchoCommand}; -use task::{Task, TaskContext}; -use topos_metrics::{ - CERTIFICATE_PROCESSING_FROM_API_TOTAL, CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL, - CERTIFICATE_PROCESSING_TOTAL, -}; -use topos_tce_storage::validator::ValidatorStore; - -/// The TaskManager is responsible for receiving messages from the network and distributing them -/// among tasks. These tasks are either created if none for a certain CertificateID exists yet, -/// or existing tasks will receive the messages. -pub struct TaskManager { - pub message_receiver: mpsc::Receiver, - pub task_completion_receiver: mpsc::Receiver<(CertificateId, TaskStatus)>, - pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - pub notify_task_completion: mpsc::Sender<(CertificateId, TaskStatus)>, - pub subscriptions: SubscriptionsView, - pub event_sender: mpsc::Sender, - pub tasks: HashMap, - pub buffered_messages: HashMap>, - pub validator_id: ValidatorId, - pub message_signer: Arc, - pub thresholds: ReliableBroadcastParams, - pub shutdown_sender: mpsc::Sender<()>, - pub validator_store: Arc, -} - -impl TaskManager { - #[allow(clippy::too_many_arguments)] - pub fn new( - message_receiver: mpsc::Receiver, - notify_task_completion: mpsc::Sender<(CertificateId, TaskStatus)>, - subscriptions: SubscriptionsView, - event_sender: mpsc::Sender, - validator_id: ValidatorId, - message_signer: Arc, - thresholds: ReliableBroadcastParams, - validator_store: Arc, - ) -> (Self, mpsc::Receiver<()>) { - let (task_completion_sender, task_completion_receiver) = - mpsc::channel(*constant::BROADCAST_TASK_COMPLETION_CHANNEL_SIZE); - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - ( - Self { - message_receiver, - task_completion_receiver, - task_completion_sender, - notify_task_completion, - subscriptions, - event_sender, - tasks: HashMap::new(), - buffered_messages: Default::default(), - validator_id, - message_signer, - thresholds, - shutdown_sender, - validator_store, - }, - shutdown_receiver, - ) - } - - pub async fn run(mut self, mut shutdown_receiver: mpsc::Receiver<()>) { - loop { - tokio::select! { - biased; - - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready{ certificate_id, .. } => { - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages.entry(certificate_id).or_default().push(msg); - } - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { - match self.tasks.entry(cert.id) { - std::collections::hash_map::Entry::Vacant(entry) => { - let broadcast_state = BroadcastState::new( - cert.clone(), - self.validator_id, - self.thresholds.echo_threshold, - self.thresholds.ready_threshold, - self.thresholds.delivery_threshold, - self.event_sender.clone(), - self.subscriptions.clone(), - need_gossip, - self.message_signer.clone(), - ); - - let (task, task_context) = Task::new(cert.id, self.task_completion_sender.clone(), broadcast_state, self.validator_store.clone()); - - spawn(task.run()); - - CERTIFICATE_PROCESSING_TOTAL.inc(); - if need_gossip { - CERTIFICATE_PROCESSING_FROM_API_TOTAL.inc(); - } else { - CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL.inc(); - } - - if let Some(messages) = self.buffered_messages.remove(&cert.id) { - let sink = task_context.sink.clone(); - spawn(async move { - for msg in messages { - _ = sink.send(msg).await; - } - }); - } - - entry.insert(task_context); - } - std::collections::hash_map::Entry::Occupied(_) => {}, - } - } - } - } - - Some((certificate_id, status)) = self.task_completion_receiver.recv() => { - self.tasks.remove(&certificate_id); - let _ = self.notify_task_completion.send((certificate_id, status)).await; - } - - _ = shutdown_receiver.recv() => { - warn!("Task Manager shutting down"); - - for task in self.tasks.iter() { - task.1.shutdown_sender.send(()).await.unwrap(); - } - - break; - } - } - } - } -} - -impl Drop for TaskManager { - fn drop(&mut self) { - _ = self.shutdown_sender.try_send(()); - } -} diff --git a/crates/topos-tce-broadcast/src/task_manager_channels/task.rs b/crates/topos-tce-broadcast/src/task_manager_channels/task.rs deleted file mode 100644 index 452beb315..000000000 --- a/crates/topos-tce-broadcast/src/task_manager_channels/task.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::sync::Arc; -use tokio::sync::mpsc; - -use crate::double_echo::broadcast_state::{BroadcastState, Status}; -use crate::DoubleEchoCommand; -use crate::TaskStatus; -use topos_core::uci::CertificateId; -use topos_tce_storage::errors::StorageError; -use topos_tce_storage::store::WriteStore; -use topos_tce_storage::validator::ValidatorStore; -use topos_tce_storage::CertificatePositions; - -#[derive(Debug, Clone)] -pub struct TaskContext { - pub sink: mpsc::Sender, - pub shutdown_sender: mpsc::Sender<()>, -} - -pub struct Task { - pub validator_store: Arc, - pub message_receiver: mpsc::Receiver, - pub certificate_id: CertificateId, - pub completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - pub broadcast_state: BroadcastState, - pub shutdown_receiver: mpsc::Receiver<()>, -} - -impl Task { - pub fn new( - certificate_id: CertificateId, - completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, - broadcast_state: BroadcastState, - validator_store: Arc, - ) -> (Self, TaskContext) { - let (message_sender, message_receiver) = mpsc::channel(1024); - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - let task_context = TaskContext { - sink: message_sender, - shutdown_sender, - }; - - let task = Task { - message_receiver, - certificate_id, - completion_sender, - broadcast_state, - shutdown_receiver, - validator_store, - }; - - (task, task_context) - } - - pub async fn persist(&self) -> Result { - let certificate_delivered = self.broadcast_state.into_delivered(); - - self.validator_store - .insert_certificate_delivered(&certificate_delivered) - .await - } - - pub(crate) async fn run(mut self) { - loop { - tokio::select! { - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { validator_id, .. } => { - if let Some(Status::DeliveredWithReadySent) = - self.broadcast_state.apply_echo(validator_id) - { - let _ = self - .completion_sender - .send((self.certificate_id, TaskStatus::Success)) - .await; - - break; - } - } - DoubleEchoCommand::Ready { validator_id, .. } => { - if let Some(Status::DeliveredWithReadySent) = - self.broadcast_state.apply_ready(validator_id) - { - let _ = self - .completion_sender - .send((self.certificate_id, TaskStatus::Success)) - .await; - - break; - } - } - _ => {} - } - } - - _ = self.shutdown_receiver.recv() => { - println!("Received shutdown, shutting down task {:?}", self.certificate_id); - break; - } - } - } - } -} diff --git a/crates/topos/Cargo.toml b/crates/topos/Cargo.toml index 328a02267..3d9798e14 100644 --- a/crates/topos/Cargo.toml +++ b/crates/topos/Cargo.toml @@ -70,4 +70,3 @@ predicates = "3.0.3" [features] default = [] -broadcast_via_channels = ["default", "topos-tce-broadcast/task-manager-channels"]