diff --git a/Cargo.lock b/Cargo.lock index 7851c9560..58d4796cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19935,7 +19935,6 @@ dependencies = [ "async-channel 1.9.0", "async-trait", "bincode", - "chronicle", "clap", "convert_case 0.6.0", "futures", @@ -19945,7 +19944,6 @@ dependencies = [ "polkadot-sdk", "serde", "serde_json", - "tc-subxt", "time-primitives", "timechain-runtime", "tokio", diff --git a/node/Cargo.toml b/node/Cargo.toml index 646bfed33..b716b6bf2 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -101,10 +101,8 @@ polkadot-sdk = { workspace = true, features = [ jsonrpsee = { version = "0.22.1", features = [ "server" ] } # node's local dependencies -chronicle = { path = "../chronicle", optional = true } time-primitives = { path = "../primitives" } timechain-runtime = { path = "../runtime" } -tc-subxt = { path = "../tc-subxt", optional = true } # additional command line interfaces #try-runtime-core = { git = "https://github.com/paritytech/try-runtime-cli", tag = "v0.7.0", optional = true } diff --git a/node/src/chronicle/mod.rs b/node/src/chronicle/mod.rs deleted file mode 100644 index de8c03256..000000000 --- a/node/src/chronicle/mod.rs +++ /dev/null @@ -1,56 +0,0 @@ -use anyhow::Result; -use sc_client_api::{BlockchainEvents, HeaderBackend}; -use sc_network::request_responses::IncomingRequest; -use sc_network::{NetworkRequest, NetworkSigner}; -use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sp_api::ProvideRuntimeApi; -use sp_runtime::traits::Block; -use std::sync::Arc; -use time_primitives::{ - BlockHash, MembersApi, NetworksApi, ShardsApi, SubmitTransactionApi, TasksApi, -}; - -mod network; -mod runtime; - -pub use network::protocol_config; - -pub struct ChronicleParams { - pub client: Arc, - pub runtime: Arc, - pub tx_pool: OffchainTransactionPoolFactory, - pub network: Option<(N, async_channel::Receiver)>, - pub config: chronicle::ChronicleConfig, -} - -pub async fn run_node_with_chronicle(params: ChronicleParams) -> Result<()> -where - B: Block, - C: BlockchainEvents + HeaderBackend + 'static, - R: ProvideRuntimeApi + Send + Sync + 'static, - R::Api: MembersApi + NetworksApi + ShardsApi + TasksApi + SubmitTransactionApi, - N: NetworkRequest + NetworkSigner + Send + Sync + 'static, -{ - let (network, net_request) = if let Some((network, incoming)) = params.network { - network::create_substrate_network(network, incoming).await? - } else { - chronicle::create_iroh_network(params.config.network_config()).await? - }; - - let tx_client = tc_subxt::SubxtClient::get_client(¶ms.config.timechain_url).await?; - let tx_submitter = runtime::SubstrateTxSubmitter::new( - params.tx_pool.clone(), - params.client.clone(), - params.runtime.clone(), - tx_client, - ); - let subxt_client = tc_subxt::SubxtClient::with_keyfile( - ¶ms.config.timechain_url, - ¶ms.config.timechain_keyfile, - tx_submitter, - ) - .await?; - let substrate = runtime::Substrate::new(params.client, params.runtime, subxt_client); - - chronicle::run_chronicle(params.config, network, net_request, substrate).await -} diff --git a/node/src/chronicle/network.rs b/node/src/chronicle/network.rs deleted file mode 100644 index 00fd8211a..000000000 --- a/node/src/chronicle/network.rs +++ /dev/null @@ -1,132 +0,0 @@ -use anyhow::Result; -use chronicle::{Message, Network, PeerId, PROTOCOL_NAME}; -use futures::channel::oneshot; -use futures::stream::BoxStream; -use futures::{Future, Stream, StreamExt}; -use sc_network::config::{IncomingRequest, RequestResponseConfig}; -use sc_network::multiaddr::multihash::MultihashGeneric as Multihash; -use sc_network::request_responses::OutgoingResponse; -use sc_network::{IfDisconnected, NetworkRequest, NetworkSigner, PublicKey}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; - -pub fn protocol_config(tx: async_channel::Sender) -> RequestResponseConfig { - RequestResponseConfig { - name: PROTOCOL_NAME.into(), - fallback_names: vec![], - max_request_size: 1024 * 1024, - max_response_size: 0, - request_timeout: Duration::from_secs(3), - inbound_queue: Some(tx), - } -} - -pub struct SubstrateNetwork { - network: N, - peer_id: PeerId, -} - -impl SubstrateNetwork -where - N: NetworkRequest + NetworkSigner, -{ - pub fn new(network: N) -> Result { - let public_key = network.sign_with_local_identity([])?.public_key; - let peer_id = public_key.clone().try_into_ed25519()?.to_bytes(); - Ok(Self { network, peer_id }) - } -} - -impl Network for SubstrateNetwork { - fn peer_id(&self) -> PeerId { - self.peer_id - } - - fn send( - &self, - peer_id: PeerId, - msg: Message, - ) -> Pin> + Send>> { - let bytes = bincode::serialize(&msg).unwrap(); - let (tx, rx) = oneshot::channel(); - let peer_id = sc_network::PeerId::from_public_key( - &sc_network::config::ed25519::PublicKey::try_from_bytes(&peer_id).unwrap().into(), - ); - self.network.start_request( - peer_id, - PROTOCOL_NAME.into(), - bytes, - None, - tx, - IfDisconnected::TryConnect, - ); - Box::pin(async move { - let response = rx.await??; - Ok(bincode::deserialize(&response.0)?) - }) - } -} - -fn parse_peer_id(peer: sc_network::PeerId) -> Option { - let mh = Multihash::from(peer); - if mh.code() != 0 { - return None; - } - let p = PublicKey::try_decode_protobuf(mh.digest()).ok()?; - let p = p.try_into_ed25519().ok()?; - Some(p.to_bytes()) -} - -pub struct SubstrateNetworkAdapter(async_channel::Receiver); - -impl SubstrateNetworkAdapter { - pub fn new(rx: async_channel::Receiver) -> Self { - Self(rx) - } -} - -impl Stream for SubstrateNetworkAdapter { - type Item = (PeerId, Message); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - match Pin::new(&mut self.0).poll_next(cx) { - Poll::Ready(Some(IncomingRequest { - peer, - payload, - pending_response, - })) => { - // Don't try to do anything other than reply immediately as - // substrate will close the substream. - let _ = pending_response.send(OutgoingResponse { - result: Ok(vec![]), - reputation_changes: vec![], - sent_feedback: None, - }); - let Some(peer) = parse_peer_id(peer) else { - tracing::info!("invalid peer id"); - continue; - }; - if let Ok(msg) = bincode::deserialize(&payload) { - return Poll::Ready(Some((peer, msg))); - } else { - tracing::info!("invalid message"); - } - }, - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending, - } - } - } -} - -pub async fn create_substrate_network( - network: N, - incoming: async_channel::Receiver, -) -> Result<(Arc, BoxStream<'static, (PeerId, Message)>)> { - let network = Arc::new(SubstrateNetwork::new(network)?) as Arc; - let incoming = SubstrateNetworkAdapter::new(incoming).boxed(); - Ok((network, incoming)) -} diff --git a/node/src/chronicle/runtime.rs b/node/src/chronicle/runtime.rs deleted file mode 100644 index 69072efac..000000000 --- a/node/src/chronicle/runtime.rs +++ /dev/null @@ -1,283 +0,0 @@ -use anyhow::Result; -use async_trait::async_trait; -use futures::stream::{self, Stream, StreamExt}; -use sc_client_api::{BlockchainEvents, HeaderBackend}; -use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sp_api::{ApiExt, ApiRef, ProvideRuntimeApi}; -use sp_core::H256; -use sp_runtime::traits::{Block, Header}; -use std::marker::PhantomData; -use std::pin::Pin; -use std::sync::Arc; -use tc_subxt::{ - OnlineClient, PolkadotConfig, StreamOfResults, SubxtClient, TxProgress, TxSubmitter, -}; -use time_primitives::{ - AccountId, BlockHash, BlockNumber, Commitment, MemberStatus, MembersApi, NetworkId, - NetworksApi, PeerId, ProofOfKnowledge, PublicKey, Runtime, ShardId, ShardStatus, ShardsApi, - SubmitTransactionApi, TaskDescriptor, TaskError, TaskExecution, TaskId, TaskResult, TasksApi, - TssSignature, -}; - -pub struct Substrate { - _block: PhantomData, - client: Arc, - runtime: Arc, - subxt_client: SubxtClient, -} - -impl Substrate -where - B: Block, - C: HeaderBackend + BlockchainEvents + 'static, - R: ProvideRuntimeApi + Send + Sync + 'static, - R::Api: NetworksApi + MembersApi + ShardsApi + TasksApi + SubmitTransactionApi, -{ - fn best_block(&self) -> B::Hash { - self.client.info().best_hash - } - - pub fn new(client: Arc, runtime: Arc, subxt_client: SubxtClient) -> Self { - Self { - _block: PhantomData, - client, - runtime, - subxt_client, - } - } - - fn runtime_api(&self) -> ApiRef<'_, R::Api> { - self.runtime.runtime_api() - } -} - -impl Clone for Substrate { - fn clone(&self) -> Self { - Self { - _block: self._block, - client: self.client.clone(), - runtime: self.runtime.clone(), - subxt_client: self.subxt_client.clone(), - } - } -} - -#[async_trait] -impl Runtime for Substrate -where - B: Block, - C: HeaderBackend + BlockchainEvents + 'static, - R: ProvideRuntimeApi + Send + Sync + 'static, - R::Api: NetworksApi + MembersApi + ShardsApi + TasksApi + SubmitTransactionApi, -{ - fn public_key(&self) -> &PublicKey { - self.subxt_client.public_key() - } - - fn account_id(&self) -> &AccountId { - self.subxt_client.account_id() - } - - fn finality_notification_stream( - &self, - ) -> Pin + Send + 'static>> { - let stream = self.client.finality_notification_stream(); - stream - .map(|notification| { - let block_hash = notification.header.hash(); - let block_number = notification.header.number().to_string().parse().unwrap(); - (block_hash, block_number) - }) - .boxed() - } - - async fn get_shards(&self, block: BlockHash, account: &AccountId) -> Result> { - Ok(self.runtime_api().get_shards(block, account)?) - } - - async fn get_shard_members( - &self, - block: BlockHash, - shard_id: ShardId, - ) -> Result> { - Ok(self.runtime_api().get_shard_members(block, shard_id)?) - } - - async fn get_shard_threshold(&self, block: BlockHash, shard_id: ShardId) -> Result { - Ok(self.runtime_api().get_shard_threshold(block, shard_id)?) - } - - async fn get_shard_status(&self, block: BlockHash, shard_id: ShardId) -> Result { - Ok(self.runtime_api().get_shard_status(block, shard_id)?) - } - - async fn get_shard_commitment( - &self, - block: BlockHash, - shard_id: ShardId, - ) -> Result { - Ok(self.runtime_api().get_shard_commitment(block, shard_id)?) - } - - async fn submit_commitment( - &self, - shard_id: ShardId, - commitment: Commitment, - - proof_of_knowledge: ProofOfKnowledge, - ) -> Result<()> { - self.subxt_client - .submit_commitment(shard_id, commitment, proof_of_knowledge) - .await - } - - async fn submit_online(&self, shard_id: ShardId) -> Result<()> { - self.subxt_client.submit_online(shard_id).await - } - - async fn get_shard_tasks( - &self, - block: BlockHash, - shard_id: ShardId, - ) -> Result> { - Ok(self.runtime_api().get_shard_tasks(block, shard_id)?) - } - - async fn get_task(&self, block: BlockHash, task_id: TaskId) -> Result> { - Ok(self.runtime_api().get_task(block, task_id)?) - } - - async fn get_task_signature(&self, task_id: TaskId) -> Result> { - Ok(self.runtime_api().get_task_signature(self.best_block(), task_id)?) - } - - async fn get_gateway(&self, network: NetworkId) -> Result>> { - Ok(self.runtime_api().get_gateway(self.best_block(), network)?) - } - - async fn submit_task_hash(&self, task_id: TaskId, hash: Vec) -> Result<()> { - self.subxt_client.submit_task_hash(task_id, hash).await - } - - async fn submit_task_result(&self, task_id: TaskId, result: TaskResult) -> Result<()> { - self.subxt_client.submit_task_result(task_id, result).await - } - - async fn submit_task_error(&self, task_id: TaskId, error: TaskError) -> Result<()> { - self.subxt_client.submit_task_error(task_id, error).await - } - - async fn submit_task_signature(&self, task_id: TaskId, signature: TssSignature) -> Result<()> { - self.subxt_client.submit_task_signature(task_id, signature, hash).await - } - - async fn get_member_peer_id( - &self, - block: BlockHash, - account: &AccountId, - ) -> Result> { - Ok(self.runtime_api().get_member_peer_id(block, account)?) - } - - async fn get_heartbeat_timeout(&self) -> Result { - Ok(self.runtime_api().get_heartbeat_timeout(self.best_block())?) - } - - async fn get_min_stake(&self) -> Result { - Ok(self.runtime_api().get_min_stake(self.best_block())?) - } - - async fn submit_register_member( - &self, - network: NetworkId, - peer_id: PeerId, - stake_amount: u128, - ) -> Result<()> { - self.subxt_client.submit_register_member(network, peer_id, stake_amount).await - } - - async fn submit_heartbeat(&self) -> Result<()> { - self.subxt_client.submit_heartbeat().await - } - - async fn get_network(&self, network_id: NetworkId) -> Result> { - Ok(self.runtime_api().get_network(self.best_block(), network_id)?) - } -} - -pub struct SubstrateTxSubmitter { - _marker: PhantomData, - client: Arc, - pool: OffchainTransactionPoolFactory, - runtime: Arc, - tx_client: OnlineClient, -} - -impl Clone for SubstrateTxSubmitter { - fn clone(&self) -> Self { - Self { - _marker: self._marker, - client: self.client.clone(), - pool: self.pool.clone(), - runtime: self.runtime.clone(), - tx_client: self.tx_client.clone(), - } - } -} - -impl SubstrateTxSubmitter -where - B: Block, - C: HeaderBackend + BlockchainEvents + 'static, - R: ProvideRuntimeApi + Send + Sync + 'static, - R::Api: SubmitTransactionApi, -{ - pub fn new( - pool: OffchainTransactionPoolFactory, - client: Arc, - runtime: Arc, - tx_client: OnlineClient, - ) -> Self { - Self { - _marker: PhantomData, - client, - pool, - runtime, - tx_client, - } - } - - fn best_block(&self) -> B::Hash { - self.client.info().best_hash - } - - fn runtime_api(&self) -> ApiRef<'_, R::Api> { - let mut runtime = self.runtime.runtime_api(); - runtime.register_extension(self.pool.offchain_transaction_pool(self.best_block())); - runtime - } -} - -#[async_trait] -impl TxSubmitter for SubstrateTxSubmitter -where - B: Block, - C: HeaderBackend + BlockchainEvents + 'static, - R: ProvideRuntimeApi + Send + Sync + 'static, - R::Api: SubmitTransactionApi, -{ - async fn submit(&self, tx: Vec) -> Result { - self.runtime_api() - .submit_transaction(self.best_block(), tx) - .map_err(|_| anyhow::anyhow!("Error submitting transaction to runtime"))? - .map_err(|_| anyhow::anyhow!("Error submitting transaction onchain"))?; - let dummy_hash = H256::repeat_byte(0x01); - let dummy_stream = stream::iter(vec![]); - let empty_progress = TxProgress::new( - StreamOfResults::new(Box::pin(dummy_stream)), - self.tx_client.clone(), - dummy_hash, - ); - Ok(empty_progress) - } -} diff --git a/node/src/cli.rs b/node/src/cli.rs index c7cb6cad9..5d1ff9a2b 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -24,41 +24,6 @@ pub struct Cli { #[allow(missing_docs)] #[clap(flatten)] pub storage_monitor: sc_storage_monitor::StorageMonitorParams, - - #[allow(missing_docs)] - #[clap(flatten)] - pub chronicle: Option, -} - -#[derive(Debug, clap::Parser)] -/// workaround for -#[group(requires_all = ["network_id", "target_url", "target_keyfile", "timechain_keyfile"], multiple = true)] -pub struct ChronicleArgs { - /// The network to be used from Analog Connector. - #[arg(required = false)] - #[clap(long)] - pub network_id: time_primitives::NetworkId, - /// The secret to use for p2p networking. - #[clap(long)] - pub network_keyfile: Option, - /// The port to bind to for p2p networking. - #[clap(long)] - pub bind_port: Option, - /// Enables iroh networking. - #[clap(long)] - pub enable_iroh: bool, - /// The address of Analog Connector. - #[arg(required = false)] - #[clap(long)] - pub target_url: String, - /// key file for connector wallet - #[arg(required = false)] - #[clap(long)] - pub target_keyfile: std::path::PathBuf, - /// keyfile having an account with funds for timechain. - #[arg(required = false)] - #[clap(long)] - pub timechain_keyfile: std::path::PathBuf, } /// Possible subcommands of the main binary. diff --git a/node/src/main.rs b/node/src/main.rs index 7e2227dd1..b41a4d675 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -2,7 +2,6 @@ mod chain_spec; -mod chronicle; #[macro_use] mod service; mod benchmarking; diff --git a/node/src/service.rs b/node/src/service.rs index 3bec6580c..edd4362e5 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -234,7 +234,6 @@ pub struct NewFullBase { pub fn new_full_base::Hash>>( config: Configuration, disable_hardware_benchmarks: bool, - chronicle_args: Option, with_startup_data: impl FnOnce( &sc_consensus_babe::BabeBlockImport, &sc_consensus_babe::BabeLink, @@ -290,12 +289,6 @@ pub fn new_full_base::Hash>>( ); net_config.add_notification_protocol(grandpa_protocol_config); - // registering time p2p protocol - - let (protocol_tx, protocol_rx) = async_channel::bounded(10); - - net_config.add_request_response_protocol(crate::chronicle::protocol_config(protocol_tx)); - let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new( backend.clone(), import_setup.1.shared_authority_set().clone(), @@ -479,33 +472,6 @@ pub fn new_full_base::Hash>>( ); } - { - if let Some(args) = chronicle_args { - let config = chronicle::ChronicleConfig { - network_id: args.network_id, - network_port: args.bind_port, - network_keyfile: args.network_keyfile, - timechain_url: "ws://127.0.0.1:9944".into(), - timechain_keyfile: args.timechain_keyfile, - target_url: args.target_url, - target_keyfile: args.target_keyfile, - }; - let network = if args.enable_iroh { None } else { Some((network, protocol_rx)) }; - let params = crate::chronicle::ChronicleParams { - client: client.clone(), - runtime: client.clone(), - tx_pool: OffchainTransactionPoolFactory::new(transaction_pool.clone()), - network, - config, - }; - task_manager - .spawn_essential_handle() - .spawn_blocking("chronicle", None, async move { - crate::chronicle::run_node_with_chronicle(params).await.unwrap() - }); - } - } - if enable_offchain_worker { task_manager.spawn_handle().spawn( "offchain-workers-runner", @@ -547,7 +513,6 @@ pub fn new_full(config: Configuration, cli: cli::Cli) -> Result>( config, cli.no_hardware_benchmarks, - cli.chronicle, |_, _| (), ) .map(|NewFullBase { task_manager, .. }| task_manager)? @@ -556,7 +521,6 @@ pub fn new_full(config: Configuration, cli: cli::Cli) -> Result( config, cli.no_hardware_benchmarks, - cli.chronicle, |_, _| (), ) .map(|NewFullBase { task_manager, .. }| task_manager)?