Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: add in-memory cert delivered for faster lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
gruberb committed Apr 17, 2024
1 parent 1d652f1 commit f050355
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
14 changes: 12 additions & 2 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::event::ProtocolEvents;
use crate::{DoubleEchoCommand, SubscriptionsView};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio_util::sync::CancellationToken;
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::{types::ValidatorId, uci::CertificateId};
Expand Down Expand Up @@ -49,6 +49,7 @@ pub struct DoubleEcho {
/// List of approved validators through smart contract and/or genesis
pub validators: HashSet<ValidatorId>,
pub validator_store: Arc<ValidatorStore>,
pub delivered_certificates: Arc<Mutex<HashSet<CertificateId>>>,
pub broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,

pub task_manager_cancellation: CancellationToken,
Expand Down Expand Up @@ -85,6 +86,7 @@ impl DoubleEcho {
},
shutdown,
validator_store,
delivered_certificates: Arc::new(Mutex::new(HashSet::new())),
broadcast_sender,
task_manager_cancellation: CancellationToken::new(),
}
Expand All @@ -102,6 +104,7 @@ impl DoubleEcho {
self.params.clone(),
self.message_signer.clone(),
self.validator_store.clone(),
self.delivered_certificates.clone(),
self.broadcast_sender.clone(),
);

Expand Down Expand Up @@ -159,6 +162,10 @@ impl DoubleEcho {
continue;
}

if let Some(cert_id) = self.delivered_certificates.lock().await.get(&certificate_id) {
debug!("ECHO message received for already delivered certificate: {}", cert_id);
continue;
}
self.handle_echo(certificate_id, validator_id, signature).await
},
DoubleEchoCommand::Ready { certificate_id, validator_id, signature } => {
Expand All @@ -176,7 +183,10 @@ impl DoubleEcho {
debug!("READY message signature cannot be verified from: {}", e);
continue;
}

if let Some(cert_id) = self.delivered_certificates.lock().await.get(&certificate_id) {
debug!("READY message received for already delivered certificate: {}", cert_id);
continue;
}
self.handle_ready(certificate_id, validator_id, signature).await
},
}
Expand Down
9 changes: 6 additions & 3 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::event::ProtocolEvents;
use futures::stream::FuturesUnordered;
use futures::Future;
use futures::StreamExt;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::IntoFuture;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, Mutex};
use tokio::{spawn, sync::mpsc};
use tokio_util::sync::CancellationToken;
use topos_config::tce::broadcast::ReliableBroadcastParams;
Expand Down Expand Up @@ -52,6 +52,7 @@ pub struct TaskManager {
pub thresholds: ReliableBroadcastParams,
pub validator_id: ValidatorId,
pub validator_store: Arc<ValidatorStore>,
pub delivered_certficiates: Arc<Mutex<HashSet<CertificateId>>>,
pub broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,
pub latest_pending_id: PendingCertificateId,
}
Expand All @@ -66,6 +67,7 @@ impl TaskManager {
thresholds: ReliableBroadcastParams,
message_signer: Arc<MessageSigner>,
validator_store: Arc<ValidatorStore>,
delivered_certficiates: Arc<Mutex<HashSet<CertificateId>>>,
broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,
) -> Self {
Self {
Expand All @@ -79,6 +81,7 @@ impl TaskManager {
message_signer,
thresholds,
validator_store,
delivered_certficiates,
broadcast_sender,
latest_pending_id: 0,
}
Expand Down Expand Up @@ -174,8 +177,8 @@ impl TaskManager {
if let TaskStatus::Success = status {
trace!("Task for certificate {} finished successfully", certificate_id);
self.tasks.remove(&certificate_id);
self.delivered_certficiates.lock().await.insert(certificate_id);
DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec();

} else {
error!("Task for certificate {} finished unsuccessfully", certificate_id);
}
Expand Down
2 changes: 2 additions & 0 deletions crates/topos-tce-broadcast/src/tests/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn can_start(
let (broadcast_sender, _) = broadcast::channel(1);
let shutdown = CancellationToken::new();
let validator_id = ValidatorId::default();
let delivered_certificates = Default::default();
let thresholds = topos_config::tce::broadcast::ReliableBroadcastParams {
echo_threshold: 1,
ready_threshold: 1,
Expand All @@ -45,6 +46,7 @@ async fn can_start(
thresholds,
message_signer,
validator_store,
delivered_certificates,
broadcast_sender,
);

Expand Down

0 comments on commit f050355

Please sign in to comment.