Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: rename sequencer to certificate producer
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdplm committed Feb 16, 2024
1 parent 7030341 commit 0209422
Show file tree
Hide file tree
Showing 34 changed files with 181 additions and 158 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "topos-sequencer-subnet-client"
name = "topos-certificate-producer-subnet-client"
version = "0.1.0"
edition = "2021"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "topos-sequencer-subnet-runtime"
name = "topos-certificate-producer-subnet-runtime"
version = "0.1.0"
edition = "2021"

Expand Down Expand Up @@ -29,7 +29,7 @@ tracing-opentelemetry.workspace = true
opentelemetry.workspace = true

topos-core = { workspace = true, features = ["uci"] }
topos-sequencer-subnet-client = { package = "topos-sequencer-subnet-client", path = "../topos-sequencer-subnet-client" }
topos-certificate-producer-subnet-client = { package = "topos-certificate-producer-subnet-client", path = "../topos-certificate-producer-subnet-client" }
topos-crypto = {package = "topos-crypto", path = "../topos-crypto"}

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::collections::{HashSet, LinkedList};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::Mutex;
use topos_certificate_producer_subnet_client::{BlockInfo, SubnetEvent};
use topos_core::uci::{Certificate, CertificateId, SubnetId};
use topos_sequencer_subnet_client::{BlockInfo, SubnetEvent};

