From d0157d27a183d11041c47760d254c6f8a5350112 Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Mon, 13 Jan 2025 00:05:39 +0800 Subject: [PATCH 1/9] feat: impl reqactor --- reqactor/Cargo.toml | 82 +++++++ reqactor/src/action.rs | 17 ++ reqactor/src/actor.rs | 483 ++++++++++++++++++++++++++++++++++++++++ reqactor/src/gateway.rs | 93 ++++++++ reqactor/src/lib.rs | 12 + 5 files changed, 687 insertions(+) create mode 100644 reqactor/Cargo.toml create mode 100644 reqactor/src/action.rs create mode 100644 reqactor/src/actor.rs create mode 100644 reqactor/src/gateway.rs create mode 100644 reqactor/src/lib.rs diff --git a/reqactor/Cargo.toml b/reqactor/Cargo.toml new file mode 100644 index 000000000..1e61e7e36 --- /dev/null +++ b/reqactor/Cargo.toml @@ -0,0 +1,82 @@ +[package] +name = "raiko-reqactor" +version = "0.1.0" +edition = "2021" + +[dependencies] + +# provers +sp1-driver = { path = "../provers/sp1/driver", optional = true } +risc0-driver = { path = "../provers/risc0/driver", optional = true } +sgx-prover = { path = "../provers/sgx/prover", optional = true } + +# raiko +raiko-lib = { workspace = true } +raiko-core = { workspace = true } +raiko-reqpool = { workspace = true } + +# alloy +alloy-rlp = { workspace = true } +alloy-rlp-derive = { workspace = true } +alloy-sol-types = { workspace = true } +alloy-primitives = { workspace = true } +alloy-rpc-types = { workspace = true } +alloy-provider = { workspace = true } +alloy-transport-http = { workspace = true } +alloy-consensus = { workspace = true } +alloy-network = { workspace = true } +alloy-rpc-client = { workspace = true } + +# crypto +kzg = { workspace = true } +kzg_traits = { workspace = true } + +# misc +anyhow = { workspace = true } +bincode = { workspace = true } +bytemuck = { workspace = true } +clap = { workspace = true } +flate2 = { workspace = true } +serde = { workspace = true } +serde_with = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +env_logger = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +tracing-appender = { workspace = true } +lru_time_cache = { workspace = true } +prometheus = { workspace = true } +lazy_static = { workspace = true } +once_cell = { workspace = true } +thiserror = { workspace = true } +reqwest = { workspace = true } +reqwest_alloy = { workspace = true } +sha2 = { workspace = true } +proptest = { workspace = true } +rlp = { workspace = true } +url = { workspace = true } +cfg-if = { workspace = true } +cap = { workspace = true } +dotenv = { workspace = true } +chrono = { workspace = true, features = ["serde"] } + +# reth +reth-primitives = { workspace = true } +reth-evm = { workspace = true } +reth-evm-ethereum = { workspace = true } +reth-provider = { workspace = true } + +[dev-dependencies] +assert_cmd = { workspace = true } +rstest = { workspace = true } +ethers-core = { workspace = true } +rand = { workspace = true } + +[features] +default = [] +sp1 = ["raiko-core/sp1"] +risc0 = ["raiko-core/risc0"] +sgx = ["raiko-core/sgx"] +integration = [] diff --git a/reqactor/src/action.rs b/reqactor/src/action.rs new file mode 100644 index 000000000..760ee8d61 --- /dev/null +++ b/reqactor/src/action.rs @@ -0,0 +1,17 @@ +use crate::{RequestEntity, RequestKey}; +use raiko_reqpool::impl_display_using_json_pretty; +use serde::{Deserialize, Serialize}; + +/// The action message sent from **external** to the actor. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Action { + Prove { + request_key: RequestKey, + request_entity: RequestEntity, + }, + Cancel { + request_key: RequestKey, + }, +} + +impl_display_using_json_pretty!(Action); diff --git a/reqactor/src/actor.rs b/reqactor/src/actor.rs new file mode 100644 index 000000000..c925e6701 --- /dev/null +++ b/reqactor/src/actor.rs @@ -0,0 +1,483 @@ +use std::time::Duration; + +use raiko_core::{ + interfaces::{aggregate_proofs, ProofRequest}, + provider::rpc::RpcBlockDataProvider, + Raiko, +}; +use raiko_lib::{ + consts::SupportedChainSpecs, + input::{AggregationGuestInput, AggregationGuestOutput}, + prover::{IdStore, IdWrite, Proof}, +}; +use raiko_reqpool::{ + AggregationRequestEntity, RequestEntity, RequestKey, SingleProofRequestEntity, Status, + StatusWithContext, +}; +use reth_primitives::B256; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, +}; + +use crate::{Action, Pool}; + +#[derive(Clone)] +pub struct Actor { + pool: P, + chain_specs: SupportedChainSpecs, + internal_tx: Sender, +} + +// TODO: load pool and notify internal channel +impl Actor

