Skip to content

Commit

Permalink
[TestLoop] Make MessageWithCallback accept future as a result. (near#…
Browse files Browse the repository at this point in the history
…12212)

Previously, `MessageWithCallback<T, R>` contains `T` as well as a
callback that is a function that accepts an `Result<R, AsyncSendError>`.
And then, `AsyncSender<T, R>` is simply `Sender<MessageWithCallback<T,
R>>`.

The problem was that in the actix implementation of
`Sender<MessageWithCallback<T, R>>`, because on the actix side when we
call `.send(msg)` it gives us a future of `R`, we cannot call the
callback right away. The solution was to `actix::spawn` a future that
awaits this future of `R` and then we call the callback.

This was a bad design, because it is actually the *sender* (code that
calls `.send_async(...)`) who ends up calling `actix::spawn`, and that
panics if the sender was not on an actix thread.

This PR changes that so that `MessageWithCallback<T, R>` contains a
callback that accepts a future of the result. That way, it becomes the
responsibility of whoever awaits on the resulting future from
`.send_async(...)` to drive the result future, and there won't be any
problems since we don't spawn anything anymore.

Also correct a use case where code in the test was sending a custom
MessageWithCallback. This is not supported; changed it to spawning a
future on the testloop instead.
  • Loading branch information
robin-near authored Oct 14, 2024
1 parent 8cfa7a7 commit e0d9637
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 37 deletions.
3 changes: 2 additions & 1 deletion chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,8 @@ pub fn setup_no_network_with_validity_period(
let future = client.send_async(
ChunkEndorsementMessage(endorsement.clone()).with_span_context(),
);
drop(future);
// Don't ignore the future or else the message may not actually be handled.
actix::spawn(future);
}
}
_ => {}
Expand Down
18 changes: 11 additions & 7 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::types::{
ReasonForBan,
};
use crate::PeerManagerActor;
use futures::FutureExt;
use near_async::messaging::IntoMultiSender;
use near_async::messaging::Sender;
use near_async::time;
Expand Down Expand Up @@ -601,18 +602,21 @@ pub(crate) async fn start(
let result = Some(StateResponse(Box::new(StateResponseInfo::V2(
StateResponseInfoV2 { shard_id, sync_hash, state_response },
))));
(msg.callback)(Ok(result));
(msg.callback)(std::future::ready(Ok(result)).boxed());
send.send(Event::Client(
ClientSenderForNetworkInput::_state_request_part(msg.message),
));
}
ClientSenderForNetworkMessage::_announce_account(msg) => {
(msg.callback)(Ok(Ok(msg
.message
.0
.iter()
.map(|(account, _)| account.clone())
.collect())));
(msg.callback)(
std::future::ready(Ok(Ok(msg
.message
.0
.iter()
.map(|(account, _)| account.clone())
.collect())))
.boxed(),
);
send.send(Event::Client(
ClientSenderForNetworkInput::_announce_account(msg.message),
));
Expand Down
9 changes: 6 additions & 3 deletions chain/network/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,12 @@ impl CanSend<MessageWithCallback<PeerManagerMessageRequest, PeerManagerMessageRe
) {
self.requests.write().unwrap().push_back(message.message);
self.notify.notify_one();
(message.callback)(Ok(PeerManagerMessageResponse::NetworkResponses(
NetworkResponses::NoResponse,
)));
(message.callback)(
std::future::ready(Ok(PeerManagerMessageResponse::NetworkResponses(
NetworkResponses::NoResponse,
)))
.boxed(),
);
}
}

Expand Down
13 changes: 8 additions & 5 deletions core/async/src/actix.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::messaging::{AsyncSendError, CanSend, MessageWithCallback};
use futures::FutureExt;
use near_o11y::{WithSpanContext, WithSpanContextExt};

