From 8a2490659896a17ce16e08d415c07512e9470c38 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Sat, 21 Mar 2020 02:57:22 +0300 Subject: [PATCH] Convert test_max_size to using mock sources and sinks, address races Signed-off-by: MOZGIII --- tests/buffering.rs | 114 +++++++++++++++++++++------------------------ 1 file changed, 52 insertions(+), 62 deletions(-) diff --git a/tests/buffering.rs b/tests/buffering.rs index bb30cccefd96ec..6aaf308f09b725 100644 --- a/tests/buffering.rs +++ b/tests/buffering.rs @@ -4,7 +4,7 @@ use futures01::{Future, Sink, Stream}; use prost::Message; use tempfile::tempdir; use tracing::trace; -use vector::event::{self, Event}; +use vector::event; use vector::test_util::{ self, block_on, next_addr, random_lines, receive, runtime, send_lines, shutdown_on_idle, wait_for_tcp, @@ -118,92 +118,82 @@ fn test_buffering() { #[test] fn test_max_size() { - vector::test_util::trace_init(); + test_util::trace_init(); let data_dir = tempdir().unwrap(); let data_dir = data_dir.path().to_path_buf(); + trace!(message = "Test data dir", ?data_dir); - let num_lines: usize = 1000; - let line_size = 1000; - let input_lines = random_lines(line_size).take(num_lines).collect::>(); + let num_events: usize = 1000; + let line_length = 1000; + let (input_events, input_events_stream) = + test_util::random_events_with_stream(line_length, num_events); - let max_size = input_lines + let max_size = input_events .clone() .into_iter() - .take(num_lines / 2) - .map(|line| { - let mut e = Event::from(line); - e.as_mut_log().insert("host", "127.0.0.1"); - event::proto::EventWrapper::from(e) - }) + .take(num_events / 2) + .map(event::proto::EventWrapper::from) .map(|ew| ew.encoded_len()) .sum(); - let in_addr = next_addr(); - let out_addr = next_addr(); - - // Run vector while sink server is not running, and then shut it down abruptly - let mut config = config::Config::empty(); - config.add_source( - "in", - sources::socket::SocketConfig::make_tcp_config(in_addr), - ); - config.add_sink( - "out", - &["in"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()), - ); - config.sinks["out"].buffer = BufferConfig::Disk { - max_size, - when_full: Default::default(), + // Run vector with a dead sink, and then shut it down without sink ever + // accepting any data. + let (in_tx, source_config) = support::source(); + let sink_config = support::sink_dead(); + let config = { + let mut config = config::Config::empty(); + config.add_source("in", source_config); + config.add_sink("out", &["in"], sink_config); + config.sinks["out"].buffer = BufferConfig::Disk { + max_size, + when_full: Default::default(), + }; + config.global.data_dir = Some(data_dir.clone()); + config }; - config.global.data_dir = Some(data_dir.clone()); - let mut rt = runtime::Runtime::new().unwrap(); + let mut rt = runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - wait_for_tcp(in_addr); - - let send = send_lines(in_addr, input_lines.clone().into_iter()); - rt.block_on(send).unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); + let send = in_tx + .sink_map_err(|err| panic!(err)) + .send_all(input_events_stream); + let _ = rt.block_on(send).unwrap(); - rt.shutdown_now().wait().unwrap(); - drop(topology); + rt.block_on(topology.stop()).unwrap(); + shutdown_on_idle(rt); - // Start sink server, then run vector again. It should send the lines from the first run that fit in the limited space - let mut config = config::Config::empty(); - config.add_source( - "in", - sources::socket::SocketConfig::make_tcp_config(in_addr), - ); - config.add_sink( - "out", - &["in"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()), - ); - config.sinks["out"].buffer = BufferConfig::Disk { - max_size, - when_full: Default::default(), + // Then run vector again with a sink that accepts events now. It should + // send all of the events from the first run that fit in the limited buffer + // space. + let (_in_tx, source_config) = support::source(); + let (mut out_rx, sink_config) = support::sink(num_events + 100); + let config = { + let mut config = config::Config::empty(); + config.add_source("in", source_config); + config.add_sink("out", &["in"], sink_config); + config.sinks["out"].buffer = BufferConfig::Disk { + max_size, + when_full: Default::default(), + }; + config.global.data_dir = Some(data_dir.clone()); + config }; - config.global.data_dir = Some(data_dir); - let mut rt = runtime::Runtime::new().unwrap(); - - let output_lines = receive(&out_addr); + let mut rt = runtime(); let (topology, _crash) = topology::start(config, &mut rt, false).unwrap(); - wait_for_tcp(in_addr); - rt.block_on(topology.stop()).unwrap(); - shutdown_on_idle(rt); - let output_lines = output_lines.wait(); - assert_eq!(num_lines / 2, output_lines.len()); - assert_eq!(&input_lines[..num_lines / 2], &output_lines[..]); + out_rx.close(); + let output_events = out_rx.collect().wait().unwrap(); + + assert_eq!(num_events / 2, output_events.len()); + assert_eq!(&input_events[..num_events / 2], &output_events[..]); } #[test]