Skip to content

Commit

Permalink
added AtomicWeak and store() method
Browse files Browse the repository at this point in the history
  • Loading branch information
wyang5 committed Mar 4, 2024
1 parent 5c97459 commit 08b7930
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 70 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
on:
pull_request:
branches:
- main
push:
branches:
- main
on: [push, pull_request]

env:
RUST_BACKTRACE: 1
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aarc"
version = "0.0.1"
version = "0.1.0"
edition = "2021"
description = "Atomically updatable variants of Arc and Weak for lock-free concurrency."
homepage = "https://github.com/aarc-rs/aarc"
Expand All @@ -11,4 +11,4 @@ categories = ["concurrency", "memory-management", "data-structures", "algorithms
exclude = [".github/", ".gitignore"]

[dev-dependencies]
rand = "0.8"
rand = "0.8"
199 changes: 185 additions & 14 deletions src/atomics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::statics::{acquire, release, retire};
use std::mem::ManuallyDrop;
use std::ops::Deref;
use std::ptr::null;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Weak};

pub struct AtomicArc<T> {
ptr: AtomicPtr<T>,
Expand All @@ -16,7 +18,7 @@ impl<T> AtomicArc<T> {
/// # Examples
/// ```
/// use std::sync::atomic::Ordering::SeqCst;
/// use aarc::atomics::AtomicArc;
/// use aarc::AtomicArc;
///
/// let atomic1: AtomicArc<Option<i32>> = AtomicArc::new(None);
/// assert_eq!(atomic1.load(SeqCst), None);
Expand All @@ -32,17 +34,17 @@ impl<T> AtomicArc<T> {
}
}

/// Loads the [`Arc`] with its reference count incremented appropriately if not [`None`].
///
/// # Example
/// Loads the [`Arc`] and increments its strong count appropriately if not [`None`].
///
/// # Examples
/// ```
/// use std::sync::Arc;
/// use std::sync::atomic::Ordering::SeqCst;
/// use aarc::atomics::AtomicArc;
/// use aarc::AtomicArc;
///
/// let atomic1 = AtomicArc::new(Some(42));
/// let loaded = atomic1.load(SeqCst).unwrap();
/// let atomic = AtomicArc::new(None);
/// atomic.store(Some(&Arc::new(42)), SeqCst);
/// let loaded = atomic.load(SeqCst).unwrap();
///
/// assert_eq!(Arc::strong_count(&loaded), 2);
/// assert_eq!(*loaded, 42);
Expand All @@ -56,6 +58,18 @@ impl<T> AtomicArc<T> {
}
}

/// Stores `val` into `self`. See `AtomicArc::load` for examples.
pub fn store(&self, val: Option<&Arc<T>>, order: Ordering) {
let ptr: *const T = val.map_or(null(), Arc::as_ptr);
unsafe {
Arc::increment_strong_count(ptr);
}
let before = self.ptr.swap(ptr.cast_mut(), order);
if !before.is_null() {
retire::<_, true>(before);
}
}

