diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index ddcf9af76..a9c50336d 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -17,7 +17,7 @@ use crate::event::ProtocolEvents; use crate::{DoubleEchoCommand, SubscriptionsView}; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use tokio_util::sync::CancellationToken; use topos_config::tce::broadcast::ReliableBroadcastParams; use topos_core::{types::ValidatorId, uci::CertificateId}; @@ -49,6 +49,7 @@ pub struct DoubleEcho { /// List of approved validators through smart contract and/or genesis pub validators: HashSet, pub validator_store: Arc, + pub delivered_certificates: Arc>>, pub broadcast_sender: broadcast::Sender, pub task_manager_cancellation: CancellationToken, @@ -85,6 +86,7 @@ impl DoubleEcho { }, shutdown, validator_store, + delivered_certificates: Arc::new(Mutex::new(HashSet::new())), broadcast_sender, task_manager_cancellation: CancellationToken::new(), } @@ -102,6 +104,7 @@ impl DoubleEcho { self.params.clone(), self.message_signer.clone(), self.validator_store.clone(), + self.delivered_certificates.clone(), self.broadcast_sender.clone(), ); @@ -159,6 +162,10 @@ impl DoubleEcho { continue; } + if let Some(cert_id) = self.delivered_certificates.lock().await.get(&certificate_id) { + debug!("ECHO message received for already delivered certificate: {}", cert_id); + continue; + } self.handle_echo(certificate_id, validator_id, signature).await }, DoubleEchoCommand::Ready { certificate_id, validator_id, signature } => { @@ -176,7 +183,10 @@ impl DoubleEcho { debug!("READY message signature cannot be verified from: {}", e); continue; } - + if let Some(cert_id) = self.delivered_certificates.lock().await.get(&certificate_id) { + debug!("READY message received for already delivered certificate: {}", cert_id); + continue; + } self.handle_ready(certificate_id, validator_id, signature).await }, } diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 5492797bf..009c79f69 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -2,12 +2,12 @@ use crate::event::ProtocolEvents; use futures::stream::FuturesUnordered; use futures::Future; use futures::StreamExt; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::IntoFuture; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, Mutex}; use tokio::{spawn, sync::mpsc}; use tokio_util::sync::CancellationToken; use topos_config::tce::broadcast::ReliableBroadcastParams; @@ -52,6 +52,7 @@ pub struct TaskManager { pub thresholds: ReliableBroadcastParams, pub validator_id: ValidatorId, pub validator_store: Arc, + pub delivered_certficiates: Arc>>, pub broadcast_sender: broadcast::Sender, pub latest_pending_id: PendingCertificateId, } @@ -66,6 +67,7 @@ impl TaskManager { thresholds: ReliableBroadcastParams, message_signer: Arc, validator_store: Arc, + delivered_certficiates: Arc>>, broadcast_sender: broadcast::Sender, ) -> Self { Self { @@ -79,6 +81,7 @@ impl TaskManager { message_signer, thresholds, validator_store, + delivered_certficiates, broadcast_sender, latest_pending_id: 0, } @@ -174,8 +177,8 @@ impl TaskManager { if let TaskStatus::Success = status { trace!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); + self.delivered_certficiates.lock().await.insert(certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); - } else { error!("Task for certificate {} finished unsuccessfully", certificate_id); } diff --git a/crates/topos-tce-broadcast/src/tests/task_manager.rs b/crates/topos-tce-broadcast/src/tests/task_manager.rs index 672da59f7..10b116e12 100644 --- a/crates/topos-tce-broadcast/src/tests/task_manager.rs +++ b/crates/topos-tce-broadcast/src/tests/task_manager.rs @@ -31,6 +31,7 @@ async fn can_start( let (broadcast_sender, _) = broadcast::channel(1); let shutdown = CancellationToken::new(); let validator_id = ValidatorId::default(); + let delivered_certificates = Default::default(); let thresholds = topos_config::tce::broadcast::ReliableBroadcastParams { echo_threshold: 1, ready_threshold: 1, @@ -45,6 +46,7 @@ async fn can_start( thresholds, message_signer, validator_store, + delivered_certificates, broadcast_sender, );