Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: block handling during certificate generation (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko authored Mar 13, 2024
1 parent 96e862f commit a5299c8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
35 changes: 28 additions & 7 deletions crates/topos-sequencer-subnet-runtime/src/certification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use topos_core::uci::{Certificate, CertificateId, SubnetId};
use topos_sequencer_subnet_client::{BlockInfo, SubnetEvent};
use tracing::debug;

pub struct Certification {
/// Last known certificate id for subnet
pub last_certificate_id: Option<CertificateId>,
/// Latest BLOCK_HISTORY_LENGTH blocks kept in memory
pub finalized_blocks: LinkedList<BlockInfo>,
/// Subnet id for which certificates are generated
pub subnet_id: SubnetId,
/// Type of verifier used
Expand All @@ -19,6 +18,10 @@ pub struct Certification {
signing_key: Vec<u8>,
/// Optional synchronization from particular block number
pub start_block: Option<u64>,
/// Blocks received from subnet, not yet certified. We keep them in memory until we can
/// generate certificate for them. They are kept as linked list to maintain
/// order of blocks, latest received blocks are at the end of the list
finalized_blocks: LinkedList<BlockInfo>,
}

impl Debug for Certification {
Expand Down Expand Up @@ -52,8 +55,11 @@ impl Certification {
let subnet_id = self.subnet_id;
let mut generated_certificates = Vec::new();

// Keep account of blocks with generated certificates so that we can remove them from
// finalized blocks
let mut certified_blocks: Vec<u64> = Vec::with_capacity(self.finalized_blocks.len());

// For every block, create one certificate
// This will change after MVP
for block_info in &self.finalized_blocks {
// Parse target subnets from events
let mut target_subnets: HashSet<SubnetId> = HashSet::new();
Expand Down Expand Up @@ -95,6 +101,7 @@ impl Certification {
.update_signature(self.get_signing_key())
.map_err(Error::CertificateSigningError)?;
generated_certificates.push(certificate);
certified_blocks.push(block_info.number);
}

// Check for inconsistencies
Expand Down Expand Up @@ -126,7 +133,24 @@ impl Certification {
}

// Remove processed blocks
self.finalized_blocks.clear();
for processed_block_number in certified_blocks {
let front_block_number = self.finalized_blocks.front().map(|front| front.number);

if front_block_number.is_some() {
if Some(processed_block_number) == front_block_number {
debug!(
"Block {processed_block_number} processed and removed from the block list"
);
self.finalized_blocks.pop_front();
} else {
panic!(
"Block history is inconsistent, this should not happen! \
processed_block_number: {processed_block_number}, front_number: {:?}",
front_block_number
);
}
}
}

Ok(generated_certificates)
}
Expand All @@ -138,8 +162,5 @@ impl Certification {
/// Expand short block history. Remove older blocks
pub fn append_blocks(&mut self, blocks: Vec<BlockInfo>) {
self.finalized_blocks.extend(blocks);
while self.finalized_blocks.len() > Self::BLOCK_HISTORY_LENGTH {
self.finalized_blocks.pop_front();
}
}
}
6 changes: 5 additions & 1 deletion crates/topos-sequencer-subnet-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub mod proxy;

use crate::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent};

// Optimal Size of event channel is yet to be determined. Now just putting a number
const EVENT_SUBSCRIBER_CHANNEL_SIZE: usize = 64;

#[derive(Debug, Error)]
pub enum Error {
#[error("Peers error: {err}")]
Expand Down Expand Up @@ -103,7 +106,8 @@ impl SubnetRuntimeProxyWorker {
signing_key: Vec<u8>,
) -> Result<Self, Error> {
let runtime_proxy = SubnetRuntimeProxy::spawn_new(config, signing_key)?;
let (events_sender, events_rcv) = mpsc::channel::<SubnetRuntimeProxyEvent>(256);
let (events_sender, events_rcv) =
mpsc::channel::<SubnetRuntimeProxyEvent>(EVENT_SUBSCRIBER_CHANNEL_SIZE);
let commands;
{
let mut runtime_proxy = runtime_proxy.lock().await;
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl SubnetRuntimeProxy {
certification.clone(),
block
).await {
error!("Failed to process next block: {}", e);
error!("Failed to process next block: {}, exit block production!", e);
break None;
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-proxy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl TceClientBuilder {
let tce_endpoint = tce_endpoint.clone();
let tce_grpc_client = tce_grpc_client.clone();
let context_backoff = context.clone();
// TODO: Push certificates to the TCE one by one
certificate_to_send.push_back(async move {
debug!("Submitting certificate {} to the TCE using backoff strategy...", &tce_endpoint);
let cert = cert.clone();
Expand Down

0 comments on commit a5299c8

Please sign in to comment.