{ + /// Start the actor, return the actor and the sender. + /// + /// The returned channel sender is used to send actions to the actor, and the actor will + /// act on the actions and send responses back. + pub async fn start( + pool: P, + chain_specs: SupportedChainSpecs, + ) -> ( + Sender<(Action, oneshot::Sender>)>, + Sender<()>, + ) { + let channel_size = 1024; + let (external_tx, external_rx) = mpsc::channel::<( + Action, + oneshot::Sender>, + )>(channel_size); + let (internal_tx, internal_rx) = mpsc::channel::(channel_size); + let (pause_tx, pause_rx) = mpsc::channel::<()>(1); + + tokio::spawn(async move { + Actor { + pool, + chain_specs, + internal_tx, + } + .serve(external_rx, internal_rx, pause_rx) + .await; + }); + + (external_tx, pause_tx) + } + + async fn serve( + mut self, + mut external_rx: Receiver<(Action, oneshot::Sender>)>, + mut internal_rx: Receiver, + mut pause_rx: Receiver<()>, + ) { + loop { + tokio::select! { + Some((action, resp_tx)) = external_rx.recv() => { + let response = self.handle_external_action(action.clone()).await; + if let Err(err) = resp_tx.send(response.clone()) { + tracing::error!( + "Actor failed to send response {response:?} to action {action}: {err:?}" + ); + } + } + Some(request_key) = internal_rx.recv() => { + if let Err(err) = self.handle_internal_signal(request_key.clone()).await { + tracing::error!( + "Actor failed to handle internal signal {request_key}: {err:?}" + ); + } + } + Some(()) = pause_rx.recv() => { + tracing::info!("Actor received pause-signal, halting"); + if let Err(err) = self.halt().await { + tracing::error!("Actor failed to halt: {err:?}"); + } + } + else => { + // All channels are closed, exit the loop + tracing::info!("Actor exited"); + break; + } + } + } + } + + async fn handle_external_action( + &mut self, + action: Action, + ) -> Result { + match action { + Action::Prove { + request_key, + request_entity, + } => match self.pool.get_status(&request_key) { + Ok(None) => { + tracing::info!("Actor received prove-action {request_key}, and it is not in pool, registering"); + self.register(request_key, request_entity).await + } + Ok(Some(status)) => match status.status() { + Status::Registered | Status::WorkInProgress | Status::Success { .. } => { + tracing::info!("Actor received prove-action {request_key}, but it is already {status}, skipping"); + Ok(status) + } + Status::Cancelled { .. } => { + tracing::warn!("Actor received prove-action {request_key}, and it is cancelled, re-registering"); + self.register(request_key, request_entity).await + } + Status::Failed { .. } => { + tracing::warn!("Actor received prove-action {request_key}, and it is failed, re-registering"); + self.register(request_key, request_entity).await + } + }, + Err(err) => { + tracing::error!( + "Actor failed to get status of prove-action {request_key}: {err:?}" + ); + Err(err) + } + }, + Action::Cancel { request_key } => match self.pool.get_status(&request_key) { + Ok(None) => { + tracing::warn!("Actor received cancel-action {request_key}, but it is not in pool, skipping"); + Err(format!("request {request_key} is not in pool")) + } + Ok(Some(status)) => match status.status() { + Status::Registered | Status::WorkInProgress => { + tracing::info!("Actor received cancel-action {request_key}, and it is {status}, cancelling"); + self.cancel(request_key).await + } + + Status::Failed { .. } | Status::Cancelled { .. } | Status::Success { .. } => { + tracing::info!("Actor received cancel-action {request_key}, but it is already {status}, skipping"); + Ok(status) + } + }, + Err(err) => { + tracing::error!( + "Actor failed to get status of cancel-action {request_key}: {err:?}" + ); + Err(err) + } + }, + } + } + + // TODO: semaphore + async fn handle_internal_signal(&mut self, request_key: RequestKey) -> Result<(), String> { + match self.pool.get(&request_key) { + Ok(Some((request_entity, status))) => match status.status() { + Status::Registered => match request_entity { + RequestEntity::SingleProof(entity) => { + self.prove_single(request_key, entity).await + } + RequestEntity::Aggregation(entity) => { + self.prove_aggregation(request_key, entity).await + } + }, + Status::WorkInProgress => { + // Wait for proving completion + tracing::info!( + "Actor wait for proving completion: {request_key}, elapsed: {elapsed:?}", + elapsed = chrono::Utc::now() - status.timestamp(), + ); + + self.internal_signal_timeout(&request_key, Duration::from_secs(3)) + .await; + Ok(()) + } + Status::Success { .. } | Status::Cancelled { .. } | Status::Failed { .. } => Ok(()), + }, + Ok(None) => { + tracing::warn!( + "Actor received internal signal {request_key}, but it is not in pool, skipping" + ); + Ok(()) + } + Err(err) => { + tracing::error!( + "Actor failed to get status of internal signal {request_key}: {err:?}, retrying" + ); + + self.internal_signal_timeout(&request_key, Duration::from_secs(3)) + .await; + Err(err) + } + } + } + + // Resignal the request key to the internal channel after 3 seconds + async fn internal_signal_timeout(&mut self, request_key: &RequestKey, duration: Duration) { + // Re-signal the request key to the internal channel after 3 seconds + let mut timer = tokio::time::interval(duration); + let internal_tx = self.internal_tx.clone(); + let request_key = request_key.clone(); + tokio::spawn(async move { + timer.tick().await; + if let Err(err) = internal_tx.send(request_key.clone()).await { + tracing::error!( + "Actor failed to send internal signal {request_key}: {err:?}, actor will exit" + ); + } + }); + } + + // Register a new request to the pool and notify the actor. + async fn register( + &mut self, + request_key: RequestKey, + request_entity: RequestEntity, + ) -> Result { + let status = StatusWithContext::new_registered(); + if let Err(err) = self + .pool + .add(request_key.clone(), request_entity, status.clone()) + { + return Err(err); + } + + if let Err(err) = self.internal_tx.send(request_key.clone()).await { + tracing::error!( + "Actor failed to send internal signal {request_key}: {err:?}, actor will exit" + ); + return Err(format!( + "failed to send internal signal {request_key}: {err:?}, actor will exit" + )); + } + + Ok(status) + } + + async fn cancel(&mut self, request_key: RequestKey) -> Result { + let Some(status) = self.pool.get_status(&request_key)? else { + // the request is not in the pool, do nothing + tracing::warn!( + "Actor received cancel-action {request_key}, but it is not in pool, skipping" + ); + return Err(format!("request {request_key} is not in pool")); + }; + + if status.status() != &Status::WorkInProgress { + // the request is not in proving, do nothing + tracing::warn!( + "Actor received cancel-action {request_key}, but it is not in work-in-progress, skipping" + ); + return Err(format!("request {request_key} is not in work-in-progress")); + } + + tracing::info!("Actor received cancel-action {request_key}, status: {status}, cancelling"); + + match &request_key { + RequestKey::SingleProof(key) => { + // Cancel Single Proof + // + // 1. Cancel the proving work by the cancel token // TODO: cancel token + // 2. Remove the proof id from the pool + raiko_core::interfaces::cancel_proof( + key.proof_type().clone(), + ( + key.chain_id().clone(), + key.block_number().clone(), + key.block_hash().clone(), + *key.proof_type() as u8, + ), + Box::new(&mut self.pool), + ) + .await + .or_else(|e| { + if e.to_string().contains("No data for query") { + tracing::warn!("Actor received cancel-action {request_key}, but it is already cancelled or not yet started, skipping"); + Ok(()) + } else { + tracing::error!( + "Actor received cancel-action {request_key}, but failed to cancel proof: {e:?}" + ); + Err(format!("failed to cancel proof: {e:?}")) + } + })?; + + // 3. Mark the request as cancelled in the pool + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + + Ok(status) + } + RequestKey::Aggregation(..) => { + // Cancel Aggregation Proof + // + // 1. Cancel the proving work by the cancel token // TODO + // 2. Remove the proof id from the pool // TODO + + // 3. Mark the request as cancelled in the pool + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + + Ok(status) + } + } + } + + async fn prove_single( + &mut self, + request_key: RequestKey, + request_entity: SingleProofRequestEntity, + ) -> Result<(), String> { + // 1. Update the request status in pool to WorkInProgress + self.pool + .update_status(request_key.clone(), Status::WorkInProgress.into())?; + + let mut actor = self.clone(); + tokio::spawn(async move { + // 2. Start the proving work + let proven_status = do_prove_single( + &mut actor.pool, + &actor.chain_specs, + request_key.clone(), + request_entity, + ) + .await + .map(|proof| Status::Success { proof }) + .unwrap_or_else(|error| Status::Failed { error }); + + // 3. Update the request status in pool to the resulted status + if let Err(err) = actor + .pool + .update_status(request_key.clone(), proven_status.clone().into()) + { + tracing::error!( + "Actor failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" + ); + return; + } + + // 4. Resignal the request key to the internal channel, to let the actor know the proving is done // TODO + let _ = actor.internal_tx.send(request_key.clone()).await; + }); + + Ok(()) + } + + async fn prove_aggregation( + &mut self, + request_key: RequestKey, + request_entity: AggregationRequestEntity, + ) -> Result<(), String> { + // 1. Update the request status in pool to WorkInProgress + self.pool + .update_status(request_key.clone(), Status::WorkInProgress.into())?; + + let mut actor = self.clone(); + tokio::spawn(async move { + // 2. Start the proving work + let proven_status = + do_prove_aggregation(&mut actor.pool, request_key.clone(), request_entity) + .await + .map(|proof| Status::Success { proof }) + .unwrap_or_else(|error| Status::Failed { error }); + + // 3. Update the request status in pool to the resulted status + if let Err(err) = actor + .pool + .update_status(request_key.clone(), proven_status.clone().into()) + { + tracing::error!( + "Actor failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" + ); + return; + } + + // 4. Resignal the request key to the internal channel, to let the actor know the proving is done // TODO + let _ = actor.internal_tx.send(request_key.clone()).await; + }); + + Ok(()) + } + + async fn halt(&mut self) -> Result<(), String> { + todo!("halt") + } +} + +// TODO: cache input, reference to raiko_host::cache +// TODO: memory tracking +// TODO: metrics +// TODO: measurement +pub async fn do_prove_single( + pool: &mut dyn IdWrite, + chain_specs: &SupportedChainSpecs, + request_key: RequestKey, + request_entity: SingleProofRequestEntity, +) -> Result { + tracing::info!("Generating proof for {request_key}"); + + let l1_chain_spec = chain_specs + .get_chain_spec(&request_entity.l1_network()) + .ok_or_else(|| { + format!( + "unsupported l1 network: {}, it should not happen, please issue a bug report", + request_entity.l1_network() + ) + })?; + let taiko_chain_spec = chain_specs + .get_chain_spec(&request_entity.network()) + .ok_or_else(|| { + format!( + "unsupported raiko network: {}, it should not happen, please issue a bug report", + request_entity.network() + ) + })?; + let proof_request = ProofRequest { + block_number: *request_entity.block_number(), + l1_inclusion_block_number: *request_entity.l1_inclusion_block_number(), + network: request_entity.network().clone(), + l1_network: request_entity.l1_network().clone(), + graffiti: request_entity.graffiti().clone(), + prover: request_entity.prover().clone(), + proof_type: request_entity.proof_type().clone(), + blob_proof_type: request_entity.blob_proof_type().clone(), + prover_args: request_entity.prover_args().clone(), + }; + let raiko = Raiko::new(l1_chain_spec, taiko_chain_spec.clone(), proof_request); + let provider = RpcBlockDataProvider::new( + &taiko_chain_spec.rpc.clone(), + request_entity.block_number() - 1, + ) + .map_err(|err| format!("failed to create rpc block data provider: {err:?}"))?; + + // 1. Generate the proof input + let input = raiko + .generate_input(provider) + .await + .map_err(|e| format!("failed to generate input: {e:?}"))?; + + // 2. Generate the proof output + let output = raiko + .get_output(&input) + .map_err(|e| format!("failed to get output: {e:?}"))?; + + // 3. Generate the proof + let proof = raiko + .prove(input, &output, Some(pool)) + .await + .map_err(|err| format!("failed to generate single proof: {err:?}"))?; + + Ok(proof) +} + +async fn do_prove_aggregation( + pool: &mut dyn IdWrite, + request_key: RequestKey, + request_entity: AggregationRequestEntity, +) -> Result { + let proof_type = request_key.proof_type().clone(); + let proofs = request_entity.proofs().clone(); + + let input = AggregationGuestInput { proofs }; + let output = AggregationGuestOutput { hash: B256::ZERO }; + let config = serde_json::to_value(request_entity.prover_args()) + .map_err(|err| format!("failed to serialize prover args: {err:?}"))?; + + let proof = aggregate_proofs(proof_type, input, &output, &config, Some(pool)) + .await + .map_err(|err| format!("failed to generate aggregation proof: {err:?}"))?; + + Ok(proof) +} diff --git a/reqactor/src/gateway.rs b/reqactor/src/gateway.rs new file mode 100644 index 000000000..c1d932ed2 --- /dev/null +++ b/reqactor/src/gateway.rs @@ -0,0 +1,93 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; + +use raiko_core::interfaces::ProofRequestOpt; +use raiko_lib::consts::SupportedChainSpecs; +use raiko_reqpool::{Pool, RequestKey, StatusWithContext}; +use tokio::sync::{mpsc::Sender, oneshot}; + +use crate::Action; + +#[derive(Debug, Clone)] +pub struct Gateway { + default_request_config: ProofRequestOpt, + chain_specs: SupportedChainSpecs, + controller: Sender<(Action, oneshot::Sender>)>, + pause_signal: Sender<()>, + pool: Arc>, + is_paused: Arc, +} + +/// Gateway for the Actor. +impl Gateway

{ + pub fn new( + pool: P, + default_request_config: ProofRequestOpt, + chain_specs: SupportedChainSpecs, + controller: Sender<(Action, oneshot::Sender>)>, + pause_signal: Sender<()>, + ) -> Self { + Self { + default_request_config, + chain_specs, + controller, + pause_signal, + pool: Arc::new(Mutex::new(pool)), + is_paused: Arc::new(AtomicBool::new(false)), + } + } + + /// Return the default request config. + pub fn default_request_config(&self) -> &ProofRequestOpt { + &self.default_request_config + } + + /// Return the chain specs. + pub fn chain_specs(&self) -> &SupportedChainSpecs { + &self.chain_specs + } + + /// Check if the system is paused. + pub fn is_paused(&self) -> bool { + self.is_paused.load(Ordering::SeqCst) + } + + /// Get the status of the request from the pool. + pub fn pool_get_status( + &self, + request_key: &RequestKey, + ) -> Result, String> { + self.pool.lock().unwrap().get_status(request_key) + } + + /// Send an action to the controller and wait for the response. + pub async fn send(&self, action: Action) -> Result { + let (resp_tx, resp_rx) = oneshot::channel(); + + // Send the action to the controller + self.controller + .send((action, resp_tx)) + .await + .map_err(|e| format!("failed to send action: {e}"))?; + + // Wait for response of the action + resp_rx + .await + .map_err(|e| format!("failed to receive action response: {e}"))? + } + + /// Set the pause flag and notify the task manager to pause, then wait for the task manager to + /// finish the pause process. + /// + /// Note that this function is blocking until the task manager finishes the pause process. + pub async fn pause(&self) -> Result<(), String> { + self.is_paused.store(true, Ordering::SeqCst); + self.pause_signal + .send(()) + .await + .map_err(|e| format!("failed to send pause signal: {e}"))?; + Ok(()) + } +} diff --git a/reqactor/src/lib.rs b/reqactor/src/lib.rs new file mode 100644 index 000000000..48cbb3850 --- /dev/null +++ b/reqactor/src/lib.rs @@ -0,0 +1,12 @@ +mod action; +mod actor; +mod gateway; + +pub use raiko_reqpool::{ + AggregationRequestEntity, AggregationRequestKey, Pool, RequestEntity, RequestKey, + SingleProofRequestEntity, SingleProofRequestKey, +}; + +pub use action::Action; +pub use actor::Actor; +pub use gateway::Gateway; From 4f51bef2cc1a8d70afdb44efbd77271ac90f22f0 Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Tue, 14 Jan 2025 20:02:11 +0800 Subject: [PATCH 2/9] chore(reqactor): remove Pool trait --- reqactor/src/actor.rs | 10 +++++----- reqactor/src/gateway.rs | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/reqactor/src/actor.rs b/reqactor/src/actor.rs index c925e6701..07191f651 100644 --- a/reqactor/src/actor.rs +++ b/reqactor/src/actor.rs @@ -8,7 +8,7 @@ use raiko_core::{ use raiko_lib::{ consts::SupportedChainSpecs, input::{AggregationGuestInput, AggregationGuestOutput}, - prover::{IdStore, IdWrite, Proof}, + prover::{IdWrite, Proof}, }; use raiko_reqpool::{ AggregationRequestEntity, RequestEntity, RequestKey, SingleProofRequestEntity, Status, @@ -23,20 +23,20 @@ use tokio::sync::{ use crate::{Action, Pool}; #[derive(Clone)] -pub struct Actor { - pool: P, +pub struct Actor { + pool: Pool, chain_specs: SupportedChainSpecs, internal_tx: Sender, } // TODO: load pool and notify internal channel -impl Actor

{ +impl Actor { /// Start the actor, return the actor and the sender. /// /// The returned channel sender is used to send actions to the actor, and the actor will /// act on the actions and send responses back. pub async fn start( - pool: P, + pool: Pool, chain_specs: SupportedChainSpecs, ) -> ( Sender<(Action, oneshot::Sender>)>, diff --git a/reqactor/src/gateway.rs b/reqactor/src/gateway.rs index c1d932ed2..48736d51d 100644 --- a/reqactor/src/gateway.rs +++ b/reqactor/src/gateway.rs @@ -11,19 +11,19 @@ use tokio::sync::{mpsc::Sender, oneshot}; use crate::Action; #[derive(Debug, Clone)] -pub struct Gateway { +pub struct Gateway { default_request_config: ProofRequestOpt, chain_specs: SupportedChainSpecs, controller: Sender<(Action, oneshot::Sender>)>, pause_signal: Sender<()>, - pool: Arc>, + pool: Arc>, is_paused: Arc, } /// Gateway for the Actor. -impl Gateway

{ +impl Gateway { pub fn new( - pool: P, + pool: Pool, default_request_config: ProofRequestOpt, chain_specs: SupportedChainSpecs, controller: Sender<(Action, oneshot::Sender>)>, From 99d883104ab39038864e3131288349459313e62d Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Wed, 15 Jan 2025 13:26:20 +0800 Subject: [PATCH 3/9] feat(reqactor): impl max_proving_concurrency --- reqactor/src/action.rs | 9 + reqactor/src/actor.rs | 520 +++++----------------------------------- reqactor/src/backend.rs | 501 ++++++++++++++++++++++++++++++++++++++ reqactor/src/gateway.rs | 93 ------- reqactor/src/lib.rs | 45 +++- 5 files changed, 616 insertions(+), 552 deletions(-) create mode 100644 reqactor/src/backend.rs delete mode 100644 reqactor/src/gateway.rs diff --git a/reqactor/src/action.rs b/reqactor/src/action.rs index 760ee8d61..e9184e766 100644 --- a/reqactor/src/action.rs +++ b/reqactor/src/action.rs @@ -14,4 +14,13 @@ pub enum Action { }, } +impl Action { + pub fn request_key(&self) -> &RequestKey { + match self { + Action::Prove { request_key, .. } => request_key, + Action::Cancel { request_key, .. } => request_key, + } + } +} + impl_display_using_json_pretty!(Action); diff --git a/reqactor/src/actor.rs b/reqactor/src/actor.rs index 07191f651..f276a2532 100644 --- a/reqactor/src/actor.rs +++ b/reqactor/src/actor.rs @@ -1,483 +1,95 @@ -use std::time::Duration; - -use raiko_core::{ - interfaces::{aggregate_proofs, ProofRequest}, - provider::rpc::RpcBlockDataProvider, - Raiko, -}; -use raiko_lib::{ - consts::SupportedChainSpecs, - input::{AggregationGuestInput, AggregationGuestOutput}, - prover::{IdWrite, Proof}, -}; -use raiko_reqpool::{ - AggregationRequestEntity, RequestEntity, RequestKey, SingleProofRequestEntity, Status, - StatusWithContext, -}; -use reth_primitives::B256; -use tokio::sync::{ - mpsc::{self, Receiver, Sender}, - oneshot, +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, }; -use crate::{Action, Pool}; +use raiko_core::interfaces::ProofRequestOpt; +use raiko_lib::consts::SupportedChainSpecs; +use raiko_reqpool::{Pool, RequestKey, StatusWithContext}; +use tokio::sync::{mpsc::Sender, oneshot}; -#[derive(Clone)] +use crate::Action; + +/// Actor is the main interface interacting with the backend and the pool. +#[derive(Debug, Clone)] pub struct Actor { - pool: Pool, + default_request_config: ProofRequestOpt, chain_specs: SupportedChainSpecs, - internal_tx: Sender, + action_tx: Sender<(Action, oneshot::Sender>)>, + pause_tx: Sender<()>, + is_paused: Arc, + + // TODO: Remove Mutex. currently, in order to pass `&mut Pool`, we need to use Arc>. + pool: Arc>, } -// TODO: load pool and notify internal channel impl Actor { - /// Start the actor, return the actor and the sender. - /// - /// The returned channel sender is used to send actions to the actor, and the actor will - /// act on the actions and send responses back. - pub async fn start( + pub fn new( pool: Pool, + default_request_config: ProofRequestOpt, chain_specs: SupportedChainSpecs, - ) -> ( - Sender<(Action, oneshot::Sender>)>, - Sender<()>, - ) { - let channel_size = 1024; - let (external_tx, external_rx) = mpsc::channel::<( - Action, - oneshot::Sender>, - )>(channel_size); - let (internal_tx, internal_rx) = mpsc::channel::(channel_size); - let (pause_tx, pause_rx) = mpsc::channel::<()>(1); - - tokio::spawn(async move { - Actor { - pool, - chain_specs, - internal_tx, - } - .serve(external_rx, internal_rx, pause_rx) - .await; - }); - - (external_tx, pause_tx) - } - - async fn serve( - mut self, - mut external_rx: Receiver<(Action, oneshot::Sender>)>, - mut internal_rx: Receiver, - mut pause_rx: Receiver<()>, - ) { - loop { - tokio::select! { - Some((action, resp_tx)) = external_rx.recv() => { - let response = self.handle_external_action(action.clone()).await; - if let Err(err) = resp_tx.send(response.clone()) { - tracing::error!( - "Actor failed to send response {response:?} to action {action}: {err:?}" - ); - } - } - Some(request_key) = internal_rx.recv() => { - if let Err(err) = self.handle_internal_signal(request_key.clone()).await { - tracing::error!( - "Actor failed to handle internal signal {request_key}: {err:?}" - ); - } - } - Some(()) = pause_rx.recv() => { - tracing::info!("Actor received pause-signal, halting"); - if let Err(err) = self.halt().await { - tracing::error!("Actor failed to halt: {err:?}"); - } - } - else => { - // All channels are closed, exit the loop - tracing::info!("Actor exited"); - break; - } - } + action_tx: Sender<(Action, oneshot::Sender>)>, + pause_tx: Sender<()>, + ) -> Self { + Self { + default_request_config, + chain_specs, + action_tx, + pause_tx, + is_paused: Arc::new(AtomicBool::new(false)), + pool: Arc::new(Mutex::new(pool)), } } - async fn handle_external_action( - &mut self, - action: Action, - ) -> Result { - match action { - Action::Prove { - request_key, - request_entity, - } => match self.pool.get_status(&request_key) { - Ok(None) => { - tracing::info!("Actor received prove-action {request_key}, and it is not in pool, registering"); - self.register(request_key, request_entity).await - } - Ok(Some(status)) => match status.status() { - Status::Registered | Status::WorkInProgress | Status::Success { .. } => { - tracing::info!("Actor received prove-action {request_key}, but it is already {status}, skipping"); - Ok(status) - } - Status::Cancelled { .. } => { - tracing::warn!("Actor received prove-action {request_key}, and it is cancelled, re-registering"); - self.register(request_key, request_entity).await - } - Status::Failed { .. } => { - tracing::warn!("Actor received prove-action {request_key}, and it is failed, re-registering"); - self.register(request_key, request_entity).await - } - }, - Err(err) => { - tracing::error!( - "Actor failed to get status of prove-action {request_key}: {err:?}" - ); - Err(err) - } - }, - Action::Cancel { request_key } => match self.pool.get_status(&request_key) { - Ok(None) => { - tracing::warn!("Actor received cancel-action {request_key}, but it is not in pool, skipping"); - Err(format!("request {request_key} is not in pool")) - } - Ok(Some(status)) => match status.status() { - Status::Registered | Status::WorkInProgress => { - tracing::info!("Actor received cancel-action {request_key}, and it is {status}, cancelling"); - self.cancel(request_key).await - } - - Status::Failed { .. } | Status::Cancelled { .. } | Status::Success { .. } => { - tracing::info!("Actor received cancel-action {request_key}, but it is already {status}, skipping"); - Ok(status) - } - }, - Err(err) => { - tracing::error!( - "Actor failed to get status of cancel-action {request_key}: {err:?}" - ); - Err(err) - } - }, - } + /// Return the default request config. + pub fn default_request_config(&self) -> &ProofRequestOpt { + &self.default_request_config } - // TODO: semaphore - async fn handle_internal_signal(&mut self, request_key: RequestKey) -> Result<(), String> { - match self.pool.get(&request_key) { - Ok(Some((request_entity, status))) => match status.status() { - Status::Registered => match request_entity { - RequestEntity::SingleProof(entity) => { - self.prove_single(request_key, entity).await - } - RequestEntity::Aggregation(entity) => { - self.prove_aggregation(request_key, entity).await - } - }, - Status::WorkInProgress => { - // Wait for proving completion - tracing::info!( - "Actor wait for proving completion: {request_key}, elapsed: {elapsed:?}", - elapsed = chrono::Utc::now() - status.timestamp(), - ); - - self.internal_signal_timeout(&request_key, Duration::from_secs(3)) - .await; - Ok(()) - } - Status::Success { .. } | Status::Cancelled { .. } | Status::Failed { .. } => Ok(()), - }, - Ok(None) => { - tracing::warn!( - "Actor received internal signal {request_key}, but it is not in pool, skipping" - ); - Ok(()) - } - Err(err) => { - tracing::error!( - "Actor failed to get status of internal signal {request_key}: {err:?}, retrying" - ); - - self.internal_signal_timeout(&request_key, Duration::from_secs(3)) - .await; - Err(err) - } - } + /// Return the chain specs. + pub fn chain_specs(&self) -> &SupportedChainSpecs { + &self.chain_specs } - // Resignal the request key to the internal channel after 3 seconds - async fn internal_signal_timeout(&mut self, request_key: &RequestKey, duration: Duration) { - // Re-signal the request key to the internal channel after 3 seconds - let mut timer = tokio::time::interval(duration); - let internal_tx = self.internal_tx.clone(); - let request_key = request_key.clone(); - tokio::spawn(async move { - timer.tick().await; - if let Err(err) = internal_tx.send(request_key.clone()).await { - tracing::error!( - "Actor failed to send internal signal {request_key}: {err:?}, actor will exit" - ); - } - }); + /// Check if the system is paused. + pub fn is_paused(&self) -> bool { + self.is_paused.load(Ordering::SeqCst) } - // Register a new request to the pool and notify the actor. - async fn register( - &mut self, - request_key: RequestKey, - request_entity: RequestEntity, - ) -> Result { - let status = StatusWithContext::new_registered(); - if let Err(err) = self - .pool - .add(request_key.clone(), request_entity, status.clone()) - { - return Err(err); - } - - if let Err(err) = self.internal_tx.send(request_key.clone()).await { - tracing::error!( - "Actor failed to send internal signal {request_key}: {err:?}, actor will exit" - ); - return Err(format!( - "failed to send internal signal {request_key}: {err:?}, actor will exit" - )); - } - - Ok(status) + /// Get the status of the request from the pool. + pub fn pool_get_status( + &self, + request_key: &RequestKey, + ) -> Result, String> { + self.pool.lock().unwrap().get_status(request_key) } - async fn cancel(&mut self, request_key: RequestKey) -> Result { - let Some(status) = self.pool.get_status(&request_key)? else { - // the request is not in the pool, do nothing - tracing::warn!( - "Actor received cancel-action {request_key}, but it is not in pool, skipping" - ); - return Err(format!("request {request_key} is not in pool")); - }; - - if status.status() != &Status::WorkInProgress { - // the request is not in proving, do nothing - tracing::warn!( - "Actor received cancel-action {request_key}, but it is not in work-in-progress, skipping" - ); - return Err(format!("request {request_key} is not in work-in-progress")); - } - - tracing::info!("Actor received cancel-action {request_key}, status: {status}, cancelling"); - - match &request_key { - RequestKey::SingleProof(key) => { - // Cancel Single Proof - // - // 1. Cancel the proving work by the cancel token // TODO: cancel token - // 2. Remove the proof id from the pool - raiko_core::interfaces::cancel_proof( - key.proof_type().clone(), - ( - key.chain_id().clone(), - key.block_number().clone(), - key.block_hash().clone(), - *key.proof_type() as u8, - ), - Box::new(&mut self.pool), - ) - .await - .or_else(|e| { - if e.to_string().contains("No data for query") { - tracing::warn!("Actor received cancel-action {request_key}, but it is already cancelled or not yet started, skipping"); - Ok(()) - } else { - tracing::error!( - "Actor received cancel-action {request_key}, but failed to cancel proof: {e:?}" - ); - Err(format!("failed to cancel proof: {e:?}")) - } - })?; - - // 3. Mark the request as cancelled in the pool - let status = StatusWithContext::new_cancelled(); - self.pool.update_status(request_key, status.clone())?; + /// Send an action to the backend and wait for the response. + pub async fn act(&self, action: Action) -> Result { + let (resp_tx, resp_rx) = oneshot::channel(); - Ok(status) - } - RequestKey::Aggregation(..) => { - // Cancel Aggregation Proof - // - // 1. Cancel the proving work by the cancel token // TODO - // 2. Remove the proof id from the pool // TODO - - // 3. Mark the request as cancelled in the pool - let status = StatusWithContext::new_cancelled(); - self.pool.update_status(request_key, status.clone())?; - - Ok(status) - } - } - } - - async fn prove_single( - &mut self, - request_key: RequestKey, - request_entity: SingleProofRequestEntity, - ) -> Result<(), String> { - // 1. Update the request status in pool to WorkInProgress - self.pool - .update_status(request_key.clone(), Status::WorkInProgress.into())?; - - let mut actor = self.clone(); - tokio::spawn(async move { - // 2. Start the proving work - let proven_status = do_prove_single( - &mut actor.pool, - &actor.chain_specs, - request_key.clone(), - request_entity, - ) + // Send the action to the backend + self.action_tx + .send((action, resp_tx)) .await - .map(|proof| Status::Success { proof }) - .unwrap_or_else(|error| Status::Failed { error }); - - // 3. Update the request status in pool to the resulted status - if let Err(err) = actor - .pool - .update_status(request_key.clone(), proven_status.clone().into()) - { - tracing::error!( - "Actor failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" - ); - return; - } + .map_err(|e| format!("failed to send action: {e}"))?; - // 4. Resignal the request key to the internal channel, to let the actor know the proving is done // TODO - let _ = actor.internal_tx.send(request_key.clone()).await; - }); - - Ok(()) + // Wait for response of the action + resp_rx + .await + .map_err(|e| format!("failed to receive action response: {e}"))? } - async fn prove_aggregation( - &mut self, - request_key: RequestKey, - request_entity: AggregationRequestEntity, - ) -> Result<(), String> { - // 1. Update the request status in pool to WorkInProgress - self.pool - .update_status(request_key.clone(), Status::WorkInProgress.into())?; - - let mut actor = self.clone(); - tokio::spawn(async move { - // 2. Start the proving work - let proven_status = - do_prove_aggregation(&mut actor.pool, request_key.clone(), request_entity) - .await - .map(|proof| Status::Success { proof }) - .unwrap_or_else(|error| Status::Failed { error }); - - // 3. Update the request status in pool to the resulted status - if let Err(err) = actor - .pool - .update_status(request_key.clone(), proven_status.clone().into()) - { - tracing::error!( - "Actor failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" - ); - return; - } - - // 4. Resignal the request key to the internal channel, to let the actor know the proving is done // TODO - let _ = actor.internal_tx.send(request_key.clone()).await; - }); - + /// Set the pause flag and notify the task manager to pause, then wait for the task manager to + /// finish the pause process. + /// + /// Note that this function is blocking until the task manager finishes the pause process. + pub async fn pause(&self) -> Result<(), String> { + self.is_paused.store(true, Ordering::SeqCst); + self.pause_tx + .send(()) + .await + .map_err(|e| format!("failed to send pause signal: {e}"))?; Ok(()) } - - async fn halt(&mut self) -> Result<(), String> { - todo!("halt") - } -} - -// TODO: cache input, reference to raiko_host::cache -// TODO: memory tracking -// TODO: metrics -// TODO: measurement -pub async fn do_prove_single( - pool: &mut dyn IdWrite, - chain_specs: &SupportedChainSpecs, - request_key: RequestKey, - request_entity: SingleProofRequestEntity, -) -> Result { - tracing::info!("Generating proof for {request_key}"); - - let l1_chain_spec = chain_specs - .get_chain_spec(&request_entity.l1_network()) - .ok_or_else(|| { - format!( - "unsupported l1 network: {}, it should not happen, please issue a bug report", - request_entity.l1_network() - ) - })?; - let taiko_chain_spec = chain_specs - .get_chain_spec(&request_entity.network()) - .ok_or_else(|| { - format!( - "unsupported raiko network: {}, it should not happen, please issue a bug report", - request_entity.network() - ) - })?; - let proof_request = ProofRequest { - block_number: *request_entity.block_number(), - l1_inclusion_block_number: *request_entity.l1_inclusion_block_number(), - network: request_entity.network().clone(), - l1_network: request_entity.l1_network().clone(), - graffiti: request_entity.graffiti().clone(), - prover: request_entity.prover().clone(), - proof_type: request_entity.proof_type().clone(), - blob_proof_type: request_entity.blob_proof_type().clone(), - prover_args: request_entity.prover_args().clone(), - }; - let raiko = Raiko::new(l1_chain_spec, taiko_chain_spec.clone(), proof_request); - let provider = RpcBlockDataProvider::new( - &taiko_chain_spec.rpc.clone(), - request_entity.block_number() - 1, - ) - .map_err(|err| format!("failed to create rpc block data provider: {err:?}"))?; - - // 1. Generate the proof input - let input = raiko - .generate_input(provider) - .await - .map_err(|e| format!("failed to generate input: {e:?}"))?; - - // 2. Generate the proof output - let output = raiko - .get_output(&input) - .map_err(|e| format!("failed to get output: {e:?}"))?; - - // 3. Generate the proof - let proof = raiko - .prove(input, &output, Some(pool)) - .await - .map_err(|err| format!("failed to generate single proof: {err:?}"))?; - - Ok(proof) -} - -async fn do_prove_aggregation( - pool: &mut dyn IdWrite, - request_key: RequestKey, - request_entity: AggregationRequestEntity, -) -> Result { - let proof_type = request_key.proof_type().clone(); - let proofs = request_entity.proofs().clone(); - - let input = AggregationGuestInput { proofs }; - let output = AggregationGuestOutput { hash: B256::ZERO }; - let config = serde_json::to_value(request_entity.prover_args()) - .map_err(|err| format!("failed to serialize prover args: {err:?}"))?; - - let proof = aggregate_proofs(proof_type, input, &output, &config, Some(pool)) - .await - .map_err(|err| format!("failed to generate aggregation proof: {err:?}"))?; - - Ok(proof) } diff --git a/reqactor/src/backend.rs b/reqactor/src/backend.rs new file mode 100644 index 000000000..c29d87e53 --- /dev/null +++ b/reqactor/src/backend.rs @@ -0,0 +1,501 @@ +use std::sync::Arc; +use std::time::Duration; + +use raiko_core::{ + interfaces::{aggregate_proofs, ProofRequest}, + provider::rpc::RpcBlockDataProvider, + Raiko, +}; +use raiko_lib::{ + consts::SupportedChainSpecs, + input::{AggregationGuestInput, AggregationGuestOutput}, + prover::{IdWrite, Proof}, +}; +use raiko_reqpool::{ + AggregationRequestEntity, RequestEntity, RequestKey, SingleProofRequestEntity, Status, + StatusWithContext, +}; +use reth_primitives::B256; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, Semaphore, +}; + +use crate::{Action, Pool}; + +/// Backend runs in the background, and handles the actions from the actor. +#[derive(Clone)] +pub(crate) struct Backend { + pool: Pool, + chain_specs: SupportedChainSpecs, + internal_tx: Sender, + proving_semaphore: Arc, +} + +// TODO: load pool and notify internal channel +impl Backend { + /// Run the backend in background. + /// + /// The returned channel sender is used to send actions to the actor, and the actor will + /// act on the actions and send responses back. + pub async fn serve_in_background( + pool: Pool, + chain_specs: SupportedChainSpecs, + pause_rx: Receiver<()>, + action_rx: Receiver<(Action, oneshot::Sender>)>, + max_proving_concurrency: usize, + ) { + let channel_size = 1024; + let (internal_tx, internal_rx) = mpsc::channel::(channel_size); + tokio::spawn(async move { + Backend { + pool, + chain_specs, + internal_tx, + proving_semaphore: Arc::new(Semaphore::new(max_proving_concurrency)), + } + .serve(action_rx, internal_rx, pause_rx) + .await; + }); + } + + // There are three incoming channels: + // 1. action_rx: actions from the external Actor + // 2. internal_rx: internal signals from the backend itself + // 3. pause_rx: pause signal from the external Actor + async fn serve( + mut self, + mut action_rx: Receiver<(Action, oneshot::Sender>)>, + mut internal_rx: Receiver, + mut pause_rx: Receiver<()>, + ) { + loop { + tokio::select! { + Some((action, resp_tx)) = action_rx.recv() => { + let request_key = action.request_key().clone(); + let response = self.handle_external_action(action.clone()).await; + + // Signal the request key to the internal channel, to move on to the next step, whatever the result is + // + // NOTE: Why signal whatever the result is? It's for fault tolerence, to ensure the request will be + // handled even when something unexpected happens. + self.ensure_internal_signal(request_key).await; + + if let Err(err) = resp_tx.send(response.clone()) { + tracing::error!( + "Actor Backend failed to send response {response:?} to action {action}: {err:?}" + ); + } + } + Some(request_key) = internal_rx.recv() => { + self.handle_internal_signal(request_key.clone()).await; + } + Some(()) = pause_rx.recv() => { + tracing::info!("Actor Backend received pause-signal, halting"); + if let Err(err) = self.halt().await { + tracing::error!("Actor Backend failed to halt: {err:?}"); + } + } + else => { + // All channels are closed, exit the loop + tracing::info!("Actor Backend exited"); + break; + } + } + } + } + + async fn handle_external_action( + &mut self, + action: Action, + ) -> Result { + match action { + Action::Prove { + request_key, + request_entity, + } => match self.pool.get_status(&request_key) { + Ok(None) => { + tracing::info!("Actor Backend received prove-action {request_key}, and it is not in pool, registering"); + self.register(request_key.clone(), request_entity).await + } + Ok(Some(status)) => match status.status() { + Status::Registered | Status::WorkInProgress | Status::Success { .. } => { + tracing::info!("Actor Backend received prove-action {request_key}, but it is already {status}, skipping"); + Ok(status) + } + Status::Cancelled { .. } => { + tracing::warn!("Actor Backend received prove-action {request_key}, and it is cancelled, re-registering"); + self.register(request_key, request_entity).await + } + Status::Failed { .. } => { + tracing::warn!("Actor Backend received prove-action {request_key}, and it is failed, re-registering"); + self.register(request_key, request_entity).await + } + }, + Err(err) => { + tracing::error!( + "Actor Backend failed to get status of prove-action {request_key}: {err:?}" + ); + Err(err) + } + }, + Action::Cancel { request_key } => match self.pool.get_status(&request_key) { + Ok(None) => { + tracing::warn!("Actor Backend received cancel-action {request_key}, but it is not in pool, skipping"); + Err("request is not in pool".to_string()) + } + Ok(Some(status)) => match status.status() { + Status::Registered | Status::WorkInProgress => { + tracing::info!("Actor Backend received cancel-action {request_key}, and it is {status}, cancelling"); + self.cancel(request_key, status).await + } + + Status::Failed { .. } | Status::Cancelled { .. } | Status::Success { .. } => { + tracing::info!("Actor Backend received cancel-action {request_key}, but it is already {status}, skipping"); + Ok(status) + } + }, + Err(err) => { + tracing::error!( + "Actor Backend failed to get status of cancel-action {request_key}: {err:?}" + ); + Err(err) + } + }, + } + } + + // Check the request status and then move on to the next step accordingly. + async fn handle_internal_signal(&mut self, request_key: RequestKey) { + match self.pool.get(&request_key) { + Ok(Some((request_entity, status))) => match status.status() { + Status::Registered => match request_entity { + RequestEntity::SingleProof(entity) => { + tracing::info!("Actor Backend received internal signal {request_key}, status: {status}, proving single proof"); + self.prove_single(request_key.clone(), entity).await; + self.ensure_internal_signal(request_key).await; + } + RequestEntity::Aggregation(entity) => { + tracing::info!("Actor Backend received internal signal {request_key}, status: {status}, proving aggregation proof"); + self.prove_aggregation(request_key.clone(), entity).await; + self.ensure_internal_signal(request_key).await; + } + }, + Status::WorkInProgress => { + // Wait for proving completion + tracing::info!( + "Actor Backend checks a work-in-progress request {request_key}, elapsed: {elapsed:?}", + elapsed = chrono::Utc::now() - status.timestamp(), + ); + self.ensure_internal_signal_after(request_key, Duration::from_secs(3)) + .await; + } + Status::Success { .. } | Status::Cancelled { .. } | Status::Failed { .. } => { + tracing::info!("Actor Backend received internal signal {request_key}, status: {status}, done"); + } + }, + Ok(None) => { + tracing::warn!( + "Actor Backend received internal signal {request_key}, but it is not in pool, skipping" + ); + } + Err(err) => { + // Fault tolerence: re-enqueue the internal signal after 3 seconds + tracing::warn!( + "Actor Backend failed to get status of internal signal {request_key}: {err:?}, performing fault tolerence and retrying later" + ); + self.ensure_internal_signal_after(request_key, Duration::from_secs(3)) + .await; + } + } + } + + // Ensure signal the request key to the internal channel. + // + // Note that this function will retry sending the signal until successed. + async fn ensure_internal_signal(&mut self, request_key: RequestKey) { + let mut ticker = tokio::time::interval(Duration::from_secs(3)); + let internal_tx = self.internal_tx.clone(); + tokio::spawn(async move { + loop { + if let Err(err) = internal_tx.send(request_key.clone()).await { + tracing::error!("Actor Backend failed to send internal signal {request_key}: {err:?}, retrying. It should not happen, please issue a bug report"); + } + ticker.tick().await; + } + }); + } + + async fn ensure_internal_signal_after(&mut self, request_key: RequestKey, after: Duration) { + let mut timer = tokio::time::interval(after); + timer.tick().await; + self.ensure_internal_signal(request_key).await + } + + // Register a new request to the pool and notify the actor. + async fn register( + &mut self, + request_key: RequestKey, + request_entity: RequestEntity, + ) -> Result { + // 1. Register to the pool + let status = StatusWithContext::new_registered(); + if let Err(err) = self + .pool + .add(request_key.clone(), request_entity, status.clone()) + { + return Err(err); + } + + Ok(status) + } + + async fn cancel( + &mut self, + request_key: RequestKey, + old_status: StatusWithContext, + ) -> Result { + debug_assert!( + old_status.status() == &Status::WorkInProgress + || old_status.status() == &Status::Registered + ); + + // Case: old_status is registered: mark the request as cancelled in the pool and return directly + if old_status.status() == &Status::Registered { + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + return Ok(status); + } + + // Case: old_status is work-in-progress: + // 1. Cancel the proving work by the cancel token // TODO: cancel token + // 2. Remove the proof id from the pool + // 3. Mark the request as cancelled in the pool + match &request_key { + RequestKey::SingleProof(key) => { + raiko_core::interfaces::cancel_proof( + key.proof_type().clone(), + ( + key.chain_id().clone(), + key.block_number().clone(), + key.block_hash().clone(), + *key.proof_type() as u8, + ), + Box::new(&mut self.pool), + ) + .await + .or_else(|e| { + if e.to_string().contains("No data for query") { + tracing::warn!("Actor Backend received cancel-action {request_key}, but it is already cancelled or not yet started, skipping"); + Ok(()) + } else { + tracing::error!( + "Actor Backend received cancel-action {request_key}, but failed to cancel proof: {e:?}" + ); + Err(format!("failed to cancel proof: {e:?}")) + } + })?; + + // 3. Mark the request as cancelled in the pool + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + Ok(status) + } + RequestKey::Aggregation(..) => { + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + Ok(status) + } + } + } + + async fn prove_single( + &mut self, + request_key: RequestKey, + request_entity: SingleProofRequestEntity, + ) { + // 1. Update the request status in pool to WorkInProgress + if let Err(err) = self + .pool + .update_status(request_key.clone(), Status::WorkInProgress.into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {status}", + status = Status::WorkInProgress, + ); + return; + } + + // 2. Start the proving work in a separate thread + let mut actor = self.clone(); + let proving_semaphore = self.proving_semaphore.clone(); + tokio::spawn(async move { + // Acquire a permit from the semaphore before starting the proving work + let _permit = proving_semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + + // 2.1. Start the proving work + let proven_status = do_prove_single( + &mut actor.pool, + &actor.chain_specs, + request_key.clone(), + request_entity, + ) + .await + .map(|proof| Status::Success { proof }) + .unwrap_or_else(|error| Status::Failed { error }); + + // 2.2. Update the request status in pool to the resulted status + if let Err(err) = actor + .pool + .update_status(request_key.clone(), proven_status.clone().into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" + ); + return; + } + // The permit is automatically dropped here, releasing the semaphore + }); + } + + async fn prove_aggregation( + &mut self, + request_key: RequestKey, + request_entity: AggregationRequestEntity, + ) { + // 1. Update the request status in pool to WorkInProgress + if let Err(err) = self + .pool + .update_status(request_key.clone(), Status::WorkInProgress.into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {status}", + status = Status::WorkInProgress, + ); + return; + } + + // 2. Start the proving work in a separate thread + let mut actor = self.clone(); + let proving_semaphore = self.proving_semaphore.clone(); + tokio::spawn(async move { + // Acquire a permit from the semaphore before starting the proving work + let _permit = proving_semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + + // 2.1. Start the proving work + let proven_status = + do_prove_aggregation(&mut actor.pool, request_key.clone(), request_entity) + .await + .map(|proof| Status::Success { proof }) + .unwrap_or_else(|error| Status::Failed { error }); + + // 2.2. Update the request status in pool to the resulted status + if let Err(err) = actor + .pool + .update_status(request_key.clone(), proven_status.clone().into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" + ); + return; + } + // The permit is automatically dropped here, releasing the semaphore + }); + } + + async fn halt(&mut self) -> Result<(), String> { + todo!("halt") + } +} + +// TODO: cache input, reference to raiko_host::cache +// TODO: memory tracking +// TODO: metrics +// TODO: measurement +pub async fn do_prove_single( + pool: &mut dyn IdWrite, + chain_specs: &SupportedChainSpecs, + request_key: RequestKey, + request_entity: SingleProofRequestEntity, +) -> Result { + tracing::info!("Generating proof for {request_key}"); + + let l1_chain_spec = chain_specs + .get_chain_spec(&request_entity.l1_network()) + .ok_or_else(|| { + format!( + "unsupported l1 network: {}, it should not happen, please issue a bug report", + request_entity.l1_network() + ) + })?; + let taiko_chain_spec = chain_specs + .get_chain_spec(&request_entity.network()) + .ok_or_else(|| { + format!( + "unsupported raiko network: {}, it should not happen, please issue a bug report", + request_entity.network() + ) + })?; + let proof_request = ProofRequest { + block_number: *request_entity.block_number(), + l1_inclusion_block_number: *request_entity.l1_inclusion_block_number(), + network: request_entity.network().clone(), + l1_network: request_entity.l1_network().clone(), + graffiti: request_entity.graffiti().clone(), + prover: request_entity.prover().clone(), + proof_type: request_entity.proof_type().clone(), + blob_proof_type: request_entity.blob_proof_type().clone(), + prover_args: request_entity.prover_args().clone(), + }; + let raiko = Raiko::new(l1_chain_spec, taiko_chain_spec.clone(), proof_request); + let provider = RpcBlockDataProvider::new( + &taiko_chain_spec.rpc.clone(), + request_entity.block_number() - 1, + ) + .map_err(|err| format!("failed to create rpc block data provider: {err:?}"))?; + + // 1. Generate the proof input + let input = raiko + .generate_input(provider) + .await + .map_err(|e| format!("failed to generate input: {e:?}"))?; + + // 2. Generate the proof output + let output = raiko + .get_output(&input) + .map_err(|e| format!("failed to get output: {e:?}"))?; + + // 3. Generate the proof + let proof = raiko + .prove(input, &output, Some(pool)) + .await + .map_err(|err| format!("failed to generate single proof: {err:?}"))?; + + Ok(proof) +} + +async fn do_prove_aggregation( + pool: &mut dyn IdWrite, + request_key: RequestKey, + request_entity: AggregationRequestEntity, +) -> Result { + let proof_type = request_key.proof_type().clone(); + let proofs = request_entity.proofs().clone(); + + let input = AggregationGuestInput { proofs }; + let output = AggregationGuestOutput { hash: B256::ZERO }; + let config = serde_json::to_value(request_entity.prover_args()) + .map_err(|err| format!("failed to serialize prover args: {err:?}"))?; + + let proof = aggregate_proofs(proof_type, input, &output, &config, Some(pool)) + .await + .map_err(|err| format!("failed to generate aggregation proof: {err:?}"))?; + + Ok(proof) +} diff --git a/reqactor/src/gateway.rs b/reqactor/src/gateway.rs deleted file mode 100644 index 48736d51d..000000000 --- a/reqactor/src/gateway.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, -}; - -use raiko_core::interfaces::ProofRequestOpt; -use raiko_lib::consts::SupportedChainSpecs; -use raiko_reqpool::{Pool, RequestKey, StatusWithContext}; -use tokio::sync::{mpsc::Sender, oneshot}; - -use crate::Action; - -#[derive(Debug, Clone)] -pub struct Gateway { - default_request_config: ProofRequestOpt, - chain_specs: SupportedChainSpecs, - controller: Sender<(Action, oneshot::Sender>)>, - pause_signal: Sender<()>, - pool: Arc>, - is_paused: Arc, -} - -/// Gateway for the Actor. -impl Gateway { - pub fn new( - pool: Pool, - default_request_config: ProofRequestOpt, - chain_specs: SupportedChainSpecs, - controller: Sender<(Action, oneshot::Sender>)>, - pause_signal: Sender<()>, - ) -> Self { - Self { - default_request_config, - chain_specs, - controller, - pause_signal, - pool: Arc::new(Mutex::new(pool)), - is_paused: Arc::new(AtomicBool::new(false)), - } - } - - /// Return the default request config. - pub fn default_request_config(&self) -> &ProofRequestOpt { - &self.default_request_config - } - - /// Return the chain specs. - pub fn chain_specs(&self) -> &SupportedChainSpecs { - &self.chain_specs - } - - /// Check if the system is paused. - pub fn is_paused(&self) -> bool { - self.is_paused.load(Ordering::SeqCst) - } - - /// Get the status of the request from the pool. - pub fn pool_get_status( - &self, - request_key: &RequestKey, - ) -> Result, String> { - self.pool.lock().unwrap().get_status(request_key) - } - - /// Send an action to the controller and wait for the response. - pub async fn send(&self, action: Action) -> Result { - let (resp_tx, resp_rx) = oneshot::channel(); - - // Send the action to the controller - self.controller - .send((action, resp_tx)) - .await - .map_err(|e| format!("failed to send action: {e}"))?; - - // Wait for response of the action - resp_rx - .await - .map_err(|e| format!("failed to receive action response: {e}"))? - } - - /// Set the pause flag and notify the task manager to pause, then wait for the task manager to - /// finish the pause process. - /// - /// Note that this function is blocking until the task manager finishes the pause process. - pub async fn pause(&self) -> Result<(), String> { - self.is_paused.store(true, Ordering::SeqCst); - self.pause_signal - .send(()) - .await - .map_err(|e| format!("failed to send pause signal: {e}"))?; - Ok(()) - } -} diff --git a/reqactor/src/lib.rs b/reqactor/src/lib.rs index 48cbb3850..6ae17370a 100644 --- a/reqactor/src/lib.rs +++ b/reqactor/src/lib.rs @@ -1,12 +1,47 @@ mod action; mod actor; -mod gateway; +mod backend; +use raiko_core::interfaces::ProofRequestOpt; +use raiko_lib::consts::SupportedChainSpecs; +use tokio::sync::{mpsc, oneshot}; + +pub(crate) use backend::Backend; + +// re-export +pub use action::Action; +pub use actor::Actor; pub use raiko_reqpool::{ AggregationRequestEntity, AggregationRequestKey, Pool, RequestEntity, RequestKey, - SingleProofRequestEntity, SingleProofRequestKey, + SingleProofRequestEntity, SingleProofRequestKey, StatusWithContext, }; -pub use action::Action; -pub use actor::Actor; -pub use gateway::Gateway; +/// Run the actor backend in background, and return the actor. +pub async fn start_actor( + pool: Pool, + chain_specs: SupportedChainSpecs, + default_request_config: ProofRequestOpt, + max_proving_concurrency: usize, +) -> Actor { + let channel_size = 1024; + let (action_tx, action_rx) = + mpsc::channel::<(Action, oneshot::Sender>)>(channel_size); + let (pause_tx, pause_rx) = mpsc::channel::<()>(1); + + Backend::serve_in_background( + pool.clone(), + chain_specs.clone(), + pause_rx, + action_rx, + max_proving_concurrency, + ) + .await; + + Actor::new( + pool, + default_request_config, + chain_specs.clone(), + action_tx, + pause_tx, + ) +} From 14f56a6fe22d3dc6b70723576339d1f490253374 Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Mon, 13 Jan 2025 00:06:08 +0800 Subject: [PATCH 4/9] feat(lib): adjust IdStore --- lib/src/input.rs | 2 +- lib/src/prover.rs | 6 ++++-- taskdb/src/lib.rs | 2 +- taskdb/src/mem_db.rs | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/src/input.rs b/lib/src/input.rs index 67bc5b11b..c97b849eb 100644 --- a/lib/src/input.rs +++ b/lib/src/input.rs @@ -174,7 +174,7 @@ impl TryFrom> for TaikoGuestInput { } } -#[derive(Clone, Debug, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, Serialize, Deserialize, Default, Eq, PartialEq, Ord, PartialOrd, Hash)] #[serde(rename_all = "snake_case")] pub enum BlobProofType { /// Guest runs through the entire computation from blob to Kzg commitment diff --git a/lib/src/prover.rs b/lib/src/prover.rs index 0b1bf3498..043c5b3f8 100644 --- a/lib/src/prover.rs +++ b/lib/src/prover.rs @@ -26,7 +26,9 @@ pub type ProverResult = core::result::Result; pub type ProverConfig = serde_json::Value; pub type ProofKey = (ChainId, u64, B256, u8); -#[derive(Clone, Debug, Serialize, ToSchema, Deserialize, Default, PartialEq, Eq, Hash)] +#[derive( + Clone, Debug, Serialize, ToSchema, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord, Hash, +)] /// The response body of a proof request. pub struct Proof { /// The proof either TEE or ZK. @@ -50,7 +52,7 @@ pub trait IdWrite: Send { #[async_trait::async_trait] pub trait IdStore: IdWrite { - async fn read_id(&self, key: ProofKey) -> ProverResult; + async fn read_id(&mut self, key: ProofKey) -> ProverResult; } #[allow(async_fn_in_trait)] diff --git a/taskdb/src/lib.rs b/taskdb/src/lib.rs index 513843a13..246cb1793 100644 --- a/taskdb/src/lib.rs +++ b/taskdb/src/lib.rs @@ -321,7 +321,7 @@ impl IdWrite for TaskManagerWrapper { #[async_trait::async_trait] impl IdStore for TaskManagerWrapper { - async fn read_id(&self, key: ProofKey) -> ProverResult { + async fn read_id(&mut self, key: ProofKey) -> ProverResult { self.manager.read_id(key).await } } diff --git a/taskdb/src/mem_db.rs b/taskdb/src/mem_db.rs index 8ebab1b7e..aa4bd76b6 100644 --- a/taskdb/src/mem_db.rs +++ b/taskdb/src/mem_db.rs @@ -291,7 +291,7 @@ impl IdWrite for InMemoryTaskManager { #[async_trait::async_trait] impl IdStore for InMemoryTaskManager { - async fn read_id(&self, key: ProofKey) -> ProverResult { + async fn read_id(&mut self, key: ProofKey) -> ProverResult { let mut db = self.db.lock().await; db.read_id(key) .map_err(|e| ProverError::StoreError(e.to_string())) From 17883a21cc81ee513c19d80b3e7ba7a7aa964b5c Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Wed, 15 Jan 2025 16:35:17 +0800 Subject: [PATCH 5/9] chore(reqactor): correct typos --- reqactor/src/backend.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/reqactor/src/backend.rs b/reqactor/src/backend.rs index c29d87e53..a7ce99396 100644 --- a/reqactor/src/backend.rs +++ b/reqactor/src/backend.rs @@ -77,7 +77,7 @@ impl Backend { // Signal the request key to the internal channel, to move on to the next step, whatever the result is // - // NOTE: Why signal whatever the result is? It's for fault tolerence, to ensure the request will be + // NOTE: Why signal whatever the result is? It's for fault tolerance, to ensure the request will be // handled even when something unexpected happens. self.ensure_internal_signal(request_key).await; @@ -200,9 +200,9 @@ impl Backend { ); } Err(err) => { - // Fault tolerence: re-enqueue the internal signal after 3 seconds + // Fault tolerance: re-enqueue the internal signal after 3 seconds tracing::warn!( - "Actor Backend failed to get status of internal signal {request_key}: {err:?}, performing fault tolerence and retrying later" + "Actor Backend failed to get status of internal signal {request_key}: {err:?}, performing fault tolerance and retrying later" ); self.ensure_internal_signal_after(request_key, Duration::from_secs(3)) .await; @@ -212,7 +212,7 @@ impl Backend { // Ensure signal the request key to the internal channel. // - // Note that this function will retry sending the signal until successed. + // Note that this function will retry sending the signal until success. async fn ensure_internal_signal(&mut self, request_key: RequestKey) { let mut ticker = tokio::time::interval(Duration::from_secs(3)); let internal_tx = self.internal_tx.clone(); From 5f5501e5f3d8e118c87d4775bcd7bbe2244d0a13 Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Wed, 15 Jan 2025 13:39:32 +0800 Subject: [PATCH 6/9] test(reqactor): add cases --- reqactor/src/actor.rs | 110 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/reqactor/src/actor.rs b/reqactor/src/actor.rs index f276a2532..307f44d67 100644 --- a/reqactor/src/actor.rs +++ b/reqactor/src/actor.rs @@ -93,3 +93,113 @@ impl Actor { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::Address; + use raiko_lib::{ + consts::SupportedChainSpecs, + input::BlobProofType, + primitives::{ChainId, B256}, + proof_type::ProofType, + }; + use raiko_reqpool::{ + Pool, RedisPoolConfig, RequestEntity, RequestKey, SingleProofRequestEntity, + SingleProofRequestKey, StatusWithContext, + }; + use std::collections::HashMap; + use tokio::sync::mpsc; + + #[tokio::test] + async fn test_pause_sets_is_paused_flag() { + let (action_tx, _) = mpsc::channel(1); + let (pause_tx, _pause_rx) = mpsc::channel(1); + + let config = RedisPoolConfig { + redis_url: "redis://localhost:6379/0".to_string(), + redis_ttl: 3600, + }; + + let actor = Actor::new( + Pool::open(config).expect("Failed to create pool"), + ProofRequestOpt::default(), + SupportedChainSpecs::default(), + action_tx, + pause_tx, + ); + + assert!(!actor.is_paused(), "Actor should not be paused initially"); + + actor.pause().await.expect("Pause should succeed"); + assert!( + actor.is_paused(), + "Actor should be paused after calling pause()" + ); + } + + #[tokio::test] + async fn test_act_sends_action_and_returns_response() { + let (action_tx, mut action_rx) = mpsc::channel(1); + let (pause_tx, _) = mpsc::channel(1); + + let config = RedisPoolConfig { + redis_url: "redis://localhost:6379/0".to_string(), + redis_ttl: 3600, + }; + + let actor = Actor::new( + Pool::open(config).expect("Failed to create pool"), + ProofRequestOpt::default(), + SupportedChainSpecs::default(), + action_tx, + pause_tx, + ); + + // Create a test action + let request_key = RequestKey::SingleProof(SingleProofRequestKey::new( + ChainId::default(), + 1, + B256::default(), + ProofType::default(), + "test_prover".to_string(), + )); + let request_entity = RequestEntity::SingleProof(SingleProofRequestEntity::new( + 1, + 1, + "test_network".to_string(), + "test_l1_network".to_string(), + B256::default(), + Address::default(), + ProofType::default(), + BlobProofType::default(), + HashMap::new(), + )); + let test_action = Action::Prove { + request_key: request_key.clone(), + request_entity, + }; + + // Spawn a task to handle the action and send back a response + let status = StatusWithContext::new_registered(); + let status_clone = status.clone(); + let handle = tokio::spawn(async move { + let (action, resp_tx) = action_rx.recv().await.expect("Should receive action"); + // Verify we received the expected action + assert_eq!(action.request_key(), &request_key); + // Send back a mock response with Registered status + resp_tx + .send(Ok(status_clone)) + .expect("Should send response"); + }); + + // Send the action and wait for response + let result = actor.act(test_action).await; + + // Make sure we got back an Ok response + assert_eq!(result, Ok(status), "Should receive successful response"); + + // Wait for the handler to complete + handle.await.expect("Handler should complete"); + } +} From 7859c64eb47d0a498f9ec1a3a3a0712327010c39 Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Wed, 15 Jan 2025 16:54:06 +0800 Subject: [PATCH 7/9] chore(reqactor): update Cargo.toml --- reqactor/Cargo.toml | 58 +-------------------------------------------- 1 file changed, 1 insertion(+), 57 deletions(-) diff --git a/reqactor/Cargo.toml b/reqactor/Cargo.toml index 1e61e7e36..793cccdba 100644 --- a/reqactor/Cargo.toml +++ b/reqactor/Cargo.toml @@ -4,79 +4,23 @@ version = "0.1.0" edition = "2021" [dependencies] - -# provers -sp1-driver = { path = "../provers/sp1/driver", optional = true } -risc0-driver = { path = "../provers/risc0/driver", optional = true } -sgx-prover = { path = "../provers/sgx/prover", optional = true } - -# raiko raiko-lib = { workspace = true } raiko-core = { workspace = true } raiko-reqpool = { workspace = true } -# alloy -alloy-rlp = { workspace = true } -alloy-rlp-derive = { workspace = true } -alloy-sol-types = { workspace = true } -alloy-primitives = { workspace = true } -alloy-rpc-types = { workspace = true } -alloy-provider = { workspace = true } -alloy-transport-http = { workspace = true } -alloy-consensus = { workspace = true } -alloy-network = { workspace = true } -alloy-rpc-client = { workspace = true } - -# crypto -kzg = { workspace = true } -kzg_traits = { workspace = true } - -# misc -anyhow = { workspace = true } -bincode = { workspace = true } -bytemuck = { workspace = true } -clap = { workspace = true } -flate2 = { workspace = true } serde = { workspace = true } -serde_with = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -tokio-util = { workspace = true } -env_logger = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true } -tracing-appender = { workspace = true } -lru_time_cache = { workspace = true } -prometheus = { workspace = true } -lazy_static = { workspace = true } -once_cell = { workspace = true } -thiserror = { workspace = true } -reqwest = { workspace = true } -reqwest_alloy = { workspace = true } -sha2 = { workspace = true } -proptest = { workspace = true } -rlp = { workspace = true } -url = { workspace = true } -cfg-if = { workspace = true } -cap = { workspace = true } -dotenv = { workspace = true } chrono = { workspace = true, features = ["serde"] } -# reth reth-primitives = { workspace = true } -reth-evm = { workspace = true } -reth-evm-ethereum = { workspace = true } -reth-provider = { workspace = true } +alloy-primitives = { workspace = true } [dev-dependencies] -assert_cmd = { workspace = true } -rstest = { workspace = true } -ethers-core = { workspace = true } -rand = { workspace = true } [features] default = [] sp1 = ["raiko-core/sp1"] risc0 = ["raiko-core/risc0"] sgx = ["raiko-core/sgx"] -integration = [] From 0ab77224aaf3ddd86fd290c496b15b301e806a39 Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Tue, 21 Jan 2025 20:02:45 +0800 Subject: [PATCH 8/9] feat(reqactor): support list --- reqactor/src/actor.rs | 13 ++++++++++--- reqactor/src/backend.rs | 13 ++++++++++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/reqactor/src/actor.rs b/reqactor/src/actor.rs index 307f44d67..0efbab2ea 100644 --- a/reqactor/src/actor.rs +++ b/reqactor/src/actor.rs @@ -1,6 +1,9 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, }; use raiko_core::interfaces::ProofRequestOpt; @@ -64,6 +67,10 @@ impl Actor { self.pool.lock().unwrap().get_status(request_key) } + pub fn pool_list_status(&self) -> Result, String> { + self.pool.lock().unwrap().list() + } + /// Send an action to the backend and wait for the response. pub async fn act(&self, action: Action) -> Result { let (resp_tx, resp_rx) = oneshot::channel(); diff --git a/reqactor/src/backend.rs b/reqactor/src/backend.rs index a7ce99396..07ce3d3dd 100644 --- a/reqactor/src/backend.rs +++ b/reqactor/src/backend.rs @@ -183,7 +183,7 @@ impl Backend { }, Status::WorkInProgress => { // Wait for proving completion - tracing::info!( + tracing::debug!( "Actor Backend checks a work-in-progress request {request_key}, elapsed: {elapsed:?}", elapsed = chrono::Utc::now() - status.timestamp(), ); @@ -228,6 +228,7 @@ impl Backend { async fn ensure_internal_signal_after(&mut self, request_key: RequestKey, after: Duration) { let mut timer = tokio::time::interval(after); + timer.tick().await; // first tick is immediate timer.tick().await; self.ensure_internal_signal(request_key).await } @@ -329,12 +330,14 @@ impl Backend { // 2. Start the proving work in a separate thread let mut actor = self.clone(); let proving_semaphore = self.proving_semaphore.clone(); + let (semaphore_acquired_tx, semaphore_acquired_rx) = oneshot::channel(); tokio::spawn(async move { // Acquire a permit from the semaphore before starting the proving work let _permit = proving_semaphore .acquire() .await .expect("semaphore should not be closed"); + semaphore_acquired_tx.send(()).unwrap(); // 2.1. Start the proving work let proven_status = do_prove_single( @@ -359,6 +362,9 @@ impl Backend { } // The permit is automatically dropped here, releasing the semaphore }); + + // Wait for the semaphore to be acquired + semaphore_acquired_rx.await.unwrap(); } async fn prove_aggregation( @@ -381,12 +387,14 @@ impl Backend { // 2. Start the proving work in a separate thread let mut actor = self.clone(); let proving_semaphore = self.proving_semaphore.clone(); + let (semaphore_acquired_tx, semaphore_acquired_rx) = oneshot::channel(); tokio::spawn(async move { // Acquire a permit from the semaphore before starting the proving work let _permit = proving_semaphore .acquire() .await .expect("semaphore should not be closed"); + semaphore_acquired_tx.send(()).unwrap(); // 2.1. Start the proving work let proven_status = @@ -407,6 +415,9 @@ impl Backend { } // The permit is automatically dropped here, releasing the semaphore }); + + // Wait for the semaphore to be acquired + semaphore_acquired_rx.await.unwrap(); } async fn halt(&mut self) -> Result<(), String> { From 619facd2c88fe93a0eae5ae15eb4c955f19dc331 Mon Sep 17 00:00:00 2001 From: "keroroxx520@gmail.com" Date: Wed, 22 Jan 2025 10:44:45 +0800 Subject: [PATCH 9/9] chore(reqactor): logs for cancel non-registered and non-wip requests --- reqactor/src/backend.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/reqactor/src/backend.rs b/reqactor/src/backend.rs index 07ce3d3dd..dfd91ab54 100644 --- a/reqactor/src/backend.rs +++ b/reqactor/src/backend.rs @@ -256,10 +256,12 @@ impl Backend { request_key: RequestKey, old_status: StatusWithContext, ) -> Result { - debug_assert!( - old_status.status() == &Status::WorkInProgress - || old_status.status() == &Status::Registered - ); + if old_status.status() != &Status::Registered + && old_status.status() != &Status::WorkInProgress + { + tracing::warn!("Actor Backend received cancel-action {request_key}, but it is not registered or work-in-progress, skipping"); + return Ok(old_status); + } // Case: old_status is registered: mark the request as cancelled in the pool and return directly if old_status.status() == &Status::Registered {