Skip to content

Commit

Permalink
Spawn expiration tracker task only once per PendingRequests instance
Browse files Browse the repository at this point in the history
This seems to fix the block_nonce_tamper test failure but it's not exactly clear why. My educated guess is that maybe `DelayQueue` doesn't like to be polled from multiple tasks at the same time.
  • Loading branch information
madadam committed Aug 15, 2024
1 parent bc6facf commit 3bcd331
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions lib/src/network/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use deadlock::BlockingMutex;
use scoped_task::ScopedJoinHandle;
use std::{future, sync::Arc, task::ready};
use std::{task::Poll, time::Instant};
use tokio::sync::Notify;

pub(crate) enum PendingRequest {
RootNode(PublicKey, PendingDebugRequest),
Expand Down Expand Up @@ -94,19 +95,28 @@ pub(crate) enum Key {
pub(super) struct PendingRequests {
monitor: Arc<RepositoryMonitor>,
map: Arc<BlockingMutex<DelayMap<Key, RequestData>>>,
// Notify when item is inserted into previously empty map. This restarts the expiration tracker
// task.
add_notify: Arc<Notify>,
// This is to ensure the `run_expiration_tracker` task is destroyed with PendingRequests (as
// opposed to the task being destroyed "sometime after"). This is important because the task
// holds an Arc to the RepositoryMonitor which must be destroyed prior to reimporting its
// corresponding repository if the user decides to do so.
tracker_task: BlockingMutex<Option<ScopedJoinHandle<()>>>,
_expiration_tracker_task: ScopedJoinHandle<()>,
}

impl PendingRequests {
pub fn new(monitor: Arc<RepositoryMonitor>) -> Self {
let map = Arc::new(BlockingMutex::new(DelayMap::default()));
let add_notify = Arc::new(Notify::new());

Self {
monitor,
map: Arc::new(BlockingMutex::new(DelayMap::default())),
tracker_task: BlockingMutex::new(None),
monitor: monitor.clone(),
map: map.clone(),
add_notify: add_notify.clone(),
_expiration_tracker_task: scoped_task::spawn(run_expiration_tracker(
monitor, map, add_notify,
)),
}
}

Expand Down Expand Up @@ -144,13 +154,8 @@ impl PendingRequests {
REQUEST_TIMEOUT,
);

// The expiration tracker task is started each time an item is inserted into previously
// empty map and stopped when the map becomes empty again.
if map.len() == 1 {
*self.tracker_task.lock().unwrap() = Some(scoped_task::spawn(run_expiration_tracker(
self.monitor.clone(),
self.map.clone(),
)));
self.add_notify.notify_waiters();
}

request_added(&self.monitor, &key);
Expand Down Expand Up @@ -192,7 +197,9 @@ fn request_added(monitor: &RepositoryMonitor, key: &Key) {
monitor.index_requests_sent.increment(1);
monitor.index_requests_inflight.increment(1.0);
}
Key::Block(_) => {
Key::Block(block_id) => {
tracing::warn!(?block_id, "block request added");

monitor.block_requests_sent.increment(1);
monitor.block_requests_inflight.increment(1.0);
}
Expand All @@ -203,21 +210,34 @@ fn request_added(monitor: &RepositoryMonitor, key: &Key) {
fn request_removed(monitor: &RepositoryMonitor, key: &Key) {
match key {
Key::RootNode(_) | Key::ChildNodes { .. } => monitor.index_requests_inflight.decrement(1.0),
Key::Block(_) => monitor.block_requests_inflight.decrement(1.0),
Key::Block(block_id) => {
tracing::warn!(?block_id, "block request removed");

monitor.block_requests_inflight.decrement(1.0);
}
Key::BlockOffer(_) => (),
}
}

async fn run_expiration_tracker(
monitor: Arc<RepositoryMonitor>,
request_map: Arc<BlockingMutex<DelayMap<Key, RequestData>>>,
map: Arc<BlockingMutex<DelayMap<Key, RequestData>>>,
add_notify: Arc<Notify>,
) {
// NOTE: The `expired` fn does not always complete when the last item is removed from the
// DelayMap. There is an issue in the DelayQueue used by DelayMap, reported here:
// https://github.com/tokio-rs/tokio/issues/6751
while let Some((key, _)) = expired(&request_map).await {
monitor.request_timeouts.increment(1);
request_removed(&monitor, &key);

loop {
let notified = add_notify.notified();

while let Some((key, _)) = expired(&map).await {
monitor.request_timeouts.increment(1);
request_removed(&monitor, &key);
}

// Last item removed from the map. Wait until new item added.
notified.await;
}
}

Expand Down

0 comments on commit 3bcd331

Please sign in to comment.