/// Stores `new` into `self` if `self` is equal to `current`.
///
/// The comparison is shallow and is determined by pointer equality; see [`Arc::ptr_eq`].
Expand All @@ -64,11 +78,11 @@ impl<T> AtomicArc<T> {
/// (instead of a copy of `current`). This eliminates the overhead of providing the caller with
/// a redundant [`Arc`].
///
/// # Example:
/// # Examples:
/// ```
/// use std::sync::Arc;
/// use std::sync::atomic::Ordering::SeqCst;
/// use aarc::atomics::AtomicArc;
/// use aarc::AtomicArc;
///
/// let atomic1: AtomicArc<i32> = AtomicArc::new(None);
/// let arc1 = Arc::new(42);
Expand All @@ -92,7 +106,7 @@ impl<T> AtomicArc<T> {
{
Ok(before) => unsafe {
if !before.is_null() {
retire(before);
retire::<_, true>(before);
}
if !n.is_null() {
Arc::increment_strong_count(n);
Expand Down Expand Up @@ -134,17 +148,19 @@ impl<T> Clone for AtomicArc<T> {
}

impl<T> Default for AtomicArc<T> {
/// Equivalent to `AtomicArc::new(None)`.
/// Creates an empty `AtomicArc`. Equivalent to `AtomicArc::new(None)`.
fn default() -> Self {
Self::new(None)
Self {
ptr: AtomicPtr::default(),
}
}
}

impl<T> Drop for AtomicArc<T> {
fn drop(&mut self) {
let ptr = self.ptr.load(SeqCst);
if !ptr.is_null() {
retire(ptr);
retire::<_, true>(ptr);
}
}
}
Expand All @@ -164,3 +180,158 @@ impl<T> From<Option<Arc<T>>> for AtomicArc<T> {
}
}
}

pub struct AtomicWeak<T> {
ptr: AtomicPtr<T>,
}

impl<T> AtomicWeak<T> {
/// Loads the [`Weak`] and increments its weak count appropriately if not [`None`].
///
/// # Examples
/// ```
/// use std::sync::{Arc, Weak};
/// use std::sync::atomic::Ordering::SeqCst;
/// use aarc::{AtomicArc, AtomicWeak};
///
/// let arc = Arc::new(42);
/// let weak = Arc::downgrade(&arc);
///
/// let atomic_weak = AtomicWeak::default();
/// atomic_weak.store(Some(&weak), SeqCst);
/// let loaded = atomic_weak.load(SeqCst).unwrap();
///
/// assert_eq!(*loaded.upgrade().unwrap(), 42);
/// assert_eq!(Weak::weak_count(&loaded), 3);
/// ```
pub fn load(&self, order: Ordering) -> Option<Weak<T>> {
acquire();
let ptr = self.ptr.load(order);
let result = if !ptr.is_null() {
unsafe { Some(clone_weak_from_raw(ptr)) }
} else {
None
};
release();
result
}

/// Stores `val` into `self`. See `AtomicWeak::load` for examples.
pub fn store(&self, val: Option<&Weak<T>>, order: Ordering) {
let ptr: *const T = val.map_or(null(), |w| {
increment_weak_count(w);
Weak::as_ptr(w)
});
let before = self.ptr.swap(ptr.cast_mut(), order);
if !before.is_null() {
retire::<_, false>(before);
}
}

/// Stores `new` into `self` if `self` is equal to `current`.
///
/// The comparison is shallow and is determined by pointer equality; see [`Weak::ptr_eq`].
///
/// If the comparison succeeds, the return value will be an [`Ok`] containing the unit type
/// (instead of a copy of `current`). This eliminates the overhead of providing the caller with
/// a redundant [`Weak`].
///
/// # Examples:
/// ```
/// use std::sync::Arc;
/// use std::sync::atomic::Ordering::SeqCst;
/// use aarc::AtomicWeak;
///
/// let arc = Arc::new(42);
/// let weak = Arc::downgrade(&arc);
///
/// let atomic_weak = AtomicWeak::default();
/// assert!(atomic_weak.compare_exchange(None, Some(&weak), SeqCst, SeqCst).is_ok());
/// let loaded = atomic_weak.load(SeqCst).unwrap();
/// assert_eq!(*loaded.upgrade().unwrap(), 42);
/// ```
pub fn compare_exchange(
&self,
current: Option<&Weak<T>>,
new: Option<&Weak<T>>,
success: Ordering,
failure: Ordering,
) -> Result<(), Option<Weak<T>>> {
let c: *const T = current.map_or(null(), Weak::as_ptr);
let n: *const T = new.map_or(null(), Weak::as_ptr);
acquire();
let result = match self
.ptr
.compare_exchange(c.cast_mut(), n.cast_mut(), success, failure)
{
Ok(before) => {
if !before.is_null() {
retire::<_, false>(before);
}
if let Some(weak_new) = new {
increment_weak_count(weak_new);
}
Ok(())
}
Err(before) => unsafe {
if !before.is_null() {
Err(Some(clone_weak_from_raw(before)))
} else {
Err(None)
}
},
};
release();
result
}
}

impl<T> Clone for AtomicWeak<T> {
fn clone(&self) -> Self {
self.load(SeqCst).map_or(Self::default(), |weak| Self {
ptr: AtomicPtr::new(Weak::into_raw(weak).cast_mut()),
})
}
}

impl<T> Default for AtomicWeak<T> {
/// Creates an empty `AtomicWeak`.
fn default() -> Self {
Self {
ptr: AtomicPtr::default(),
}
}
}

impl<T> Drop for AtomicWeak<T> {
fn drop(&mut self) {
let ptr = self.ptr.load(SeqCst);
if !ptr.is_null() {
retire::<_, false>(ptr);
}
}
}

impl<T> From<Weak<T>> for AtomicWeak<T> {
fn from(value: Weak<T>) -> Self {
Self {
ptr: AtomicPtr::new(Weak::into_raw(value).cast_mut()),
}
}
}

impl<T> From<Option<Weak<T>>> for AtomicWeak<T> {
fn from(value: Option<Weak<T>>) -> Self {
Self {
ptr: AtomicPtr::new(value.map_or(null(), Weak::into_raw).cast_mut()),
}
}
}

fn increment_weak_count<T>(w: &Weak<T>) {
_ = ManuallyDrop::new(w.clone());
}

unsafe fn clone_weak_from_raw<T>(ptr: *const T) -> Weak<T> {
ManuallyDrop::new(Weak::from_raw(ptr)).deref().clone()
}
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
pub mod atomics;
pub use atomics::AtomicArc;
pub use atomics::AtomicWeak;

pub(crate) mod atomics;
pub(crate) mod hyaline;
pub(crate) mod statics;

Expand Down
20 changes: 14 additions & 6 deletions src/statics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::hyaline::{Context, DeferredFn, Slot};
use std::cell::RefCell;
use std::ptr::null_mut;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, OnceLock, Weak};
use std::thread::AccessError;

const SLOTS_PER_NODE: usize = 64;
Expand Down Expand Up @@ -38,21 +38,29 @@ pub(crate) fn release() {
}
}

pub(crate) fn retire<T>(raw_arc_ptr: *const T) {
pub(crate) fn retire<T, const IS_STRONG: bool>(ptr: *const T) {
if let Ok(slot) = get_slot() {
get_context().retire(
slot,
DeferredFn {
ptr: raw_arc_ptr as *mut u8,
f: Box::new(|ptr| unsafe {
Arc::decrement_strong_count(ptr as *const T);
ptr: ptr as *mut u8,
f: Box::new(|p| unsafe {
if IS_STRONG {
drop(Arc::from_raw(p as *const T));
} else {
drop(Weak::from_raw(p as *const T));
};
}),
},
);
} else {
// The TLS key is being destroyed. This path is only safe during cleanup.
unsafe {
Arc::decrement_strong_count(raw_arc_ptr);
if IS_STRONG {
drop(Arc::from_raw(ptr));
} else {
drop(Weak::from_raw(ptr));
};
}
}
}
Expand Down
Loading

0 comments on commit 08b7930

Please sign in to comment.