From 3bcd331713cc387e276083ed03b915745a4b24c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Thu, 15 Aug 2024 14:10:11 +0200 Subject: [PATCH] Spawn expiration tracker task only once per PendingRequests instance This seems to fix the block_nonce_tamper test failure but it's not exactly clear why. My educated guess is that maybe `DelayQueue` doesn't like to be polled from multiple tasks at the same time. --- lib/src/network/pending.rs | 52 ++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/lib/src/network/pending.rs b/lib/src/network/pending.rs index ca2b5dbb3..8820dc2e4 100644 --- a/lib/src/network/pending.rs +++ b/lib/src/network/pending.rs @@ -14,6 +14,7 @@ use deadlock::BlockingMutex; use scoped_task::ScopedJoinHandle; use std::{future, sync::Arc, task::ready}; use std::{task::Poll, time::Instant}; +use tokio::sync::Notify; pub(crate) enum PendingRequest { RootNode(PublicKey, PendingDebugRequest), @@ -94,19 +95,28 @@ pub(crate) enum Key { pub(super) struct PendingRequests { monitor: Arc, map: Arc>>, + // Notify when item is inserted into previously empty map. This restarts the expiration tracker + // task. + add_notify: Arc, // This is to ensure the `run_expiration_tracker` task is destroyed with PendingRequests (as // opposed to the task being destroyed "sometime after"). This is important because the task // holds an Arc to the RepositoryMonitor which must be destroyed prior to reimporting its // corresponding repository if the user decides to do so. - tracker_task: BlockingMutex>>, + _expiration_tracker_task: ScopedJoinHandle<()>, } impl PendingRequests { pub fn new(monitor: Arc) -> Self { + let map = Arc::new(BlockingMutex::new(DelayMap::default())); + let add_notify = Arc::new(Notify::new()); + Self { - monitor, - map: Arc::new(BlockingMutex::new(DelayMap::default())), - tracker_task: BlockingMutex::new(None), + monitor: monitor.clone(), + map: map.clone(), + add_notify: add_notify.clone(), + _expiration_tracker_task: scoped_task::spawn(run_expiration_tracker( + monitor, map, add_notify, + )), } } @@ -144,13 +154,8 @@ impl PendingRequests { REQUEST_TIMEOUT, ); - // The expiration tracker task is started each time an item is inserted into previously - // empty map and stopped when the map becomes empty again. if map.len() == 1 { - *self.tracker_task.lock().unwrap() = Some(scoped_task::spawn(run_expiration_tracker( - self.monitor.clone(), - self.map.clone(), - ))); + self.add_notify.notify_waiters(); } request_added(&self.monitor, &key); @@ -192,7 +197,9 @@ fn request_added(monitor: &RepositoryMonitor, key: &Key) { monitor.index_requests_sent.increment(1); monitor.index_requests_inflight.increment(1.0); } - Key::Block(_) => { + Key::Block(block_id) => { + tracing::warn!(?block_id, "block request added"); + monitor.block_requests_sent.increment(1); monitor.block_requests_inflight.increment(1.0); } @@ -203,21 +210,34 @@ fn request_added(monitor: &RepositoryMonitor, key: &Key) { fn request_removed(monitor: &RepositoryMonitor, key: &Key) { match key { Key::RootNode(_) | Key::ChildNodes { .. } => monitor.index_requests_inflight.decrement(1.0), - Key::Block(_) => monitor.block_requests_inflight.decrement(1.0), + Key::Block(block_id) => { + tracing::warn!(?block_id, "block request removed"); + + monitor.block_requests_inflight.decrement(1.0); + } Key::BlockOffer(_) => (), } } async fn run_expiration_tracker( monitor: Arc, - request_map: Arc>>, + map: Arc>>, + add_notify: Arc, ) { // NOTE: The `expired` fn does not always complete when the last item is removed from the // DelayMap. There is an issue in the DelayQueue used by DelayMap, reported here: // https://github.com/tokio-rs/tokio/issues/6751 - while let Some((key, _)) = expired(&request_map).await { - monitor.request_timeouts.increment(1); - request_removed(&monitor, &key); + + loop { + let notified = add_notify.notified(); + + while let Some((key, _)) = expired(&map).await { + monitor.request_timeouts.increment(1); + request_removed(&monitor, &key); + } + + // Last item removed from the map. Wait until new item added. + notified.await; } }