diff --git a/Cargo.lock b/Cargo.lock index f7fa32ddc7739..32415508772f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,6 +279,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" + [[package]] name = "bytesize" version = "1.0.0" @@ -367,7 +373,7 @@ dependencies = [ name = "codec" version = "0.1.0" dependencies = [ - "bytes", + "bytes 0.4.12", "serde_json", "tokio-codec", "tracing", @@ -870,7 +876,7 @@ version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5589ce096bbfc5465cc2f3f6de6605803d6b4ef741ad6d38172dc30bfeea5f95" dependencies = [ - "bytes", + "bytes 0.4.12", "smallvec 1.2.0", ] @@ -924,7 +930,7 @@ checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" name = "file-source" version = "0.1.0" dependencies = [ - "bytes", + "bytes 0.4.12", "crc", "flate2", "futures 0.3.4", @@ -1254,7 +1260,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462" dependencies = [ "byteorder", - "bytes", + "bytes 0.4.12", "fnv", "futures 0.1.29", "http", @@ -1307,7 +1313,7 @@ checksum = "882ca7d8722f33ce2c2db44f95425d6267ed59ca96ce02acbe58320054ceb642" dependencies = [ "base64 0.10.1", "bitflags", - "bytes", + "bytes 0.4.12", "headers-core", "http", "mime", @@ -1321,7 +1327,7 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "967131279aaa9f7c20c7205b45a391638a83ab118e6509b2d0ccbe08de044237" dependencies = [ - "bytes", + "bytes 0.4.12", "http", ] @@ -1392,7 +1398,7 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6ccf5ede3a895d8856620237b2f02972c1bbc78d2965ad7fe8838d4a0ed41f0" dependencies = [ - "bytes", + "bytes 0.4.12", "fnv", "itoa", ] @@ -1403,7 +1409,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "http", "tokio-buf", @@ -1430,7 +1436,7 @@ version = "0.12.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dbe6ed1438e1f8ad955a4701e9a944938e9519f6888d12d8558b645e247d5f6" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "futures-cpupool", "h2", @@ -1443,7 +1449,7 @@ dependencies = [ "net2", "rustc_version", "time", - "tokio", + "tokio 0.1.22", "tokio-buf", "tokio-executor", "tokio-io", @@ -1461,7 +1467,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52657b5cdb2a8067efd29a02e011b7cf656b473ec8a5c34e86645e85d763006" dependencies = [ "antidote", - "bytes", + "bytes 0.4.12", "futures 0.1.29", "hyper", "lazy_static", @@ -1478,7 +1484,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a800d6aa50af4b5850b2b0f659625ce9504df908e9733b635720483be26174f" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "hyper", "native-tls", @@ -1494,7 +1500,7 @@ dependencies = [ "futures 0.1.29", "hex", "hyper", - "tokio", + "tokio 0.1.22", "tokio-io", "tokio-uds", ] @@ -1655,7 +1661,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30e2ac1496d5920d157d0eb5ab453b0af1e6622d5ffa8e7f9c60d435d13342da" dependencies = [ "base64 0.10.1", - "bytes", + "bytes 0.4.12", "chrono", "http", "serde", @@ -2404,6 +2410,12 @@ dependencies = [ "syn 1.0.14", ] +[[package]] +name = "pin-project-lite" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" + [[package]] name = "pin-utils" version = "0.1.0-alpha.4" @@ -2549,7 +2561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" dependencies = [ "byteorder", - "bytes", + "bytes 0.4.12", "prost-derive", ] @@ -2559,7 +2571,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" dependencies = [ - "bytes", + "bytes 0.4.12", "heck", "itertools", "log", @@ -2590,7 +2602,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" dependencies = [ - "bytes", + "bytes 0.4.12", "prost", ] @@ -2976,7 +2988,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f88643aea3c1343c804950d7bf983bd2067f5ab59db6d613a08e05572f2714ab" dependencies = [ "base64 0.10.1", - "bytes", + "bytes 0.4.12", "cookie", "cookie_store", "encoding_rs", @@ -2993,7 +3005,7 @@ dependencies = [ "serde_json", "serde_urlencoded 0.5.5", "time", - "tokio", + "tokio 0.1.22", "tokio-executor", "tokio-io", "tokio-threadpool", @@ -3031,7 +3043,7 @@ version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f470c278733d18760a5966a89227f4d9a06bb1d8260d676b011faa4d2aa82342" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "rusoto_core", "serde_urlencoded 0.5.5", @@ -3045,7 +3057,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "351e97aedcc659bd03168ff7fd3dbb270b6ee812c0c51c7953d2ef6f0a119aa9" dependencies = [ "base64 0.10.1", - "bytes", + "bytes 0.4.12", "futures 0.1.29", "hex", "hmac", @@ -3063,7 +3075,7 @@ dependencies = [ "serde_json", "sha2", "time", - "tokio", + "tokio 0.1.22", "tokio-timer", "xml-rs", ] @@ -3093,7 +3105,7 @@ version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef495217f457f6ad9a1d975fcbedc346b39708a6725118d58f704064e0252c81" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "rusoto_core", "serde", @@ -3107,7 +3119,7 @@ version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f88609598b24779b268f47c0d19780cf4865b0cbccd9f677bae75f7f7c5d4dcb" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "rusoto_core", "serde", @@ -3121,7 +3133,7 @@ version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "691f665f2ae401d4fddaac8d6a33c3e7c388e93494242ea937ebdddf9961f7f6" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "rusoto_core", "serde", @@ -3135,7 +3147,7 @@ version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c840fca030950caf0c9bea89eb35311aa5b01c0341fb0aaf28d347b5c416d7" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "rusoto_core", "xml-rs", @@ -3147,7 +3159,7 @@ version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e8cdff317625611de545e91b80a883a7d6fb1cf2d772f64abaf83c0925fa0ff" dependencies = [ - "bytes", + "bytes 0.4.12", "chrono", "futures 0.1.29", "rusoto_core", @@ -3438,7 +3450,7 @@ checksum = "b78521d24224ac77e489f92efa0f02884e7fc2973f97b2e72c9bb3e4771e6f6b" dependencies = [ "base64 0.11.0", "byteorder", - "bytes", + "bytes 0.4.12", "flate2", "futures 0.1.29", "http", @@ -3451,7 +3463,7 @@ dependencies = [ "serde", "serde_json", "tar", - "tokio", + "tokio 0.1.22", "tokio-codec", "tokio-io", "url 2.1.1", @@ -3593,7 +3605,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d24114bfcceb867ca7f71a0d3fe45d45619ec47a6fbfa98cb14e14250bfa5d6d" dependencies = [ - "bytes", + "bytes 0.4.12", ] [[package]] @@ -3840,7 +3852,7 @@ version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "mio", "num_cpus", @@ -3859,13 +3871,29 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tokio" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fdd17989496f49cdc57978c96f0c9fe5e4a58a8bddc6813c449a4624f6a030b" +dependencies = [ + "bytes 0.5.4", + "fnv", + "futures-core", + "lazy_static", + "mio", + "num_cpus", + "pin-project-lite", + "slab", +] + [[package]] name = "tokio-buf" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" dependencies = [ - "bytes", + "bytes 0.4.12", "either", "futures 0.1.29", ] @@ -3876,11 +3904,28 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "tokio-io", ] +[[package]] +name = "tokio-compat" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ae6e8f0b3bf2c9908bd050ec35598ca71990185e5618fa7a2bbd98771c3e53" +dependencies = [ + "futures 0.1.29", + "futures-core", + "futures-util", + "pin-project-lite", + "tokio 0.2.11", + "tokio-current-thread", + "tokio-executor", + "tokio-reactor", + "tokio-timer", +] + [[package]] name = "tokio-current-thread" version = "0.1.6" @@ -3918,7 +3963,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "log", ] @@ -4016,7 +4061,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "iovec", "mio", @@ -4059,7 +4104,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f02298505547f73e60f568359ef0d016d5acd6e830ab9bc7c4a5b3403440121b" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "log", "mio", @@ -4074,7 +4119,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "iovec", "libc", @@ -4330,7 +4375,7 @@ dependencies = [ "hotmic", "hyper", "serde_json", - "tokio", + "tokio 0.1.22", "tracing", "tracing-core", "tracing-futures 0.2.1", @@ -4396,7 +4441,7 @@ dependencies = [ "log", "radix_trie", "rand 0.7.3", - "tokio", + "tokio 0.1.22", "tokio-tcp", "tokio-udp", "trust-dns-proto", @@ -4444,7 +4489,7 @@ dependencies = [ "resolv-conf", "serde", "smallvec 0.6.13", - "tokio", + "tokio 0.1.22", "tokio-executor", "tokio-tcp", "tokio-udp", @@ -4458,7 +4503,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e526ea9c9203633a7818e9d459ff29a3fca050281f6de24db07d873f9d95792e" dependencies = [ "backtrace", - "bytes", + "bytes 0.4.12", "chrono", "clap", "enum-as-inner", @@ -4471,7 +4516,7 @@ dependencies = [ "rusqlite", "serde", "time", - "tokio", + "tokio 0.1.22", "tokio-executor", "tokio-io", "tokio-reactor", @@ -4666,7 +4711,7 @@ dependencies = [ "base64 0.10.1", "bloom", "built", - "bytes", + "bytes 0.4.12", "bytesize", "chrono", "codec", @@ -4749,12 +4794,13 @@ dependencies = [ "syslog", "syslog_loose", "tempfile", - "tokio", + "tokio 0.1.22", + "tokio 0.2.11", "tokio-codec", + "tokio-compat", "tokio-openssl", "tokio-retry", "tokio-signal", - "tokio-threadpool", "tokio-uds", "tokio01-test", "toml 0.4.10", @@ -4841,7 +4887,7 @@ name = "warp" version = "0.1.21" source = "git+https://github.com/timberio/warp?branch=0.1.x#017965c627047bf0e53062940e7def127cb21649" dependencies = [ - "bytes", + "bytes 0.4.12", "futures 0.1.29", "headers", "http", @@ -4853,7 +4899,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded 0.6.1", - "tokio", + "tokio 0.1.22", "tokio-io", "tokio-threadpool", "urlencoding", diff --git a/Cargo.toml b/Cargo.toml index 4fe534ecf58d6..f39313c8fee19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,11 +45,12 @@ tracing-limit = { path = "lib/tracing-limit" } futures01 = { package = "futures", version = "0.1.25" } futures = { version = "0.3", default-features = false, features = ["compat"] } tokio = { version = "0.1.22", features = ["io", "uds", "tcp", "rt-full", "experimental-tracing"], default-features = false } +tokio02 = { package = "tokio", version = "0.2", features = ["blocking"] } tokio-codec = "0.1.0" tokio-openssl = "0.3.0" tokio-retry = "0.2.0" tokio-signal = "0.2.7" -tokio-threadpool = "0.1.16" +tokio-compat = { version = "0.1", features = ["rt-full"] } # Tracing tracing = "0.1.9" diff --git a/benches/bench.rs b/benches/bench.rs index 2cd97ac5ddf77..f661d8798b024 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -36,7 +36,7 @@ criterion_main!( buffering::buffers, http::http, batch::batch, - files::files, + /* files::files, */ lua::lua, event::event ); diff --git a/benches/files.rs b/benches/files.rs index e0ce95ba8d18b..67737f5500834 100644 --- a/benches/files.rs +++ b/benches/files.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "disabled")] + use bytes::Bytes; use criterion::{criterion_group, Benchmark, Criterion, Throughput}; use futures01::{sink::Sink, stream::Stream, Future}; diff --git a/src/buffers/disk.rs b/src/buffers/disk.rs index 02f45e6306e51..dcb3c1e0b747d 100644 --- a/src/buffers/disk.rs +++ b/src/buffers/disk.rs @@ -189,14 +189,13 @@ impl Stream for Reader { // This will usually complete instantly, but in the case of a large queue (or a fresh launch of // the app), this will have to go to disk. - let next = tokio_threadpool::blocking(|| { + let next = tokio02::task::block_in_place(|| { self.db .get(ReadOptions::new(), Key(self.read_offset)) .unwrap() - }) - .unwrap(); + }); - if let Async::Ready(Some(value)) = next { + if let Some(value) = next { self.unacked_sizes.push_back(value.len()); self.read_offset += 1; diff --git a/src/runtime.rs b/src/runtime.rs index 9d7cc73c646d2..71fff1f46aaf2 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,16 +1,16 @@ use futures01::future::{ExecuteError, Executor, Future}; use std::io; use std::pin::Pin; -use tokio::runtime::Builder; +use tokio_compat::runtime::{Builder, Runtime as TokioRuntime, TaskExecutor as TokioTaskExecutor}; pub struct Runtime { - rt: tokio::runtime::Runtime, + rt: TokioRuntime, } impl Runtime { pub fn new() -> io::Result { Ok(Runtime { - rt: tokio::runtime::Runtime::new()?, + rt: TokioRuntime::new()?, }) } @@ -70,7 +70,7 @@ impl Runtime { #[derive(Clone, Debug)] pub struct TaskExecutor { - inner: tokio::runtime::TaskExecutor, + inner: TokioTaskExecutor, } impl TaskExecutor { diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index ced1364ae298f..d1cf74a6a2ae0 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "disabled")] + mod file; use self::file::File; diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs index 8af01749212cd..e58716989ec63 100644 --- a/src/sinks/kafka.rs +++ b/src/sinks/kafka.rs @@ -11,9 +11,7 @@ use crate::{ }; use futures::compat::Compat; use futures01::{ - future::{self, poll_fn, IntoFuture}, - stream::FuturesUnordered, - Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, + future, stream::FuturesUnordered, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, }; use rdkafka::{ consumer::{BaseConsumer, Consumer}, @@ -214,18 +212,14 @@ impl Sink for KafkaSink { fn healthcheck(config: KafkaSinkConfig) -> super::Healthcheck { let consumer: BaseConsumer = config.to_rdkafka().unwrap().create().unwrap(); - let check = poll_fn(move || { - tokio_threadpool::blocking(|| { - consumer - .fetch_metadata(Some(&config.topic), Duration::from_secs(3)) - .map(|_| ()) - .map_err(|err| err.into()) - }) - }) - .map_err(|err| err.into()) - .and_then(|result| result.into_future()); + let check = tokio02::task::block_in_place(|| { + consumer + .fetch_metadata(Some(&config.topic), Duration::from_secs(3)) + .map(|_| ()) + .map_err(|err| err.into()) + }); - Box::new(check) + Box::new(future::result(check)) } fn encode_event( diff --git a/src/sinks/statsd.rs b/src/sinks/statsd.rs index 945fc82cb981a..0da310fb1cfdc 100644 --- a/src/sinks/statsd.rs +++ b/src/sinks/statsd.rs @@ -318,6 +318,8 @@ mod test { #[test] fn test_send_to_statsd() { + crate::test_util::trace_init(); + let config = StatsdSinkConfig { namespace: "vector".into(), address: default_address(),