Skip to content

Commit

Permalink
fix potential race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Aug 7, 2024
1 parent 5f192fa commit 17d7ed0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 25 deletions.
41 changes: 19 additions & 22 deletions src/arc_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub(crate) struct ArcSliceInnerMeta {
pub(crate) struct ArcSlotInner {
index: usize,
wake_lock: AtomicBool,
woken: AtomicBool,
// woken: AtomicBool,
next: AtomicUsize,
}

Expand Down Expand Up @@ -135,24 +135,14 @@ impl ArcSliceInner {
/// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
///
/// Safety: index must be within capacity
pub(crate) unsafe fn push(&self, index: usize) -> bool {
pub(crate) unsafe fn push(&self, index: usize) {
let node = self.slice.get_unchecked(index);

node.woken.store(true, Ordering::Relaxed);
// if node.wake_lock.swap(true, Ordering::SeqCst) {
// // already woken
// return false;
// }

node.next.store(usize::MAX, Ordering::Relaxed);

let prev = self.meta.list_head.swap(index, Ordering::AcqRel);
self.slice
.get_unchecked(prev)
.next
.store(index, Ordering::Release);

true
let prev_node = self.slice.get_unchecked(prev);
prev_node.next.store(index, Ordering::Release);
}

/// The pop function from the 1024cores intrusive MPSC queue algorithm
Expand All @@ -174,11 +164,11 @@ impl ArcSliceInner {
next = self.slice.get_unchecked(next).next.load(Ordering::Acquire);
}

if next <= self.meta.len {
if next != usize::MAX {
*self.meta.list_tail.get() = next;
debug_assert!(tail != self.meta.len);

// self.slice[tail].wake_lock.store(false, Ordering::SeqCst);
self.slice[tail].wake_lock.store(false, Ordering::SeqCst);
return ReadySlot::Ready(tail);
}

Expand All @@ -190,10 +180,10 @@ impl ArcSliceInner {

next = self.slice.get_unchecked(tail).next.load(Ordering::Acquire);

if next <= self.meta.len {
if next != usize::MAX {
*self.meta.list_tail.get() = next;

// self.slice[tail].wake_lock.store(false, Ordering::SeqCst);
self.slice[tail].wake_lock.store(false, Ordering::SeqCst);
return ReadySlot::Ready(tail);
}

Expand All @@ -212,6 +202,7 @@ mod slot {
alloc::Layout,
mem::align_of,
ptr,
sync::atomic::Ordering,
task::{RawWaker, RawWakerVTable, Waker},
};

Expand Down Expand Up @@ -299,9 +290,15 @@ mod slot {
// then call the stored waker to trigger a poll
unsafe fn wake_by_ref(waker: *const ()) {
let slot = waker.cast::<ArcSlotInner>();
let index = *core::ptr::addr_of!((*slot).index);
let inner = inner_ref(slot);
if inner.push(index) {

let node = &*slot;
// node.woken.store(true, Ordering::Relaxed);
let prev = node.wake_lock.swap(true, Ordering::SeqCst);

if !prev {
let index = node.index;
let inner = inner_ref(slot);
inner.push(index);
inner.meta.waker.notify();
}
}
Expand Down Expand Up @@ -452,7 +449,7 @@ impl ArcSlice {
ArcSlotInner {
index: i,
wake_lock: AtomicBool::new(false),
woken: AtomicBool::new(false),
// woken: AtomicBool::new(false),
next: AtomicUsize::new(usize::MAX),
},
);
Expand Down
4 changes: 1 addition & 3 deletions src/futures_unordered_bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl<F> FuturesUnorderedBounded<F> {
}

match unsafe { self.shared.pop() } {
crate::arc_slice::ReadySlot::None => break,
crate::arc_slice::ReadySlot::None => return Poll::Pending,
crate::arc_slice::ReadySlot::Inconsistent => {
cx.waker().wake_by_ref();
return Poll::Pending;
Expand All @@ -224,8 +224,6 @@ impl<F> FuturesUnorderedBounded<F> {
}
}
}

Poll::Pending
}
}

Expand Down

0 comments on commit 17d7ed0

Please sign in to comment.