/// An actix Addr implements CanSend for any message type that the actor handles.
Expand Down Expand Up @@ -39,13 +40,15 @@ where
fn send(&self, message: MessageWithCallback<M, M::Result>) {
let MessageWithCallback { message, callback: responder } = message;
let future = self.send(message);
actix::spawn(async move {

let transformed_future = async move {
match future.await {
Ok(result) => responder(Ok(result)),
Err(actix::MailboxError::Closed) => responder(Err(AsyncSendError::Closed)),
Err(actix::MailboxError::Timeout) => responder(Err(AsyncSendError::Timeout)),
Ok(result) => Ok(result),
Err(actix::MailboxError::Closed) => Err(AsyncSendError::Closed),
Err(actix::MailboxError::Timeout) => Err(AsyncSendError::Timeout),
}
});
};
responder(transformed_future.boxed());
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/async/src/actix_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ where
Self::Context: DelayedActionRunner<T>,
T: messaging::HandlerWithContext<M>,
M: actix::Message,
<M as actix::Message>::Result: actix::dev::MessageResponse<ActixWrapper<T>, WithSpanContext<M>>,
<M as actix::Message>::Result:
actix::dev::MessageResponse<ActixWrapper<T>, WithSpanContext<M>> + Send,
{
type Result = M::Result;
fn handle(&mut self, msg: WithSpanContext<M>, ctx: &mut Self::Context) -> Self::Result {
Expand Down Expand Up @@ -83,7 +84,7 @@ where
T: messaging::Handler<M>,
M: actix::Message,
<M as actix::Message>::Result:
actix::dev::MessageResponse<SyncActixWrapper<T>, WithSpanContext<M>>,
actix::dev::MessageResponse<SyncActixWrapper<T>, WithSpanContext<M>> + Send,
{
type Result = M::Result;
fn handle(&mut self, msg: WithSpanContext<M>, _ctx: &mut Self::Context) -> Self::Result {
Expand Down
12 changes: 8 additions & 4 deletions core/async/src/functional.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::messaging::{CanSend, MessageWithCallback};
use futures::FutureExt;
use std::marker::PhantomData;

/// Allows a Sender to be created from a raw function.
Expand All @@ -20,22 +21,25 @@ impl<M: 'static, F: Fn(M) + Send + Sync + 'static> CanSend<M> for SendFunction<M
}

/// Allows an AsyncSender to be created from a raw (synchronous) function.
pub struct SendAsyncFunction<M: 'static, R: 'static, F: Fn(M) -> R + Send + Sync + 'static> {
pub struct SendAsyncFunction<M: 'static, R: Send + 'static, F: Fn(M) -> R + Send + Sync + 'static> {
f: F,
_phantom: PhantomData<fn(M, R)>,
}

impl<M: 'static, R: 'static, F: Fn(M) -> R + Send + Sync + 'static> SendAsyncFunction<M, R, F> {
impl<M: 'static, R: Send + 'static, F: Fn(M) -> R + Send + Sync + 'static>
SendAsyncFunction<M, R, F>
{
pub fn new(f: F) -> Self {
Self { f, _phantom: PhantomData }
}
}

impl<M: 'static, R: 'static, F: Fn(M) -> R + Send + Sync + 'static>
impl<M: 'static, R: Send + 'static, F: Fn(M) -> R + Send + Sync + 'static>
CanSend<MessageWithCallback<M, R>> for SendAsyncFunction<M, R, F>
{
fn send(&self, message: MessageWithCallback<M, R>) {
let MessageWithCallback { message, callback: responder } = message;
responder(Ok((self.f)(message)));
let result = Ok((self.f)(message));
responder(async move { result }.boxed());
}
}
28 changes: 21 additions & 7 deletions core/async/src/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub trait Actor {
/// messages it would like to handle, while the CanSend trait implements the logic to send the
/// message to the actor. Handle and CanSend are typically not both implemented by the same struct.
/// Note that the actor is any struct that implements the Handler trait, not just actix actors.
pub trait Handler<M: actix::Message> {
pub trait Handler<M: actix::Message>
where
M::Result: Send,
{
fn handle(&mut self, msg: M) -> M::Result;
}

Expand All @@ -40,14 +43,18 @@ pub trait Handler<M: actix::Message> {
/// defined as actix::Context<Self> implements DelayedActionRunner<T>.
/// Note that the implementer for hander of a message only needs to implement either of Handler or
/// HandlerWithContext, not both.
pub trait HandlerWithContext<M: actix::Message> {
pub trait HandlerWithContext<M: actix::Message>
where
M::Result: Send,
{
fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result;
}

impl<A, M> HandlerWithContext<M> for A
where
M: actix::Message,
A: Actor + Handler<M>,
M::Result: Send,
{
fn handle(&mut self, msg: M, ctx: &mut dyn DelayedActionRunner<Self>) -> M::Result {
self.wrap_handler(msg, ctx, |this, msg, _| Handler::handle(this, msg))
Expand Down Expand Up @@ -134,8 +141,14 @@ impl<M, R: Send + 'static, A: CanSend<MessageWithCallback<M, R>> + ?Sized> SendA
// possible that someone implementing the Sender would just drop the
// message without calling the responder, in which case we return a
// Dropped error.
let (sender, receiver) = oneshot::channel::<Result<R, AsyncSendError>>();
let future = async move { receiver.await.unwrap_or_else(|_| Err(AsyncSendError::Dropped)) };
let (sender, receiver) =
oneshot::channel::<BoxFuture<'static, Result<R, AsyncSendError>>>();
let future = async move {
match receiver.await {
Ok(result_future) => result_future.await,
Err(_) => Err(AsyncSendError::Dropped),
}
};
let responder = Box::new(move |r| {
// It's ok for the send to return an error, because that means the receiver is dropped
// therefore the sender does not care about the result.
Expand Down Expand Up @@ -182,10 +195,10 @@ impl Display for AsyncSendError {

/// Used to implement an async sender. An async sender is just a Sender whose
/// message is `MessageWithCallback<M, R>`, which is a message plus a
/// callback function (which resolves the future that send_async returns).
/// callback to send the response future back.
pub struct MessageWithCallback<T, R> {
pub message: T,
pub callback: Box<dyn FnOnce(Result<R, AsyncSendError>) + Send>,
pub callback: Box<dyn FnOnce(BoxFuture<'static, Result<R, AsyncSendError>>) + Send>,
}

impl<T: Debug, R> Debug for MessageWithCallback<T, R> {
Expand Down Expand Up @@ -264,6 +277,7 @@ impl<A, B: MultiSenderFrom<A>> IntoMultiSender<B> for A {
#[cfg(test)]
mod tests {
use crate::messaging::{AsyncSendError, MessageWithCallback, Sender};
use futures::FutureExt;
use tokio_util::sync::CancellationToken;

#[tokio::test]
Expand Down Expand Up @@ -295,7 +309,7 @@ mod tests {
let callback_done = callback_done.clone();
tokio::spawn(async move {
result_blocker.cancelled().await;
callback(Ok(message));
callback(async move { Ok(message) }.boxed());
callback_done.cancel();
});
})
Expand Down
6 changes: 4 additions & 2 deletions core/async/src/test_loop/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::time::Duration;

use super::data::{TestLoopData, TestLoopDataHandle};
use super::PendingEventsSender;
use futures::FutureExt;

/// TestLoopSender implements the CanSend methods for an actor that can Handle them. This is
/// similar to our pattern of having an ActixWarpper around an actor to send messages to it.
Expand Down Expand Up @@ -80,6 +81,7 @@ impl<M, A> CanSend<M> for TestLoopSender<A>
where
M: actix::Message + Debug + Send + 'static,
A: Actor + HandlerWithContext<M> + 'static,
M::Result: Send,
{
fn send(&self, msg: M) {
let mut this = self.clone();
Expand All @@ -100,7 +102,7 @@ impl<M, R, A> CanSend<MessageWithCallback<M, R>> for TestLoopSender<A>
where
M: actix::Message<Result = R> + Debug + Send + 'static,
A: Actor + HandlerWithContext<M> + 'static,
R: 'static,
R: 'static + Send,
{
fn send(&self, msg: MessageWithCallback<M, R>) {
let mut this = self.clone();
Expand All @@ -109,7 +111,7 @@ where
let MessageWithCallback { message: msg, callback } = msg;
let actor = data.get_mut(&this.actor_handle);
let result = actor.handle(msg, &mut this);
callback(Ok(result));
callback(async move { Ok(result) }.boxed());
};
self.pending_events_sender.send_with_delay(
description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl<'a> ViewClientTester<'a> {
/// Sends a message to the `[ViewClientActorInner]` for the client at position `idx`.
fn send<M: actix::Message>(&mut self, request: M, idx: usize) -> M::Result
where
M::Result: Send,
ViewClientActorInner: Handler<M>,
{
let view_client = self.test_loop.data.get_mut(&self.handles[idx]);
Expand Down
13 changes: 7 additions & 6 deletions integration-tests/src/test_loop/utils/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::test_loop::env::TestData;
use assert_matches::assert_matches;
use itertools::Itertools;
use near_async::messaging::{CanSend, MessageWithCallback, SendAsync};
use near_async::messaging::SendAsync;
use near_async::test_loop::TestLoopV2;
use near_async::time::Duration;
use near_client::test_utils::test_loop::ClientQueries;
Expand All @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use super::{ONE_NEAR, TGAS};
use near_async::futures::FutureSpawnerExt;

/// Execute money transfers within given `TestLoop` between given accounts.
/// Runs chain long enough for the transfers to be optimistically executed.
Expand Down Expand Up @@ -240,11 +241,11 @@ pub fn execute_tx(
let process_result = Arc::new(Mutex::new(None));
let process_result_clone = process_result.clone();

node_datas[rpc_node_id].client_sender.send(MessageWithCallback {
message: ProcessTxRequest { transaction: tx, is_forwarded: false, check_only: false },
callback: Box::new(move |process_res| {
*process_result_clone.lock().unwrap() = Some(process_res);
}),
let initial_process_tx_future = node_datas[rpc_node_id]
.client_sender
.send_async(ProcessTxRequest { transaction: tx, is_forwarded: false, check_only: false });
test_loop.future_spawner().spawn("initial process tx", async move {
*process_result_clone.lock().unwrap() = Some(initial_process_tx_future.await);
});

test_loop.run_until(
Expand Down

0 comments on commit e0d9637

Please sign in to comment.