diff --git a/src/kubernetes/debounce.rs b/src/kubernetes/debounce.rs new file mode 100644 index 00000000000000..f9d8d7b52e7dc7 --- /dev/null +++ b/src/kubernetes/debounce.rs @@ -0,0 +1,245 @@ +//! Arbitrary signal debouncing logic. +//! +//! Call [`Debounce::signal`] multiple times within the debounce time window, +//! and the [`Debounce::debounced`] will be resolved only once. + +use std::time::Duration; +use tokio::time::{delay_until, Instant}; + +/// Provides an arbitrary signal debouncing. +pub struct Debounce { + sequence_start: Option, + time: Duration, +} + +impl Debounce { + /// Create a new [`Debounce`]. + pub fn new(time: Duration) -> Self { + Self { + sequence_start: None, + time, + } + } + + /// Trigger a signal to debounce. + pub fn signal(&mut self) { + if self.sequence_start.is_some() { + return; + } + + self.sequence_start = Some(Instant::now() + self.time); + } + + /// Debounced signal. + /// + /// This function resolves after a debounce timeout since the first signal + /// in sequence expires. + /// If there hasn't been a signal, or the debounce timeout isn't yet + /// exausted - the future will be in a pending state. + pub async fn debounced(&mut self) { + let sequence_start = match self.sequence_start { + Some(val) => val, + None => futures::future::pending().await, + }; + + delay_until(sequence_start).await; + self.sequence_start = None; + } + + /// This function exposes the state of the debounce logic. + /// If this returns `false`, you shouldn't `poll` on [`debounced`], as it's + /// pending indefinitely. + pub fn is_debouncing(&self) -> bool { + self.sequence_start.is_some() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{pin_mut, poll}; + + const TEST_DELAY_FRACTION: Duration = Duration::from_secs(60 * 60); // one hour + const TEST_DELAY: Duration = Duration::from_secs(24 * 60 * 60); // one day + + #[tokio::test] + async fn one_signal() { + tokio::time::pause(); + + let mut debounce = Debounce::new(TEST_DELAY); + assert!(debounce.sequence_start.is_none()); + + // Issue a signal. + debounce.signal(); + assert!(debounce.sequence_start.is_some()); + + { + // Request debounced signal. + let fut = debounce.debounced(); + pin_mut!(fut); + + // Shouldn't be available immediately. + assert!(poll!(&mut fut).is_pending()); + + // Simulate that we waited for some time, but no long enouh for the + // debounce to happen. + tokio::time::advance(TEST_DELAY_FRACTION).await; + + // Still shouldn't be available. + assert!(poll!(&mut fut).is_pending()); + + // Then wait long enough for debounce timeout to pass. + tokio::time::advance(TEST_DELAY * 2).await; + + // Should finally be available. + assert!(poll!(&mut fut).is_ready()); + } + + assert!(debounce.sequence_start.is_none()); + + tokio::time::resume(); + } + + #[tokio::test] + async fn late_request() { + tokio::time::pause(); + + let mut debounce = Debounce::new(TEST_DELAY); + assert!(debounce.sequence_start.is_none()); + + // Issue a signal. + debounce.signal(); + assert!(debounce.sequence_start.is_some()); + + // Simulate that we waited long enough. + tokio::time::advance(TEST_DELAY * 2).await; + assert!(debounce.sequence_start.is_some()); + + { + // Request a debounced signal. + let fut = debounce.debounced(); + pin_mut!(fut); + + // Should be available immediately. + assert!(poll!(&mut fut).is_ready()); + } + + assert!(debounce.sequence_start.is_none()); + + tokio::time::resume(); + } + + #[tokio::test] + async fn multiple_signals() { + tokio::time::pause(); + + let mut debounce = Debounce::new(TEST_DELAY); + assert!(debounce.sequence_start.is_none()); + + debounce.signal(); + + let first_signal_timestamp = debounce.sequence_start; + assert!(first_signal_timestamp.is_some()); + + debounce.signal(); + assert_eq!(debounce.sequence_start, first_signal_timestamp); + + tokio::time::advance(TEST_DELAY_FRACTION).await; + + debounce.signal(); + assert_eq!(debounce.sequence_start, first_signal_timestamp); + + { + let fut = debounce.debounced(); + pin_mut!(fut); + + assert!(poll!(&mut fut).is_pending()); + + tokio::time::advance(TEST_DELAY_FRACTION).await; + + assert!(poll!(&mut fut).is_pending()); + + tokio::time::advance(TEST_DELAY * 2).await; + + assert!(poll!(&mut fut).is_ready()); + } + + assert!(debounce.sequence_start.is_none()); + + tokio::time::resume(); + } + + #[tokio::test] + async fn sequence() { + tokio::time::pause(); + + let mut debounce = Debounce::new(TEST_DELAY); + assert!(debounce.sequence_start.is_none()); + + debounce.signal(); + + let first_signal_timestamp = debounce.sequence_start; + assert!(first_signal_timestamp.is_some()); + + debounce.signal(); + assert_eq!(debounce.sequence_start, first_signal_timestamp); + + tokio::time::advance(TEST_DELAY_FRACTION).await; + + debounce.signal(); + assert_eq!(debounce.sequence_start, first_signal_timestamp); + + { + let fut = debounce.debounced(); + pin_mut!(fut); + + assert!(poll!(&mut fut).is_pending()); + + tokio::time::advance(TEST_DELAY * 2).await; + + assert!(poll!(&mut fut).is_ready()); + } + + assert!(debounce.sequence_start.is_none()); + + debounce.signal(); + + let second_signal_timestamp = debounce.sequence_start; + assert!(second_signal_timestamp.is_some()); + assert_ne!(second_signal_timestamp, first_signal_timestamp); + + { + let fut = debounce.debounced(); + pin_mut!(fut); + + assert!(poll!(&mut fut).is_pending()); + + tokio::time::advance(TEST_DELAY * 2).await; + + assert!(poll!(&mut fut).is_ready()); + } + + assert!(debounce.sequence_start.is_none()); + + tokio::time::resume(); + } + + #[tokio::test] + async fn is_debouncing() { + tokio::time::pause(); + + let mut debounce = Debounce::new(TEST_DELAY); + assert_eq!(debounce.is_debouncing(), false); + + debounce.signal(); + assert_eq!(debounce.is_debouncing(), true); + + tokio::time::advance(TEST_DELAY * 2).await; + assert_eq!(debounce.is_debouncing(), true); + + debounce.debounced().await; + assert_eq!(debounce.is_debouncing(), false); + + tokio::time::resume(); + } +} diff --git a/src/kubernetes/mod.rs b/src/kubernetes/mod.rs index ac84b15e05e76a..f9df1a26cf21cb 100644 --- a/src/kubernetes/mod.rs +++ b/src/kubernetes/mod.rs @@ -22,6 +22,7 @@ pub mod api_watcher; pub mod client; +pub mod debounce; pub mod hash_value; pub mod instrumenting_watcher; pub mod mock_watcher; @@ -34,6 +35,7 @@ pub mod watch_request_builder; pub mod watcher; // Reexports for more elegant public API. +pub use debounce::Debounce; pub use hash_value::HashValue; pub use multi_response_decoder::MultiResponseDecoder; pub use reflector::Reflector; diff --git a/src/kubernetes/reflector.rs b/src/kubernetes/reflector.rs index 89e92f6fe5e9c1..9ff25c26d57493 100644 --- a/src/kubernetes/reflector.rs +++ b/src/kubernetes/reflector.rs @@ -896,7 +896,7 @@ mod tests { // Prepare state. let (state_reader, state_writer) = evmap10::new(); - let state_writer = state::evmap::Writer::new(state_writer); + let state_writer = state::evmap::Writer::new(state_writer, None); // test without debounce to avouid complexity let state_writer = state::instrumenting::Writer::new(state_writer); let resulting_state_reader = state_reader.clone(); diff --git a/src/kubernetes/state/evmap.rs b/src/kubernetes/state/evmap.rs index 5480e9ba75b1f9..4598c692b4303c 100644 --- a/src/kubernetes/state/evmap.rs +++ b/src/kubernetes/state/evmap.rs @@ -1,10 +1,11 @@ //! A state implementation backed by [`evmap10`]. -use crate::kubernetes::hash_value::HashValue; +use crate::kubernetes::{debounce::Debounce, hash_value::HashValue}; use async_trait::async_trait; use evmap10::WriteHandle; use futures::future::BoxFuture; use k8s_openapi::{apimachinery::pkg::apis::meta::v1::ObjectMeta, Metadata}; +use std::time::Duration; /// A [`WriteHandle`] wrapper that implements [`super::Write`]. /// For use as a state writer implementation for @@ -14,6 +15,7 @@ where T: Metadata + Send, { inner: WriteHandle>, + debounced_flush: Option, } impl Writer @@ -22,10 +24,36 @@ where { /// Take a [`WriteHandle`], initialize it and return it wrapped with /// [`Self`]. - pub fn new(mut inner: WriteHandle>) -> Self { + pub fn new( + mut inner: WriteHandle>, + flush_debounce_timeout: Option, + ) -> Self { + // Prepare inner. inner.purge(); inner.refresh(); - Self { inner } + + // Prepare flush debounce. + let debounced_flush = flush_debounce_timeout.map(Debounce::new); + + Self { + inner, + debounced_flush, + } + } + + /// Debounced `flush`. + /// When a number of flush events arrive un a row, we buffer them such that + /// only the last one in the chain is propagated. + /// This is intended to improve the state behaivor at resync - by delaying + /// the `flush` proparagion, we maximize the time `evmap` remains populated, + /// ideally allowing a single transition from non-populated to populated + /// state. + fn debounced_flush(&mut self) { + if let Some(ref mut debounced_flush) = self.debounced_flush { + debounced_flush.signal(); + } else { + self.inner.flush(); + } } } @@ -36,35 +64,31 @@ where { type Item = T; - // TODO: debounce `flush` so that when a bunch of events arrive in a row - // within a certain small time window we commit all of them at once. - // This will improve the state behaivor at resync. - async fn add(&mut self, item: Self::Item) { if let Some((key, value)) = kv(item) { self.inner.insert(key, value); - self.inner.flush(); + self.debounced_flush(); } } async fn update(&mut self, item: Self::Item) { if let Some((key, value)) = kv(item) { self.inner.update(key, value); - self.inner.flush(); + self.debounced_flush(); } } async fn delete(&mut self, item: Self::Item) { if let Some((key, _value)) = kv(item) { self.inner.empty(key); - self.inner.flush(); + self.debounced_flush(); } } async fn resync(&mut self) { // By omiting the flush here, we cache the results from the // previous run until flush is issued when the new events - // begin arriving, reducing the time durig which the state + // begin arriving, reducing the time during which the state // has no data. self.inner.purge(); } @@ -76,11 +100,18 @@ where T: Metadata + Send, { fn maintenance_request(&mut self) -> Option> { + if let Some(ref mut debounced_flush) = self.debounced_flush { + if debounced_flush.is_debouncing() { + return Some(Box::pin(debounced_flush.debounced())); + } + } None } async fn perform_maintenance(&mut self) { - // noop + if self.debounced_flush.is_some() { + self.inner.flush(); + } } } diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index b0fd02c458f5d1..c7cac0f0a2b44c 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -160,7 +160,8 @@ impl Source { let watcher = k8s::api_watcher::ApiWatcher::new(client, Pod::watch_pod_for_all_namespaces); let watcher = k8s::instrumenting_watcher::InstrumentingWatcher::new(watcher); let (state_reader, state_writer) = evmap::new(); - let state_writer = k8s::state::evmap::Writer::new(state_writer); + let state_writer = + k8s::state::evmap::Writer::new(state_writer, Some(Duration::from_millis(10))); let state_writer = k8s::state::instrumenting::Writer::new(state_writer); let state_writer = k8s::state::delayed_delete::Writer::new(state_writer, Duration::from_secs(60));