pub struct Certification {
/// Last known certificate id for subnet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub enum Error {
#[error("subnet client error: {source}")]
SubnetError {
#[from]
source: topos_sequencer_subnet_client::Error,
source: topos_certificate_producer_subnet_client::Error,
},

#[error("Unable to retrieve key error: {source}")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use topos_certificate_producer_subnet_client::{
self, BlockInfo, SubnetClient, SubnetClientListener,
};
use topos_core::api::grpc::checkpoints::TargetStreamPosition;
use topos_core::uci::{Certificate, CertificateId, SubnetId};
use topos_sequencer_subnet_client::{self, BlockInfo, SubnetClient, SubnetClientListener};
use tracing::{debug, error, field, info, info_span, instrument, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

Expand Down Expand Up @@ -102,7 +104,7 @@ impl SubnetRuntimeProxy {
let runtime_proxy = runtime_proxy.clone();
let subnet_contract_address = subnet_contract_address.clone();
tokio::spawn(async move {
// If the `start_block` sequencer parameter is provided, first block retrieved from blockchain (for genesis certificate)
// If the `start_block` CP parameter is provided, first block retrieved from blockchain (for genesis certificate)
// will be `start_block`. `default_block_sync_start` is hence `start_block`-1
// as first block retrieved from subnet node is `latest_acquired_subnet_block_number` + 1
let default_block_sync_start: i128 = config
Expand All @@ -128,7 +130,7 @@ impl SubnetRuntimeProxy {
certificate_and_position
);
// If tce source head position is provided, continue synchronizing from it
// If the `start_block` sequencer parameter is provided and tce source head is missing,
// If the `start_block` CP parameter is provided and tce source head is missing,
// we should start synchronizing from that block instead of genesis
// If neither tce source head position nor start_block parameters are provided,
// sync should start form -1, so that first fetched is subnet genesis block
Expand All @@ -155,7 +157,7 @@ impl SubnetRuntimeProxy {
// Establish the connection with the Subnet
let subnet_listener: Option<SubnetClientListener> = tokio::select! {
// Create subnet client
Ok(client) = topos_sequencer_subnet_client::connect_to_subnet_listener_with_retry(
Ok(client) = topos_certificate_producer_subnet_client::connect_to_subnet_listener_with_retry(
ws_runtime_endpoint.as_str(),
subnet_contract_address.as_str(),
) => {
Expand Down Expand Up @@ -290,7 +292,7 @@ impl SubnetRuntimeProxy {
// Establish the connection with the Subnet
let mut subnet_client: Option<SubnetClient> = tokio::select! {
// Create subnet client
Ok(client) = topos_sequencer_subnet_client::connect_to_subnet_with_retry(
Ok(client) = topos_certificate_producer_subnet_client::connect_to_subnet_with_retry(
http_runtime_endpoint.as_ref(),
Some(signing_key.clone()),
subnet_contract_address.as_str(),
Expand Down Expand Up @@ -361,10 +363,14 @@ impl SubnetRuntimeProxy {
info!("Block {} processed", next_block);
Ok(())
}
Err(topos_sequencer_subnet_client::Error::BlockNotAvailable(block_number)) => {
Err(topos_certificate_producer_subnet_client::Error::BlockNotAvailable(
block_number,
)) => {
warn!("New block {block_number} not yet available, trying again soon");
Err(Error::SubnetError {
source: topos_sequencer_subnet_client::Error::BlockNotAvailable(block_number),
source: topos_certificate_producer_subnet_client::Error::BlockNotAvailable(
block_number,
),
})
}
Err(e) => {
Expand Down Expand Up @@ -568,25 +574,26 @@ impl SubnetRuntimeProxy {
info!("Connecting to subnet to query for checkpoints...");
let http_runtime_endpoint = self.config.http_endpoint.as_ref();
// Create subnet client
let subnet_client = match topos_sequencer_subnet_client::connect_to_subnet_with_retry(
http_runtime_endpoint,
None, // We do not need actual key here as we are just reading state
self.config.subnet_contract_address.as_str(),
)
.await
{
Ok(subnet_client) => {
info!(
"Connected to subnet node to acquire checkpoints {}",
http_runtime_endpoint
);
subnet_client
}
Err(e) => {
error!("Unable to connect to the subnet node to get checkpoints: {e}");
return Err(Error::SubnetError { source: e });
}
};
let subnet_client =
match topos_certificate_producer_subnet_client::connect_to_subnet_with_retry(
http_runtime_endpoint,
None, // We do not need actual key here as we are just reading state
self.config.subnet_contract_address.as_str(),
)
.await
{
Ok(subnet_client) => {
info!(
"Connected to subnet node to acquire checkpoints {}",
http_runtime_endpoint
);
subnet_client
}
Err(e) => {
error!("Unable to connect to the subnet node to get checkpoints: {e}");
return Err(Error::SubnetError { source: e });
}
};

match subnet_client.get_checkpoints(&self.config.subnet_id).await {
Ok(checkpoints) => {
Expand All @@ -611,25 +618,26 @@ impl SubnetRuntimeProxy {
) -> Result<SubnetId, Error> {
info!("Connecting to subnet to query for subnet id...");
// Create subnet client
let subnet_client = match topos_sequencer_subnet_client::connect_to_subnet_with_retry(
http_endpoint,
None, // We do not need actual key here as we are just reading state
contract_address,
)
.await
{
Ok(subnet_client) => {
info!(
"Connected to subnet node to acquire subnet id {}",
http_endpoint
);
subnet_client
}
Err(e) => {
error!("Unable to connect to the subnet node to get subnet id: {e}");
return Err(Error::SubnetError { source: e });
}
};
let subnet_client =
match topos_certificate_producer_subnet_client::connect_to_subnet_with_retry(
http_endpoint,
None, // We do not need actual key here as we are just reading state
contract_address,
)
.await
{
Ok(subnet_client) => {
info!(
"Connected to subnet node to acquire subnet id {}",
http_endpoint
);
subnet_client
}
Err(e) => {
error!("Unable to connect to the subnet node to get subnet id: {e}");
return Err(Error::SubnetError { source: e });
}
};

match subnet_client.get_subnet_id().await {
Ok(subnet_id) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ use ethers::{
middleware::SignerMiddleware,
providers::{Http, Middleware, Provider},
signers::{LocalWallet, Signer},
types::{Block, H256},
types::{Block, H160, H256},
};
use rstest::*;
use serial_test::serial;
use std::collections::HashSet;
use std::process::{Child, Command};
use std::str::FromStr;
use std::sync::Arc;
use test_log::test;
use tokio::sync::Mutex;
use topos_certificate_producer_subnet_runtime::proxy::{
SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent,
};
use topos_core::uci::{Certificate, CertificateId, SubnetId, SUBNET_ID_LENGTH};
use topos_sequencer_subnet_runtime::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent};
use tracing::{error, info, warn, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

mod common;
use crate::common::subnet_test_data::generate_test_private_key;
use topos_certificate_producer_subnet_runtime::{
SubnetRuntimeProxyConfig, SubnetRuntimeProxyWorker,
};
use topos_core::api::grpc::checkpoints::TargetStreamPosition;
use topos_sequencer_subnet_runtime::{SubnetRuntimeProxyConfig, SubnetRuntimeProxyWorker};

use topos_test_sdk::constants::*;

Expand Down Expand Up @@ -497,7 +502,7 @@ async fn test_subnet_node_get_block_info(
) -> Result<(), Box<dyn std::error::Error>> {
//Context with subnet
let context = context_running_subnet_node.await;
match topos_sequencer_subnet_client::SubnetClientListener::new(
match topos_certificate_producer_subnet_client::SubnetClientListener::new(
&context.jsonrpc_ws(),
&("0x".to_string() + &hex::encode(context.i_topos_core.address())),
)
Expand Down Expand Up @@ -545,7 +550,8 @@ async fn test_create_runtime() -> Result<(), Box<dyn std::error::Error>> {
test_private_key,
)
.await?;
let runtime_proxy = topos_sequencer_subnet_runtime::testing::get_runtime(&runtime_proxy_worker);
let runtime_proxy =
topos_certificate_producer_subnet_runtime::testing::get_runtime(&runtime_proxy_worker);
let runtime_proxy = runtime_proxy.lock().await;
info!("New runtime proxy created:{:?}", &runtime_proxy);
Ok(())
Expand Down Expand Up @@ -687,7 +693,7 @@ async fn test_subnet_certificate_get_checkpoints_call(
let subnet_jsonrpc_http = context.jsonrpc();

// Get checkpoints when contract is empty
let subnet_client = topos_sequencer_subnet_client::SubnetClient::new(
let subnet_client = topos_certificate_producer_subnet_client::SubnetClient::new(
&subnet_jsonrpc_http,
Some(hex::decode(TEST_SECRET_ETHEREUM_KEY).unwrap()),
&subnet_smart_contract_address,
Expand Down Expand Up @@ -805,7 +811,7 @@ async fn test_subnet_id_call(
let subnet_jsonrpc_http = context.jsonrpc();

// Create subnet client
let subnet_client = topos_sequencer_subnet_client::SubnetClient::new(
let subnet_client = topos_certificate_producer_subnet_client::SubnetClient::new(
&subnet_jsonrpc_http,
Some(hex::decode(TEST_SECRET_ETHEREUM_KEY).unwrap()),
&subnet_smart_contract_address,
Expand Down Expand Up @@ -902,7 +908,7 @@ async fn test_subnet_send_token_processing(
.i_erc20_messaging
.send_token(
TARGET_SUBNET_ID_2.into(),
TOKEN_SYMBOL.into(),
H160::from_str(TOKEN_SYMBOL).unwrap(),
"00000000000000000000000000000000000000AA".parse()?,
U256::from(2),
)
Expand Down Expand Up @@ -1408,7 +1414,7 @@ async fn test_subnet_multiple_send_token_in_a_block(
if let Err(e) = i_erc20_messaging
.send_token(
target_subnet.into(),
TOKEN_SYMBOL.into(),
H160::from_str(TOKEN_SYMBOL).unwrap(),
"00000000000000000000000000000000000000AA".parse().unwrap(),
U256::from(i),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "topos-sequencer"
name = "topos-certificate-producer"
description = "Implementation of the Topos protocol"
version = "0.1.0"
edition = "2021"
Expand All @@ -20,6 +20,5 @@ opentelemetry.workspace = true
topos-crypto.workspace = true
topos-wallet = { path = "../topos-wallet" }
topos-core = { workspace = true, features = ["uci"] }
topos-sequencer-subnet-runtime = { package = "topos-sequencer-subnet-runtime", path = "../topos-sequencer-subnet-runtime" }
topos-certificate-producer-subnet-runtime = { package = "topos-certificate-producer-subnet-runtime", path = "../topos-certificate-producer-subnet-runtime" }
topos-tce-proxy = { package = "topos-tce-proxy", path = "../topos-tce-proxy" }

Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
//!
//! Application logic glue
//!
use crate::SequencerConfiguration;
use crate::CertificateProducerConfiguration;
use opentelemetry::trace::FutureExt;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use topos_sequencer_subnet_runtime::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent};
use topos_sequencer_subnet_runtime::SubnetRuntimeProxyWorker;
use topos_certificate_producer_subnet_runtime::proxy::{
SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent,
};
use topos_certificate_producer_subnet_runtime::SubnetRuntimeProxyWorker;
use topos_tce_proxy::{worker::TceProxyWorker, TceProxyCommand, TceProxyEvent};
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Top-level transducer sequencer app context & driver (alike)
/// Top-level transducer certificate producer app context & driver (alike)
///
/// Implements <...Host> traits for network and Api, listens for protocol events in events
/// (store is not active component).
Expand All @@ -20,7 +22,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
/// config+data as input and runs app returning data as output
///
pub struct AppContext {
pub config: SequencerConfiguration,
pub config: CertificateProducerConfiguration,
pub subnet_runtime_proxy_worker: SubnetRuntimeProxyWorker,
pub tce_proxy_worker: TceProxyWorker,
}
Expand All @@ -33,7 +35,7 @@ pub enum AppContextStatus {
impl AppContext {
/// Factory
pub fn new(
config: SequencerConfiguration,
config: CertificateProducerConfiguration,
runtime_proxy_worker: SubnetRuntimeProxyWorker,
tce_proxy_worker: TceProxyWorker,
) -> Self {
Expand Down Expand Up @@ -63,15 +65,14 @@ impl AppContext {
debug!("tce_proxy_worker.next_event(): {:?}", &tce_evt);
match tce_evt {
TceProxyEvent::TceServiceFailure | TceProxyEvent::WatchCertificatesChannelFailed => {
// Unrecoverable failure in interaction with the TCE. Sequencer needs to be restarted
// Unrecoverable failure in interaction with the TCE. Certificate Producer needs to be restarted
warn!(
"Unrecoverable failure in sequencer <-> tce interaction. Shutting down sequencer \
sequencer..."
"Unrecoverable failure in Certificate Producer <-> TCE interaction. Shutting down Certificate Producer."
);
if let Err(e) = self.shutdown().await {
warn!("Error happened during shutdown: {e:?}");
}
warn!("Shutdown finished, restarting sequencer...");
warn!("Shutdown finished, restarting Certificate Producer...");
return AppContextStatus::Restarting;
},
_ => self.on_tce_proxy_event(tce_evt).await,
Expand All @@ -80,11 +81,11 @@ impl AppContext {

// Shutdown signal
_ = shutdown.0.cancelled() => {
info!("Shutting down Sequencer app context...");
info!("Shutting down Certificate Producer app context...");
if let Err(e) = self.shutdown().await {
error!("Error shutting down Sequencer app context: {e}");
error!("Error shutting down Certificate Producer app context: {e}");
}
// Drop the sender to notify the Sequencer termination
// Drop the sender to notify the Certificate Producer termination
drop(shutdown.1);
return AppContextStatus::Finished;
}
Expand All @@ -100,7 +101,7 @@ impl AppContext {
block_number: _,
ctx,
} => {
let span = info_span!("Sequencer app context");
let span = info_span!("Certificate Producer app context");
span.set_parent(ctx);
if let Err(e) = self
.tce_proxy_worker
Expand All @@ -123,7 +124,7 @@ impl AppContext {

async fn on_tce_proxy_event(&mut self, evt: TceProxyEvent) {
if let TceProxyEvent::NewDeliveredCerts { certificates, ctx } = evt {
let span = info_span!("Sequencer app context");
let span = info_span!("Certificate Producer app context");
span.set_parent(ctx);
async {
// New certificates acquired from TCE
Expand Down
Loading

0 comments on commit 0209422

Please sign in to comment.