Skip to content

Commit

Permalink
Convert test_max_size to using mock sources and sinks, address races
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII committed Mar 20, 2020
1 parent c46a3a2 commit 8a24906
Showing 1 changed file with 52 additions and 62 deletions.
114 changes: 52 additions & 62 deletions tests/buffering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures01::{Future, Sink, Stream};
use prost::Message;
use tempfile::tempdir;
use tracing::trace;
use vector::event::{self, Event};
use vector::event;
use vector::test_util::{
self, block_on, next_addr, random_lines, receive, runtime, send_lines, shutdown_on_idle,
wait_for_tcp,
Expand Down Expand Up @@ -118,92 +118,82 @@ fn test_buffering() {

#[test]
fn test_max_size() {
vector::test_util::trace_init();
test_util::trace_init();

let data_dir = tempdir().unwrap();
let data_dir = data_dir.path().to_path_buf();
trace!(message = "Test data dir", ?data_dir);

let num_lines: usize = 1000;
let line_size = 1000;
let input_lines = random_lines(line_size).take(num_lines).collect::<Vec<_>>();
let num_events: usize = 1000;
let line_length = 1000;
let (input_events, input_events_stream) =
test_util::random_events_with_stream(line_length, num_events);

let max_size = input_lines
let max_size = input_events
.clone()
.into_iter()
.take(num_lines / 2)
.map(|line| {
let mut e = Event::from(line);
e.as_mut_log().insert("host", "127.0.0.1");
event::proto::EventWrapper::from(e)
})
.take(num_events / 2)
.map(event::proto::EventWrapper::from)
.map(|ew| ew.encoded_len())
.sum();

let in_addr = next_addr();
let out_addr = next_addr();

// Run vector while sink server is not running, and then shut it down abruptly
let mut config = config::Config::empty();
config.add_source(
"in",
sources::socket::SocketConfig::make_tcp_config(in_addr),
);
config.add_sink(
"out",
&["in"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()),
);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
// Run vector with a dead sink, and then shut it down without sink ever
// accepting any data.
let (in_tx, source_config) = support::source();
let sink_config = support::sink_dead();
let config = {
let mut config = config::Config::empty();
config.add_source("in", source_config);
config.add_sink("out", &["in"], sink_config);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
};
config.global.data_dir = Some(data_dir.clone());
config
};
config.global.data_dir = Some(data_dir.clone());

let mut rt = runtime::Runtime::new().unwrap();
let mut rt = runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
wait_for_tcp(in_addr);

let send = send_lines(in_addr, input_lines.clone().into_iter());
rt.block_on(send).unwrap();

std::thread::sleep(std::time::Duration::from_millis(100));
let send = in_tx
.sink_map_err(|err| panic!(err))
.send_all(input_events_stream);
let _ = rt.block_on(send).unwrap();

rt.shutdown_now().wait().unwrap();
drop(topology);
rt.block_on(topology.stop()).unwrap();
shutdown_on_idle(rt);

// Start sink server, then run vector again. It should send the lines from the first run that fit in the limited space
let mut config = config::Config::empty();
config.add_source(
"in",
sources::socket::SocketConfig::make_tcp_config(in_addr),
);
config.add_sink(
"out",
&["in"],
sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()),
);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
// Then run vector again with a sink that accepts events now. It should
// send all of the events from the first run that fit in the limited buffer
// space.
let (_in_tx, source_config) = support::source();
let (mut out_rx, sink_config) = support::sink(num_events + 100);
let config = {
let mut config = config::Config::empty();
config.add_source("in", source_config);
config.add_sink("out", &["in"], sink_config);
config.sinks["out"].buffer = BufferConfig::Disk {
max_size,
when_full: Default::default(),
};
config.global.data_dir = Some(data_dir.clone());
config
};
config.global.data_dir = Some(data_dir);

let mut rt = runtime::Runtime::new().unwrap();

let output_lines = receive(&out_addr);
let mut rt = runtime();

let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();

wait_for_tcp(in_addr);

rt.block_on(topology.stop()).unwrap();

shutdown_on_idle(rt);

let output_lines = output_lines.wait();
assert_eq!(num_lines / 2, output_lines.len());
assert_eq!(&input_lines[..num_lines / 2], &output_lines[..]);
out_rx.close();
let output_events = out_rx.collect().wait().unwrap();

assert_eq!(num_events / 2, output_events.len());
assert_eq!(&input_events[..num_events / 2], &output_events[..]);
}

#[test]
Expand Down

0 comments on commit 8a24906

Please sign in to comment.