Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Racy buffering tests correction #2106

Merged
merged 21 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
84a7987
Correct socket shutdown logic at test_util::send_lines
MOZGIII Mar 20, 2020
8cfd24f
Add trace_init to buffering tests
MOZGIII Mar 20, 2020
93629eb
Improve the clarity at the test_buffering test
MOZGIII Mar 20, 2020
2784ba5
Allow using tests/support/mod.rs in multiple tests
MOZGIII Mar 20, 2020
45f3696
Improve error event at SinkConfig::build for MockSinkConfig
MOZGIII Mar 20, 2020
48b0ac2
Add support for mocking a dead sink
MOZGIII Mar 20, 2020
1865fbb
Require explicit sink channel size specification
MOZGIII Mar 20, 2020
34ba76c
Convert test_buffering to using mock sources and sinks, address races
MOZGIII Mar 20, 2020
72591c7
Convert test_max_size to using mock sources and sinks, address races
MOZGIII Mar 20, 2020
8f83809
Add test_util::receive_events and switched buffering tests to it
MOZGIII Mar 21, 2020
da971bc
Add the ability to alter mock source data type
MOZGIII Mar 21, 2020
ade6821
Switched sinks at test_max_size_resume to mocks, changed sleep logic
MOZGIII Mar 21, 2020
a5f2763
Unignored and updated test_reclaim_disk_space to match the test_buffe…
MOZGIII Mar 21, 2020
c4d749c
Extract a compute_disk_size fn
MOZGIII Mar 21, 2020
38eb793
Clean up unused dependencies
MOZGIII Mar 21, 2020
ccd065a
Resolve name collision
MOZGIII Mar 21, 2020
85e0ae4
Document the multi-thread runtime requirement
MOZGIII Mar 21, 2020
989708b
Add myself to .github/CODEOWNERS
MOZGIII Mar 21, 2020
d91629a
Add a helper fn for graceful termination
MOZGIII Mar 21, 2020
90bbc1e
Switch to abrupt termination to simulate crashes
MOZGIII Mar 21, 2020
c2e1a55
Fix the tests broken by generalization of MockSinkConfig
MOZGIII Mar 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@

/src/event/merge.rs @MOZGIII

/tests/buffering.rs @MOZGIII

/website/ @binarylogic
57 changes: 50 additions & 7 deletions src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::fs::File;
use std::io::Read;
use std::iter;
use std::mem;
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -66,7 +66,12 @@ pub fn send_lines(
.forward(out)
.and_then(|(_source, sink)| {
let socket = sink.into_inner().into_inner();
tokio::io::shutdown(socket).map_err(|e| panic!("{:}", e))
// In tokio 0.1 `AsyncWrite::shutdown` for `TcpStream` is a noop.
// See https://docs.rs/tokio-tcp/0.1.4/src/tokio_tcp/stream.rs.html#917
// Use `TcpStream::shutdown` instead - it actually does something.
socket
.shutdown(Shutdown::Both)
.map_err(|e| panic!("{:}", e))
})
.map(|_| ())
})
Expand Down Expand Up @@ -236,6 +241,15 @@ pub fn wait_for_tcp(addr: SocketAddr) {
wait_for(|| std::net::TcpStream::connect(addr).is_ok())
}

pub fn wait_for_atomic_usize<T, F>(val: T, unblock: F)
where
T: AsRef<AtomicUsize>,
F: Fn(usize) -> bool,
{
let val = val.as_ref();
wait_for(|| unblock(val.load(Ordering::SeqCst)))
}

pub fn shutdown_on_idle(runtime: Runtime) {
block_on(
runtime
Expand Down Expand Up @@ -345,25 +359,25 @@ where
}
}

pub struct Receiver {
handle: futures01::sync::oneshot::SpawnHandle<Vec<String>, ()>,
pub struct Receiver<T> {
handle: futures01::sync::oneshot::SpawnHandle<Vec<T>, ()>,
count: Arc<AtomicUsize>,
trigger: Trigger,
_runtime: Runtime,
}

impl Receiver {
impl<T> Receiver<T> {
pub fn count(&self) -> usize {
self.count.load(Ordering::Relaxed)
}

pub fn wait(self) -> Vec<String> {
pub fn wait(self) -> Vec<T> {
self.trigger.cancel();
self.handle.wait().unwrap()
}
}

pub fn receive(addr: &SocketAddr) -> Receiver {
pub fn receive(addr: &SocketAddr) -> Receiver<String> {
let runtime = runtime();

let listener = TcpListener::bind(addr).unwrap();
Expand Down Expand Up @@ -393,6 +407,35 @@ pub fn receive(addr: &SocketAddr) -> Receiver {
}
}

pub fn receive_events<S>(stream: S) -> Receiver<Event>
where
S: Stream<Item = Event> + Send + 'static,
<S as Stream>::Error: std::fmt::Debug,
{
let runtime = runtime();

let count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&count);

let (trigger, tripwire) = Tripwire::new();

let events = stream
.take_until(tripwire)
.inspect(move |_| {
count_clone.fetch_add(1, Ordering::Relaxed);
})
.map_err(|e| panic!("{:?}", e))
.collect();

let handle = futures01::sync::oneshot::spawn(events, &runtime.executor());
Receiver {
handle,
count,
trigger,
_runtime: runtime,
}
}

pub struct CountReceiver {
handle: futures01::sync::oneshot::SpawnHandle<usize, ()>,
trigger: Trigger,
Expand Down
Loading