Skip to content

Commit

Permalink
Merge pull request #242 from async-rs/barrier-unstable
Browse files Browse the repository at this point in the history
mark sync::Barrier as unstable
  • Loading branch information
yoshuawuyts authored Sep 26, 2019
2 parents 0f0b354 + 0b39306 commit fdc8fe6
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 55 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]

[features]
docs = []
unstable = []
docs = ["broadcaster"]
unstable = ["broadcaster"]

[dependencies]
async-macros = "1.0.0"
Expand All @@ -42,7 +42,7 @@ num_cpus = "1.10.1"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
kv-log-macro = "1.0.4"
broadcaster = "0.2.4"
broadcaster = { version = "0.2.4", optional = true }

[dev-dependencies]
femme = "1.2.0"
Expand Down
57 changes: 57 additions & 0 deletions src/sync/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::sync::Mutex;
/// # });
/// # }
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct Barrier {
state: Mutex<BarrierState>,
Expand Down Expand Up @@ -60,6 +61,7 @@ struct BarrierState {
/// let barrier = Barrier::new(1);
/// let barrier_wait_result = barrier.wait();
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug, Clone)]
pub struct BarrierWaitResult(bool);

Expand Down Expand Up @@ -172,3 +174,58 @@ impl BarrierWaitResult {
self.0
}
}

#[cfg(test)]
mod test {
use futures_channel::mpsc::unbounded;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;

use crate::sync::{Arc, Barrier};
use crate::task;

#[test]
fn test_barrier() {
// NOTE(dignifiedquire): Based on the test in std, I was seeing some
// race conditions, so running it in a loop to make sure things are
// solid.

for _ in 0..1_000 {
task::block_on(async move {
const N: usize = 10;

let barrier = Arc::new(Barrier::new(N));
let (tx, mut rx) = unbounded();

for _ in 0..N - 1 {
let c = barrier.clone();
let mut tx = tx.clone();
task::spawn(async move {
let res = c.wait().await;

tx.send(res.is_leader()).await.unwrap();
});
}

// At this point, all spawned threads should be blocked,
// so we shouldn't get anything from the port
let res = rx.try_next();
assert!(match res {
Err(_err) => true,
_ => false,
});

let mut leader_found = barrier.wait().await.is_leader();

// Now, the barrier is cleared and we should get data.
for _ in 0..N - 1 {
if rx.next().await.unwrap() {
assert!(!leader_found);
leader_found = true;
}
}
assert!(leader_found);
});
}
}
}
4 changes: 4 additions & 0 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@
#[doc(inline)]
pub use std::sync::{Arc, Weak};

#[cfg(any(feature = "unstable", feature = "docs"))]
pub use barrier::{Barrier, BarrierWaitResult};

pub use mutex::{Mutex, MutexGuard};
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};

#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
mod barrier;
mod mutex;
mod rwlock;
52 changes: 0 additions & 52 deletions tests/barrier.rs

This file was deleted.

0 comments on commit fdc8fe6

Please sign in to comment.