From e20cc4098df3e539b00964eee9111d8dc783ad89 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sat, 6 Jan 2024 09:01:14 +0300 Subject: [PATCH 1/9] benches: add sync_broadcast benchmark Refs: #5465 --- benches/Cargo.toml | 5 +++ benches/sync_broadcast.rs | 82 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 benches/sync_broadcast.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 1eea2e04489..c581055cf65 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -26,6 +26,11 @@ name = "spawn" path = "spawn.rs" harness = false +[[bench]] +name = "sync_broadcast" +path = "sync_broadcast.rs" +harness = false + [[bench]] name = "sync_mpsc" path = "sync_mpsc.rs" diff --git a/benches/sync_broadcast.rs b/benches/sync_broadcast.rs new file mode 100644 index 00000000000..38a2141387b --- /dev/null +++ b/benches/sync_broadcast.rs @@ -0,0 +1,82 @@ +use rand::{Rng, RngCore, SeedableRng}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::sync::{broadcast, Notify}; + +use criterion::measurement::WallTime; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(6) + .build() + .unwrap() +} + +fn do_work(rng: &mut impl RngCore) -> u32 { + use std::fmt::Write; + let mut message = String::new(); + for i in 1..=10 { + let _ = write!(&mut message, " {i}={}", rng.gen::()); + } + message + .as_bytes() + .iter() + .map(|&c| c as u32) + .fold(0, u32::wrapping_add) +} + +fn contention_impl(g: &mut BenchmarkGroup) { + let rt = rt(); + + let (tx, _rx) = broadcast::channel::(1000); + let wg = Arc::new((AtomicUsize::new(0), Notify::new())); + + for n in 0..N_TASKS { + let wg = wg.clone(); + let mut rx = tx.subscribe(); + let mut rng = rand::rngs::StdRng::seed_from_u64(n as u64); + rt.spawn(async move { + while let Ok(_) = rx.recv().await { + let r = do_work(&mut rng); + let _ = black_box(r); + if wg.0.fetch_sub(1, Ordering::Relaxed) == 1 { + wg.1.notify_one(); + } + } + }); + } + + const N_ITERS: usize = 100; + + g.bench_function(N_TASKS.to_string(), |b| { + b.iter(|| { + rt.block_on({ + let wg = wg.clone(); + let tx = tx.clone(); + async move { + for i in 0..N_ITERS { + assert_eq!(wg.0.fetch_add(N_TASKS, Ordering::Relaxed), 0); + tx.send(i).unwrap(); + while wg.0.load(Ordering::Relaxed) > 0 { + wg.1.notified().await; + } + } + } + }) + }) + }); +} + +fn bench_contention(c: &mut Criterion) { + let mut group = c.benchmark_group("contention"); + contention_impl::<10>(&mut group); + contention_impl::<100>(&mut group); + contention_impl::<500>(&mut group); + contention_impl::<1000>(&mut group); + group.finish(); +} + +criterion_group!(contention, bench_contention); + +criterion_main!(contention); From 55b4d159210853d0917bf74a3efe430bcf632984 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Thu, 18 Jan 2024 08:33:39 +0300 Subject: [PATCH 2/9] Make waiter.queued atomic --- tokio/src/sync/broadcast.rs | 51 +++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 568a50bd59b..5e4d8612149 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -127,7 +127,8 @@ use std::future::Future; use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; use std::task::{Context, Poll, Waker}; use std::usize; @@ -354,7 +355,7 @@ struct Slot { /// An entry in the wait queue. struct Waiter { /// True if queued. - queued: bool, + queued: AtomicBool, /// Task waiting on the broadcast channel. waker: Option, @@ -369,7 +370,7 @@ struct Waiter { impl Waiter { fn new() -> Self { Self { - queued: false, + queued: AtomicBool::new(false), waker: None, pointers: linked_list::Pointers::new(), _p: PhantomPinned, @@ -901,8 +902,7 @@ impl Shared { // Safety: `tail` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(waiter.queued); - waiter.queued = false; + assert!(waiter.queued.swap(false, Release)); if let Some(waker) = waiter.waker.take() { wakers.push(waker); @@ -1104,8 +1104,7 @@ impl Receiver { } } - if !(*ptr).queued { - (*ptr).queued = true; + if !(*ptr).queued.swap(true, Relaxed) { tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } }); @@ -1357,7 +1356,7 @@ impl<'a, T> Recv<'a, T> { Recv { receiver, waiter: UnsafeCell::new(Waiter { - queued: false, + queued: AtomicBool::new(false), waker: None, pointers: linked_list::Pointers::new(), _p: PhantomPinned, @@ -1402,22 +1401,30 @@ where impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { - // Acquire the tail lock. This is required for safety before accessing - // the waiter node. - let mut tail = self.receiver.shared.tail.lock(); - - // safety: tail lock is held - let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued }); + let queued = self + .waiter + .with(|ptr| unsafe { (*ptr).queued.load(Acquire) }); if queued { - // Remove the node - // - // safety: tail lock is held and the wait node is verified to be in - // the list. - unsafe { - self.waiter.with_mut(|ptr| { - tail.waiters.remove((&mut *ptr).into()); - }); + // Acquire the tail lock. This is required for safety before accessing + // the waiter node. + let mut tail = self.receiver.shared.tail.lock(); + + // safety: tail lock is held + let queued = self + .waiter + .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); + + if queued { + // Remove the node + // + // safety: tail lock is held and the wait node is verified to be in + // the list. + unsafe { + self.waiter.with_mut(|ptr| { + tail.waiters.remove((&mut *ptr).into()); + }); + } } } } From e9a94f8a1675ed7d3694b5f857434714b888504b Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Fri, 19 Jan 2024 17:07:48 +0300 Subject: [PATCH 3/9] Add comments --- tokio/src/sync/broadcast.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 5e4d8612149..c87baaa14c5 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -902,11 +902,14 @@ impl Shared { // Safety: `tail` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(waiter.queued.swap(false, Release)); - if let Some(waker) = waiter.waker.take() { wakers.push(waker); } + + // `Release` is needed to synchronize with `Recv::drop`. + // It is critical to set this variable **after** waker + // is extracted, otherwise we may data race with `Recv::drop`. + assert!(waiter.queued.swap(false, Release)); } None => { break 'outer; @@ -1104,6 +1107,11 @@ impl Receiver { } } + // If the waiter is not already queued, enqueue it. + // Relaxed memory order suffices because we don't need + // to synchronize with `Recv::drop` here (calling + // `Receiver::recv_ref` with a waiter implies ownership + // of the corresponding `Recv`). if !(*ptr).queued.swap(true, Relaxed) { tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } @@ -1401,16 +1409,23 @@ where impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { + // Safety: `waiter.queued` is atomic. + // Acquire ordering is required to synchronize with + // `Shared::notify_rx` before we drop the object. let queued = self .waiter .with(|ptr| unsafe { (*ptr).queued.load(Acquire) }); + // If the waiter is queued, we need to unlink it from the waiters list. + // If not, no further synchronization is required, since the waiter + // is not in the list and, as such, is not shared with any other threads. if queued { // Acquire the tail lock. This is required for safety before accessing // the waiter node. let mut tail = self.receiver.shared.tail.lock(); - // safety: tail lock is held + // Safety: tail lock is held. + // Relaxed order suffices because we hold the tail lock. let queued = self .waiter .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); From 000488ffc814e628f5d09397e47f1fde60797809 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Fri, 19 Jan 2024 18:03:58 +0300 Subject: [PATCH 4/9] Fix aliasing issue & use non-atomic write --- tokio/src/sync/broadcast.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index c87baaa14c5..5e5ac2996b2 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -898,18 +898,20 @@ impl Shared { 'outer: loop { while wakers.can_push() { match list.pop_back_locked(&mut tail) { - Some(mut waiter) => { - // Safety: `tail` lock is still held. - let waiter = unsafe { waiter.as_mut() }; - - if let Some(waker) = waiter.waker.take() { - wakers.push(waker); + Some(waiter) => { + unsafe { + // Safety: accessing `waker` is safe because + // the tail lock is held. + if let Some(waker) = (*waiter.as_ptr()).waker.take() { + wakers.push(waker); + } + + // Safety: `queued` is atomic. + // `Release` is needed to synchronize with `Recv::drop`. + // It is critical to set this variable **after** waker + // is extracted, otherwise we may data race with `Recv::drop`. + assert!((*waiter.as_ptr()).queued.swap(false, Release)); } - - // `Release` is needed to synchronize with `Recv::drop`. - // It is critical to set this variable **after** waker - // is extracted, otherwise we may data race with `Recv::drop`. - assert!(waiter.queued.swap(false, Release)); } None => { break 'outer; @@ -1108,11 +1110,8 @@ impl Receiver { } // If the waiter is not already queued, enqueue it. - // Relaxed memory order suffices because we don't need - // to synchronize with `Recv::drop` here (calling - // `Receiver::recv_ref` with a waiter implies ownership - // of the corresponding `Recv`). - if !(*ptr).queued.swap(true, Relaxed) { + if !*(*ptr).queued.get_mut() { + *(*ptr).queued.get_mut() = true; tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } }); From 8467890b2e1c2cb777563d0dad7e070429ea11ef Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Fri, 19 Jan 2024 18:16:10 +0300 Subject: [PATCH 5/9] Use direct access instead of Relaxed ordering --- tokio/src/sync/broadcast.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 5e5ac2996b2..f9ffc606359 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -128,7 +128,7 @@ use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; +use std::sync::atomic::Ordering::{Acquire, Release, SeqCst}; use std::task::{Context, Poll, Waker}; use std::usize; @@ -1424,10 +1424,9 @@ impl<'a, T> Drop for Recv<'a, T> { let mut tail = self.receiver.shared.tail.lock(); // Safety: tail lock is held. - // Relaxed order suffices because we hold the tail lock. let queued = self .waiter - .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); + .with_mut(|ptr| unsafe { *(*ptr).queued.get_mut() }); if queued { // Remove the node From a7fc0bbb9ca71512846b28f85a9f0e4e9f3e6e4f Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sat, 27 Jan 2024 12:40:30 +0300 Subject: [PATCH 6/9] Replace swap with load+store --- tokio/src/sync/broadcast.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index f9ffc606359..aeb71c08149 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -117,7 +117,7 @@ //! ``` use crate::loom::cell::UnsafeCell; -use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; @@ -127,8 +127,7 @@ use std::future::Future; use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::{Acquire, Release, SeqCst}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; use std::task::{Context, Poll, Waker}; use std::usize; @@ -907,10 +906,13 @@ impl Shared { } // Safety: `queued` is atomic. + let queued = &(*waiter.as_ptr()).queued; + // `Relaxed` suffices because the tail lock is held. + assert!(queued.load(Relaxed)); // `Release` is needed to synchronize with `Recv::drop`. // It is critical to set this variable **after** waker // is extracted, otherwise we may data race with `Recv::drop`. - assert!((*waiter.as_ptr()).queued.swap(false, Release)); + queued.store(false, Release); } } None => { From be6b4bc12ee25f9b8c85acf35784fde727782765 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sat, 27 Jan 2024 12:59:59 +0300 Subject: [PATCH 7/9] Use load/store instead of get_mut --- tokio/src/sync/broadcast.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index aeb71c08149..6bfb844734f 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1112,8 +1112,12 @@ impl Receiver { } // If the waiter is not already queued, enqueue it. - if !*(*ptr).queued.get_mut() { - *(*ptr).queued.get_mut() = true; + // `Relaxed` order suffices: we have synchronized with + // all writers through the tail lock that we hold. + if (*ptr).queued.load(Relaxed) { + // `Relaxed` order suffices: all the readers will + // synchronize with this write through the tail lock. + (*ptr).queued.store(true, Relaxed); tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } }); From f93c70488357ad4afecab680f5c0eaef854ba1be Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sat, 27 Jan 2024 13:03:01 +0300 Subject: [PATCH 8/9] Use load instead of get_mut --- tokio/src/sync/broadcast.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 6bfb844734f..a6179a46bc7 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1430,9 +1430,10 @@ impl<'a, T> Drop for Recv<'a, T> { let mut tail = self.receiver.shared.tail.lock(); // Safety: tail lock is held. + // `Relaxed` order suffices because we hold the tail lock. let queued = self .waiter - .with_mut(|ptr| unsafe { *(*ptr).queued.get_mut() }); + .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) }); if queued { // Remove the node From c86619e39af98cc5d8b2ed51dc2e0f153e877162 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Sat, 27 Jan 2024 13:09:03 +0300 Subject: [PATCH 9/9] Fix a typo --- tokio/src/sync/broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index a6179a46bc7..499e5296da4 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1114,7 +1114,7 @@ impl Receiver { // If the waiter is not already queued, enqueue it. // `Relaxed` order suffices: we have synchronized with // all writers through the tail lock that we hold. - if (*ptr).queued.load(Relaxed) { + if !(*ptr).queued.load(Relaxed) { // `Relaxed` order suffices: all the readers will // synchronize with this write through the tail lock. (*ptr).queued.store(true, Relaxed);