Skip to content

Commit

Permalink
Upgrade tokio-threadpool with tokio02::task::block_in_place
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII committed Feb 26, 2020
1 parent dd5edf0 commit 0443414
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 21 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ tracing-limit = { path = "lib/tracing-limit" }
futures01 = { package = "futures", version = "0.1.25" }
futures = { version = "0.3", default-features = false, features = ["compat"] }
tokio = { version = "0.1.22", features = ["io", "uds", "tcp", "rt-full", "experimental-tracing"], default-features = false }
tokio02 = { package = "tokio", version = "0.2" }
tokio02 = { package = "tokio", version = "0.2", features = ["blocking"] }
tokio-retry = "0.2.0"
tokio-signal = "0.2.7"
tokio-threadpool = "0.1.16"
tokio-tls = "0.2.1"
tokio-compat = { version = "0.1", features = ["rt-full"] }

Expand Down
7 changes: 3 additions & 4 deletions src/buffers/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,13 @@ impl Stream for Reader {

// This will usually complete instantly, but in the case of a large queue (or a fresh launch of
// the app), this will have to go to disk.
let next = tokio_threadpool::blocking(|| {
let next = tokio02::task::block_in_place(|| {
self.db
.get(ReadOptions::new(), Key(self.read_offset))
.unwrap()
})
.unwrap();
});

if let Async::Ready(Some(value)) = next {
if let Some(value) = next {
self.unacked_sizes.push_back(value.len());
self.read_offset += 1;

Expand Down
22 changes: 8 additions & 14 deletions src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use futures01::{
future::{self, poll_fn, IntoFuture},
stream::FuturesUnordered,
Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
future, stream::FuturesUnordered, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
};
use rdkafka::{
consumer::{BaseConsumer, Consumer},
Expand Down Expand Up @@ -201,18 +199,14 @@ impl Sink for KafkaSink {
fn healthcheck(config: KafkaSinkConfig) -> super::Healthcheck {
let consumer: BaseConsumer = config.to_rdkafka().unwrap().create().unwrap();

let check = poll_fn(move || {
tokio_threadpool::blocking(|| {
consumer
.fetch_metadata(Some(&config.topic), Duration::from_secs(3))
.map(|_| ())
.map_err(|err| err.into())
})
})
.map_err(|err| err.into())
.and_then(|result| result.into_future());
let check = tokio02::task::block_in_place(|| {
consumer
.fetch_metadata(Some(&config.topic), Duration::from_secs(3))
.map(|_| ())
.map_err(|err| err.into())
});

Box::new(check)
Box::new(future::result(check))
}

fn encode_event(
Expand Down

0 comments on commit 0443414

Please sign in to comment.