Skip to content

Commit

Permalink
Convert test_buffering to using mock sources and sinks, address races
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII committed Mar 20, 2020
1 parent de19be3 commit c46a3a2
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 66 deletions.
9 changes: 9 additions & 0 deletions src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ pub fn wait_for_tcp(addr: SocketAddr) {
wait_for(|| std::net::TcpStream::connect(addr).is_ok())
}

pub fn wait_for_atomic_usize<T, F>(val: T, unblock: F)
where
T: AsRef<AtomicUsize>,
F: Fn(usize) -> bool,
{
let val = val.as_ref();
wait_for(|| unblock(val.load(Ordering::SeqCst)))
}

pub fn shutdown_on_idle(runtime: Runtime) {
block_on(
runtime
Expand Down
137 changes: 71 additions & 66 deletions tests/buffering.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,119 @@
#![cfg(feature = "leveldb")]

use futures01::Future;
use futures01::{Future, Sink, Stream};
use prost::Message;
use tempfile::tempdir;
use tracing::trace;
use vector::event::{self, Event};
use vector::test_util::{
block_on, next_addr, random_lines, receive, runtime, send_lines, shutdown_on_idle, wait_for_tcp,
self, block_on, next_addr, random_lines, receive, runtime, send_lines, shutdown_on_idle,
wait_for_tcp,
};
use vector::topology::{self, config};
use vector::{buffers::BufferConfig, runtime, sinks, sources};

mod support;

#[test]
fn test_buffering() {
vector::test_util::trace_init();
test_util::trace_init();

let data_dir = tempdir().unwrap();
let data_dir = data_dir.path().to_path_buf();
trace!(message = "Test data dir", ?data_dir);

let num_lines: usize = 10;
let num_events: usize = 10;
let line_length = 100;
let max_size = 10_000;
let expected_events_count = num_events * 2;

assert!(
line_length * num_lines * 2 <= max_size,
line_length * expected_events_count <= max_size,
"Test parameters are invalid, this test implies that all lines will fit
into the buffer, but the buffer is not big enough"
);

let in_addr = next_addr();
let out_addr = next_addr();

// Run vector while sink server is not running, and then shut it down
// without server ever coming online.
let mut config = config::Config::empty();
config.add_source(
"in",
sources::socket::SocketConfig::make_tcp_config(in_addr),
);
config.add_sink(
"out",
&["in"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()),
);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
// Run vector with a dead sink, and then shut it down without sink ever
// accepting any data.
let (in_tx, source_config, source_event_counter) = support::source_with_event_counter();
let sink_config = support::sink_dead();
let config = {
let mut config = config::Config::empty();
config.add_source("in", source_config);
config.add_sink("out", &["in"], sink_config);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
};
config.global.data_dir = Some(data_dir.clone());
config
};
config.global.data_dir = Some(data_dir.clone());

let mut rt = runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
wait_for_tcp(in_addr);

let input_lines = random_lines(line_length)
.take(num_lines)
.collect::<Vec<_>>();
let send = send_lines(in_addr, input_lines.clone().into_iter());
rt.block_on(send).unwrap();

std::thread::sleep(std::time::Duration::from_millis(100));
let (input_events, input_events_stream) =
test_util::random_events_with_stream(line_length, num_events);
let send = in_tx
.sink_map_err(|err| panic!(err))
.send_all(input_events_stream);
let _ = rt.block_on(send).unwrap();

rt.shutdown_now().wait().unwrap();
drop(topology);
// A race caused by `rt.block_on(send).unwrap()` is handled here. For some
// reason, at times less events than were sent actually arrive to the
// `source`.
// We mitigate that by waiting on the event counter provided by our source
// mock.
test_util::wait_for_atomic_usize(source_event_counter, |x| x == num_events);

let in_addr = next_addr();
let out_addr = next_addr();
rt.block_on(topology.stop()).unwrap();
shutdown_on_idle(rt);

// Start sink server, then run vector again. It should send all of the lines
// from the first run.
let mut config = config::Config::empty();
config.add_source(
"in",
sources::socket::SocketConfig::make_tcp_config(in_addr),
);
config.add_sink(
"out",
&["in"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()),
);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
// Then run vector again with a sink that accepts events now. It should
// send all of the events from the first run.
let (in_tx, source_config, source_event_counter) = support::source_with_event_counter();
let (mut out_rx, sink_config) = support::sink(expected_events_count + 100);
let config = {
let mut config = config::Config::empty();
config.add_source("in", source_config);
config.add_sink("out", &["in"], sink_config);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
};
config.global.data_dir = Some(data_dir.clone());
config
};
config.global.data_dir = Some(data_dir);

let mut rt = runtime();

let output_lines = receive(&out_addr);

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();

wait_for_tcp(in_addr);
let (input_events2, input_events_stream) =
test_util::random_events_with_stream(line_length, num_events);

let input_lines2 = random_lines(line_length)
.take(num_lines)
.collect::<Vec<_>>();
let send = send_lines(in_addr, input_lines2.clone().into_iter());
rt.block_on(send).unwrap();
let send = in_tx
.sink_map_err(|err| panic!(err))
.send_all(input_events_stream);
let _ = rt.block_on(send).unwrap();

std::thread::sleep(std::time::Duration::from_millis(100));
// A race caused by `rt.block_on(send).unwrap()` is handled here. For some
// reason, at times less events than were sent actually arrive to the
// `source`.
// We mitigate that by waiting on the event counter provided by our source
// mock.
test_util::wait_for_atomic_usize(source_event_counter, |x| x == num_events);

rt.block_on(topology.stop()).unwrap();

shutdown_on_idle(rt);

let output_lines = output_lines.wait();
assert_eq!(num_lines * 2, output_lines.len());
assert_eq!(input_lines, &output_lines[..num_lines]);
assert_eq!(input_lines2, &output_lines[num_lines..]);
out_rx.close();
let output_events = out_rx.collect().wait().unwrap();

assert_eq!(expected_events_count, output_events.len());
assert_eq!(input_events, &output_events[..num_events]);
assert_eq!(input_events2, &output_events[num_events..]);
}

#[test]
Expand Down
7 changes: 7 additions & 0 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ pub fn source() -> (Sender<Event>, MockSourceConfig) {
(tx, source)
}

pub fn source_with_event_counter() -> (Sender<Event>, MockSourceConfig, Arc<AtomicUsize>) {
let event_counter = Arc::new(AtomicUsize::new(0));
let (tx, rx) = futures01::sync::mpsc::channel(0);
let source = MockSourceConfig::new_with_event_counter(rx, event_counter.clone());
(tx, source, event_counter)
}

pub fn transform(suffix: &str, increase: f64) -> MockTransformConfig {
MockTransformConfig::new(suffix.to_owned(), increase)
}
Expand Down

0 comments on commit c46a3a2

Please sign in to comment.