diff --git a/chain/chunks/src/test/basic.rs b/chain/chunks/src/test/basic.rs index bcebb48534e..8bf8d23e164 100644 --- a/chain/chunks/src/test/basic.rs +++ b/chain/chunks/src/test/basic.rs @@ -1,7 +1,16 @@ -use std::collections::HashSet; - +use crate::{ + adapter::ShardsManagerRequestFromClient, + client::ShardsManagerResponse, + test_loop::{ + forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, + MockChainForShardsManager, MockChainForShardsManagerConfig, + }, + test_utils::default_tip, + ShardsManager, CHUNK_REQUEST_RETRY, +}; use derive_enum_from_into::{EnumFrom, EnumTryInto}; use near_async::messaging::noop; +use near_async::test_loop::futures::TestLoopDelayedActionEvent; use near_async::time; use near_async::{ messaging::{CanSend, IntoSender}, @@ -18,20 +27,9 @@ use near_network::{ }; use near_primitives::types::{AccountId, BlockHeight}; use near_store::test_utils::create_test_store; +use std::collections::HashSet; use tracing::log::info; -use crate::{ - adapter::ShardsManagerRequestFromClient, - client::ShardsManagerResponse, - test_loop::{ - forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, - periodically_resend_chunk_requests, MockChainForShardsManager, - MockChainForShardsManagerConfig, ShardsManagerResendChunkRequests, - }, - test_utils::default_tip, - ShardsManager, CHUNK_REQUEST_RETRY, -}; - #[derive(derive_more::AsMut)] struct TestData { shards_manager: ShardsManager, @@ -60,8 +58,8 @@ enum TestEvent { NetworkToShardsManager(ShardsManagerRequestFromNetwork), ShardsManagerToClient(ShardsManagerResponse), ShardsManagerToNetwork(PeerManagerMessageRequest), - ShardsManagerResendRequests(ShardsManagerResendChunkRequests), Adhoc(AdhocEvent), + ShardsManagerDelayedActions(TestLoopDelayedActionEvent), } type ShardsManagerTestLoopBuilder = near_async::test_loop::TestLoopBuilder; @@ -182,8 +180,13 @@ fn test_chunk_forward() { test.register_handler(capture_events::().widen()); test.register_handler(forward_client_request_to_shards_manager().widen()); test.register_handler(forward_network_request_to_shards_manager().widen()); - test.register_handler(periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen()); test.register_handler(handle_adhoc_events::().widen()); + test.register_delayed_action_handler::(); + + test.data.shards_manager.periodically_resend_chunk_requests( + &mut test.sender().into_delayed_action_runner::(test.shutting_down()), + CHUNK_REQUEST_RETRY, + ); // We'll produce a single chunk whose next chunk producer is a chunk-only // producer, so that we can test that the chunk is forwarded to the next @@ -260,4 +263,5 @@ fn test_chunk_forward() { } } assert!(seen_part_request); + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } diff --git a/chain/chunks/src/test/multi.rs b/chain/chunks/src/test/multi.rs index 24e8bf84dd3..47a74bfa2d5 100644 --- a/chain/chunks/src/test/multi.rs +++ b/chain/chunks/src/test/multi.rs @@ -1,4 +1,16 @@ +use crate::{ + adapter::ShardsManagerRequestFromClient, + client::ShardsManagerResponse, + test_loop::{ + forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, + route_shards_manager_network_messages, MockChainForShardsManager, + MockChainForShardsManagerConfig, + }, + test_utils::default_tip, + ShardsManager, CHUNK_REQUEST_RETRY, +}; use derive_enum_from_into::{EnumFrom, EnumTryInto}; +use near_async::test_loop::futures::TestLoopDelayedActionEvent; use near_async::{ messaging::IntoSender, test_loop::{ @@ -20,19 +32,6 @@ use near_primitives::{ }; use near_store::test_utils::create_test_store; -use crate::{ - adapter::ShardsManagerRequestFromClient, - client::ShardsManagerResponse, - test_loop::{ - forward_client_request_to_shards_manager, forward_network_request_to_shards_manager, - periodically_resend_chunk_requests, route_shards_manager_network_messages, - MockChainForShardsManager, MockChainForShardsManagerConfig, - ShardsManagerResendChunkRequests, - }, - test_utils::default_tip, - ShardsManager, CHUNK_REQUEST_RETRY, -}; - #[derive(derive_more::AsMut, derive_more::AsRef)] struct TestData { shards_manager: ShardsManager, @@ -50,11 +49,11 @@ impl AsMut for TestData { #[derive(EnumTryInto, Debug, EnumFrom)] enum TestEvent { Adhoc(AdhocEvent), + ShardsManagerDelayedActions(TestLoopDelayedActionEvent), ClientToShardsManager(ShardsManagerRequestFromClient), NetworkToShardsManager(ShardsManagerRequestFromNetwork), ShardsManagerToClient(ShardsManagerResponse), OutboundNetwork(PeerManagerMessageRequest), - ShardsManagerResendChunkRequests(ShardsManagerResendChunkRequests), } type ShardsManagerTestLoop = near_async::test_loop::TestLoop, (usize, TestEvent)>; @@ -106,13 +105,24 @@ fn basic_setup(config: BasicSetupConfig) -> ShardsManagerTestLoop { let mut test = builder.build(data); for idx in 0..test.data.len() { test.register_handler(handle_adhoc_events::().widen().for_index(idx)); + test.register_delayed_action_handler_for_index::(idx); test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx)); test.register_handler(forward_network_request_to_shards_manager().widen().for_index(idx)); test.register_handler(capture_events::().widen().for_index(idx)); - test.register_handler(route_shards_manager_network_messages(NETWORK_DELAY)); - test.register_handler( - periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen().for_index(idx), - ); + test.register_handler(route_shards_manager_network_messages( + test.sender(), + test.clock(), + NETWORK_DELAY, + )); + + let sender = test.sender().for_index(idx); + let shutting_down = test.shutting_down(); + test.sender().for_index(idx).send_adhoc_event("start_shards_manager", |data| { + data.shards_manager.periodically_resend_chunk_requests( + &mut sender.into_delayed_action_runner(shutting_down), + CHUNK_REQUEST_RETRY, + ); + }) } test } @@ -175,6 +185,8 @@ fn test_distribute_chunk_basic() { _ => panic!("Unexpected event"), } } + + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } /// Tests that when we have some block producers (validators) in the network, @@ -237,6 +249,7 @@ fn test_distribute_chunk_track_all_shards() { _ => panic!("Unexpected event"), } } + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } /// Tests that when the network has some block producers and also some chunk- @@ -348,4 +361,5 @@ fn test_distribute_chunk_with_chunk_only_producers() { }); } test.run_instant(); + test.shutdown_and_drain_remaining_events(time::Duration::seconds(1)); } diff --git a/chain/chunks/src/test_loop.rs b/chain/chunks/src/test_loop.rs index 1826d30983b..894e5c32fb6 100644 --- a/chain/chunks/src/test_loop.rs +++ b/chain/chunks/src/test_loop.rs @@ -1,9 +1,15 @@ -use std::{collections::HashMap, sync::Arc}; - +use crate::{ + adapter::ShardsManagerRequestFromClient, + logic::{cares_about_shard_this_or_next_epoch, make_outgoing_receipts_proofs}, + test_utils::{default_tip, tip}, + ShardsManager, +}; +use near_async::test_loop::delay_sender::DelaySender; use near_async::time; +use near_async::time::Clock; use near_async::{ messaging::Sender, - test_loop::event_handler::{interval, LoopEventHandler, LoopHandlerContext, TryIntoOrSelf}, + test_loop::event_handler::{LoopEventHandler, TryIntoOrSelf}, }; use near_chain::{types::Tip, Chain}; use near_epoch_manager::{ @@ -28,13 +34,7 @@ use near_primitives::{ version::PROTOCOL_VERSION, }; use near_store::Store; - -use crate::{ - adapter::ShardsManagerRequestFromClient, - logic::{cares_about_shard_this_or_next_epoch, make_outgoing_receipts_proofs}, - test_utils::{default_tip, tip}, - ShardsManager, -}; +use std::{collections::HashMap, sync::Arc}; pub fn forward_client_request_to_shards_manager( ) -> LoopEventHandler { @@ -61,25 +61,24 @@ pub fn route_shards_manager_network_messages< + From + From, >( + sender: DelaySender<(usize, Event)>, + clock: Clock, network_delay: time::Duration, ) -> LoopEventHandler { let mut route_back_lookup: HashMap = HashMap::new(); let mut next_hash: u64 = 0; - LoopEventHandler::new( - move |event: (usize, Event), - data: &mut Data, - context: &LoopHandlerContext<(usize, Event)>| { - let (idx, event) = event; - let message = event.try_into_or_self().map_err(|e| (idx, e.into()))?; - match message { - PeerManagerMessageRequest::NetworkRequests(request) => { - match request { - NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => { - let target_idx = data.index_for_account(&target.account_id.unwrap()); - let route_back = CryptoHash::hash_borsh(next_hash); - route_back_lookup.insert(route_back, idx); - next_hash += 1; - context.sender.send_with_delay( + LoopEventHandler::new(move |event: (usize, Event), data: &mut Data| { + let (idx, event) = event; + let message = event.try_into_or_self().map_err(|e| (idx, e.into()))?; + match message { + PeerManagerMessageRequest::NetworkRequests(request) => { + match request { + NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => { + let target_idx = data.index_for_account(&target.account_id.unwrap()); + let route_back = CryptoHash::hash_borsh(next_hash); + route_back_lookup.insert(route_back, idx); + next_hash += 1; + sender.send_with_delay( (target_idx, ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest { partial_encoded_chunk_request: request, @@ -87,73 +86,66 @@ pub fn route_shards_manager_network_messages< }.into()), network_delay, ); - Ok(()) - } - NetworkRequests::PartialEncodedChunkResponse { route_back, response } => { - let target_idx = - *route_back_lookup.get(&route_back).expect("Route back not found"); - context.sender.send_with_delay( + Ok(()) + } + NetworkRequests::PartialEncodedChunkResponse { route_back, response } => { + let target_idx = + *route_back_lookup.get(&route_back).expect("Route back not found"); + sender.send_with_delay( (target_idx, ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse { partial_encoded_chunk_response: response, - received_time: context.clock.now().into(), // TODO: use clock + received_time: clock.now().into(), // TODO: use clock }.into()), network_delay, ); - Ok(()) - } - NetworkRequests::PartialEncodedChunkMessage { - account_id, - partial_encoded_chunk, - } => { - let target_idx = data.index_for_account(&account_id); - context.sender.send_with_delay( - ( - target_idx, - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk( - partial_encoded_chunk.into(), - ) - .into(), - ), - network_delay, - ); - Ok(()) - } - NetworkRequests::PartialEncodedChunkForward { account_id, forward } => { - let target_idx = data.index_for_account(&account_id); - context.sender.send_with_delay( - (target_idx, - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward( - forward, - ).into()), + Ok(()) + } + NetworkRequests::PartialEncodedChunkMessage { + account_id, + partial_encoded_chunk, + } => { + let target_idx = data.index_for_account(&account_id); + sender.send_with_delay( + ( + target_idx, + ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk( + partial_encoded_chunk.into(), + ) + .into(), + ), + network_delay, + ); + Ok(()) + } + NetworkRequests::PartialEncodedChunkForward { account_id, forward } => { + let target_idx = data.index_for_account(&account_id); + sender.send_with_delay( + ( + target_idx, + ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward( + forward, + ) + .into(), + ), network_delay, ); - Ok(()) - } - other_message => Err(( - idx, - PeerManagerMessageRequest::NetworkRequests(other_message).into(), - )), + Ok(()) + } + other_message => { + Err((idx, PeerManagerMessageRequest::NetworkRequests(other_message).into())) } } - message => Err((idx, message.into())), } - }, - ) + message => Err((idx, message.into())), + } + }) } +// NOTE: this is no longer needed for TestLoop, but some other non-TestLoop tests depend on it. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ShardsManagerResendChunkRequests; -/// Periodically call resend_chunk_requests. -pub fn periodically_resend_chunk_requests( - every: time::Duration, -) -> LoopEventHandler { - interval(every, ShardsManagerResendChunkRequests, |data: &mut ShardsManager| { - data.resend_chunk_requests() - }) -} - /// A simple implementation of the chain side that interacts with /// ShardsManager. pub struct MockChainForShardsManager { diff --git a/chain/client/src/test_utils/client_actions_test_utils.rs b/chain/client/src/test_utils/client_actions_test_utils.rs index 0b584c40648..0dc766b0d56 100644 --- a/chain/client/src/test_utils/client_actions_test_utils.rs +++ b/chain/client/src/test_utils/client_actions_test_utils.rs @@ -6,7 +6,7 @@ use near_network::client::ClientSenderForNetworkMessage; pub fn forward_client_messages_from_network_to_client_actions( ) -> LoopEventHandler { - LoopEventHandler::new(|msg, client_actions: &mut ClientActions, _| { + LoopEventHandler::new(|msg, client_actions: &mut ClientActions| { match msg { ClientSenderForNetworkMessage::_state_response(msg) => { (msg.callback)(Ok(client_actions.handle(msg.message))); diff --git a/core/async/src/examples/actix_component_test.rs b/core/async/src/examples/actix_component_test.rs index a3917305399..e3950776992 100644 --- a/core/async/src/examples/actix_component_test.rs +++ b/core/async/src/examples/actix_component_test.rs @@ -4,9 +4,7 @@ use super::actix_component::{ use crate::futures::FutureSpawnerExt; use crate::messaging::IntoSender; use crate::test_loop::event_handler::{capture_events, LoopEventHandler}; -use crate::test_loop::futures::{ - drive_delayed_action_runners, drive_futures, TestLoopDelayedActionEvent, TestLoopTask, -}; +use crate::test_loop::futures::{drive_futures, TestLoopDelayedActionEvent, TestLoopTask}; use crate::test_loop::TestLoopBuilder; use derive_enum_from_into::{EnumFrom, EnumTryInto}; use std::sync::Arc; @@ -57,7 +55,7 @@ fn test_actix_component() { // test itself is synchronous. test.register_handler(drive_futures().widen()); // This is to allow the ExampleComponent to run delayed actions (timers). - test.register_handler(drive_delayed_action_runners::().widen()); + test.register_delayed_action_handler::(); // This is to capture the periodic requests sent by the ExampleComponent // so we can assert against it. test.register_handler(capture_events::().widen()); @@ -66,7 +64,7 @@ fn test_actix_component() { test.register_handler(example_handler().widen()); // We need to redo whatever the ExampleActor does in its `started` method. - test.data.example.start(&mut test.sender().into_delayed_action_runner()); + test.data.example.start(&mut test.sender().into_delayed_action_runner(test.shutting_down())); // Send some requests; this can be done in the asynchronous context. test.future_spawner().spawn("wait for 5", { let res = test.data.outer.call_example_component_for_response(5); @@ -87,4 +85,6 @@ fn test_actix_component() { test.data.periodic_requests_captured, vec![PeriodicRequest { id: 0 }, PeriodicRequest { id: 1 }, PeriodicRequest { id: 2 },] ); + + test.shutdown_and_drain_remaining_events(Duration::seconds(1)); } diff --git a/core/async/src/examples/mod.rs b/core/async/src/examples/mod.rs index 710c4525f93..5c56dba55d2 100644 --- a/core/async/src/examples/mod.rs +++ b/core/async/src/examples/mod.rs @@ -5,5 +5,3 @@ mod async_component_test; mod multi_instance_test; mod sum_numbers; mod sum_numbers_test; -mod timed_component; -mod timed_component_test; diff --git a/core/async/src/examples/multi_instance_test.rs b/core/async/src/examples/multi_instance_test.rs index c345bd12d01..27ff2d0521b 100644 --- a/core/async/src/examples/multi_instance_test.rs +++ b/core/async/src/examples/multi_instance_test.rs @@ -1,6 +1,7 @@ use crate::time; use derive_enum_from_into::{EnumFrom, EnumTryInto}; +use crate::test_loop::delay_sender::DelaySender; use crate::{ examples::sum_numbers_test::forward_sum_request, messaging::{CanSend, IntoSender}, @@ -27,13 +28,14 @@ enum TestEvent { /// Let's pretend that when we send a remote request, the number gets sent to /// every other instance in the setup as a local request. -fn forward_remote_request_to_other_instances() -> LoopEventHandler, (usize, TestEvent)> -{ - LoopEventHandler::new(|event: (usize, TestEvent), data: &mut Vec, context| { +fn forward_remote_request_to_other_instances( + sender: DelaySender<(usize, TestEvent)>, +) -> LoopEventHandler, (usize, TestEvent)> { + LoopEventHandler::new(move |event: (usize, TestEvent), data: &mut Vec| { if let TestEvent::RemoteRequest(number) = event.1 { for i in 0..data.len() { if i != event.0 { - context.sender.send((i, TestEvent::LocalRequest(SumRequest::Number(number)))) + sender.send((i, TestEvent::LocalRequest(SumRequest::Number(number)))) } } Ok(()) @@ -58,7 +60,7 @@ fn test_multi_instance() { } let sender = builder.sender(); let mut test = builder.build(data); - test.register_handler(forward_remote_request_to_other_instances()); + test.register_handler(forward_remote_request_to_other_instances(test.sender())); for i in 0..5 { // Single-instance handlers can be reused for multi-instance tests. test.register_handler(forward_sum_request().widen().for_index(i)); diff --git a/core/async/src/examples/timed_component.rs b/core/async/src/examples/timed_component.rs deleted file mode 100644 index 373a486af3b..00000000000 --- a/core/async/src/examples/timed_component.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::messaging::Sender; - -pub(crate) struct TimedComponent { - buffered_messages: Vec, - message_sender: Sender>, -} - -/// Mimics a component that has a specific function that is supposed to be -/// triggered by a timer. -impl TimedComponent { - pub fn new(message_sender: Sender>) -> Self { - Self { buffered_messages: vec![], message_sender } - } - - pub fn send_message(&mut self, msg: String) { - self.buffered_messages.push(msg); - } - - /// This is supposed to be triggered by a timer so it flushes the - /// messages every tick. - pub fn flush(&mut self) { - if self.buffered_messages.is_empty() { - return; - } - self.message_sender.send(self.buffered_messages.clone()); - self.buffered_messages.clear(); - } -} diff --git a/core/async/src/examples/timed_component_test.rs b/core/async/src/examples/timed_component_test.rs deleted file mode 100644 index 4d6c677a718..00000000000 --- a/core/async/src/examples/timed_component_test.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::time; -use derive_enum_from_into::{EnumFrom, EnumTryInto}; - -use crate::{ - messaging::IntoSender, - test_loop::event_handler::{capture_events, interval, LoopEventHandler}, -}; - -use super::timed_component::TimedComponent; - -#[derive(Debug, Clone, PartialEq)] -struct Flush; - -#[derive(Debug, EnumTryInto, EnumFrom)] -enum TestEvent { - SendMessage(String), - Flush(Flush), - MessageSent(Vec), -} - -#[derive(derive_more::AsMut, derive_more::AsRef)] -struct TestData { - component: TimedComponent, - messages_sent: Vec>, -} - -fn forward_send_message() -> LoopEventHandler { - LoopEventHandler::new_simple(|event, data: &mut TimedComponent| { - data.send_message(event); - }) -} - -#[test] -fn test_timed_component() { - let builder = crate::test_loop::TestLoopBuilder::::new(); - let data = TestData { - component: TimedComponent::new(builder.sender().into_sender()), - messages_sent: vec![], - }; - let sender = builder.sender(); - let mut test = builder.build(data); - test.register_handler(forward_send_message().widen()); - test.register_handler( - interval(time::Duration::milliseconds(100), Flush, |data: &mut TimedComponent| { - data.flush() - }) - .widen(), - ); - test.register_handler(capture_events::>().widen()); - - sender.send_with_delay("Hello".to_string().into(), time::Duration::milliseconds(10)); - sender.send_with_delay("World".to_string().into(), time::Duration::milliseconds(20)); - // The timer fires at 100ms here and flushes "Hello" and "World". - sender.send_with_delay("!".to_string().into(), time::Duration::milliseconds(110)); - // The timer fires again at 200ms here and flushes "!"". - // Further timer events do not send messages. - - test.run_for(time::Duration::seconds(1)); - assert_eq!( - test.data.messages_sent, - vec![vec!["Hello".to_string(), "World".to_string()], vec!["!".to_string()]] - ); -} diff --git a/core/async/src/test_loop.rs b/core/async/src/test_loop.rs index 551bf7a4373..cf25fce6e79 100644 --- a/core/async/src/test_loop.rs +++ b/core/async/src/test_loop.rs @@ -63,18 +63,17 @@ pub mod adhoc; pub mod delay_sender; pub mod event_handler; pub mod futures; -pub mod multi_instance; use self::{ delay_sender::DelaySender, event_handler::LoopEventHandler, futures::{TestLoopFutureSpawner, TestLoopTask}, }; -use crate::test_loop::event_handler::LoopHandlerContext; use crate::time; -use crate::time::Duration; +use crate::time::{Clock, Duration}; use near_o11y::{testonly::init_test_logger, tracing::info}; use serde::Serialize; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::{collections::BinaryHeap, fmt::Debug, sync::Arc}; @@ -109,9 +108,9 @@ pub struct TestLoop { current_time: Duration, /// Fake clock that always returns the virtual time. clock: time::FakeClock, - - /// Handlers are initialized only once, upon the first call to run(). - handlers_initialized: bool, + /// Shutdown flag. When this flag is true, delayed action runners will no + /// longer post any new events to the event loop. + shutting_down: Arc, /// All the event handlers that are registered. We invoke them one by one /// for each event, until one of them handles the event (or panic if no one /// handles it). @@ -121,7 +120,7 @@ pub struct TestLoop { /// An event waiting to be executed, ordered by the due time and then by ID. struct EventInHeap { event: Event, - due: time::Duration, + due: Duration, id: usize, } @@ -190,6 +189,7 @@ pub struct TestLoopBuilder { clock: time::FakeClock, pending_events: Arc>>, pending_events_sender: DelaySender, + shutting_down: Arc, } impl TestLoopBuilder { @@ -207,6 +207,7 @@ impl TestLoopBuilder { pending_events_sender: DelaySender::new(move |event, delay| { pending_events.lock().unwrap().add(event, delay); }), + shutting_down: Arc::new(AtomicBool::new(false)), } } @@ -220,8 +221,20 @@ impl TestLoopBuilder { self.clock.clock() } + /// Returns a flag indicating whether the TestLoop system is being shut down; + /// this is similar to whether the Actix system is shutting down. + pub fn shutting_down(&self) -> Arc { + self.shutting_down.clone() + } + pub fn build(self, data: Data) -> TestLoop { - TestLoop::new(self.pending_events, self.pending_events_sender, self.clock, data) + TestLoop::new( + self.pending_events, + self.pending_events_sender, + self.clock, + self.shutting_down, + data, + ) } } @@ -253,6 +266,7 @@ impl TestLoop { pending_events: Arc>>, sender: DelaySender, clock: time::FakeClock, + shutting_down: Arc, data: Data, ) -> Self { Self { @@ -263,7 +277,7 @@ impl TestLoop { next_event_index: 0, current_time: time::Duration::ZERO, clock, - handlers_initialized: false, + shutting_down, handlers: Vec::new(), } } @@ -272,28 +286,22 @@ impl TestLoop { self.sender.clone() } + pub fn clock(&self) -> Clock { + self.clock.clock() + } + + pub fn shutting_down(&self) -> Arc { + self.shutting_down.clone() + } + /// Registers a new event handler to the test loop. pub fn register_handler(&mut self, handler: LoopEventHandler) { - assert!(!self.handlers_initialized, "Cannot register more handlers after run() is called"); self.handlers.push(handler); } - fn maybe_initialize_handlers(&mut self) { - if self.handlers_initialized { - return; - } - for handler in &mut self.handlers { - handler.init(LoopHandlerContext { - sender: self.sender.clone(), - clock: self.clock.clock(), - }); - } - } - /// Helper to push events we have just received into the heap. fn queue_received_events(&mut self) { for event in self.pending_events.lock().unwrap().events.drain(..) { - info!("Queuing new event at index {}: {:?}", self.next_event_index, event.event); self.events.push(EventInHeap { due: self.current_time + event.delay, event: event.event, @@ -303,29 +311,63 @@ impl TestLoop { } } - /// Runs the test loop for the given duration. This function may be called - /// multiple times, but further test handlers may not be registered after - /// the first call. - pub fn run_for(&mut self, duration: time::Duration) { - self.maybe_initialize_handlers(); - // Push events we have received outside the test or during handler init into the heap. - self.queue_received_events(); - let deadline = self.current_time + duration; + /// Performs the logic to find the next event, advance to its time, and dequeue it. + /// Takes a decider to determine whether to advance time, handle the next event, and/or to stop. + fn advance_till_next_event( + &mut self, + decider: &impl Fn(Option, &mut Data) -> AdvanceDecision, + ) -> Option> { loop { - // Don't execute any more events after the deadline. - match self.events.peek() { - Some(event) => { - if event.due > deadline { - break; - } + // New events may have been sent to the TestLoop from outside, and the previous + // iteration of the loop may have made new futures ready, so queue up any received + // events. + self.queue_received_events(); + + // Now there are two ways an event may be/become available. One is that the event is + // queued into the event loop at a specific time; the other is that some future is + // waiting on our fake clock to advance beyond a specific time. Pick the earliest. + let next_timestamp = { + let next_event_timestamp = self.events.peek().map(|event| event.due); + let next_future_waiter_timestamp = self + .clock + .first_waiter() + .map(|time| time - (self.clock.now() - self.current_time)); + next_event_timestamp + .map(|t1| next_future_waiter_timestamp.map(|t2| t2.min(t1)).unwrap_or(t1)) + .or(next_future_waiter_timestamp) + }; + // If the next event is immediately available (i.e. its time is same as current time), + // just return that event; there's no decision to make (as we only give deciders a + // chance to stop processing if we would advance the clock) and no need to advance time. + if next_timestamp == Some(self.current_time) { + let event = self.events.pop().expect("Programming error in TestLoop"); + assert_eq!(event.due, self.current_time); + return Some(event); + } + // If we reach this point, it means we need to advance the clock. Let the decider choose + // if we should do that, or if we should stop. + let decision = decider(next_timestamp, &mut self.data); + match decision { + AdvanceDecision::AdvanceToNextEvent => { + let next_timestamp = next_timestamp.unwrap(); + self.clock.advance(next_timestamp - self.current_time); + self.current_time = next_timestamp; + // Run the loop again, because if the reason why we advance the clock to this + // time is due to a possible future waiting on the clock, we may or may not get + // another future queued into the TestLoop, so we just check the whole thing + // again. + continue; + } + AdvanceDecision::AdvanceToAndStop(target) => { + self.clock.advance(target - self.current_time); + self.current_time = target; + return None; + } + AdvanceDecision::Stop => { + return None; } - None => break, } - // Process the event. - let event = self.events.pop().unwrap(); - self.process_event(event); } - self.current_time = deadline; } /// Processes the given event, by logging a line first and then finding a handler to run it. @@ -338,8 +380,7 @@ impl TestLoop { }) .unwrap(); info!(target: "test_loop", "TEST_LOOP_EVENT_START {}", start_json); - self.clock.advance(event.due - self.current_time); - self.current_time = event.due; + assert_eq!(self.current_time, event.due); for handler in &mut self.handlers { if let Err(e) = handler.handle(event.event, &mut self.data) { @@ -359,33 +400,42 @@ impl TestLoop { panic!("Unhandled event: {:?}", event.event); } + /// Runs the test loop for the given duration. This function may be called + /// multiple times, but further test handlers may not be registered after + /// the first call. + pub fn run_for(&mut self, duration: Duration) { + let deadline = self.current_time + duration; + while let Some(event) = self.advance_till_next_event(&|next_time, _| { + if let Some(next_time) = next_time { + if next_time <= deadline { + return AdvanceDecision::AdvanceToNextEvent; + } + } + AdvanceDecision::AdvanceToAndStop(deadline) + }) { + self.process_event(event); + } + } + /// Run until the given condition is true, asserting that it happens before the maximum duration /// is reached. /// /// To maximize logical consistency, the condition is only checked before the clock would /// advance. If it returns true, execution stops before advancing the clock. pub fn run_until(&mut self, condition: impl Fn(&mut Data) -> bool, maximum_duration: Duration) { - self.maybe_initialize_handlers(); - // Push events we have received outside the test or during handler init into the heap. - self.queue_received_events(); let deadline = self.current_time + maximum_duration; - loop { - // Don't execute any more events after the deadline. - match self.events.peek() { - Some(event) => { - if event.due > deadline { - panic!("run_until did not fulfill the condition within the given deadline"); - } - if event.due > self.current_time { - if condition(&mut self.data) { - return; - } - } + let decider = |next_time, data: &mut Data| { + if condition(data) { + return AdvanceDecision::Stop; + } + if let Some(next_time) = next_time { + if next_time <= deadline { + return AdvanceDecision::AdvanceToNextEvent; } - None => break, } - // Process the event. - let event = self.events.pop().unwrap(); + panic!("run_until did not fulfill the condition within the given deadline"); + }; + while let Some(event) = self.advance_till_next_event(&decider) { self.process_event(event); } } @@ -393,41 +443,11 @@ impl TestLoop { /// Used to finish off remaining events that are still in the loop. This can be necessary if the /// destructor of some components wait for certain condition to become true. Otherwise, the /// destructors may end up waiting forever. This also helps avoid a panic when destructing - /// TestLoop itself, as it asserts that all important events have been handled. - /// - /// Note that events that are droppable are dropped and not handled. It would not be consistent - /// to continue using the TestLoop, and therefore it is consumed by this function. - pub fn finish_remaining_events(mut self, maximum_duration: Duration) { - self.maybe_initialize_handlers(); - // Push events we have received outside the test or during handler init into the heap. - self.queue_received_events(); - let max_time = self.current_time + maximum_duration; - 'outer: loop { - // Don't execute any more events after the deadline. - match self.events.peek() { - Some(event) => { - if event.due > max_time { - panic!( - "finish_remaining_events could not finish all events; \ - event still remaining: {:?}", - event.event - ); - } - } - None => break, - } - // Only execute the event if we can't drop it. - let mut event = self.events.pop().unwrap(); - for handler in &self.handlers { - if let Err(e) = handler.try_drop(event.event) { - event.event = e; - } else { - continue 'outer; - } - } - // Process the event. - self.process_event(event); - } + /// TestLoop itself, as it asserts that all events have been handled. + pub fn shutdown_and_drain_remaining_events(mut self, maximum_duration: Duration) { + self.shutting_down.store(true, Ordering::Relaxed); + self.run_for(maximum_duration); + // Implicitly dropped here, which asserts that no more events are remaining. } pub fn run_instant(&mut self) { @@ -445,20 +465,81 @@ impl TestLoop { impl Drop for TestLoop { fn drop(&mut self) { self.queue_received_events(); - 'outer: for event in self.events.drain() { - let mut to_handle = event.event; - for handler in &mut self.handlers { - if let Err(e) = handler.try_drop(to_handle) { - to_handle = e; - } else { - continue 'outer; - } - } + if let Some(event) = self.events.pop() { panic!( - "Important event scheduled at {} is not handled at the end of the test: {:?}. - Consider calling `test.run()` again, or with a longer duration.", - event.due, to_handle + "Event scheduled at {} is not handled at the end of the test: {:?}. + Consider calling `test.shutdown_and_drain_remaining_events(...)`.", + event.due, event.event ); } } } + +enum AdvanceDecision { + AdvanceToNextEvent, + AdvanceToAndStop(Duration), + Stop, +} + +#[cfg(test)] +mod tests { + use crate::futures::FutureSpawnerExt; + use crate::test_loop::futures::{drive_futures, TestLoopTask}; + use crate::test_loop::TestLoopBuilder; + use derive_enum_from_into::{EnumFrom, EnumTryInto}; + use derive_more::AsMut; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use time::Duration; + + #[derive(Debug, EnumFrom, EnumTryInto)] + enum TestEvent { + Task(Arc), + } + + #[derive(AsMut)] + struct TestData { + dummy: (), + } + + // Tests that the TestLoop correctly handles futures that sleep on the fake clock. + #[test] + fn test_futures() { + let builder = TestLoopBuilder::::new(); + let clock = builder.clock(); + let mut test = builder.build::(TestData { dummy: () }); + test.register_handler(drive_futures().widen()); + let start_time = clock.now(); + + let finished = Arc::new(AtomicUsize::new(0)); + + let clock1 = clock.clone(); + let finished1 = finished.clone(); + test.sender().into_future_spawner().spawn("test1", async move { + assert_eq!(clock1.now(), start_time); + clock1.sleep(Duration::seconds(10)).await; + assert_eq!(clock1.now(), start_time + Duration::seconds(10)); + clock1.sleep(Duration::seconds(5)).await; + assert_eq!(clock1.now(), start_time + Duration::seconds(15)); + finished1.fetch_add(1, Ordering::Relaxed); + }); + + test.run_for(Duration::seconds(2)); + + let clock2 = clock; + let finished2 = finished.clone(); + test.sender().into_future_spawner().spawn("test2", async move { + assert_eq!(clock2.now(), start_time + Duration::seconds(2)); + clock2.sleep(Duration::seconds(3)).await; + assert_eq!(clock2.now(), start_time + Duration::seconds(5)); + clock2.sleep(Duration::seconds(20)).await; + assert_eq!(clock2.now(), start_time + Duration::seconds(25)); + finished2.fetch_add(1, Ordering::Relaxed); + }); + // During these 30 virtual seconds, the TestLoop should've automatically advanced the clock + // to wake each future as they become ready to run again. The code inside the futures + // assert that the fake clock does indeed have the expected times. + test.run_for(Duration::seconds(30)); + assert_eq!(finished.load(Ordering::Relaxed), 2); + } +} diff --git a/core/async/src/test_loop/adhoc.rs b/core/async/src/test_loop/adhoc.rs index dee459cffa0..29a3847d645 100644 --- a/core/async/src/test_loop/adhoc.rs +++ b/core/async/src/test_loop/adhoc.rs @@ -52,8 +52,7 @@ impl> + 'static> AdhocEventSender() -> LoopEventHandler> { - LoopEventHandler::new(|event: AdhocEvent, data, _ctx| { + LoopEventHandler::new_simple(|event: AdhocEvent, data| { (event.handler)(data); - Ok(()) }) } diff --git a/core/async/src/test_loop/delay_sender.rs b/core/async/src/test_loop/delay_sender.rs index e5baa3f5009..a3d0dd918b4 100644 --- a/core/async/src/test_loop/delay_sender.rs +++ b/core/async/src/test_loop/delay_sender.rs @@ -7,6 +7,7 @@ use crate::test_loop::futures::{ }; use crate::time; use crate::time::Duration; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use super::futures::{TestLoopFutureSpawner, TestLoopTask}; @@ -70,11 +71,14 @@ impl DelaySender { self.into_sender().break_apart().into_multi_sender() } - pub fn into_delayed_action_runner(self) -> TestLoopDelayedActionRunner + pub fn into_delayed_action_runner( + self, + shutting_down: Arc, + ) -> TestLoopDelayedActionRunner where Event: From> + 'static, { - TestLoopDelayedActionRunner { sender: self.narrow() } + TestLoopDelayedActionRunner { sender: self.narrow(), shutting_down } } /// Returns a FutureSpawner that can be used to spawn futures into the loop. diff --git a/core/async/src/test_loop/event_handler.rs b/core/async/src/test_loop/event_handler.rs index c8f5897ccfe..e9479d02744 100644 --- a/core/async/src/test_loop/event_handler.rs +++ b/core/async/src/test_loop/event_handler.rs @@ -1,81 +1,27 @@ -use super::{delay_sender::DelaySender, multi_instance::IndexedLoopEventHandler}; -use crate::time; - -/// Context given to the loop handler on each call. -pub struct LoopHandlerContext { - /// The sender that can be used to send more messages to the loop. - pub sender: DelaySender, - /// The clock whose .now() returns the current virtual time maintained by - /// the test loop. - pub clock: time::Clock, -} - /// An event handler registered on a test loop. Each event handler usually /// handles only some events, so we will usually have multiple event handlers /// registered to cover all event types. -pub struct LoopEventHandler { - inner: Box>, -} +pub struct LoopEventHandler( + Box Result<(), Event>>, +); impl LoopEventHandler { /// Creates a handler from the handling logic function. The function is /// called on each event. It should return Ok(()) if the event was handled, /// or Err(event) if the event was not handled (which will cause it to be /// passed to the next handler). - pub fn new( - handler: impl FnMut(Event, &mut Data, &LoopHandlerContext) -> Result<(), Event> + 'static, - ) -> Self { - Self { - inner: Box::new(LoopEventHandlerImplByFunction { - initial_event: None, - handler: Box::new(handler), - ok_to_drop: Box::new(|_| false), - context: None, - }), - } + pub fn new(handler: impl FnMut(Event, &mut Data) -> Result<(), Event> + 'static) -> Self { + Self(Box::new(handler)) } - /// Like new(), but the handler function is only given an event and data, - /// without the context, and also without the ability to reject the event. + /// Like new(), but the handler is not given the ability to reject the event. pub fn new_simple(mut handler: impl FnMut(Event, &mut Data) + 'static) -> Self { - Self::new(move |event, data, _| { + Self::new(move |event, data| { handler(event, data); Ok(()) }) } - pub fn new_with_drop( - handler: impl FnMut(Event, &mut Data, &LoopHandlerContext) -> Result<(), Event> + 'static, - ok_to_drop: impl Fn(&Event) -> bool + 'static, - ) -> Self { - Self { - inner: Box::new(LoopEventHandlerImplByFunction { - initial_event: None, - handler: Box::new(handler), - ok_to_drop: Box::new(ok_to_drop), - context: None, - }), - } - } - - /// Like new(), but additionally sends an initial event with an initial - /// delay. See periodic_interval() for why this is useful. - pub fn new_with_initial_event( - initial_event: Event, - initial_delay: time::Duration, - handler: impl FnMut(Event, &mut Data, &LoopHandlerContext) -> Result<(), Event> + 'static, - ok_to_drop: impl Fn(&Event) -> bool + 'static, - ) -> Self { - Self { - inner: Box::new(LoopEventHandlerImplByFunction { - initial_event: Some((initial_event, initial_delay)), - handler: Box::new(handler), - ok_to_drop: Box::new(ok_to_drop), - context: None, - }), - } - } - /// Adapts this handler to a handler whose data is a superset of our data /// and whose event is a superset of our event. /// For data, A is a superset of B if A implements AsRef and AsMut. @@ -85,78 +31,31 @@ impl LoopEventHandler { OuterData: AsMut, OuterEvent: TryIntoOrSelf + From + 'static, >( - self, + mut self, ) -> LoopEventHandler { - LoopEventHandler { inner: Box::new(WideningEventHandler(self)) } + LoopEventHandler(Box::new(move |event, data| { + let mut inner_data = data.as_mut(); + let inner_event = event.try_into_or_self()?; + self.0(inner_event, &mut inner_data)?; + Ok(()) + })) } /// Adapts this handler to a handler whose data is a vector of our data, /// and whose event is a is the tuple (index, our event), for a specific /// index. - pub fn for_index(self, index: usize) -> LoopEventHandler, (usize, Event)> { - LoopEventHandler { inner: Box::new(IndexedLoopEventHandler { inner: self, index }) } - } - - pub(crate) fn init(&mut self, context: LoopHandlerContext) { - self.inner.init(context) + pub fn for_index(mut self, index: usize) -> LoopEventHandler, (usize, Event)> { + LoopEventHandler(Box::new(move |event, data| { + if event.0 == index { + self.0(event.1, &mut data[index]).map_err(|event| (index, event)) + } else { + Err(event) + } + })) } pub(crate) fn handle(&mut self, event: Event, data: &mut Data) -> Result<(), Event> { - self.inner.handle(event, data) - } - - pub(crate) fn try_drop(&self, event: Event) -> Result<(), Event> { - self.inner.try_drop(event) - } -} - -/// Internal implementation of LoopEventHandler. -pub(crate) trait LoopEventHandlerImpl { - /// init is called when the test loop runs for the first time. - fn init(&mut self, context: LoopHandlerContext); - /// handle is called when we have a pending event from the test loop. - fn handle(&mut self, event: Event, data: &mut Data) -> Result<(), Event>; - /// try_drop is called when the TestLoop is dropped, but an event - /// remains in the event queue. If this handler knows that it's OK to - /// drop the event, it should return Ok(()); otherwise it should return - /// the original event as an Err. - /// - /// This is basically used for periodic timers, as it's OK to drop timers, - /// but not OK to drop an event that forgot to be handled. - fn try_drop(&self, event: Event) -> Result<(), Event>; -} - -/// Implementation of LoopEventHandlerImpl by a closure. We cache the context -/// upon receiving the init() call, so that we can pass a reference to the -/// closure every time we receive the handle() call. -struct LoopEventHandlerImplByFunction { - initial_event: Option<(Event, time::Duration)>, - handler: Box) -> Result<(), Event>>, - ok_to_drop: Box bool>, - context: Option>, -} - -impl LoopEventHandlerImpl - for LoopEventHandlerImplByFunction -{ - fn init(&mut self, context: LoopHandlerContext) { - if let Some((event, delay)) = self.initial_event.take() { - context.sender.send_with_delay(event, delay); - } - self.context = Some(context); - } - - fn handle(&mut self, event: Event, data: &mut Data) -> Result<(), Event> { - let context = self.context.as_ref().unwrap(); - (self.handler)(event, data, context) - } - - fn try_drop(&self, event: Event) -> Result<(), Event> { - if (self.ok_to_drop)(&event) { - Ok(()) - } else { - Err(event) - } + self.0(event, data) } } @@ -172,34 +71,6 @@ impl> TryIntoOrSelf for T { } } -/// Implements .widen() for an event handler. -struct WideningEventHandler(LoopEventHandler); - -impl< - Data, - Event, - OuterData: AsMut, - OuterEvent: TryIntoOrSelf + From + 'static, - > LoopEventHandlerImpl for WideningEventHandler -{ - fn init(&mut self, context: LoopHandlerContext) { - self.0.init(LoopHandlerContext { sender: context.sender.narrow(), clock: context.clock }) - } - - fn handle(&mut self, event: OuterEvent, data: &mut OuterData) -> Result<(), OuterEvent> { - let mut inner_data = data.as_mut(); - let inner_event = event.try_into_or_self()?; - self.0.handle(inner_event, &mut inner_data)?; - Ok(()) - } - - fn try_drop(&self, event: OuterEvent) -> Result<(), OuterEvent> { - let inner_event = event.try_into_or_self()?; - self.0.try_drop(inner_event)?; - Ok(()) - } -} - /// An event handler that puts the event into a vector in the Data, as long as /// the Data contains a Vec. (Use widen() right after). /// @@ -213,28 +84,3 @@ pub fn capture_events() -> LoopEventHandler, Event> { pub fn ignore_events() -> LoopEventHandler<(), Event> { LoopEventHandler::new_simple(|_, _| {}) } - -/// Periodically sends to the event loop the given event by the given interval. -/// Each time this event is handled, the given function is called. -/// The first invocation is triggered after the interval, not immediately. -pub fn interval( - interval: time::Duration, - event: Event, - func: impl Fn(&mut Data) + 'static, -) -> LoopEventHandler { - let event_cloned = event.clone(); - LoopEventHandler::new_with_initial_event( - event.clone(), - interval, - move |actual_event, data, context| { - if actual_event == event { - func(data); - context.sender.send_with_delay(actual_event, interval); - Ok(()) - } else { - Err(actual_event) - } - }, - move |actual_event| actual_event == &event_cloned, - ) -} diff --git a/core/async/src/test_loop/futures.rs b/core/async/src/test_loop/futures.rs index 9282ff7b795..ccf49414943 100644 --- a/core/async/src/test_loop/futures.rs +++ b/core/async/src/test_loop/futures.rs @@ -1,10 +1,12 @@ -use super::{delay_sender::DelaySender, event_handler::LoopEventHandler}; +use super::{delay_sender::DelaySender, event_handler::LoopEventHandler, TestLoop}; use crate::futures::{AsyncComputationSpawner, DelayedActionRunner}; +use crate::test_loop::event_handler::TryIntoOrSelf; use crate::time::Duration; use crate::{futures::FutureSpawner, messaging::CanSend}; use futures::future::BoxFuture; use futures::task::{waker_ref, ArcWake}; use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::task::Context; @@ -113,27 +115,24 @@ impl Debug for TestLoopDelayedActionEvent { /// An event handler that handles only `TestLoopDelayedActionEvent`s, by /// running the action encapsulated in the event. -pub fn drive_delayed_action_runners() -> LoopEventHandler> { - LoopEventHandler::new_with_drop( - |event, data, ctx| { - let mut runner = TestLoopDelayedActionRunner { sender: ctx.sender.clone() }; - (event.action)(data, &mut runner); - Ok(()) - }, - |_| { - // Delayed actions are usually used for timers, so let's just say - // it's OK to drop them at the end of the test. It would be hard - // to distinguish what sort of delayed action was being scheduled - // anyways. - true - }, - ) +pub fn drive_delayed_action_runners( + sender: DelaySender>, + shutting_down: Arc, +) -> LoopEventHandler> { + LoopEventHandler::new_simple(move |event: TestLoopDelayedActionEvent, data: &mut T| { + let mut runner = TestLoopDelayedActionRunner { + sender: sender.clone(), + shutting_down: shutting_down.clone(), + }; + (event.action)(data, &mut runner); + }) } /// `DelayedActionRunner` that schedules the action to be run later by the /// TestLoop event loop. pub struct TestLoopDelayedActionRunner { pub(crate) sender: DelaySender>, + pub(crate) shutting_down: Arc, } impl DelayedActionRunner for TestLoopDelayedActionRunner { @@ -143,6 +142,9 @@ impl DelayedActionRunner for TestLoopDelayedActionRunner { dur: Duration, action: Box) + Send + 'static>, ) { + if self.shutting_down.load(Ordering::Relaxed) { + return; + } self.sender.send_with_delay( TestLoopDelayedActionEvent { name: name.to_string(), action }, dur.try_into().unwrap(), @@ -150,6 +152,43 @@ impl DelayedActionRunner for TestLoopDelayedActionRunner { } } +impl TestLoop { + /// Shorthand for registering this frequently used handler. + pub fn register_delayed_action_handler(&mut self) + where + T: 'static, + Data: AsMut, + Event: TryIntoOrSelf> + + From> + + 'static, + { + self.register_handler( + drive_delayed_action_runners::(self.sender().narrow(), self.shutting_down()).widen(), + ); + } +} + +impl TestLoop, (usize, Event)> { + /// Shorthand for registering this frequently used handler for a multi-instance test. + pub fn register_delayed_action_handler_for_index(&mut self, idx: usize) + where + T: 'static, + Data: AsMut, + Event: TryIntoOrSelf> + + From> + + 'static, + { + self.register_handler( + drive_delayed_action_runners::( + self.sender().for_index(idx).narrow(), + self.shutting_down(), + ) + .widen() + .for_index(idx), + ); + } +} + /// An event that represents async computation. See async_computation_spawner() in DelaySender. pub struct TestLoopAsyncComputationEvent { name: String, diff --git a/core/async/src/test_loop/multi_instance.rs b/core/async/src/test_loop/multi_instance.rs deleted file mode 100644 index 2a84f2b3b2c..00000000000 --- a/core/async/src/test_loop/multi_instance.rs +++ /dev/null @@ -1,42 +0,0 @@ -use super::event_handler::{LoopEventHandler, LoopEventHandlerImpl, LoopHandlerContext}; - -/// Event handler that handles a specific single instance in a multi-instance -/// setup. -/// -/// To convert a single-instance handler to a multi-instance handler -/// (for one instance), use handler.for_index(index). -pub(crate) struct IndexedLoopEventHandler { - pub(crate) inner: LoopEventHandler, - pub(crate) index: usize, -} - -impl LoopEventHandlerImpl, (usize, Event)> - for IndexedLoopEventHandler -{ - fn init(&mut self, context: LoopHandlerContext<(usize, Event)>) { - self.inner.init(LoopHandlerContext { - sender: context.sender.for_index(self.index), - clock: context.clock, - }) - } - - fn handle( - &mut self, - event: (usize, Event), - data: &mut Vec, - ) -> Result<(), (usize, Event)> { - if event.0 == self.index { - self.inner.handle(event.1, &mut data[self.index]).map_err(|event| (self.index, event)) - } else { - Err(event) - } - } - - fn try_drop(&self, event: (usize, Event)) -> Result<(), (usize, Event)> { - if event.0 == self.index { - self.inner.try_drop(event.1).map_err(|event| (self.index, event)) - } else { - Err(event) - } - } -} diff --git a/core/async/src/time.rs b/core/async/src/time.rs index 4c4762a3d8f..2c61bd3333e 100644 --- a/core/async/src/time.rs +++ b/core/async/src/time.rs @@ -21,9 +21,10 @@ //! of different machines are not perfectly synchronized, and in extreme //! cases can be totally skewed. use once_cell::sync::Lazy; +use std::cmp::Ordering; +use std::collections::BinaryHeap; use std::sync::{Arc, Mutex}; pub use time::error; -use tokio::sync::watch; // TODO: consider wrapping these types to prevent interactions // with other time libraries, especially to prevent the direct access @@ -124,23 +125,46 @@ impl Clock { } struct FakeClockInner { - /// `mono` keeps the current time of the monotonic clock. - /// It is wrapped in watch::Sender, so that the value can - /// be observed from the clock::sleep() futures. - mono: watch::Sender, utc: Utc, - /// We need to keep it so that mono.send() always succeeds. - _mono_recv: watch::Receiver, + instant: Instant, + waiters: BinaryHeap, +} + +/// Whenever a user of a FakeClock calls `sleep` for `sleep_until`, we create a +/// `ClockWaiterInHeap` so that the returned future can be completed when the +/// clock advances past the desired deadline. +struct ClockWaiterInHeap { + deadline: Instant, + waker: tokio::sync::oneshot::Sender<()>, +} + +impl PartialEq for ClockWaiterInHeap { + fn eq(&self, other: &Self) -> bool { + self.deadline == other.deadline + } +} + +impl PartialOrd for ClockWaiterInHeap { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Eq for ClockWaiterInHeap {} + +impl Ord for ClockWaiterInHeap { + fn cmp(&self, other: &Self) -> Ordering { + other.deadline.cmp(&self.deadline) + } } impl FakeClockInner { pub fn new(utc: Utc) -> Self { - let (mono, _mono_recv) = watch::channel(*FAKE_CLOCK_MONO_START); - Self { utc, mono, _mono_recv } + Self { utc, instant: *FAKE_CLOCK_MONO_START, waiters: BinaryHeap::new() } } pub fn now(&mut self) -> Instant { - *self.mono.borrow() + self.instant } pub fn now_utc(&mut self) -> Utc { self.utc @@ -150,17 +174,19 @@ impl FakeClockInner { if d == Duration::ZERO { return; } - let now = *self.mono.borrow(); - self.mono.send(now + d).unwrap(); + self.instant += d; self.utc += d; + while let Some(earliest_waiter) = self.waiters.peek() { + if earliest_waiter.deadline <= self.instant { + self.waiters.pop().unwrap().waker.send(()).ok(); + } else { + break; + } + } } pub fn advance_until(&mut self, t: Instant) { - let now = *self.mono.borrow(); - if t <= now { - return; - } - self.mono.send(t).unwrap(); - self.utc += t - now; + let by = t - self.now(); + self.advance(by); } } @@ -198,19 +224,40 @@ impl FakeClock { /// Cancel-safe. pub async fn sleep(&self, d: Duration) { - let mut watch = self.0.lock().unwrap().mono.subscribe(); - let t = *watch.borrow() + d; - while *watch.borrow() < t { - watch.changed().await.unwrap(); + if d <= Duration::ZERO { + return; } + let receiver = { + let mut inner = self.0.lock().unwrap(); + let (sender, receiver) = tokio::sync::oneshot::channel(); + let waiter = ClockWaiterInHeap { waker: sender, deadline: inner.now() + d }; + inner.waiters.push(waiter); + receiver + }; + receiver.await.unwrap(); } /// Cancel-safe. pub async fn sleep_until(&self, t: Instant) { - let mut watch = self.0.lock().unwrap().mono.subscribe(); - while *watch.borrow() < t { - watch.changed().await.unwrap(); - } + let receiver = { + let mut inner = self.0.lock().unwrap(); + if inner.now() >= t { + return; + } + let (sender, receiver) = tokio::sync::oneshot::channel(); + let waiter = ClockWaiterInHeap { waker: sender, deadline: t }; + inner.waiters.push(waiter); + receiver + }; + receiver.await.unwrap(); + } + + /// Returns the earliest waiter, or None if no one is waiting on the clock. + /// The returned instant is guaranteed to be <= any waiter that is currently + /// waiting on the clock to advance. + pub fn first_waiter(&self) -> Option { + let inner = self.0.lock().unwrap(); + inner.waiters.peek().map(|waiter| waiter.deadline) } } diff --git a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs index 8ad9b1619bc..a5fc5a36ee6 100644 --- a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs @@ -1,12 +1,11 @@ use derive_enum_from_into::{EnumFrom, EnumTryInto}; use near_async::messaging::{noop, IntoMultiSender, IntoSender, MessageWithCallback, SendAsync}; use near_async::test_loop::adhoc::{handle_adhoc_events, AdhocEvent, AdhocEventSender}; -use near_async::test_loop::event_handler::{ - ignore_events, LoopEventHandler, LoopHandlerContext, TryIntoOrSelf, -}; +use near_async::test_loop::delay_sender::DelaySender; +use near_async::test_loop::event_handler::{ignore_events, LoopEventHandler, TryIntoOrSelf}; use near_async::test_loop::futures::{ - drive_async_computations, drive_delayed_action_runners, drive_futures, - TestLoopAsyncComputationEvent, TestLoopDelayedActionEvent, TestLoopTask, + drive_async_computations, drive_futures, TestLoopAsyncComputationEvent, + TestLoopDelayedActionEvent, TestLoopTask, }; use near_async::test_loop::TestLoopBuilder; use near_async::time::Duration; @@ -319,12 +318,8 @@ fn test_client_with_multi_test_loop() { test.register_handler(drive_async_computations().widen().for_index(idx)); // Delayed actions. - test.register_handler( - drive_delayed_action_runners::().widen().for_index(idx), - ); - test.register_handler( - drive_delayed_action_runners::().widen().for_index(idx), - ); + test.register_delayed_action_handler_for_index::(idx); + test.register_delayed_action_handler_for_index::(idx); // Messages to the client. test.register_handler( @@ -358,8 +353,12 @@ fn test_client_with_multi_test_loop() { } // Handles network routing. Outgoing messages are handled by emitting incoming messages to the // appropriate component of the appropriate node index. - test.register_handler(route_network_messages_to_client(NETWORK_DELAY)); - test.register_handler(route_shards_manager_network_messages(NETWORK_DELAY)); + test.register_handler(route_network_messages_to_client(test.sender(), NETWORK_DELAY)); + test.register_handler(route_shards_manager_network_messages( + test.sender(), + test.clock(), + NETWORK_DELAY, + )); // Bootstrap the test by starting the components. // We use adhoc events for these, just so that the visualizer can see these as events rather @@ -367,14 +366,16 @@ fn test_client_with_multi_test_loop() { // the send_adhoc_event part and the test would still work. for idx in 0..NUM_CLIENTS { let sender = test.sender().for_index(idx); + let shutting_down = test.shutting_down(); test.sender().for_index(idx).send_adhoc_event("start_client", move |data| { - data.client.start(&mut sender.into_delayed_action_runner()); + data.client.start(&mut sender.into_delayed_action_runner(shutting_down)); }); let sender = test.sender().for_index(idx); + let shutting_down = test.shutting_down(); test.sender().for_index(idx).send_adhoc_event("start_shards_manager", move |data| { data.shards_manager.periodically_resend_chunk_requests( - &mut sender.into_delayed_action_runner(), + &mut sender.into_delayed_action_runner(shutting_down), Duration::milliseconds(100), ); }) @@ -443,7 +444,7 @@ fn test_client_with_multi_test_loop() { // Give the test a chance to finish off remaining important events in the event loop, which can // be important for properly shutting down the nodes. - test.finish_remaining_events(Duration::seconds(1)); + test.shutdown_and_drain_remaining_events(Duration::seconds(1)); } /// Handles outgoing network messages, and turns them into incoming client messages. @@ -453,112 +454,107 @@ pub fn route_network_messages_to_client< + From + From, >( + sender: DelaySender<(usize, Event)>, network_delay: Duration, ) -> LoopEventHandler { // let mut route_back_lookup: HashMap = HashMap::new(); // let mut next_hash: u64 = 0; - LoopEventHandler::new( - move |event: (usize, Event), - data: &mut Data, - context: &LoopHandlerContext<(usize, Event)>| { - let (idx, event) = event; - let message = event.try_into_or_self().map_err(|event| (idx, event.into()))?; - let PeerManagerMessageRequest::NetworkRequests(request) = message else { - return Err((idx, message.into())); - }; - - let client_senders = (0..data.num_accounts()) + LoopEventHandler::new(move |event: (usize, Event), data: &mut Data| { + let (idx, event) = event; + let message = event.try_into_or_self().map_err(|event| (idx, event.into()))?; + let PeerManagerMessageRequest::NetworkRequests(request) = message else { + return Err((idx, message.into())); + }; + + let client_senders = (0..data.num_accounts()) .map(|idx| { - context - .sender + sender .with_additional_delay(network_delay) .for_index(idx) .into_wrapped_multi_sender::() }) .collect::>(); - match request { - NetworkRequests::Block { block } => { - for other_idx in 0..data.num_accounts() { - if other_idx != idx { - drop(client_senders[other_idx].send_async(BlockResponse { - block: block.clone(), - peer_id: PeerId::random(), - was_requested: false, - })); - } - } - } - NetworkRequests::Approval { approval_message } => { - let other_idx = data.index_for_account(&approval_message.target); + match request { + NetworkRequests::Block { block } => { + for other_idx in 0..data.num_accounts() { if other_idx != idx { - drop(client_senders[other_idx].send_async(BlockApproval( - approval_message.approval, - PeerId::random(), - ))); - } else { - tracing::warn!("Dropping message to self"); + drop(client_senders[other_idx].send_async(BlockResponse { + block: block.clone(), + peer_id: PeerId::random(), + was_requested: false, + })); } } - NetworkRequests::ForwardTx(account, transaction) => { - let other_idx = data.index_for_account(&account); - if other_idx != idx { - drop(client_senders[other_idx].send_async(ProcessTxRequest { - transaction, - is_forwarded: true, - check_only: false, - })) - } else { - tracing::warn!("Dropping message to self"); - } + } + NetworkRequests::Approval { approval_message } => { + let other_idx = data.index_for_account(&approval_message.target); + if other_idx != idx { + drop( + client_senders[other_idx] + .send_async(BlockApproval(approval_message.approval, PeerId::random())), + ); + } else { + tracing::warn!("Dropping message to self"); } - NetworkRequests::ChunkEndorsement(target, endorsement) => { - let other_idx = data.index_for_account(&target); - if other_idx != idx { + } + NetworkRequests::ForwardTx(account, transaction) => { + let other_idx = data.index_for_account(&account); + if other_idx != idx { + drop(client_senders[other_idx].send_async(ProcessTxRequest { + transaction, + is_forwarded: true, + check_only: false, + })) + } else { + tracing::warn!("Dropping message to self"); + } + } + NetworkRequests::ChunkEndorsement(target, endorsement) => { + let other_idx = data.index_for_account(&target); + if other_idx != idx { + drop( + client_senders[other_idx].send_async(ChunkEndorsementMessage(endorsement)), + ); + } else { + tracing::warn!("Dropping message to self"); + } + } + NetworkRequests::ChunkStateWitness(targets, witness) => { + let other_idxes = targets + .iter() + .map(|account| data.index_for_account(account)) + .collect::>(); + for other_idx in &other_idxes { + if *other_idx != idx { drop( - client_senders[other_idx] - .send_async(ChunkEndorsementMessage(endorsement)), + client_senders[*other_idx] + .send_async(ChunkStateWitnessMessage(witness.clone())), ); } else { - tracing::warn!("Dropping message to self"); - } - } - NetworkRequests::ChunkStateWitness(targets, witness) => { - let other_idxes = targets - .iter() - .map(|account| data.index_for_account(account)) - .collect::>(); - for other_idx in &other_idxes { - if *other_idx != idx { - drop( - client_senders[*other_idx] - .send_async(ChunkStateWitnessMessage(witness.clone())), - ); - } else { - tracing::warn!( + tracing::warn!( "ChunkStateWitness asked to send to nodes {:?}, but {} is ourselves, so skipping that", other_idxes, idx); - } } } - NetworkRequests::ChunkStateWitnessAck(target, witness_ack) => { - let other_idx = data.index_for_account(&target); - if other_idx != idx { - drop( - client_senders[other_idx] - .send_async(ChunkStateWitnessAckMessage(witness_ack)), - ); - } else { - tracing::warn!("Dropping state-witness-ack message to self"); - } + } + NetworkRequests::ChunkStateWitnessAck(target, witness_ack) => { + let other_idx = data.index_for_account(&target); + if other_idx != idx { + drop( + client_senders[other_idx] + .send_async(ChunkStateWitnessAckMessage(witness_ack)), + ); + } else { + tracing::warn!("Dropping state-witness-ack message to self"); } - // TODO: Support more network message types as we expand the test. - _ => return Err((idx, PeerManagerMessageRequest::NetworkRequests(request).into())), } + // TODO: Support more network message types as we expand the test. + _ => return Err((idx, PeerManagerMessageRequest::NetworkRequests(request).into())), + } - Ok(()) - }, - ) + Ok(()) + }) } // TODO: This would be a good starting point for turning this into a test util. diff --git a/integration-tests/src/tests/client/features/simple_test_loop_example.rs b/integration-tests/src/tests/client/features/simple_test_loop_example.rs index 0aa5bd13a4a..7c9ad296d63 100644 --- a/integration-tests/src/tests/client/features/simple_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/simple_test_loop_example.rs @@ -1,9 +1,9 @@ use derive_enum_from_into::{EnumFrom, EnumTryInto}; -use near_async::futures::DelayedActionRunnerExt; use near_async::messaging::{noop, IntoMultiSender, IntoSender}; +use near_async::test_loop::adhoc::{handle_adhoc_events, AdhocEvent, AdhocEventSender}; use near_async::test_loop::futures::{ - drive_async_computations, drive_delayed_action_runners, drive_futures, - TestLoopAsyncComputationEvent, TestLoopDelayedActionEvent, TestLoopTask, + drive_async_computations, drive_futures, TestLoopAsyncComputationEvent, + TestLoopDelayedActionEvent, TestLoopTask, }; use near_async::test_loop::TestLoopBuilder; use near_async::time::Duration; @@ -54,10 +54,17 @@ struct TestData { pub shards_manager: ShardsManager, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + #[derive(EnumTryInto, Debug, EnumFrom)] #[allow(clippy::large_enum_variant)] enum TestEvent { Task(Arc), + Adhoc(AdhocEvent), AsyncComputation(TestLoopAsyncComputationEvent), ClientDelayedActions(TestLoopDelayedActionEvent), ClientEventFromNetwork(ClientSenderForNetworkMessage), @@ -230,18 +237,17 @@ fn test_client_with_simple_test_loop() { .widen(), ); test.register_handler(drive_futures().widen()); + test.register_handler(handle_adhoc_events::().widen()); test.register_handler(drive_async_computations().widen()); - test.register_handler(drive_delayed_action_runners::().widen()); + test.register_delayed_action_handler::(); test.register_handler(forward_client_request_to_shards_manager().widen()); // TODO: handle additional events. - test.sender().into_delayed_action_runner::().run_later( - "start_client", - Duration::ZERO, - |client, runner| { - client.start(runner); - }, - ); + let mut delayed_runner = + test.sender().into_delayed_action_runner::(test.shutting_down()); + test.sender().send_adhoc_event("start_client", move |data| { + data.client.start(&mut delayed_runner); + }); test.run_for(Duration::seconds(10)); - test.finish_remaining_events(Duration::seconds(1)); + test.shutdown_and_drain_remaining_events(Duration::seconds(1)); }