From d92c69d6cd8cd0c2b1a52104270ae58d34fa5cf6 Mon Sep 17 00:00:00 2001 From: robin-near <111538878+robin-near@users.noreply.github.com> Date: Fri, 29 Mar 2024 14:06:02 -0700 Subject: [PATCH] [TestLoop] Vastly simplify LoopEventHandler (#10888) This PR is based on #10884 . Previously, a `LoopEventHandler` provides the ability to (1) initialize, which emits some events before the test loop is run at all; (2) handle an event; (3) decide whether an event can be dropped, which is used to ignore periodic timers at the end of a test; (4) send another event of the same type back into the test loop. This PR removes (1), (3), (4), leaving `LoopEventHandler` with only the ability to handle events; in other words, it is simply a function now. Here are the use cases that needed to be tackled: * (1), (3), (4) were used to support periodic timers. Periodic timers were implemented by first emitting an initial event during (1), and then later when handling that event, it used (4) to send another event to trigger the next tick, and so on. When the test is dropped, (3) returns true for any periodic timer event remaining in the event loop, so that the test doesn't think there are unhandled events. * Now, periodic timers are supported via two mechanisms: (a) by using the production implementation based on `DelayedActionRunner`, which periodic timers have already been refactored to use; see `ShardsManager::periodically_resend_chunk_requests`; for (3), the `DelayedActionRunner` that the TestLoop provides (`TestLoopDelayedActionRunner`) will no longer send new events when the test loop itself is shutting down; for production, the `DelayedActionRunner` is implemented using the Actix context, which does nothing if the actix system is shutting down, so these two behaviors are consistent. (b) by scheduling a future that uses Clock::sleep. See #10884 . * (4) was used mostly for convenience, so that when creating a handler, we do not need to pass in a `DelaySender` that can be used to send another event back to the event loop. For example, `route_shards_manager_network_messages` handled events of the type `(usize, Event)`, and since (4) provided a way to send another `(usize, Event)` back to the event loop, the handler just worked. However, this came at a cost of extra complexity in implementing the test loop because the event handler needed some way to have a `DelaySender<(usize, Event)>` (in this example) to send the event, so (1) is needed to support that (during initialization, we store the sender in a local field of the event handler). * Now, (4) is removed and replaced with the need to explicitly pass in any `DelaySender`s needed (and also Clock, or the shutting down flag). This is not really a big deal, because these are available easily via `test.sender()` (maybe with a `.for_index(idx)` or `.narrow()` trail), `test.clock()`, `test.shutting_down()`. * Because of this explicitness, I've added helper functions (`register_delayed_action_handler` and its `_for_index` variant) to register handlers for the delayed actions case, because otherwise the code looks too long and cryptic. --- chain/chunks/src/test/basic.rs | 36 ++- chain/chunks/src/test/multi.rs | 50 +-- chain/chunks/src/test_loop.rs | 144 ++++----- .../test_utils/client_actions_test_utils.rs | 2 +- .../src/examples/actix_component_test.rs | 10 +- core/async/src/examples/mod.rs | 2 - .../async/src/examples/multi_instance_test.rs | 12 +- core/async/src/examples/timed_component.rs | 28 -- .../src/examples/timed_component_test.rs | 63 ---- core/async/src/test_loop.rs | 299 +++++++++++------- core/async/src/test_loop/adhoc.rs | 3 +- core/async/src/test_loop/delay_sender.rs | 8 +- core/async/src/test_loop/event_handler.rs | 200 ++---------- core/async/src/test_loop/futures.rs | 71 ++++- core/async/src/test_loop/multi_instance.rs | 42 --- core/async/src/time.rs | 99 ++++-- .../features/multinode_test_loop_example.rs | 190 ++++++----- .../features/simple_test_loop_example.rs | 30 +- 18 files changed, 592 insertions(+), 697 deletions(-) delete mode 100644 core/async/src/examples/timed_component.rs delete mode 100644 core/async/src/examples/timed_component_test.rs delete mode 100644 core/async/src/test_loop/multi_instance.rs diff --git a/chain/chunks/src/test/basic.rs b/chain/chunks/src/test/basic.rs index bcebb48534e..8bf8d23e164 100644 --- a/chain/chunks/src/test/basic.rs +++ b/chain/chunks/src/test/basic.rs @@ -1,7 +1,16 @@ -use std::collections::HashSet; - +use crate::{ + adapter::ShardsManagerRequestFromClient, + client::ShardsManagerResponse, + test_loop::{ + forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, + MockChainForShardsManager, MockChainForShardsManagerConfig, + }, + test_utils::default_tip, + ShardsManager, CHUNK_REQUEST_RETRY, +}; use derive_enum_from_into::{EnumFrom, EnumTryInto}; use near_async::messaging::noop; +use near_async::test_loop::futures::TestLoopDelayedActionEvent; use near_async::time; use near_async::{ messaging::{CanSend, IntoSender}, @@ -18,20 +27,9 @@ use near_network::{ }; use near_primitives::types::{AccountId, BlockHeight}; use near_store::test_utils::create_test_store; +use std::collections::HashSet; use tracing::log::info; -use crate::{ - adapter::ShardsManagerRequestFromClient, - client::ShardsManagerResponse, - test_loop::{ - forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, - periodically_resend_chunk_requests, MockChainForShardsManager, - MockChainForShardsManagerConfig, ShardsManagerResendChunkRequests, - }, - test_utils::default_tip, - ShardsManager, CHUNK_REQUEST_RETRY, -}; - #[derive(derive_more::AsMut)] struct TestData { shards_manager: ShardsManager, @@ -60,8 +58,8 @@ enum TestEvent { NetworkToShardsManager(ShardsManagerRequestFromNetwork), ShardsManagerToClient(ShardsManagerResponse), ShardsManagerToNetwork(PeerManagerMessageRequest), - ShardsManagerResendRequests(ShardsManagerResendChunkRequests), Adhoc(AdhocEvent), + ShardsManagerDelayedActions(TestLoopDelayedActionEvent), } type ShardsManagerTestLoopBuilder = near_async::test_loop::TestLoopBuilder; @@ -182,8 +180,13 @@ fn test_chunk_forward() { test.register_handler(capture_events::().widen()); test.register_handler(forward_client_request_to_shards_manager().widen()); test.register_handler(forward_network_request_to_shards_manager().widen()); - test.register_handler(periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen()); test.register_handler(handle_adhoc_events::().widen()); + test.register_delayed_action_handler::(); + + test.data.shards_manager.periodically_resend_chunk_requests( + &mut test.sender().into_delayed_action_runner::(test.shutting_down()), + CHUNK_REQUEST_RETRY, + ); // We'll produce a single chunk whose next chunk producer is a chunk-only // producer, so that we can test that the chunk is forwarded to the next @@ -260,4 +263,5 @@ fn test_chunk_forward() { } } assert!(seen_part_request); + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } diff --git a/chain/chunks/src/test/multi.rs b/chain/chunks/src/test/multi.rs index 24e8bf84dd3..47a74bfa2d5 100644 --- a/chain/chunks/src/test/multi.rs +++ b/chain/chunks/src/test/multi.rs @@ -1,4 +1,16 @@ +use crate::{ + adapter::ShardsManagerRequestFromClient, + client::ShardsManagerResponse, + test_loop::{ + forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, + route_shards_manager_network_messages, MockChainForShardsManager, + MockChainForShardsManagerConfig, + }, + test_utils::default_tip, + ShardsManager, CHUNK_REQUEST_RETRY, +}; use derive_enum_from_into::{EnumFrom, EnumTryInto}; +use near_async::test_loop::futures::TestLoopDelayedActionEvent; use near_async::{ messaging::IntoSender, test_loop::{ @@ -20,19 +32,6 @@ use near_primitives::{ }; use near_store::test_utils::create_test_store; -use crate::{ - adapter::ShardsManagerRequestFromClient, - client::ShardsManagerResponse, - test_loop::{ - forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, - periodically_resend_chunk_requests, route_shards_manager_network_messages, - MockChainForShardsManager, MockChainForShardsManagerConfig, - ShardsManagerResendChunkRequests, - }, - test_utils::default_tip, - ShardsManager, CHUNK_REQUEST_RETRY, -}; - #[derive(derive_more::AsMut, derive_more::AsRef)] struct TestData { shards_manager: ShardsManager, @@ -50,11 +49,11 @@ impl AsMut for TestData { #[derive(EnumTryInto, Debug, EnumFrom)] enum TestEvent { Adhoc(AdhocEvent), + ShardsManagerDelayedActions(TestLoopDelayedActionEvent), ClientToShardsManager(ShardsManagerRequestFromClient), NetworkToShardsManager(ShardsManagerRequestFromNetwork), ShardsManagerToClient(ShardsManagerResponse), OutboundNetwork(PeerManagerMessageRequest), - ShardsManagerResendChunkRequests(ShardsManagerResendChunkRequests), } type ShardsManagerTestLoop = near_async::test_loop::TestLoop, (usize, TestEvent)>; @@ -106,13 +105,24 @@ fn basic_setup(config: BasicSetupConfig) -> ShardsManagerTestLoop { let mut test = builder.build(data); for idx in 0..test.data.len() { test.register_handler(handle_adhoc_events::().widen().for_index(idx)); + test.register_delayed_action_handler_for_index::(idx); test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx)); test.register_handler(forward_network_request_to_shards_manager().widen().for_index(idx)); test.register_handler(capture_events::().widen().for_index(idx)); - test.register_handler(route_shards_manager_network_messages(NETWORK_DELAY)); - test.register_handler( - periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen().for_index(idx), - ); + test.register_handler(route_shards_manager_network_messages( + test.sender(), + test.clock(), + NETWORK_DELAY, + )); + + let sender = test.sender().for_index(idx); + let shutting_down = test.shutting_down(); + test.sender().for_index(idx).send_adhoc_event("start_shards_manager", |data| { + data.shards_manager.periodically_resend_chunk_requests( + &mut sender.into_delayed_action_runner(shutting_down), + CHUNK_REQUEST_RETRY, + ); + }) } test } @@ -175,6 +185,8 @@ fn test_distribute_chunk_basic() { _ => panic!("Unexpected event"), } } + + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } /// Tests that when we have some block producers (validators) in the network, @@ -237,6 +249,7 @@ fn test_distribute_chunk_track_all_shards() { _ => panic!("Unexpected event"), } } + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } /// Tests that when the network has some block producers and also some chunk- @@ -348,4 +361,5 @@ fn test_distribute_chunk_with_chunk_only_producers() { }); } test.run_instant(); + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } diff --git a/chain/chunks/src/test_loop.rs b/chain/chunks/src/test_loop.rs index 1826d30983b..894e5c32fb6 100644 --- a/chain/chunks/src/test_loop.rs +++ b/chain/chunks/src/test_loop.rs @@ -1,9 +1,15 @@ -use std::{collections::HashMap, sync::Arc}; - +use crate::{ + adapter::ShardsManagerRequestFromClient, + logic::{cares_about_shard_this_or_next_epoch, make_outgoing_receipts_proofs}, + test_utils::{default_tip, tip}, + ShardsManager, +}; +use near_async::test_loop::delay_sender::DelaySender; use near_async::time; +use near_async::time::Clock; use near_async::{ messaging::Sender, - test_loop::event_handler::{interval, LoopEventHandler, LoopHandlerContext, TryIntoOrSelf}, + test_loop::event_handler::{LoopEventHandler, TryIntoOrSelf}, }; use near_chain::{types::Tip, Chain}; use near_epoch_manager::{ @@ -28,13 +34,7 @@ use near_primitives::{ version::PROTOCOL_VERSION, }; use near_store::Store; - -use crate::{ - adapter::ShardsManagerRequestFromClient, - logic::{cares_about_shard_this_or_next_epoch, make_outgoing_receipts_proofs}, - test_utils::{default_tip, tip}, - ShardsManager, -}; +use std::{collections::HashMap, sync::Arc}; pub fn forward_client_request_to_shards_manager( ) -> LoopEventHandler { @@ -61,25 +61,24 @@ pub fn route_shards_manager_network_messages< + From + From, >( + sender: DelaySender<(usize, Event)>, + clock: Clock, network_delay: time::Duration, ) -> LoopEventHandler { let mut route_back_lookup: HashMap = HashMap::new(); let mut next_hash: u64 = 0; - LoopEventHandler::new( - move |event: (usize, Event), - data: &mut Data, - context: &LoopHandlerContext<(usize, Event)>| { - let (idx, event) = event; - let message = event.try_into_or_self().map_err(|e| (idx, e.into()))?; - match message { - PeerManagerMessageRequest::NetworkRequests(request) => { - match request { - NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => { - let target_idx = data.index_for_account(&target.account_id.unwrap()); - let route_back = CryptoHash::hash_borsh(next_hash); - route_back_lookup.insert(route_back, idx); - next_hash += 1; - context.sender.send_with_delay( + LoopEventHandler::new(move |event: (usize, Event), data: &mut Data| { + let (idx, event) = event; + let message = event.try_into_or_self().map_err(|e| (idx, e.into()))?; + match message { + PeerManagerMessageRequest::NetworkRequests(request) => { + match request { + NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => { + let target_idx = data.index_for_account(&target.account_id.unwrap()); + let route_back = CryptoHash::hash_borsh(next_hash); + route_back_lookup.insert(route_back, idx); + next_hash += 1; + sender.send_with_delay( (target_idx, ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest { partial_encoded_chunk_request: request, @@ -87,73 +86,66 @@ pub fn route_shards_manager_network_messages< }.into()), network_delay, ); - Ok(()) - } - NetworkRequests::PartialEncodedChunkResponse { route_back, response } => { - let target_idx = - *route_back_lookup.get(&route_back).expect("Route back not found"); - context.sender.send_with_delay( + Ok(()) + } + NetworkRequests::PartialEncodedChunkResponse { route_back, response } => { + let target_idx = + *route_back_lookup.get(&route_back).expect("Route back not found"); + sender.send_with_delay( (target_idx, ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse { partial_encoded_chunk_response: response, - received_time: context.clock.now().into(), // TODO: use clock + received_time: clock.now().into(), // TODO: use clock }.into()), network_delay, ); - Ok(()) - } - NetworkRequests::PartialEncodedChunkMessage { - account_id, - partial_encoded_chunk, - } => { - let target_idx = data.index_for_account(&account_id); - context.sender.send_with_delay( - ( - target_idx, - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk( - partial_encoded_chunk.into(), - ) - .into(), - ), - network_delay, - ); - Ok(()) - } - NetworkRequests::PartialEncodedChunkForward { account_id, forward } => { - let target_idx = data.index_for_account(&account_id); - context.sender.send_with_delay( - (target_idx, - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward( - forward, - ).into()), + Ok(()) + } + NetworkRequests::PartialEncodedChunkMessage { + account_id, + partial_encoded_chunk, + } => { + let target_idx = data.index_for_account(&account_id); + sender.send_with_delay( + ( + target_idx, + ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk( + partial_encoded_chunk.into(), + ) + .into(), + ), + network_delay, + ); + Ok(()) + } + NetworkRequests::PartialEncodedChunkForward { account_id, forward } => { + let target_idx = data.index_for_account(&account_id); + sender.send_with_delay( + ( + target_idx, + ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward( + forward, + ) + .into(), + ), network_delay, ); - Ok(()) - } - other_message => Err(( - idx, - PeerManagerMessageRequest::NetworkRequests(other_message).into(), - )), + Ok(()) + } + other_message => { + Err((idx, PeerManagerMessageRequest::NetworkRequests(other_message).into())) } } - message => Err((idx, message.into())), } - }, - ) + message => Err((idx, message.into())), + } + }) } +// NOTE: this is no longer needed for TestLoop, but some other non-TestLoop tests depend on it. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ShardsManagerResendChunkRequests; -/// Periodically call resend_chunk_requests. -pub fn periodically_resend_chunk_requests( - every: time::Duration, -) -> LoopEventHandler { - interval(every, ShardsManagerResendChunkRequests, |data: &mut ShardsManager| { - data.resend_chunk_requests() - }) -} - /// A simple implementation of the chain side that interacts with /// ShardsManager. pub struct MockChainForShardsManager { diff --git a/chain/client/src/test_utils/client_actions_test_utils.rs b/chain/client/src/test_utils/client_actions_test_utils.rs index 0b584c40648..0dc766b0d56 100644 --- a/chain/client/src/test_utils/client_actions_test_utils.rs +++ b/chain/client/src/test_utils/client_actions_test_utils.rs @@ -6,7 +6,7 @@ use near_network::client::ClientSenderForNetworkMessage; pub fn forward_client_messages_from_network_to_client_actions( ) -> LoopEventHandler { - LoopEventHandler::new(|msg, client_actions: &mut ClientActions, _| { + LoopEventHandler::new(|msg, client_actions: &mut ClientActions| { match msg { ClientSenderForNetworkMessage::_state_response(msg) => { (msg.callback)(Ok(client_actions.handle(msg.message))); diff --git a/core/async/src/examples/actix_component_test.rs b/core/async/src/examples/actix_component_test.rs index a3917305399..e3950776992 100644 --- a/core/async/src/examples/actix_component_test.rs +++ b/core/async/src/examples/actix_component_test.rs @@ -4,9 +4,7 @@ use super::actix_component::{ use crate::futures::FutureSpawnerExt; use crate::messaging::IntoSender; use crate::test_loop::event_handler::{capture_events, LoopEventHandler}; -use crate::test_loop::futures::{ - drive_delayed_action_runners, drive_futures, TestLoopDelayedActionEvent, TestLoopTask, -}; +use crate::test_loop::futures::{drive_futures, TestLoopDelayedActionEvent, TestLoopTask}; use crate::test_loop::TestLoopBuilder; use derive_enum_from_into::{EnumFrom, EnumTryInto}; use std::sync::Arc; @@ -57,7 +55,7 @@ fn test_actix_component() { // test itself is synchronous. test.register_handler(drive_futures().widen()); // This is to allow the ExampleComponent to run delayed actions (timers). - test.register_handler(drive_delayed_action_runners::().widen()); + test.register_delayed_action_handler::(); // This is to capture the periodic requests sent by the ExampleComponent // so we can assert against it. test.register_handler(capture_events::().widen()); @@ -66,7 +64,7 @@ fn test_actix_component() { test.register_handler(example_handler().widen()); // We need to redo whatever the ExampleActor does in its `started` method. - test.data.example.start(&mut test.sender().into_delayed_action_runner()); + test.data.example.start(&mut test.sender().into_delayed_action_runner(test.shutting_down())); // Send some requests; this can be done in the asynchronous context. test.future_spawner().spawn("wait for 5", { let res = test.data.outer.call_example_component_for_response(5); @@ -87,4 +85,6 @@ fn test_actix_component() { test.data.periodic_requests_captured, vec![PeriodicRequest { id: 0 }, PeriodicRequest { id: 1 }, PeriodicRequest { id: 2 },] ); + + test.shutdown_and_drain_remaining_events(Duration::seconds(1)); } diff --git a/core/async/src/examples/mod.rs b/core/async/src/examples/mod.rs index 710c4525f93..5c56dba55d2 100644 --- a/core/async/src/examples/mod.rs +++ b/core/async/src/examples/mod.rs @@ -5,5 +5,3 @@ mod async_component_test; mod multi_instance_test; mod sum_numbers; mod sum_numbers_test; -mod timed_component; -mod timed_component_test; diff --git a/core/async/src/examples/multi_instance_test.rs b/core/async/src/examples/multi_instance_test.rs index c345bd12d01..27ff2d0521b 100644 --- a/core/async/src/examples/multi_instance_test.rs +++ b/core/async/src/examples/multi_instance_test.rs @@ -1,6 +1,7 @@ use crate::time; use derive_enum_from_into::{EnumFrom, EnumTryInto}; +use crate::test_loop::delay_sender::DelaySender; use crate::{ examples::sum_numbers_test::forward_sum_request, messaging::{CanSend, IntoSender}, @@ -27,13 +28,14 @@ enum TestEvent { /// Let's pretend that when we send a remote request, the number gets sent to /// every other instance in the setup as a local request. -fn forward_remote_request_to_other_instances() -> LoopEventHandler, (usize, TestEvent)> -{ - LoopEventHandler::new(|event: (usize, TestEvent), data: &mut Vec, context| { +fn forward_remote_request_to_other_instances( + sender: DelaySender<(usize, TestEvent)>, +) -> LoopEventHandler, (usize, TestEvent)> { + LoopEventHandler::new(move |event: (usize, TestEvent), data: &mut Vec| { if let TestEvent::RemoteRequest(number) = event.1 { for i in 0..data.len() { if i != event.0 { - context.sender.send((i, TestEvent::LocalRequest(SumRequest::Number(number)))) + sender.send((i, TestEvent::LocalRequest(SumRequest::Number(number)))) } } Ok(()) @@ -58,7 +60,7 @@ fn test_multi_instance() { } let sender = builder.sender(); let mut test = builder.build(data); - test.register_handler(forward_remote_request_to_other_instances()); + test.register_handler(forward_remote_request_to_other_instances(test.sender())); for i in 0..5 { // Single-instance handlers can be reused for multi-instance tests. test.register_handler(forward_sum_request().widen().for_index(i)); diff --git a/core/async/src/examples/timed_component.rs b/core/async/src/examples/timed_component.rs deleted file mode 100644 index 373a486af3b..00000000000 --- a/core/async/src/examples/timed_component.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::messaging::Sender; - -pub(crate) struct TimedComponent { - buffered_messages: Vec, - message_sender: Sender>, -} - -/// Mimics a component that has a specific function that is supposed to be -/// triggered by a timer. -impl TimedComponent { - pub fn new(message_sender: Sender>) -> Self { - Self { buffered_messages: vec![], message_sender } - } - - pub fn send_message(&mut self, msg: String) { - self.buffered_messages.push(msg); - } - - /// This is supposed to be triggered by a timer so it flushes the - /// messages every tick. - pub fn flush(&mut self) { - if self.buffered_messages.is_empty() { - return; - } - self.message_sender.send(self.buffered_messages.clone()); - self.buffered_messages.clear(); - } -} diff --git a/core/async/src/examples/timed_component_test.rs b/core/async/src/examples/timed_component_test.rs deleted file mode 100644 index 4d6c677a718..00000000000 --- a/core/async/src/examples/timed_component_test.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::time; -use derive_enum_from_into::{EnumFrom, EnumTryInto}; - -use crate::{ - messaging::IntoSender, - test_loop::event_handler::{capture_events, interval, LoopEventHandler}, -}; - -use super::timed_component::TimedComponent; - -#[derive(Debug, Clone, PartialEq)] -struct Flush; - -#[derive(Debug, EnumTryInto, EnumFrom)] -enum TestEvent { - SendMessage(String), - Flush(Flush), - MessageSent(Vec), -} - -#[derive(derive_more::AsMut, derive_more::AsRef)] -struct TestData { - component: TimedComponent, - messages_sent: Vec>, -} - -fn forward_send_message() -> LoopEventHandler { - LoopEventHandler::new_simple(|event, data: &mut TimedComponent| { - data.send_message(event); - }) -} - -#[test] -fn test_timed_component() { - let builder = crate::test_loop::TestLoopBuilder::::new(); - let data = TestData { - component: TimedComponent::new(builder.sender().into_sender()), - messages_sent: vec![], - }; - let sender = builder.sender(); - let mut test = builder.build(data); - test.register_handler(forward_send_message().widen()); - test.register_handler( - interval(time::Duration::milliseconds(100), Flush, |data: &mut TimedComponent| { - data.flush() - }) - .widen(), - ); - test.register_handler(capture_events::>().widen()); - - sender.send_with_delay("Hello".to_string().into(), time::Duration::milliseconds(10)); - sender.send_with_delay("World".to_string().into(), time::Duration::milliseconds(20)); - // The timer fires at 100ms here and flushes "Hello" and "World". - sender.send_with_delay("!".to_string().into(), time::Duration::milliseconds(110)); - // The timer fires again at 200ms here and flushes "!"". - // Further timer events do not send messages. - - test.run_for(time::Duration::seconds(1)); - assert_eq!( - test.data.messages_sent, - vec![vec!["Hello".to_string(), "World".to_string()], vec!["!".to_string()]] - ); -} diff --git a/core/async/src/test_loop.rs b/core/async/src/test_loop.rs index 551bf7a4373..cf25fce6e79 100644 --- a/core/async/src/test_loop.rs +++ b/core/async/src/test_loop.rs @@ -63,18 +63,17 @@ pub mod adhoc; pub mod delay_sender; pub mod event_handler; pub mod futures; -pub mod multi_instance; use self::{ delay_sender::DelaySender, event_handler::LoopEventHandler, futures::{TestLoopFutureSpawner, TestLoopTask}, }; -use crate::test_loop::event_handler::LoopHandlerContext; use crate::time; -use crate::time::Duration; +use crate::time::{Clock, Duration}; use near_o11y::{testonly::init_test_logger, tracing::info}; use serde::Serialize; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::{collections::BinaryHeap, fmt::Debug, sync::Arc}; @@ -109,9 +108,9 @@ pub struct TestLoop { current_time: Duration, /// Fake clock that always returns the virtual time. clock: time::FakeClock, - - /// Handlers are initialized only once, upon the first call to run(). - handlers_initialized: bool, + /// Shutdown flag. When this flag is true, delayed action runners will no + /// longer post any new events to the event loop. + shutting_down: Arc, /// All the event handlers that are registered. We invoke them one by one /// for each event, until one of them handles the event (or panic if no one /// handles it). @@ -121,7 +120,7 @@ pub struct TestLoop { /// An event waiting to be executed, ordered by the due time and then by ID. struct EventInHeap { event: Event, - due: time::Duration, + due: Duration, id: usize, } @@ -190,6 +189,7 @@ pub struct TestLoopBuilder { clock: time::FakeClock, pending_events: Arc>>, pending_events_sender: DelaySender, + shutting_down: Arc, } impl TestLoopBuilder { @@ -207,6 +207,7 @@ impl TestLoopBuilder { pending_events_sender: DelaySender::new(move |event, delay| { pending_events.lock().unwrap().add(event, delay); }), + shutting_down: Arc::new(AtomicBool::new(false)), } } @@ -220,8 +221,20 @@ impl TestLoopBuilder { self.clock.clock() } + /// Returns a flag indicating whether the TestLoop system is being shut down; + /// this is similar to whether the Actix system is shutting down. + pub fn shutting_down(&self) -> Arc { + self.shutting_down.clone() + } + pub fn build(self, data: Data) -> TestLoop { - TestLoop::new(self.pending_events, self.pending_events_sender, self.clock, data) + TestLoop::new( + self.pending_events, + self.pending_events_sender, + self.clock, + self.shutting_down, + data, + ) } } @@ -253,6 +266,7 @@ impl TestLoop { pending_events: Arc>>, sender: DelaySender, clock: time::FakeClock, + shutting_down: Arc, data: Data, ) -> Self { Self { @@ -263,7 +277,7 @@ impl TestLoop { next_event_index: 0, current_time: time::Duration::ZERO, clock, - handlers_initialized: false, + shutting_down, handlers: Vec::new(), } } @@ -272,28 +286,22 @@ impl TestLoop { self.sender.clone() } + pub fn clock(&self) -> Clock { + self.clock.clock() + } + + pub fn shutting_down(&self) -> Arc { + self.shutting_down.clone() + } + /// Registers a new event handler to the test loop. pub fn register_handler(&mut self, handler: LoopEventHandler) { - assert!(!self.handlers_initialized, "Cannot register more handlers after run() is called"); self.handlers.push(handler); } - fn maybe_initialize_handlers(&mut self) { - if self.handlers_initialized { - return; - } - for handler in &mut self.handlers { - handler.init(LoopHandlerContext { - sender: self.sender.clone(), - clock: self.clock.clock(), - }); - } - } - /// Helper to push events we have just received into the heap. fn queue_received_events(&mut self) { for event in self.pending_events.lock().unwrap().events.drain(..) { - info!("Queuing new event at index {}: {:?}", self.next_event_index, event.event); self.events.push(EventInHeap { due: self.current_time + event.delay, event: event.event, @@ -303,29 +311,63 @@ impl TestLoop { } } - /// Runs the test loop for the given duration. This function may be called - /// multiple times, but further test handlers may not be registered after - /// the first call. - pub fn run_for(&mut self, duration: time::Duration) { - self.maybe_initialize_handlers(); - // Push events we have received outside the test or during handler init into the heap. - self.queue_received_events(); - let deadline = self.current_time + duration; + /// Performs the logic to find the next event, advance to its time, and dequeue it. + /// Takes a decider to determine whether to advance time, handle the next event, and/or to stop. + fn advance_till_next_event( + &mut self, + decider: &impl Fn(Option, &mut Data) -> AdvanceDecision, + ) -> Option> { loop { - // Don't execute any more events after the deadline. - match self.events.peek() { - Some(event) => { - if event.due > deadline { - break; - } + // New events may have been sent to the TestLoop from outside, and the previous + // iteration of the loop may have made new futures ready, so queue up any received + // events. + self.queue_received_events(); + + // Now there are two ways an event may be/become available. One is that the event is + // queued into the event loop at a specific time; the other is that some future is + // waiting on our fake clock to advance beyond a specific time. Pick the earliest. + let next_timestamp = { + let next_event_timestamp = self.events.peek().map(|event| event.due); + let next_future_waiter_timestamp = self + .clock + .first_waiter() + .map(|time| time - (self.clock.now() - self.current_time)); + next_event_timestamp + .map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1)) + .or(next_future_waiter_timestamp) + }; + // If the next event is immediately available (i.e. its time is same as current time), + // just return that event; there's no decision to make (as we only give deciders a + // chance to stop processing if we would advance the clock) and no need to advance time. + if next_timestamp == Some(self.current_time) { + let event = self.events.pop().expect("Programming error in TestLoop"); + assert_eq!(event.due, self.current_time); + return Some(event); + } + // If we reach this point, it means we need to advance the clock. Let the decider choose + // if we should do that, or if we should stop. + let decision = decider(next_timestamp, &mut self.data); + match decision { + AdvanceDecision::AdvanceToNextEvent => { + let next_timestamp = next_timestamp.unwrap(); + self.clock.advance(next_timestamp - self.current_time); + self.current_time = next_timestamp; + // Run the loop again, because if the reason why we advance the clock to this + // time is due to a possible future waiting on the clock, we may or may not get + // another future queued into the TestLoop, so we just check the whole thing + // again. + continue; + } + AdvanceDecision::AdvanceToAndStop(target) => { + self.clock.advance(target - self.current_time); + self.current_time = target; + return None; + } + AdvanceDecision::Stop => { + return None; } - None => break, } - // Process the event. - let event = self.events.pop().unwrap(); - self.process_event(event); } - self.current_time = deadline; } /// Processes the given event, by logging a line first and then finding a handler to run it. @@ -338,8 +380,7 @@ impl TestLoop { }) .unwrap(); info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json); - self.clock.advance(event.due - self.current_time); - self.current_time = event.due; + assert_eq!(self.current_time, event.due); for handler in &mut self.handlers { if let Err(e) = handler.handle(event.event, &mut self.data) { @@ -359,33 +400,42 @@ impl TestLoop { panic!("Unhandled event: {:?}", event.event); } + /// Runs the test loop for the given duration. This function may be called + /// multiple times, but further test handlers may not be registered after + /// the first call. + pub fn run_for(&mut self, duration: Duration) { + let deadline = self.current_time + duration; + while let Some(event) = self.advance_till_next_event(&|next_time, _| { + if let Some(next_time) = next_time { + if next_time <= deadline { + return AdvanceDecision::AdvanceToNextEvent; + } + } + AdvanceDecision::AdvanceToAndStop(deadline) + }) { + self.process_event(event); + } + } + /// Run until the given condition is true, asserting that it happens before the maximum duration /// is reached. /// /// To maximize logical consistency, the condition is only checked before the clock would /// advance. If it returns true, execution stops before advancing the clock. pub fn run_until(&mut self, condition: impl Fn(&mut Data) -> bool, maximum_duration: Duration) { - self.maybe_initialize_handlers(); - // Push events we have received outside the test or during handler init into the heap. - self.queue_received_events(); let deadline = self.current_time + maximum_duration; - loop { - // Don't execute any more events after the deadline. - match self.events.peek() { - Some(event) => { - if event.due > deadline { - panic!("run_until did not fulfill the condition within the given deadline"); - } - if event.due > self.current_time { - if condition(&mut self.data) { - return; - } - } + let decider = |next_time, data: &mut Data| { + if condition(data) { + return AdvanceDecision::Stop; + } + if let Some(next_time) = next_time { + if next_time <= deadline { + return AdvanceDecision::AdvanceToNextEvent; } - None => break, } - // Process the event. - let event = self.events.pop().unwrap(); + panic!("run_until did not fulfill the condition within the given deadline"); + }; + while let Some(event) = self.advance_till_next_event(&decider) { self.process_event(event); } } @@ -393,41 +443,11 @@ impl TestLoop { /// Used to finish off remaining events that are still in the loop. This can be necessary if the /// destructor of some components wait for certain condition to become true. Otherwise, the /// destructors may end up waiting forever. This also helps avoid a panic when destructing - /// TestLoop itself, as it asserts that all important events have been handled. - /// - /// Note that events that are droppable are dropped and not handled. It would not be consistent - /// to continue using the TestLoop, and therefore it is consumed by this function. - pub fn finish_remaining_events(mut self, maximum_duration: Duration) { - self.maybe_initialize_handlers(); - // Push events we have received outside the test or during handler init into the heap. - self.queue_received_events(); - let max_time = self.current_time + maximum_duration; - 'outer: loop { - // Don't execute any more events after the deadline. - match self.events.peek() { - Some(event) => { - if event.due > max_time { - panic!( - "finish_remaining_events could not finish all events; \ - event still remaining: {:?}", - event.event - ); - } - } - None => break, - } - // Only execute the event if we can't drop it. - let mut event = self.events.pop().unwrap(); - for handler in &self.handlers { - if let Err(e) = handler.try_drop(event.event) { - event.event = e; - } else { - continue 'outer; - } - } - // Process the event. - self.process_event(event); - } + /// TestLoop itself, as it asserts that all events have been handled. + pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) { + self.shutting_down.store(true, Ordering::Relaxed); + self.run_for(maximum_duration); + // Implicitly dropped here, which asserts that no more events are remaining. } pub fn run_instant(&mut self) { @@ -445,20 +465,81 @@ impl TestLoop { impl Drop for TestLoop { fn drop(&mut self) { self.queue_received_events(); - 'outer: for event in self.events.drain() { - let mut to_handle = event.event; - for handler in &mut self.handlers { - if let Err(e) = handler.try_drop(to_handle) { - to_handle = e; - } else { - continue 'outer; - } - } + if let Some(event) = self.events.pop() { panic!( - "Important event scheduled at {} is not handled at the end of the test: {:?}. - Consider calling `test.run()` again, or with a longer duration.", - event.due, to_handle + "Event scheduled at {} is not handled at the end of the test: {:?}. + Consider calling `test.shutdown_and_drain_remaining_events(...)`.", + event.due, event.event ); } } } + +enum AdvanceDecision { + AdvanceToNextEvent, + AdvanceToAndStop(Duration), + Stop, +} + +#[cfg(test)] +mod tests { + use crate::futures::FutureSpawnerExt; + use crate::test_loop::futures::{drive_futures, TestLoopTask}; + use crate::test_loop::TestLoopBuilder; + use derive_enum_from_into::{EnumFrom, EnumTryInto}; + use derive_more::AsMut; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use time::Duration; + + #[derive(Debug, EnumFrom, EnumTryInto)] + enum TestEvent { + Task(Arc), + } + + #[derive(AsMut)] + struct TestData { + dummy: (), + } + + // Tests that the TestLoop correctly handles futures that sleep on the fake clock. + #[test] + fn test_futures() { + let builder = TestLoopBuilder::::new(); + let clock = builder.clock(); + let mut test = builder.build::(TestData { dummy: () }); + test.register_handler(drive_futures().widen()); + let start_time = clock.now(); + + let finished = Arc::new(AtomicUsize::new(0)); + + let clock1 = clock.clone(); + let finished1 = finished.clone(); + test.sender().into_future_spawner().spawn("test1", async move { + assert_eq!(clock1.now(), start_time); + clock1.sleep(Duration::seconds(10)).await; + assert_eq!(clock1.now(), start_time + Duration::seconds(10)); + clock1.sleep(Duration::seconds(5)).await; + assert_eq!(clock1.now(), start_time + Duration::seconds(15)); + finished1.fetch_add(1, Ordering::Relaxed); + }); + + test.run_for(Duration::seconds(2)); + + let clock2 = clock; + let finished2 = finished.clone(); + test.sender().into_future_spawner().spawn("test2", async move { + assert_eq!(clock2.now(), start_time + Duration::seconds(2)); + clock2.sleep(Duration::seconds(3)).await; + assert_eq!(clock2.now(), start_time + Duration::seconds(5)); + clock2.sleep(Duration::seconds(20)).await; + assert_eq!(clock2.now(), start_time + Duration::seconds(25)); + finished2.fetch_add(1, Ordering::Relaxed); + }); + // During these 30 virtual seconds, the TestLoop should've automatically advanced the clock + // to wake each future as they become ready to run again. The code inside the futures + // assert that the fake clock does indeed have the expected times. + test.run_for(Duration::seconds(30)); + assert_eq!(finished.load(Ordering::Relaxed), 2); + } +} diff --git a/core/async/src/test_loop/adhoc.rs b/core/async/src/test_loop/adhoc.rs index dee459cffa0..29a3847d645 100644 --- a/core/async/src/test_loop/adhoc.rs +++ b/core/async/src/test_loop/adhoc.rs @@ -52,8 +52,7 @@ impl> + 'static> AdhocEventSender() -> LoopEventHandler> { - LoopEventHandler::new(|event: AdhocEvent, data, _ctx| { + LoopEventHandler::new_simple(|event: AdhocEvent, data| { (event.handler)(data); - Ok(()) }) } diff --git a/core/async/src/test_loop/delay_sender.rs b/core/async/src/test_loop/delay_sender.rs index e5baa3f5009..a3d0dd918b4 100644 --- a/core/async/src/test_loop/delay_sender.rs +++ b/core/async/src/test_loop/delay_sender.rs @@ -7,6 +7,7 @@ use crate::test_loop::futures::{ }; use crate::time; use crate::time::Duration; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use super::futures::{TestLoopFutureSpawner, TestLoopTask}; @@ -70,11 +71,14 @@ impl DelaySender { self.into_sender().break_apart().into_multi_sender() } - pub fn into_delayed_action_runner(self) -> TestLoopDelayedActionRunner + pub fn into_delayed_action_runner( + self, + shutting_down: Arc, + ) -> TestLoopDelayedActionRunner where Event: From> + 'static, { - TestLoopDelayedActionRunner { sender: self.narrow() } + TestLoopDelayedActionRunner { sender: self.narrow(), shutting_down } } /// Returns a FutureSpawner that can be used to spawn futures into the loop. diff --git a/core/async/src/test_loop/event_handler.rs b/core/async/src/test_loop/event_handler.rs index c8f5897ccfe..e9479d02744 100644 --- a/core/async/src/test_loop/event_handler.rs +++ b/core/async/src/test_loop/event_handler.rs @@ -1,81 +1,27 @@ -use super::{delay_sender::DelaySender, multi_instance::IndexedLoopEventHandler}; -use crate::time; - -/// Context given to the loop handler on each call. -pub struct LoopHandlerContext { - /// The sender that can be used to send more messages to the loop. - pub sender: DelaySender, - /// The clock whose .now() returns the current virtual time maintained by - /// the test loop. - pub clock: time::Clock, -} - /// An event handler registered on a test loop. Each event handler usually /// handles only some events, so we will usually have multiple event handlers /// registered to cover all event types. -pub struct LoopEventHandler { - inner: Box>, -} +pub struct LoopEventHandler( + Box Result<(), Event>>, +); impl LoopEventHandler { /// Creates a handler from the handling logic function. The function is /// called on each event. It should return Ok(()) if the event was handled, /// or Err(event) if the event was not handled (which will cause it to be /// passed to the next handler). - pub fn new( - handler: impl FnMut(Event, &mut Data, &LoopHandlerContext) -> Result<(), Event> + 'static, - ) -> Self { - Self { - inner: Box::new(LoopEventHandlerImplByFunction { - initial_event: None, - handler: Box::new(handler), - ok_to_drop: Box::new(|_| false), - context: None, - }), - } + pub fn new(handler: impl FnMut(Event, &mut Data) -> Result<(), Event> + 'static) -> Self { + Self(Box::new(handler)) } - /// Like new(), but the handler function is only given an event and data, - /// without the context, and also without the ability to reject the event. + /// Like new(), but the handler is not given the ability to reject the event. pub fn new_simple(mut handler: impl FnMut(Event, &mut Data) + 'static) -> Self { - Self::new(move |event, data, _| { + Self::new(move |event, data| { handler(event, data); Ok(()) }) } - pub fn new_with_drop( - handler: impl FnMut(Event, &mut Data, &LoopHandlerContext) -> Result<(), Event> + 'static, - ok_to_drop: impl Fn(&Event) -> bool + 'static, - ) -> Self { - Self { - inner: Box::new(LoopEventHandlerImplByFunction { - initial_event: None, - handler: Box::new(handler), - ok_to_drop: Box::new(ok_to_drop), - context: None, - }), - } - } - - /// Like new(), but additionally sends an initial event with an initial - /// delay. See periodic_interval() for why this is useful. - pub fn new_with_initial_event( - initial_event: Event, - initial_delay: time::Duration, - handler: impl FnMut(Event, &mut Data, &LoopHandlerContext) -> Result<(), Event> + 'static, - ok_to_drop: impl Fn(&Event) -> bool + 'static, - ) -> Self { - Self { - inner: Box::new(LoopEventHandlerImplByFunction { - initial_event: Some((initial_event, initial_delay)), - handler: Box::new(handler), - ok_to_drop: Box::new(ok_to_drop), - context: None, - }), - } - } - /// Adapts this handler to a handler whose data is a superset of our data /// and whose event is a superset of our event. /// For data, A is a superset of B if A implements AsRef and AsMut. @@ -85,78 +31,31 @@ impl LoopEventHandler { OuterData: AsMut, OuterEvent: TryIntoOrSelf + From + 'static, >( - self, + mut self, ) -> LoopEventHandler { - LoopEventHandler { inner: Box::new(WideningEventHandler(self)) } + LoopEventHandler(Box::new(move |event, data| { + let mut inner_data = data.as_mut(); + let inner_event = event.try_into_or_self()?; + self.0(inner_event, &mut inner_data)?; + Ok(()) + })) } /// Adapts this handler to a handler whose data is a vector of our data, /// and whose event is a is the tuple (index, our event), for a specific /// index. - pub fn for_index(self, index: usize) -> LoopEventHandler, (usize, Event)> { - LoopEventHandler { inner: Box::new(IndexedLoopEventHandler { inner: self, index }) } - } - - pub(crate) fn init(&mut self, context: LoopHandlerContext) { - self.inner.init(context) + pub fn for_index(mut self, index: usize) -> LoopEventHandler, (usize, Event)> { + LoopEventHandler(Box::new(move |event, data| { + if event.0 == index { + self.0(event.1, &mut data[index]).map_err(|event| (index, event)) + } else { + Err(event) + } + })) } pub(crate) fn handle(&mut self, event: Event, data: &mut Data) -> Result<(), Event> { - self.inner.handle(event, data) - } - - pub(crate) fn try_drop(&self, event: Event) -> Result<(), Event> { - self.inner.try_drop(event) - } -} - -/// Internal implementation of LoopEventHandler. -pub(crate) trait LoopEventHandlerImpl { - /// init is called when the test loop runs for the first time. - fn init(&mut self, context: LoopHandlerContext); - /// handle is called when we have a pending event from the test loop. - fn handle(&mut self, event: Event, data: &mut Data) -> Result<(), Event>; - /// try_drop is called when the TestLoop is dropped, but an event - /// remains in the event queue. If this handler knows that it's OK to - /// drop the event, it should return Ok(()); otherwise it should return - /// the original event as an Err. - /// - /// This is basically used for periodic timers, as it's OK to drop timers, - /// but not OK to drop an event that forgot to be handled. - fn try_drop(&self, event: Event) -> Result<(), Event>; -} - -/// Implementation of LoopEventHandlerImpl by a closure. We cache the context -/// upon receiving the init() call, so that we can pass a reference to the -/// closure every time we receive the handle() call. -struct LoopEventHandlerImplByFunction { - initial_event: Option<(Event, time::Duration)>, - handler: Box) -> Result<(), Event>>, - ok_to_drop: Box bool>, - context: Option>, -} - -impl LoopEventHandlerImpl - for LoopEventHandlerImplByFunction -{ - fn init(&mut self, context: LoopHandlerContext) { - if let Some((event, delay)) = self.initial_event.take() { - context.sender.send_with_delay(event, delay); - } - self.context = Some(context); - } - - fn handle(&mut self, event: Event, data: &mut Data) -> Result<(), Event> { - let context = self.context.as_ref().unwrap(); - (self.handler)(event, data, context) - } - - fn try_drop(&self, event: Event) -> Result<(), Event> { - if (self.ok_to_drop)(&event) { - Ok(()) - } else { - Err(event) - } + self.0(event, data) } } @@ -172,34 +71,6 @@ impl> TryIntoOrSelf for T { } } -/// Implements .widen() for an event handler. -struct WideningEventHandler(LoopEventHandler); - -impl< - Data, - Event, - OuterData: AsMut, - OuterEvent: TryIntoOrSelf + From + 'static, - > LoopEventHandlerImpl for WideningEventHandler -{ - fn init(&mut self, context: LoopHandlerContext) { - self.0.init(LoopHandlerContext { sender: context.sender.narrow(), clock: context.clock }) - } - - fn handle(&mut self, event: OuterEvent, data: &mut OuterData) -> Result<(), OuterEvent> { - let mut inner_data = data.as_mut(); - let inner_event = event.try_into_or_self()?; - self.0.handle(inner_event, &mut inner_data)?; - Ok(()) - } - - fn try_drop(&self, event: OuterEvent) -> Result<(), OuterEvent> { - let inner_event = event.try_into_or_self()?; - self.0.try_drop(inner_event)?; - Ok(()) - } -} - /// An event handler that puts the event into a vector in the Data, as long as /// the Data contains a Vec. (Use widen() right after). /// @@ -213,28 +84,3 @@ pub fn capture_events() -> LoopEventHandler, Event> { pub fn ignore_events() -> LoopEventHandler<(), Event> { LoopEventHandler::new_simple(|_, _| {}) } - -/// Periodically sends to the event loop the given event by the given interval. -/// Each time this event is handled, the given function is called. -/// The first invocation is triggered after the interval, not immediately. -pub fn interval( - interval: time::Duration, - event: Event, - func: impl Fn(&mut Data) + 'static, -) -> LoopEventHandler { - let event_cloned = event.clone(); - LoopEventHandler::new_with_initial_event( - event.clone(), - interval, - move |actual_event, data, context| { - if actual_event == event { - func(data); - context.sender.send_with_delay(actual_event, interval); - Ok(()) - } else { - Err(actual_event) - } - }, - move |actual_event| actual_event == &event_cloned, - ) -} diff --git a/core/async/src/test_loop/futures.rs b/core/async/src/test_loop/futures.rs index 9282ff7b795..ccf49414943 100644 --- a/core/async/src/test_loop/futures.rs +++ b/core/async/src/test_loop/futures.rs @@ -1,10 +1,12 @@ -use super::{delay_sender::DelaySender, event_handler::LoopEventHandler}; +use super::{delay_sender::DelaySender, event_handler::LoopEventHandler, TestLoop}; use crate::futures::{AsyncComputationSpawner, DelayedActionRunner}; +use crate::test_loop::event_handler::TryIntoOrSelf; use crate::time::Duration; use crate::{futures::FutureSpawner, messaging::CanSend}; use futures::future::BoxFuture; use futures::task::{waker_ref, ArcWake}; use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::task::Context; @@ -113,27 +115,24 @@ impl Debug for TestLoopDelayedActionEvent { /// An event handler that handles only `TestLoopDelayedActionEvent`s, by /// running the action encapsulated in the event. -pub fn drive_delayed_action_runners() -> LoopEventHandler> { - LoopEventHandler::new_with_drop( - |event, data, ctx| { - let mut runner = TestLoopDelayedActionRunner { sender: ctx.sender.clone() }; - (event.action)(data, &mut runner); - Ok(()) - }, - |_| { - // Delayed actions are usually used for timers, so let's just say - // it's OK to drop them at the end of the test. It would be hard - // to distinguish what sort of delayed action was being scheduled - // anyways. - true - }, - ) +pub fn drive_delayed_action_runners( + sender: DelaySender>, + shutting_down: Arc, +) -> LoopEventHandler> { + LoopEventHandler::new_simple(move |event: TestLoopDelayedActionEvent, data: &mut T| { + let mut runner = TestLoopDelayedActionRunner { + sender: sender.clone(), + shutting_down: shutting_down.clone(), + }; + (event.action)(data, &mut runner); + }) } /// `DelayedActionRunner` that schedules the action to be run later by the /// TestLoop event loop. pub struct TestLoopDelayedActionRunner { pub(crate) sender: DelaySender>, + pub(crate) shutting_down: Arc, } impl DelayedActionRunner for TestLoopDelayedActionRunner { @@ -143,6 +142,9 @@ impl DelayedActionRunner for TestLoopDelayedActionRunner { dur: Duration, action: Box) + Send + 'static>, ) { + if self.shutting_down.load(Ordering::Relaxed) { + return; + } self.sender.send_with_delay( TestLoopDelayedActionEvent { name: name.to_string(), action }, dur.try_into().unwrap(), @@ -150,6 +152,43 @@ impl DelayedActionRunner for TestLoopDelayedActionRunner { } } +impl TestLoop { + /// Shorthand for registering this frequently used handler. + pub fn register_delayed_action_handler(&mut self) + where + T: 'static, + Data: AsMut, + Event: TryIntoOrSelf> + + From> + + 'static, + { + self.register_handler( + drive_delayed_action_runners::(self.sender().narrow(), self.shutting_down()).widen(), + ); + } +} + +impl TestLoop, (usize, Event)> { + /// Shorthand for registering this frequently used handler for a multi-instance test. + pub fn register_delayed_action_handler_for_index(&mut self, idx: usize) + where + T: 'static, + Data: AsMut, + Event: TryIntoOrSelf> + + From> + + 'static, + { + self.register_handler( + drive_delayed_action_runners::( + self.sender().for_index(idx).narrow(), + self.shutting_down(), + ) + .widen() + .for_index(idx), + ); + } +} + /// An event that represents async computation. See async_computation_spawner() in DelaySender. pub struct TestLoopAsyncComputationEvent { name: String, diff --git a/core/async/src/test_loop/multi_instance.rs b/core/async/src/test_loop/multi_instance.rs deleted file mode 100644 index 2a84f2b3b2c..00000000000 --- a/core/async/src/test_loop/multi_instance.rs +++ /dev/null @@ -1,42 +0,0 @@ -use super::event_handler::{LoopEventHandler, LoopEventHandlerImpl, LoopHandlerContext}; - -/// Event handler that handles a specific single instance in a multi-instance -/// setup. -/// -/// To convert a single-instance handler to a multi-instance handler -/// (for one instance), use handler.for_index(index). -pub(crate) struct IndexedLoopEventHandler { - pub(crate) inner: LoopEventHandler, - pub(crate) index: usize, -} - -impl LoopEventHandlerImpl, (usize, Event)> - for IndexedLoopEventHandler -{ - fn init(&mut self, context: LoopHandlerContext<(usize, Event)>) { - self.inner.init(LoopHandlerContext { - sender: context.sender.for_index(self.index), - clock: context.clock, - }) - } - - fn handle( - &mut self, - event: (usize, Event), - data: &mut Vec, - ) -> Result<(), (usize, Event)> { - if event.0 == self.index { - self.inner.handle(event.1, &mut data[self.index]).map_err(|event| (self.index, event)) - } else { - Err(event) - } - } - - fn try_drop(&self, event: (usize, Event)) -> Result<(), (usize, Event)> { - if event.0 == self.index { - self.inner.try_drop(event.1).map_err(|event| (self.index, event)) - } else { - Err(event) - } - } -} diff --git a/core/async/src/time.rs b/core/async/src/time.rs index 4c4762a3d8f..2c61bd3333e 100644 --- a/core/async/src/time.rs +++ b/core/async/src/time.rs @@ -21,9 +21,10 @@ //! of different machines are not perfectly synchronized, and in extreme //! cases can be totally skewed. use once_cell::sync::Lazy; +use std::cmp::Ordering; +use std::collections::BinaryHeap; use std::sync::{Arc, Mutex}; pub use time::error; -use tokio::sync::watch; // TODO: consider wrapping these types to prevent interactions // with other time libraries, especially to prevent the direct access @@ -124,23 +125,46 @@ impl Clock { } struct FakeClockInner { - /// `mono` keeps the current time of the monotonic clock. - /// It is wrapped in watch::Sender, so that the value can - /// be observed from the clock::sleep() futures. - mono: watch::Sender, utc: Utc, - /// We need to keep it so that mono.send() always succeeds. - _mono_recv: watch::Receiver, + instant: Instant, + waiters: BinaryHeap, +} + +/// Whenever a user of a FakeClock calls `sleep` for `sleep_until`, we create a +/// `ClockWaiterInHeap` so that the returned future can be completed when the +/// clock advances past the desired deadline. +struct ClockWaiterInHeap { + deadline: Instant, + waker: tokio::sync::oneshot::Sender<()>, +} + +impl PartialEq for ClockWaiterInHeap { + fn eq(&self, other: &Self) -> bool { + self.deadline == other.deadline + } +} + +impl PartialOrd for ClockWaiterInHeap { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Eq for ClockWaiterInHeap {} + +impl Ord for ClockWaiterInHeap { + fn cmp(&self, other: &Self) -> Ordering { + other.deadline.cmp(&self.deadline) + } } impl FakeClockInner { pub fn new(utc: Utc) -> Self { - let (mono, _mono_recv) = watch::channel(*FAKE_CLOCK_MONO_START); - Self { utc, mono, _mono_recv } + Self { utc, instant: *FAKE_CLOCK_MONO_START, waiters: BinaryHeap::new() } } pub fn now(&mut self) -> Instant { - *self.mono.borrow() + self.instant } pub fn now_utc(&mut self) -> Utc { self.utc @@ -150,17 +174,19 @@ impl FakeClockInner { if d == Duration::ZERO { return; } - let now = *self.mono.borrow(); - self.mono.send(now + d).unwrap(); + self.instant += d; self.utc += d; + while let Some(earliest_waiter) = self.waiters.peek() { + if earliest_waiter.deadline <= self.instant { + self.waiters.pop().unwrap().waker.send(()).ok(); + } else { + break; + } + } } pub fn advance_until(&mut self, t: Instant) { - let now = *self.mono.borrow(); - if t <= now { - return; - } - self.mono.send(t).unwrap(); - self.utc += t - now; + let by = t - self.now(); + self.advance(by); } } @@ -198,19 +224,40 @@ impl FakeClock { /// Cancel-safe. pub async fn sleep(&self, d: Duration) { - let mut watch = self.0.lock().unwrap().mono.subscribe(); - let t = *watch.borrow() + d; - while *watch.borrow() < t { - watch.changed().await.unwrap(); + if d <= Duration::ZERO { + return; } + let receiver = { + let mut inner = self.0.lock().unwrap(); + let (sender, receiver) = tokio::sync::oneshot::channel(); + let waiter = ClockWaiterInHeap { waker: sender, deadline: inner.now() + d }; + inner.waiters.push(waiter); + receiver + }; + receiver.await.unwrap(); } /// Cancel-safe. pub async fn sleep_until(&self, t: Instant) { - let mut watch = self.0.lock().unwrap().mono.subscribe(); - while *watch.borrow() < t { - watch.changed().await.unwrap(); - } + let receiver = { + let mut inner = self.0.lock().unwrap(); + if inner.now() >= t { + return; + } + let (sender, receiver) = tokio::sync::oneshot::channel(); + let waiter = ClockWaiterInHeap { waker: sender, deadline: t }; + inner.waiters.push(waiter); + receiver + }; + receiver.await.unwrap(); + } + + /// Returns the earliest waiter, or None if no one is waiting on the clock. + /// The returned instant is guaranteed to be <= any waiter that is currently + /// waiting on the clock to advance. + pub fn first_waiter(&self) -> Option { + let inner = self.0.lock().unwrap(); + inner.waiters.peek().map(|waiter| waiter.deadline) } } diff --git a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs index 8ad9b1619bc..a5fc5a36ee6 100644 --- a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs @@ -1,12 +1,11 @@ use derive_enum_from_into::{EnumFrom, EnumTryInto}; use near_async::messaging::{noop, IntoMultiSender, IntoSender, MessageWithCallback, SendAsync}; use near_async::test_loop::adhoc::{handle_adhoc_events, AdhocEvent, AdhocEventSender}; -use near_async::test_loop::event_handler::{ - ignore_events, LoopEventHandler, LoopHandlerContext, TryIntoOrSelf, -}; +use near_async::test_loop::delay_sender::DelaySender; +use near_async::test_loop::event_handler::{ignore_events, LoopEventHandler, TryIntoOrSelf}; use near_async::test_loop::futures::{ - drive_async_computations, drive_delayed_action_runners, drive_futures, - TestLoopAsyncComputationEvent, TestLoopDelayedActionEvent, TestLoopTask, + drive_async_computations, drive_futures, TestLoopAsyncComputationEvent, + TestLoopDelayedActionEvent, TestLoopTask, }; use near_async::test_loop::TestLoopBuilder; use near_async::time::Duration; @@ -319,12 +318,8 @@ fn test_client_with_multi_test_loop() { test.register_handler(drive_async_computations().widen().for_index(idx)); // Delayed actions. - test.register_handler( - drive_delayed_action_runners::().widen().for_index(idx), - ); - test.register_handler( - drive_delayed_action_runners::().widen().for_index(idx), - ); + test.register_delayed_action_handler_for_index::(idx); + test.register_delayed_action_handler_for_index::(idx); // Messages to the client. test.register_handler( @@ -358,8 +353,12 @@ fn test_client_with_multi_test_loop() { } // Handles network routing. Outgoing messages are handled by emitting incoming messages to the // appropriate component of the appropriate node index. - test.register_handler(route_network_messages_to_client(NETWORK_DELAY)); - test.register_handler(route_shards_manager_network_messages(NETWORK_DELAY)); + test.register_handler(route_network_messages_to_client(test.sender(), NETWORK_DELAY)); + test.register_handler(route_shards_manager_network_messages( + test.sender(), + test.clock(), + NETWORK_DELAY, + )); // Bootstrap the test by starting the components. // We use adhoc events for these, just so that the visualizer can see these as events rather @@ -367,14 +366,16 @@ fn test_client_with_multi_test_loop() { // the send_adhoc_event part and the test would still work. for idx in 0..NUM_CLIENTS { let sender = test.sender().for_index(idx); + let shutting_down = test.shutting_down(); test.sender().for_index(idx).send_adhoc_event("start_client", move |data| { - data.client.start(&mut sender.into_delayed_action_runner()); + data.client.start(&mut sender.into_delayed_action_runner(shutting_down)); }); let sender = test.sender().for_index(idx); + let shutting_down = test.shutting_down(); test.sender().for_index(idx).send_adhoc_event("start_shards_manager", move |data| { data.shards_manager.periodically_resend_chunk_requests( - &mut sender.into_delayed_action_runner(), + &mut sender.into_delayed_action_runner(shutting_down), Duration::milliseconds(100), ); }) @@ -443,7 +444,7 @@ fn test_client_with_multi_test_loop() { // Give the test a chance to finish off remaining important events in the event loop, which can // be important for properly shutting down the nodes. - test.finish_remaining_events(Duration::seconds(1)); + test.shutdown_and_drain_remaining_events(Duration::seconds(1)); } /// Handles outgoing network messages, and turns them into incoming client messages. @@ -453,112 +454,107 @@ pub fn route_network_messages_to_client< + From + From, >( + sender: DelaySender<(usize, Event)>, network_delay: Duration, ) -> LoopEventHandler { // let mut route_back_lookup: HashMap = HashMap::new(); // let mut next_hash: u64 = 0; - LoopEventHandler::new( - move |event: (usize, Event), - data: &mut Data, - context: &LoopHandlerContext<(usize, Event)>| { - let (idx, event) = event; - let message = event.try_into_or_self().map_err(|event| (idx, event.into()))?; - let PeerManagerMessageRequest::NetworkRequests(request) = message else { - return Err((idx, message.into())); - }; - - let client_senders = (0..data.num_accounts()) + LoopEventHandler::new(move |event: (usize, Event), data: &mut Data| { + let (idx, event) = event; + let message = event.try_into_or_self().map_err(|event| (idx, event.into()))?; + let PeerManagerMessageRequest::NetworkRequests(request) = message else { + return Err((idx, message.into())); + }; + + let client_senders = (0..data.num_accounts()) .map(|idx| { - context - .sender + sender .with_additional_delay(network_delay) .for_index(idx) .into_wrapped_multi_sender::() }) .collect::>(); - match request { - NetworkRequests::Block { block } => { - for other_idx in 0..data.num_accounts() { - if other_idx != idx { - drop(client_senders[other_idx].send_async(BlockResponse { - block: block.clone(), - peer_id: PeerId::random(), - was_requested: false, - })); - } - } - } - NetworkRequests::Approval { approval_message } => { - let other_idx = data.index_for_account(&approval_message.target); + match request { + NetworkRequests::Block { block } => { + for other_idx in 0..data.num_accounts() { if other_idx != idx { - drop(client_senders[other_idx].send_async(BlockApproval( - approval_message.approval, - PeerId::random(), - ))); - } else { - tracing::warn!("Dropping message to self"); + drop(client_senders[other_idx].send_async(BlockResponse { + block: block.clone(), + peer_id: PeerId::random(), + was_requested: false, + })); } } - NetworkRequests::ForwardTx(account, transaction) => { - let other_idx = data.index_for_account(&account); - if other_idx != idx { - drop(client_senders[other_idx].send_async(ProcessTxRequest { - transaction, - is_forwarded: true, - check_only: false, - })) - } else { - tracing::warn!("Dropping message to self"); - } + } + NetworkRequests::Approval { approval_message } => { + let other_idx = data.index_for_account(&approval_message.target); + if other_idx != idx { + drop( + client_senders[other_idx] + .send_async(BlockApproval(approval_message.approval, PeerId::random())), + ); + } else { + tracing::warn!("Dropping message to self"); } - NetworkRequests::ChunkEndorsement(target, endorsement) => { - let other_idx = data.index_for_account(&target); - if other_idx != idx { + } + NetworkRequests::ForwardTx(account, transaction) => { + let other_idx = data.index_for_account(&account); + if other_idx != idx { + drop(client_senders[other_idx].send_async(ProcessTxRequest { + transaction, + is_forwarded: true, + check_only: false, + })) + } else { + tracing::warn!("Dropping message to self"); + } + } + NetworkRequests::ChunkEndorsement(target, endorsement) => { + let other_idx = data.index_for_account(&target); + if other_idx != idx { + drop( + client_senders[other_idx].send_async(ChunkEndorsementMessage(endorsement)), + ); + } else { + tracing::warn!("Dropping message to self"); + } + } + NetworkRequests::ChunkStateWitness(targets, witness) => { + let other_idxes = targets + .iter() + .map(|account| data.index_for_account(account)) + .collect::>(); + for other_idx in &other_idxes { + if *other_idx != idx { drop( - client_senders[other_idx] - .send_async(ChunkEndorsementMessage(endorsement)), + client_senders[*other_idx] + .send_async(ChunkStateWitnessMessage(witness.clone())), ); } else { - tracing::warn!("Dropping message to self"); - } - } - NetworkRequests::ChunkStateWitness(targets, witness) => { - let other_idxes = targets - .iter() - .map(|account| data.index_for_account(account)) - .collect::>(); - for other_idx in &other_idxes { - if *other_idx != idx { - drop( - client_senders[*other_idx] - .send_async(ChunkStateWitnessMessage(witness.clone())), - ); - } else { - tracing::warn!( + tracing::warn!( "ChunkStateWitness asked to send to nodes {:?}, but {} is ourselves, so skipping that", other_idxes, idx); - } } } - NetworkRequests::ChunkStateWitnessAck(target, witness_ack) => { - let other_idx = data.index_for_account(&target); - if other_idx != idx { - drop( - client_senders[other_idx] - .send_async(ChunkStateWitnessAckMessage(witness_ack)), - ); - } else { - tracing::warn!("Dropping state-witness-ack message to self"); - } + } + NetworkRequests::ChunkStateWitnessAck(target, witness_ack) => { + let other_idx = data.index_for_account(&target); + if other_idx != idx { + drop( + client_senders[other_idx] + .send_async(ChunkStateWitnessAckMessage(witness_ack)), + ); + } else { + tracing::warn!("Dropping state-witness-ack message to self"); } - // TODO: Support more network message types as we expand the test. - _ => return Err((idx, PeerManagerMessageRequest::NetworkRequests(request).into())), } + // TODO: Support more network message types as we expand the test. + _ => return Err((idx, PeerManagerMessageRequest::NetworkRequests(request).into())), + } - Ok(()) - }, - ) + Ok(()) + }) } // TODO: This would be a good starting point for turning this into a test util. diff --git a/integration-tests/src/tests/client/features/simple_test_loop_example.rs b/integration-tests/src/tests/client/features/simple_test_loop_example.rs index 0aa5bd13a4a..7c9ad296d63 100644 --- a/integration-tests/src/tests/client/features/simple_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/simple_test_loop_example.rs @@ -1,9 +1,9 @@ use derive_enum_from_into::{EnumFrom, EnumTryInto}; -use near_async::futures::DelayedActionRunnerExt; use near_async::messaging::{noop, IntoMultiSender, IntoSender}; +use near_async::test_loop::adhoc::{handle_adhoc_events, AdhocEvent, AdhocEventSender}; use near_async::test_loop::futures::{ - drive_async_computations, drive_delayed_action_runners, drive_futures, - TestLoopAsyncComputationEvent, TestLoopDelayedActionEvent, TestLoopTask, + drive_async_computations, drive_futures, TestLoopAsyncComputationEvent, + TestLoopDelayedActionEvent, TestLoopTask, }; use near_async::test_loop::TestLoopBuilder; use near_async::time::Duration; @@ -54,10 +54,17 @@ struct TestData { pub shards_manager: ShardsManager, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + #[derive(EnumTryInto, Debug, EnumFrom)] #[allow(clippy::large_enum_variant)] enum TestEvent { Task(Arc), + Adhoc(AdhocEvent), AsyncComputation(TestLoopAsyncComputationEvent), ClientDelayedActions(TestLoopDelayedActionEvent), ClientEventFromNetwork(ClientSenderForNetworkMessage), @@ -230,18 +237,17 @@ fn test_client_with_simple_test_loop() { .widen(), ); test.register_handler(drive_futures().widen()); + test.register_handler(handle_adhoc_events::().widen()); test.register_handler(drive_async_computations().widen()); - test.register_handler(drive_delayed_action_runners::().widen()); + test.register_delayed_action_handler::(); test.register_handler(forward_client_request_to_shards_manager().widen()); // TODO: handle additional events. - test.sender().into_delayed_action_runner::().run_later( - "start_client", - Duration::ZERO, - |client, runner| { - client.start(runner); - }, - ); + let mut delayed_runner = + test.sender().into_delayed_action_runner::(test.shutting_down()); + test.sender().send_adhoc_event("start_client", move |data| { + data.client.start(&mut delayed_runner); + }); test.run_for(Duration::seconds(10)); - test.finish_remaining_events(Duration::seconds(1)); + test.shutdown_and_drain_remaining_events(Duration::seconds(1)); }