diff --git a/lib/src/input.rs b/lib/src/input.rs index 67bc5b11..c97b849e 100644 --- a/lib/src/input.rs +++ b/lib/src/input.rs @@ -174,7 +174,7 @@ impl TryFrom> for TaikoGuestInput { } } -#[derive(Clone, Debug, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, Serialize, Deserialize, Default, Eq, PartialEq, Ord, PartialOrd, Hash)] #[serde(rename_all = "snake_case")] pub enum BlobProofType { /// Guest runs through the entire computation from blob to Kzg commitment diff --git a/lib/src/prover.rs b/lib/src/prover.rs index 0b1bf349..043c5b3f 100644 --- a/lib/src/prover.rs +++ b/lib/src/prover.rs @@ -26,7 +26,9 @@ pub type ProverResult = core::result::Result; pub type ProverConfig = serde_json::Value; pub type ProofKey = (ChainId, u64, B256, u8); -#[derive(Clone, Debug, Serialize, ToSchema, Deserialize, Default, PartialEq, Eq, Hash)] +#[derive( + Clone, Debug, Serialize, ToSchema, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord, Hash, +)] /// The response body of a proof request. pub struct Proof { /// The proof either TEE or ZK. @@ -50,7 +52,7 @@ pub trait IdWrite: Send { #[async_trait::async_trait] pub trait IdStore: IdWrite { - async fn read_id(&self, key: ProofKey) -> ProverResult; + async fn read_id(&mut self, key: ProofKey) -> ProverResult; } #[allow(async_fn_in_trait)] diff --git a/reqactor/Cargo.toml b/reqactor/Cargo.toml new file mode 100644 index 00000000..793cccdb --- /dev/null +++ b/reqactor/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "raiko-reqactor" +version = "0.1.0" +edition = "2021" + +[dependencies] +raiko-lib = { workspace = true } +raiko-core = { workspace = true } +raiko-reqpool = { workspace = true } + +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true, features = ["serde"] } + +reth-primitives = { workspace = true } +alloy-primitives = { workspace = true } + +[dev-dependencies] + +[features] +default = [] +sp1 = ["raiko-core/sp1"] +risc0 = ["raiko-core/risc0"] +sgx = ["raiko-core/sgx"] diff --git a/reqactor/src/action.rs b/reqactor/src/action.rs new file mode 100644 index 00000000..e9184e76 --- /dev/null +++ b/reqactor/src/action.rs @@ -0,0 +1,26 @@ +use crate::{RequestEntity, RequestKey}; +use raiko_reqpool::impl_display_using_json_pretty; +use serde::{Deserialize, Serialize}; + +/// The action message sent from **external** to the actor. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Action { + Prove { + request_key: RequestKey, + request_entity: RequestEntity, + }, + Cancel { + request_key: RequestKey, + }, +} + +impl Action { + pub fn request_key(&self) -> &RequestKey { + match self { + Action::Prove { request_key, .. } => request_key, + Action::Cancel { request_key, .. } => request_key, + } + } +} + +impl_display_using_json_pretty!(Action); diff --git a/reqactor/src/actor.rs b/reqactor/src/actor.rs new file mode 100644 index 00000000..0efbab2e --- /dev/null +++ b/reqactor/src/actor.rs @@ -0,0 +1,212 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, +}; + +use raiko_core::interfaces::ProofRequestOpt; +use raiko_lib::consts::SupportedChainSpecs; +use raiko_reqpool::{Pool, RequestKey, StatusWithContext}; +use tokio::sync::{mpsc::Sender, oneshot}; + +use crate::Action; + +/// Actor is the main interface interacting with the backend and the pool. +#[derive(Debug, Clone)] +pub struct Actor { + default_request_config: ProofRequestOpt, + chain_specs: SupportedChainSpecs, + action_tx: Sender<(Action, oneshot::Sender>)>, + pause_tx: Sender<()>, + is_paused: Arc, + + // TODO: Remove Mutex. currently, in order to pass `&mut Pool`, we need to use Arc>. + pool: Arc>, +} + +impl Actor { + pub fn new( + pool: Pool, + default_request_config: ProofRequestOpt, + chain_specs: SupportedChainSpecs, + action_tx: Sender<(Action, oneshot::Sender>)>, + pause_tx: Sender<()>, + ) -> Self { + Self { + default_request_config, + chain_specs, + action_tx, + pause_tx, + is_paused: Arc::new(AtomicBool::new(false)), + pool: Arc::new(Mutex::new(pool)), + } + } + + /// Return the default request config. + pub fn default_request_config(&self) -> &ProofRequestOpt { + &self.default_request_config + } + + /// Return the chain specs. + pub fn chain_specs(&self) -> &SupportedChainSpecs { + &self.chain_specs + } + + /// Check if the system is paused. + pub fn is_paused(&self) -> bool { + self.is_paused.load(Ordering::SeqCst) + } + + /// Get the status of the request from the pool. + pub fn pool_get_status( + &self, + request_key: &RequestKey, + ) -> Result, String> { + self.pool.lock().unwrap().get_status(request_key) + } + + pub fn pool_list_status(&self) -> Result, String> { + self.pool.lock().unwrap().list() + } + + /// Send an action to the backend and wait for the response. + pub async fn act(&self, action: Action) -> Result { + let (resp_tx, resp_rx) = oneshot::channel(); + + // Send the action to the backend + self.action_tx + .send((action, resp_tx)) + .await + .map_err(|e| format!("failed to send action: {e}"))?; + + // Wait for response of the action + resp_rx + .await + .map_err(|e| format!("failed to receive action response: {e}"))? + } + + /// Set the pause flag and notify the task manager to pause, then wait for the task manager to + /// finish the pause process. + /// + /// Note that this function is blocking until the task manager finishes the pause process. + pub async fn pause(&self) -> Result<(), String> { + self.is_paused.store(true, Ordering::SeqCst); + self.pause_tx + .send(()) + .await + .map_err(|e| format!("failed to send pause signal: {e}"))?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::Address; + use raiko_lib::{ + consts::SupportedChainSpecs, + input::BlobProofType, + primitives::{ChainId, B256}, + proof_type::ProofType, + }; + use raiko_reqpool::{ + Pool, RedisPoolConfig, RequestEntity, RequestKey, SingleProofRequestEntity, + SingleProofRequestKey, StatusWithContext, + }; + use std::collections::HashMap; + use tokio::sync::mpsc; + + #[tokio::test] + async fn test_pause_sets_is_paused_flag() { + let (action_tx, _) = mpsc::channel(1); + let (pause_tx, _pause_rx) = mpsc::channel(1); + + let config = RedisPoolConfig { + redis_url: "redis://localhost:6379/0".to_string(), + redis_ttl: 3600, + }; + + let actor = Actor::new( + Pool::open(config).expect("Failed to create pool"), + ProofRequestOpt::default(), + SupportedChainSpecs::default(), + action_tx, + pause_tx, + ); + + assert!(!actor.is_paused(), "Actor should not be paused initially"); + + actor.pause().await.expect("Pause should succeed"); + assert!( + actor.is_paused(), + "Actor should be paused after calling pause()" + ); + } + + #[tokio::test] + async fn test_act_sends_action_and_returns_response() { + let (action_tx, mut action_rx) = mpsc::channel(1); + let (pause_tx, _) = mpsc::channel(1); + + let config = RedisPoolConfig { + redis_url: "redis://localhost:6379/0".to_string(), + redis_ttl: 3600, + }; + + let actor = Actor::new( + Pool::open(config).expect("Failed to create pool"), + ProofRequestOpt::default(), + SupportedChainSpecs::default(), + action_tx, + pause_tx, + ); + + // Create a test action + let request_key = RequestKey::SingleProof(SingleProofRequestKey::new( + ChainId::default(), + 1, + B256::default(), + ProofType::default(), + "test_prover".to_string(), + )); + let request_entity = RequestEntity::SingleProof(SingleProofRequestEntity::new( + 1, + 1, + "test_network".to_string(), + "test_l1_network".to_string(), + B256::default(), + Address::default(), + ProofType::default(), + BlobProofType::default(), + HashMap::new(), + )); + let test_action = Action::Prove { + request_key: request_key.clone(), + request_entity, + }; + + // Spawn a task to handle the action and send back a response + let status = StatusWithContext::new_registered(); + let status_clone = status.clone(); + let handle = tokio::spawn(async move { + let (action, resp_tx) = action_rx.recv().await.expect("Should receive action"); + // Verify we received the expected action + assert_eq!(action.request_key(), &request_key); + // Send back a mock response with Registered status + resp_tx + .send(Ok(status_clone)) + .expect("Should send response"); + }); + + // Send the action and wait for response + let result = actor.act(test_action).await; + + // Make sure we got back an Ok response + assert_eq!(result, Ok(status), "Should receive successful response"); + + // Wait for the handler to complete + handle.await.expect("Handler should complete"); + } +} diff --git a/reqactor/src/backend.rs b/reqactor/src/backend.rs new file mode 100644 index 00000000..dfd91ab5 --- /dev/null +++ b/reqactor/src/backend.rs @@ -0,0 +1,514 @@ +use std::sync::Arc; +use std::time::Duration; + +use raiko_core::{ + interfaces::{aggregate_proofs, ProofRequest}, + provider::rpc::RpcBlockDataProvider, + Raiko, +}; +use raiko_lib::{ + consts::SupportedChainSpecs, + input::{AggregationGuestInput, AggregationGuestOutput}, + prover::{IdWrite, Proof}, +}; +use raiko_reqpool::{ + AggregationRequestEntity, RequestEntity, RequestKey, SingleProofRequestEntity, Status, + StatusWithContext, +}; +use reth_primitives::B256; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, Semaphore, +}; + +use crate::{Action, Pool}; + +/// Backend runs in the background, and handles the actions from the actor. +#[derive(Clone)] +pub(crate) struct Backend { + pool: Pool, + chain_specs: SupportedChainSpecs, + internal_tx: Sender, + proving_semaphore: Arc, +} + +// TODO: load pool and notify internal channel +impl Backend { + /// Run the backend in background. + /// + /// The returned channel sender is used to send actions to the actor, and the actor will + /// act on the actions and send responses back. + pub async fn serve_in_background( + pool: Pool, + chain_specs: SupportedChainSpecs, + pause_rx: Receiver<()>, + action_rx: Receiver<(Action, oneshot::Sender>)>, + max_proving_concurrency: usize, + ) { + let channel_size = 1024; + let (internal_tx, internal_rx) = mpsc::channel::(channel_size); + tokio::spawn(async move { + Backend { + pool, + chain_specs, + internal_tx, + proving_semaphore: Arc::new(Semaphore::new(max_proving_concurrency)), + } + .serve(action_rx, internal_rx, pause_rx) + .await; + }); + } + + // There are three incoming channels: + // 1. action_rx: actions from the external Actor + // 2. internal_rx: internal signals from the backend itself + // 3. pause_rx: pause signal from the external Actor + async fn serve( + mut self, + mut action_rx: Receiver<(Action, oneshot::Sender>)>, + mut internal_rx: Receiver, + mut pause_rx: Receiver<()>, + ) { + loop { + tokio::select! { + Some((action, resp_tx)) = action_rx.recv() => { + let request_key = action.request_key().clone(); + let response = self.handle_external_action(action.clone()).await; + + // Signal the request key to the internal channel, to move on to the next step, whatever the result is + // + // NOTE: Why signal whatever the result is? It's for fault tolerance, to ensure the request will be + // handled even when something unexpected happens. + self.ensure_internal_signal(request_key).await; + + if let Err(err) = resp_tx.send(response.clone()) { + tracing::error!( + "Actor Backend failed to send response {response:?} to action {action}: {err:?}" + ); + } + } + Some(request_key) = internal_rx.recv() => { + self.handle_internal_signal(request_key.clone()).await; + } + Some(()) = pause_rx.recv() => { + tracing::info!("Actor Backend received pause-signal, halting"); + if let Err(err) = self.halt().await { + tracing::error!("Actor Backend failed to halt: {err:?}"); + } + } + else => { + // All channels are closed, exit the loop + tracing::info!("Actor Backend exited"); + break; + } + } + } + } + + async fn handle_external_action( + &mut self, + action: Action, + ) -> Result { + match action { + Action::Prove { + request_key, + request_entity, + } => match self.pool.get_status(&request_key) { + Ok(None) => { + tracing::info!("Actor Backend received prove-action {request_key}, and it is not in pool, registering"); + self.register(request_key.clone(), request_entity).await + } + Ok(Some(status)) => match status.status() { + Status::Registered | Status::WorkInProgress | Status::Success { .. } => { + tracing::info!("Actor Backend received prove-action {request_key}, but it is already {status}, skipping"); + Ok(status) + } + Status::Cancelled { .. } => { + tracing::warn!("Actor Backend received prove-action {request_key}, and it is cancelled, re-registering"); + self.register(request_key, request_entity).await + } + Status::Failed { .. } => { + tracing::warn!("Actor Backend received prove-action {request_key}, and it is failed, re-registering"); + self.register(request_key, request_entity).await + } + }, + Err(err) => { + tracing::error!( + "Actor Backend failed to get status of prove-action {request_key}: {err:?}" + ); + Err(err) + } + }, + Action::Cancel { request_key } => match self.pool.get_status(&request_key) { + Ok(None) => { + tracing::warn!("Actor Backend received cancel-action {request_key}, but it is not in pool, skipping"); + Err("request is not in pool".to_string()) + } + Ok(Some(status)) => match status.status() { + Status::Registered | Status::WorkInProgress => { + tracing::info!("Actor Backend received cancel-action {request_key}, and it is {status}, cancelling"); + self.cancel(request_key, status).await + } + + Status::Failed { .. } | Status::Cancelled { .. } | Status::Success { .. } => { + tracing::info!("Actor Backend received cancel-action {request_key}, but it is already {status}, skipping"); + Ok(status) + } + }, + Err(err) => { + tracing::error!( + "Actor Backend failed to get status of cancel-action {request_key}: {err:?}" + ); + Err(err) + } + }, + } + } + + // Check the request status and then move on to the next step accordingly. + async fn handle_internal_signal(&mut self, request_key: RequestKey) { + match self.pool.get(&request_key) { + Ok(Some((request_entity, status))) => match status.status() { + Status::Registered => match request_entity { + RequestEntity::SingleProof(entity) => { + tracing::info!("Actor Backend received internal signal {request_key}, status: {status}, proving single proof"); + self.prove_single(request_key.clone(), entity).await; + self.ensure_internal_signal(request_key).await; + } + RequestEntity::Aggregation(entity) => { + tracing::info!("Actor Backend received internal signal {request_key}, status: {status}, proving aggregation proof"); + self.prove_aggregation(request_key.clone(), entity).await; + self.ensure_internal_signal(request_key).await; + } + }, + Status::WorkInProgress => { + // Wait for proving completion + tracing::debug!( + "Actor Backend checks a work-in-progress request {request_key}, elapsed: {elapsed:?}", + elapsed = chrono::Utc::now() - status.timestamp(), + ); + self.ensure_internal_signal_after(request_key, Duration::from_secs(3)) + .await; + } + Status::Success { .. } | Status::Cancelled { .. } | Status::Failed { .. } => { + tracing::info!("Actor Backend received internal signal {request_key}, status: {status}, done"); + } + }, + Ok(None) => { + tracing::warn!( + "Actor Backend received internal signal {request_key}, but it is not in pool, skipping" + ); + } + Err(err) => { + // Fault tolerance: re-enqueue the internal signal after 3 seconds + tracing::warn!( + "Actor Backend failed to get status of internal signal {request_key}: {err:?}, performing fault tolerance and retrying later" + ); + self.ensure_internal_signal_after(request_key, Duration::from_secs(3)) + .await; + } + } + } + + // Ensure signal the request key to the internal channel. + // + // Note that this function will retry sending the signal until success. + async fn ensure_internal_signal(&mut self, request_key: RequestKey) { + let mut ticker = tokio::time::interval(Duration::from_secs(3)); + let internal_tx = self.internal_tx.clone(); + tokio::spawn(async move { + loop { + if let Err(err) = internal_tx.send(request_key.clone()).await { + tracing::error!("Actor Backend failed to send internal signal {request_key}: {err:?}, retrying. It should not happen, please issue a bug report"); + } + ticker.tick().await; + } + }); + } + + async fn ensure_internal_signal_after(&mut self, request_key: RequestKey, after: Duration) { + let mut timer = tokio::time::interval(after); + timer.tick().await; // first tick is immediate + timer.tick().await; + self.ensure_internal_signal(request_key).await + } + + // Register a new request to the pool and notify the actor. + async fn register( + &mut self, + request_key: RequestKey, + request_entity: RequestEntity, + ) -> Result { + // 1. Register to the pool + let status = StatusWithContext::new_registered(); + if let Err(err) = self + .pool + .add(request_key.clone(), request_entity, status.clone()) + { + return Err(err); + } + + Ok(status) + } + + async fn cancel( + &mut self, + request_key: RequestKey, + old_status: StatusWithContext, + ) -> Result { + if old_status.status() != &Status::Registered + && old_status.status() != &Status::WorkInProgress + { + tracing::warn!("Actor Backend received cancel-action {request_key}, but it is not registered or work-in-progress, skipping"); + return Ok(old_status); + } + + // Case: old_status is registered: mark the request as cancelled in the pool and return directly + if old_status.status() == &Status::Registered { + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + return Ok(status); + } + + // Case: old_status is work-in-progress: + // 1. Cancel the proving work by the cancel token // TODO: cancel token + // 2. Remove the proof id from the pool + // 3. Mark the request as cancelled in the pool + match &request_key { + RequestKey::SingleProof(key) => { + raiko_core::interfaces::cancel_proof( + key.proof_type().clone(), + ( + key.chain_id().clone(), + key.block_number().clone(), + key.block_hash().clone(), + *key.proof_type() as u8, + ), + Box::new(&mut self.pool), + ) + .await + .or_else(|e| { + if e.to_string().contains("No data for query") { + tracing::warn!("Actor Backend received cancel-action {request_key}, but it is already cancelled or not yet started, skipping"); + Ok(()) + } else { + tracing::error!( + "Actor Backend received cancel-action {request_key}, but failed to cancel proof: {e:?}" + ); + Err(format!("failed to cancel proof: {e:?}")) + } + })?; + + // 3. Mark the request as cancelled in the pool + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + Ok(status) + } + RequestKey::Aggregation(..) => { + let status = StatusWithContext::new_cancelled(); + self.pool.update_status(request_key, status.clone())?; + Ok(status) + } + } + } + + async fn prove_single( + &mut self, + request_key: RequestKey, + request_entity: SingleProofRequestEntity, + ) { + // 1. Update the request status in pool to WorkInProgress + if let Err(err) = self + .pool + .update_status(request_key.clone(), Status::WorkInProgress.into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {status}", + status = Status::WorkInProgress, + ); + return; + } + + // 2. Start the proving work in a separate thread + let mut actor = self.clone(); + let proving_semaphore = self.proving_semaphore.clone(); + let (semaphore_acquired_tx, semaphore_acquired_rx) = oneshot::channel(); + tokio::spawn(async move { + // Acquire a permit from the semaphore before starting the proving work + let _permit = proving_semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + semaphore_acquired_tx.send(()).unwrap(); + + // 2.1. Start the proving work + let proven_status = do_prove_single( + &mut actor.pool, + &actor.chain_specs, + request_key.clone(), + request_entity, + ) + .await + .map(|proof| Status::Success { proof }) + .unwrap_or_else(|error| Status::Failed { error }); + + // 2.2. Update the request status in pool to the resulted status + if let Err(err) = actor + .pool + .update_status(request_key.clone(), proven_status.clone().into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" + ); + return; + } + // The permit is automatically dropped here, releasing the semaphore + }); + + // Wait for the semaphore to be acquired + semaphore_acquired_rx.await.unwrap(); + } + + async fn prove_aggregation( + &mut self, + request_key: RequestKey, + request_entity: AggregationRequestEntity, + ) { + // 1. Update the request status in pool to WorkInProgress + if let Err(err) = self + .pool + .update_status(request_key.clone(), Status::WorkInProgress.into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {status}", + status = Status::WorkInProgress, + ); + return; + } + + // 2. Start the proving work in a separate thread + let mut actor = self.clone(); + let proving_semaphore = self.proving_semaphore.clone(); + let (semaphore_acquired_tx, semaphore_acquired_rx) = oneshot::channel(); + tokio::spawn(async move { + // Acquire a permit from the semaphore before starting the proving work + let _permit = proving_semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + semaphore_acquired_tx.send(()).unwrap(); + + // 2.1. Start the proving work + let proven_status = + do_prove_aggregation(&mut actor.pool, request_key.clone(), request_entity) + .await + .map(|proof| Status::Success { proof }) + .unwrap_or_else(|error| Status::Failed { error }); + + // 2.2. Update the request status in pool to the resulted status + if let Err(err) = actor + .pool + .update_status(request_key.clone(), proven_status.clone().into()) + { + tracing::error!( + "Actor Backend failed to update status of prove-action {request_key}: {err:?}, status: {proven_status}" + ); + return; + } + // The permit is automatically dropped here, releasing the semaphore + }); + + // Wait for the semaphore to be acquired + semaphore_acquired_rx.await.unwrap(); + } + + async fn halt(&mut self) -> Result<(), String> { + todo!("halt") + } +} + +// TODO: cache input, reference to raiko_host::cache +// TODO: memory tracking +// TODO: metrics +// TODO: measurement +pub async fn do_prove_single( + pool: &mut dyn IdWrite, + chain_specs: &SupportedChainSpecs, + request_key: RequestKey, + request_entity: SingleProofRequestEntity, +) -> Result { + tracing::info!("Generating proof for {request_key}"); + + let l1_chain_spec = chain_specs + .get_chain_spec(&request_entity.l1_network()) + .ok_or_else(|| { + format!( + "unsupported l1 network: {}, it should not happen, please issue a bug report", + request_entity.l1_network() + ) + })?; + let taiko_chain_spec = chain_specs + .get_chain_spec(&request_entity.network()) + .ok_or_else(|| { + format!( + "unsupported raiko network: {}, it should not happen, please issue a bug report", + request_entity.network() + ) + })?; + let proof_request = ProofRequest { + block_number: *request_entity.block_number(), + l1_inclusion_block_number: *request_entity.l1_inclusion_block_number(), + network: request_entity.network().clone(), + l1_network: request_entity.l1_network().clone(), + graffiti: request_entity.graffiti().clone(), + prover: request_entity.prover().clone(), + proof_type: request_entity.proof_type().clone(), + blob_proof_type: request_entity.blob_proof_type().clone(), + prover_args: request_entity.prover_args().clone(), + }; + let raiko = Raiko::new(l1_chain_spec, taiko_chain_spec.clone(), proof_request); + let provider = RpcBlockDataProvider::new( + &taiko_chain_spec.rpc.clone(), + request_entity.block_number() - 1, + ) + .map_err(|err| format!("failed to create rpc block data provider: {err:?}"))?; + + // 1. Generate the proof input + let input = raiko + .generate_input(provider) + .await + .map_err(|e| format!("failed to generate input: {e:?}"))?; + + // 2. Generate the proof output + let output = raiko + .get_output(&input) + .map_err(|e| format!("failed to get output: {e:?}"))?; + + // 3. Generate the proof + let proof = raiko + .prove(input, &output, Some(pool)) + .await + .map_err(|err| format!("failed to generate single proof: {err:?}"))?; + + Ok(proof) +} + +async fn do_prove_aggregation( + pool: &mut dyn IdWrite, + request_key: RequestKey, + request_entity: AggregationRequestEntity, +) -> Result { + let proof_type = request_key.proof_type().clone(); + let proofs = request_entity.proofs().clone(); + + let input = AggregationGuestInput { proofs }; + let output = AggregationGuestOutput { hash: B256::ZERO }; + let config = serde_json::to_value(request_entity.prover_args()) + .map_err(|err| format!("failed to serialize prover args: {err:?}"))?; + + let proof = aggregate_proofs(proof_type, input, &output, &config, Some(pool)) + .await + .map_err(|err| format!("failed to generate aggregation proof: {err:?}"))?; + + Ok(proof) +} diff --git a/reqactor/src/lib.rs b/reqactor/src/lib.rs new file mode 100644 index 00000000..6ae17370 --- /dev/null +++ b/reqactor/src/lib.rs @@ -0,0 +1,47 @@ +mod action; +mod actor; +mod backend; + +use raiko_core::interfaces::ProofRequestOpt; +use raiko_lib::consts::SupportedChainSpecs; +use tokio::sync::{mpsc, oneshot}; + +pub(crate) use backend::Backend; + +// re-export +pub use action::Action; +pub use actor::Actor; +pub use raiko_reqpool::{ + AggregationRequestEntity, AggregationRequestKey, Pool, RequestEntity, RequestKey, + SingleProofRequestEntity, SingleProofRequestKey, StatusWithContext, +}; + +/// Run the actor backend in background, and return the actor. +pub async fn start_actor( + pool: Pool, + chain_specs: SupportedChainSpecs, + default_request_config: ProofRequestOpt, + max_proving_concurrency: usize, +) -> Actor { + let channel_size = 1024; + let (action_tx, action_rx) = + mpsc::channel::<(Action, oneshot::Sender>)>(channel_size); + let (pause_tx, pause_rx) = mpsc::channel::<()>(1); + + Backend::serve_in_background( + pool.clone(), + chain_specs.clone(), + pause_rx, + action_rx, + max_proving_concurrency, + ) + .await; + + Actor::new( + pool, + default_request_config, + chain_specs.clone(), + action_tx, + pause_tx, + ) +} diff --git a/taskdb/src/lib.rs b/taskdb/src/lib.rs index 513843a1..246cb179 100644 --- a/taskdb/src/lib.rs +++ b/taskdb/src/lib.rs @@ -321,7 +321,7 @@ impl IdWrite for TaskManagerWrapper { #[async_trait::async_trait] impl IdStore for TaskManagerWrapper { - async fn read_id(&self, key: ProofKey) -> ProverResult { + async fn read_id(&mut self, key: ProofKey) -> ProverResult { self.manager.read_id(key).await } } diff --git a/taskdb/src/mem_db.rs b/taskdb/src/mem_db.rs index 8ebab1b7..aa4bd76b 100644 --- a/taskdb/src/mem_db.rs +++ b/taskdb/src/mem_db.rs @@ -291,7 +291,7 @@ impl IdWrite for InMemoryTaskManager { #[async_trait::async_trait] impl IdStore for InMemoryTaskManager { - async fn read_id(&self, key: ProofKey) -> ProverResult { + async fn read_id(&mut self, key: ProofKey) -> ProverResult { let mut db = self.db.lock().await; db.read_id(key) .map_err(|e| ProverError::StoreError(e.to_string()))