Skip to content

Commit

Permalink
Add flush debouncing to the evmap state
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII committed Jun 10, 2020
1 parent 137ddfb commit ccf43ae
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 14 deletions.
245 changes: 245 additions & 0 deletions src/kubernetes/debounce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
//! Arbitrary signal debouncing logic.
//!
//! Call [`Debounce::signal`] multiple times within the debounce time window,
//! and the [`Debounce::debounced`] will be resolved only once.
use std::time::Duration;
use tokio::time::{delay_until, Instant};

/// Provides an arbitrary signal debouncing.
pub struct Debounce {
sequence_start: Option<Instant>,
time: Duration,
}

impl Debounce {
/// Create a new [`Debounce`].
pub fn new(time: Duration) -> Self {
Self {
sequence_start: None,
time,
}
}

/// Trigger a signal to debounce.
pub fn signal(&mut self) {
if self.sequence_start.is_some() {
return;
}

self.sequence_start = Some(Instant::now() + self.time);
}

/// Debounced signal.
///
/// This function resolves after a debounce timeout since the first signal
/// in sequence expires.
/// If there hasn't been a signal, or the debounce timeout isn't yet
/// exausted - the future will be in a pending state.
pub async fn debounced(&mut self) {
let sequence_start = match self.sequence_start {
Some(val) => val,
None => futures::future::pending().await,
};

delay_until(sequence_start).await;
self.sequence_start = None;
}

/// This function exposes the state of the debounce logic.
/// If this returns `false`, you shouldn't `poll` on [`debounced`], as it's
/// pending indefinitely.
pub fn is_debouncing(&self) -> bool {
self.sequence_start.is_some()
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::{pin_mut, poll};

const TEST_DELAY_FRACTION: Duration = Duration::from_secs(60 * 60); // one hour
const TEST_DELAY: Duration = Duration::from_secs(24 * 60 * 60); // one day

#[tokio::test]
async fn one_signal() {
tokio::time::pause();

let mut debounce = Debounce::new(TEST_DELAY);
assert!(debounce.sequence_start.is_none());

// Issue a signal.
debounce.signal();
assert!(debounce.sequence_start.is_some());

{
// Request debounced signal.
let fut = debounce.debounced();
pin_mut!(fut);

// Shouldn't be available immediately.
assert!(poll!(&mut fut).is_pending());

// Simulate that we waited for some time, but no long enouh for the
// debounce to happen.
tokio::time::advance(TEST_DELAY_FRACTION).await;

// Still shouldn't be available.
assert!(poll!(&mut fut).is_pending());

// Then wait long enough for debounce timeout to pass.
tokio::time::advance(TEST_DELAY * 2).await;

// Should finally be available.
assert!(poll!(&mut fut).is_ready());
}

assert!(debounce.sequence_start.is_none());

tokio::time::resume();
}

#[tokio::test]
async fn late_request() {
tokio::time::pause();

let mut debounce = Debounce::new(TEST_DELAY);
assert!(debounce.sequence_start.is_none());

// Issue a signal.
debounce.signal();
assert!(debounce.sequence_start.is_some());

// Simulate that we waited long enough.
tokio::time::advance(TEST_DELAY * 2).await;
assert!(debounce.sequence_start.is_some());

{
// Request a debounced signal.
let fut = debounce.debounced();
pin_mut!(fut);

// Should be available immediately.
assert!(poll!(&mut fut).is_ready());
}

assert!(debounce.sequence_start.is_none());

tokio::time::resume();
}

#[tokio::test]
async fn multiple_signals() {
tokio::time::pause();

let mut debounce = Debounce::new(TEST_DELAY);
assert!(debounce.sequence_start.is_none());

debounce.signal();

let first_signal_timestamp = debounce.sequence_start;
assert!(first_signal_timestamp.is_some());

debounce.signal();
assert_eq!(debounce.sequence_start, first_signal_timestamp);

tokio::time::advance(TEST_DELAY_FRACTION).await;

debounce.signal();
assert_eq!(debounce.sequence_start, first_signal_timestamp);

{
let fut = debounce.debounced();
pin_mut!(fut);

assert!(poll!(&mut fut).is_pending());

tokio::time::advance(TEST_DELAY_FRACTION).await;

assert!(poll!(&mut fut).is_pending());

tokio::time::advance(TEST_DELAY * 2).await;

assert!(poll!(&mut fut).is_ready());
}

assert!(debounce.sequence_start.is_none());

tokio::time::resume();
}

#[tokio::test]
async fn sequence() {
tokio::time::pause();

let mut debounce = Debounce::new(TEST_DELAY);
assert!(debounce.sequence_start.is_none());

debounce.signal();

let first_signal_timestamp = debounce.sequence_start;
assert!(first_signal_timestamp.is_some());

debounce.signal();
assert_eq!(debounce.sequence_start, first_signal_timestamp);

tokio::time::advance(TEST_DELAY_FRACTION).await;

debounce.signal();
assert_eq!(debounce.sequence_start, first_signal_timestamp);

{
let fut = debounce.debounced();
pin_mut!(fut);

assert!(poll!(&mut fut).is_pending());

tokio::time::advance(TEST_DELAY * 2).await;

assert!(poll!(&mut fut).is_ready());
}

assert!(debounce.sequence_start.is_none());

debounce.signal();

let second_signal_timestamp = debounce.sequence_start;
assert!(second_signal_timestamp.is_some());
assert_ne!(second_signal_timestamp, first_signal_timestamp);

{
let fut = debounce.debounced();
pin_mut!(fut);

assert!(poll!(&mut fut).is_pending());

tokio::time::advance(TEST_DELAY * 2).await;

assert!(poll!(&mut fut).is_ready());
}

assert!(debounce.sequence_start.is_none());

tokio::time::resume();
}

#[tokio::test]
async fn is_debouncing() {
tokio::time::pause();

let mut debounce = Debounce::new(TEST_DELAY);
assert_eq!(debounce.is_debouncing(), false);

debounce.signal();
assert_eq!(debounce.is_debouncing(), true);

tokio::time::advance(TEST_DELAY * 2).await;
assert_eq!(debounce.is_debouncing(), true);

debounce.debounced().await;
assert_eq!(debounce.is_debouncing(), false);

tokio::time::resume();
}
}
2 changes: 2 additions & 0 deletions src/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

pub mod api_watcher;
pub mod client;
pub mod debounce;
pub mod hash_value;
pub mod instrumenting_watcher;
pub mod mock_watcher;
Expand All @@ -34,6 +35,7 @@ pub mod watch_request_builder;
pub mod watcher;

// Reexports for more elegant public API.
pub use debounce::Debounce;
pub use hash_value::HashValue;
pub use multi_response_decoder::MultiResponseDecoder;
pub use reflector::Reflector;
Expand Down
2 changes: 1 addition & 1 deletion src/kubernetes/reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ mod tests {

// Prepare state.
let (state_reader, state_writer) = evmap10::new();
let state_writer = state::evmap::Writer::new(state_writer);
let state_writer = state::evmap::Writer::new(state_writer, None); // test without debounce to avouid complexity
let state_writer = state::instrumenting::Writer::new(state_writer);
let resulting_state_reader = state_reader.clone();

Expand Down
55 changes: 43 additions & 12 deletions src/kubernetes/state/evmap.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! A state implementation backed by [`evmap10`].
use crate::kubernetes::hash_value::HashValue;
use crate::kubernetes::{debounce::Debounce, hash_value::HashValue};
use async_trait::async_trait;
use evmap10::WriteHandle;
use futures::future::BoxFuture;
use k8s_openapi::{apimachinery::pkg::apis::meta::v1::ObjectMeta, Metadata};
use std::time::Duration;

/// A [`WriteHandle`] wrapper that implements [`super::Write`].
/// For use as a state writer implementation for
Expand All @@ -14,6 +15,7 @@ where
T: Metadata<Ty = ObjectMeta> + Send,
{
inner: WriteHandle<String, Value<T>>,
debounced_flush: Option<Debounce>,
}

impl<T> Writer<T>
Expand All @@ -22,10 +24,36 @@ where
{
/// Take a [`WriteHandle`], initialize it and return it wrapped with
/// [`Self`].
pub fn new(mut inner: WriteHandle<String, Value<T>>) -> Self {
pub fn new(
mut inner: WriteHandle<String, Value<T>>,
flush_debounce_timeout: Option<Duration>,
) -> Self {
// Prepare inner.
inner.purge();
inner.refresh();
Self { inner }

// Prepare flush debounce.
let debounced_flush = flush_debounce_timeout.map(Debounce::new);

Self {
inner,
debounced_flush,
}
}

/// Debounced `flush`.
/// When a number of flush events arrive un a row, we buffer them such that
/// only the last one in the chain is propagated.
/// This is intended to improve the state behaivor at resync - by delaying
/// the `flush` proparagion, we maximize the time `evmap` remains populated,
/// ideally allowing a single transition from non-populated to populated
/// state.
fn debounced_flush(&mut self) {
if let Some(ref mut debounced_flush) = self.debounced_flush {
debounced_flush.signal();
} else {
self.inner.flush();
}
}
}

Expand All @@ -36,35 +64,31 @@ where
{
type Item = T;

// TODO: debounce `flush` so that when a bunch of events arrive in a row
// within a certain small time window we commit all of them at once.
// This will improve the state behaivor at resync.

async fn add(&mut self, item: Self::Item) {
if let Some((key, value)) = kv(item) {
self.inner.insert(key, value);
self.inner.flush();
self.debounced_flush();
}
}

async fn update(&mut self, item: Self::Item) {
if let Some((key, value)) = kv(item) {
self.inner.update(key, value);
self.inner.flush();
self.debounced_flush();
}
}

async fn delete(&mut self, item: Self::Item) {
if let Some((key, _value)) = kv(item) {
self.inner.empty(key);
self.inner.flush();
self.debounced_flush();
}
}

async fn resync(&mut self) {
// By omiting the flush here, we cache the results from the
// previous run until flush is issued when the new events
// begin arriving, reducing the time durig which the state
// begin arriving, reducing the time during which the state
// has no data.
self.inner.purge();
}
Expand All @@ -76,11 +100,18 @@ where
T: Metadata<Ty = ObjectMeta> + Send,
{
fn maintenance_request(&mut self) -> Option<BoxFuture<'_, ()>> {
if let Some(ref mut debounced_flush) = self.debounced_flush {
if debounced_flush.is_debouncing() {
return Some(Box::pin(debounced_flush.debounced()));
}
}
None
}

async fn perform_maintenance(&mut self) {
// noop
if self.debounced_flush.is_some() {
self.inner.flush();
}
}
}

Expand Down
Loading

0 comments on commit ccf43ae

Please sign in to comment.