Skip to content

Commit

Permalink
feat(reqactor): support list
Browse files Browse the repository at this point in the history
  • Loading branch information
keroro520 committed Jan 21, 2025
1 parent 7859c64 commit 0ab7722
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
13 changes: 10 additions & 3 deletions reqactor/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};

use raiko_core::interfaces::ProofRequestOpt;
Expand Down Expand Up @@ -64,6 +67,10 @@ impl Actor {
self.pool.lock().unwrap().get_status(request_key)
}

pub fn pool_list_status(&self) -> Result<HashMap<RequestKey, StatusWithContext>, String> {
self.pool.lock().unwrap().list()
}

/// Send an action to the backend and wait for the response.
pub async fn act(&self, action: Action) -> Result<StatusWithContext, String> {
let (resp_tx, resp_rx) = oneshot::channel();
Expand Down
13 changes: 12 additions & 1 deletion reqactor/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl Backend {
},
Status::WorkInProgress => {
// Wait for proving completion
tracing::info!(
tracing::debug!(
"Actor Backend checks a work-in-progress request {request_key}, elapsed: {elapsed:?}",
elapsed = chrono::Utc::now() - status.timestamp(),
);
Expand Down Expand Up @@ -228,6 +228,7 @@ impl Backend {

async fn ensure_internal_signal_after(&mut self, request_key: RequestKey, after: Duration) {
let mut timer = tokio::time::interval(after);
timer.tick().await; // first tick is immediate
timer.tick().await;
self.ensure_internal_signal(request_key).await
}
Expand Down Expand Up @@ -329,12 +330,14 @@ impl Backend {
// 2. Start the proving work in a separate thread
let mut actor = self.clone();
let proving_semaphore = self.proving_semaphore.clone();
let (semaphore_acquired_tx, semaphore_acquired_rx) = oneshot::channel();
tokio::spawn(async move {
// Acquire a permit from the semaphore before starting the proving work
let _permit = proving_semaphore
.acquire()
.await
.expect("semaphore should not be closed");
semaphore_acquired_tx.send(()).unwrap();

// 2.1. Start the proving work
let proven_status = do_prove_single(
Expand All @@ -359,6 +362,9 @@ impl Backend {
}
// The permit is automatically dropped here, releasing the semaphore
});

// Wait for the semaphore to be acquired
semaphore_acquired_rx.await.unwrap();
}

async fn prove_aggregation(
Expand All @@ -381,12 +387,14 @@ impl Backend {
// 2. Start the proving work in a separate thread
let mut actor = self.clone();
let proving_semaphore = self.proving_semaphore.clone();
let (semaphore_acquired_tx, semaphore_acquired_rx) = oneshot::channel();
tokio::spawn(async move {
// Acquire a permit from the semaphore before starting the proving work
let _permit = proving_semaphore
.acquire()
.await
.expect("semaphore should not be closed");
semaphore_acquired_tx.send(()).unwrap();

// 2.1. Start the proving work
let proven_status =
Expand All @@ -407,6 +415,9 @@ impl Backend {
}
// The permit is automatically dropped here, releasing the semaphore
});

// Wait for the semaphore to be acquired
semaphore_acquired_rx.await.unwrap();
}

async fn halt(&mut self) -> Result<(), String> {
Expand Down

0 comments on commit 0ab7722

Please sign in to comment.