From e0d963735d40c1081b353b3cc9681a79d2ed5f21 Mon Sep 17 00:00:00 2001 From: robin-near <111538878+robin-near@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:25:16 -0700 Subject: [PATCH] [TestLoop] Make MessageWithCallback accept future as a result. (#12212) Previously, `MessageWithCallback` contains `T` as well as a callback that is a function that accepts an `Result`. And then, `AsyncSender` is simply `Sender>`. The problem was that in the actix implementation of `Sender>`, because on the actix side when we call `.send(msg)` it gives us a future of `R`, we cannot call the callback right away. The solution was to `actix::spawn` a future that awaits this future of `R` and then we call the callback. This was a bad design, because it is actually the *sender* (code that calls `.send_async(...)`) who ends up calling `actix::spawn`, and that panics if the sender was not on an actix thread. This PR changes that so that `MessageWithCallback` contains a callback that accepts a future of the result. That way, it becomes the responsibility of whoever awaits on the resulting future from `.send_async(...)` to drive the result future, and there won't be any problems since we don't spawn anything anymore. Also correct a use case where code in the test was sending a custom MessageWithCallback. This is not supported; changed it to spawning a future on the testloop instead. --- chain/client/src/test_utils/setup.rs | 3 +- chain/network/src/peer_manager/testonly.rs | 18 +++++++----- chain/network/src/test_utils.rs | 9 ++++-- core/async/src/actix.rs | 13 +++++---- core/async/src/actix_wrapper.rs | 5 ++-- core/async/src/functional.rs | 12 +++++--- core/async/src/messaging.rs | 28 ++++++++++++++----- core/async/src/test_loop/sender.rs | 6 ++-- .../tests/view_requests_to_archival_node.rs | 1 + .../src/test_loop/utils/transactions.rs | 13 +++++---- 10 files changed, 71 insertions(+), 37 deletions(-) diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index c8490c9070d..2301591f7b9 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -982,7 +982,8 @@ pub fn setup_no_network_with_validity_period( let future = client.send_async( ChunkEndorsementMessage(endorsement.clone()).with_span_context(), ); - drop(future); + // Don't ignore the future or else the message may not actually be handled. + actix::spawn(future); } } _ => {} diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index 5ef9abce822..f7f253b74d7 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -29,6 +29,7 @@ use crate::types::{ ReasonForBan, }; use crate::PeerManagerActor; +use futures::FutureExt; use near_async::messaging::IntoMultiSender; use near_async::messaging::Sender; use near_async::time; @@ -601,18 +602,21 @@ pub(crate) async fn start( let result = Some(StateResponse(Box::new(StateResponseInfo::V2( StateResponseInfoV2 { shard_id, sync_hash, state_response }, )))); - (msg.callback)(Ok(result)); + (msg.callback)(std::future::ready(Ok(result)).boxed()); send.send(Event::Client( ClientSenderForNetworkInput::_state_request_part(msg.message), )); } ClientSenderForNetworkMessage::_announce_account(msg) => { - (msg.callback)(Ok(Ok(msg - .message - .0 - .iter() - .map(|(account, _)| account.clone()) - .collect()))); + (msg.callback)( + std::future::ready(Ok(Ok(msg + .message + .0 + .iter() + .map(|(account, _)| account.clone()) + .collect()))) + .boxed(), + ); send.send(Event::Client( ClientSenderForNetworkInput::_announce_account(msg.message), )); diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 2bbb64d433c..41980e7d964 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -242,9 +242,12 @@ impl CanSend) { let MessageWithCallback { message, callback: responder } = message; let future = self.send(message); - actix::spawn(async move { + + let transformed_future = async move { match future.await { - Ok(result) => responder(Ok(result)), - Err(actix::MailboxError::Closed) => responder(Err(AsyncSendError::Closed)), - Err(actix::MailboxError::Timeout) => responder(Err(AsyncSendError::Timeout)), + Ok(result) => Ok(result), + Err(actix::MailboxError::Closed) => Err(AsyncSendError::Closed), + Err(actix::MailboxError::Timeout) => Err(AsyncSendError::Timeout), } - }); + }; + responder(transformed_future.boxed()); } } diff --git a/core/async/src/actix_wrapper.rs b/core/async/src/actix_wrapper.rs index a01a82189fa..3c7d274bc13 100644 --- a/core/async/src/actix_wrapper.rs +++ b/core/async/src/actix_wrapper.rs @@ -51,7 +51,8 @@ where Self::Context: DelayedActionRunner, T: messaging::HandlerWithContext, M: actix::Message, - ::Result: actix::dev::MessageResponse, WithSpanContext>, + ::Result: + actix::dev::MessageResponse, WithSpanContext> + Send, { type Result = M::Result; fn handle(&mut self, msg: WithSpanContext, ctx: &mut Self::Context) -> Self::Result { @@ -83,7 +84,7 @@ where T: messaging::Handler, M: actix::Message, ::Result: - actix::dev::MessageResponse, WithSpanContext>, + actix::dev::MessageResponse, WithSpanContext> + Send, { type Result = M::Result; fn handle(&mut self, msg: WithSpanContext, _ctx: &mut Self::Context) -> Self::Result { diff --git a/core/async/src/functional.rs b/core/async/src/functional.rs index 7c1acaa3bb6..4ef3ed866ee 100644 --- a/core/async/src/functional.rs +++ b/core/async/src/functional.rs @@ -1,4 +1,5 @@ use crate::messaging::{CanSend, MessageWithCallback}; +use futures::FutureExt; use std::marker::PhantomData; /// Allows a Sender to be created from a raw function. @@ -20,22 +21,25 @@ impl CanSend for SendFunction R + Send + Sync + 'static> { +pub struct SendAsyncFunction R + Send + Sync + 'static> { f: F, _phantom: PhantomData, } -impl R + Send + Sync + 'static> SendAsyncFunction { +impl R + Send + Sync + 'static> + SendAsyncFunction +{ pub fn new(f: F) -> Self { Self { f, _phantom: PhantomData } } } -impl R + Send + Sync + 'static> +impl R + Send + Sync + 'static> CanSend> for SendAsyncFunction { fn send(&self, message: MessageWithCallback) { let MessageWithCallback { message, callback: responder } = message; - responder(Ok((self.f)(message))); + let result = Ok((self.f)(message)); + responder(async move { result }.boxed()); } } diff --git a/core/async/src/messaging.rs b/core/async/src/messaging.rs index 430fa90f960..9ff31413f34 100644 --- a/core/async/src/messaging.rs +++ b/core/async/src/messaging.rs @@ -30,7 +30,10 @@ pub trait Actor { /// messages it would like to handle, while the CanSend trait implements the logic to send the /// message to the actor. Handle and CanSend are typically not both implemented by the same struct. /// Note that the actor is any struct that implements the Handler trait, not just actix actors. -pub trait Handler { +pub trait Handler +where + M::Result: Send, +{ fn handle(&mut self, msg: M) -> M::Result; } @@ -40,7 +43,10 @@ pub trait Handler { /// defined as actix::Context implements DelayedActionRunner. /// Note that the implementer for hander of a message only needs to implement either of Handler or /// HandlerWithContext, not both. -pub trait HandlerWithContext { +pub trait HandlerWithContext +where + M::Result: Send, +{ fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner) -> M::Result; } @@ -48,6 +54,7 @@ impl HandlerWithContext for A where M: actix::Message, A: Actor + Handler, + M::Result: Send, { fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner) -> M::Result { self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg)) @@ -134,8 +141,14 @@ impl> + ?Sized> SendA // possible that someone implementing the Sender would just drop the // message without calling the responder, in which case we return a // Dropped error. - let (sender, receiver) = oneshot::channel::>(); - let future = async move { receiver.await.unwrap_or_else(|_| Err(AsyncSendError::Dropped)) }; + let (sender, receiver) = + oneshot::channel::>>(); + let future = async move { + match receiver.await { + Ok(result_future) => result_future.await, + Err(_) => Err(AsyncSendError::Dropped), + } + }; let responder = Box::new(move |r| { // It's ok for the send to return an error, because that means the receiver is dropped // therefore the sender does not care about the result. @@ -182,10 +195,10 @@ impl Display for AsyncSendError { /// Used to implement an async sender. An async sender is just a Sender whose /// message is `MessageWithCallback`, which is a message plus a -/// callback function (which resolves the future that send_async returns). +/// callback to send the response future back. pub struct MessageWithCallback { pub message: T, - pub callback: Box) + Send>, + pub callback: Box>) + Send>, } impl Debug for MessageWithCallback { @@ -264,6 +277,7 @@ impl> IntoMultiSender for A { #[cfg(test)] mod tests { use crate::messaging::{AsyncSendError, MessageWithCallback, Sender}; + use futures::FutureExt; use tokio_util::sync::CancellationToken; #[tokio::test] @@ -295,7 +309,7 @@ mod tests { let callback_done = callback_done.clone(); tokio::spawn(async move { result_blocker.cancelled().await; - callback(Ok(message)); + callback(async move { Ok(message) }.boxed()); callback_done.cancel(); }); }) diff --git a/core/async/src/test_loop/sender.rs b/core/async/src/test_loop/sender.rs index 502a172e89a..c20d504427b 100644 --- a/core/async/src/test_loop/sender.rs +++ b/core/async/src/test_loop/sender.rs @@ -9,6 +9,7 @@ use crate::time::Duration; use super::data::{TestLoopData, TestLoopDataHandle}; use super::PendingEventsSender; +use futures::FutureExt; /// TestLoopSender implements the CanSend methods for an actor that can Handle them. This is /// similar to our pattern of having an ActixWarpper around an actor to send messages to it. @@ -80,6 +81,7 @@ impl CanSend for TestLoopSender where M: actix::Message + Debug + Send + 'static, A: Actor + HandlerWithContext + 'static, + M::Result: Send, { fn send(&self, msg: M) { let mut this = self.clone(); @@ -100,7 +102,7 @@ impl CanSend> for TestLoopSender where M: actix::Message + Debug + Send + 'static, A: Actor + HandlerWithContext + 'static, - R: 'static, + R: 'static + Send, { fn send(&self, msg: MessageWithCallback) { let mut this = self.clone(); @@ -109,7 +111,7 @@ where let MessageWithCallback { message: msg, callback } = msg; let actor = data.get_mut(&this.actor_handle); let result = actor.handle(msg, &mut this); - callback(Ok(result)); + callback(async move { Ok(result) }.boxed()); }; self.pending_events_sender.send_with_delay( description, diff --git a/integration-tests/src/test_loop/tests/view_requests_to_archival_node.rs b/integration-tests/src/test_loop/tests/view_requests_to_archival_node.rs index f2f4a3dc7d1..f72617065a9 100644 --- a/integration-tests/src/test_loop/tests/view_requests_to_archival_node.rs +++ b/integration-tests/src/test_loop/tests/view_requests_to_archival_node.rs @@ -130,6 +130,7 @@ impl<'a> ViewClientTester<'a> { /// Sends a message to the `[ViewClientActorInner]` for the client at position `idx`. fn send(&mut self, request: M, idx: usize) -> M::Result where + M::Result: Send, ViewClientActorInner: Handler, { let view_client = self.test_loop.data.get_mut(&self.handles[idx]); diff --git a/integration-tests/src/test_loop/utils/transactions.rs b/integration-tests/src/test_loop/utils/transactions.rs index 1f95988f16f..fa5669f92cd 100644 --- a/integration-tests/src/test_loop/utils/transactions.rs +++ b/integration-tests/src/test_loop/utils/transactions.rs @@ -1,7 +1,7 @@ use crate::test_loop::env::TestData; use assert_matches::assert_matches; use itertools::Itertools; -use near_async::messaging::{CanSend, MessageWithCallback, SendAsync}; +use near_async::messaging::SendAsync; use near_async::test_loop::TestLoopV2; use near_async::time::Duration; use near_client::test_utils::test_loop::ClientQueries; @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use super::{ONE_NEAR, TGAS}; +use near_async::futures::FutureSpawnerExt; /// Execute money transfers within given `TestLoop` between given accounts. /// Runs chain long enough for the transfers to be optimistically executed. @@ -240,11 +241,11 @@ pub fn execute_tx( let process_result = Arc::new(Mutex::new(None)); let process_result_clone = process_result.clone(); - node_datas[rpc_node_id].client_sender.send(MessageWithCallback { - message: ProcessTxRequest { transaction: tx, is_forwarded: false, check_only: false }, - callback: Box::new(move |process_res| { - *process_result_clone.lock().unwrap() = Some(process_res); - }), + let initial_process_tx_future = node_datas[rpc_node_id] + .client_sender + .send_async(ProcessTxRequest { transaction: tx, is_forwarded: false, check_only: false }); + test_loop.future_spawner().spawn("initial process tx", async move { + *process_result_clone.lock().unwrap() = Some(initial_process_tx_future.await); }); test_loop.run_until(