From 195c551eb427b59c295d6e572147d8919ce2cd25 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 2 Nov 2023 13:37:46 -0700 Subject: [PATCH] Adding some comments --- src/helpers/gateway/stall_detection.rs | 120 +++++++++++++------------ 1 file changed, 63 insertions(+), 57 deletions(-) diff --git a/src/helpers/gateway/stall_detection.rs b/src/helpers/gateway/stall_detection.rs index 3616d2791..dd4af2309 100644 --- a/src/helpers/gateway/stall_detection.rs +++ b/src/helpers/gateway/stall_detection.rs @@ -22,43 +22,54 @@ use crate::{ }; +/// Trait for structs that can report their current state. +pub trait ObserveState { + type State: Debug; + fn get_state(&self) -> Option; +} + +/// This object does not own the sequence number, it must be stored outside and dropped when +/// observing entity goes out of scope. If that happens, any attempt to increment it through this +/// instance will result in a panic. +/// +/// Observing and incrementing sequence numbers do not introduce happens-before relationship. pub struct Observed { - version: Weak, + /// Each time a state change occurs inside the observable object `T`, its sequence number is + /// incremented by 1. It is up to the caller to decide what is a state change. + /// + /// The sequence number is stored as a weak reference, so it can be dropped when the observed + /// object is dropped. + /// + /// External observers watching this object will declare it stalled if it's sequence number + /// hasn't been incremented for long enough time. If `T` implements `ObserveState`, then the + /// state of `T` is also reported. + sn: Weak, inner: T, } -impl Observed { - pub fn get_version(&self) -> &Weak { - &self.version - } - - pub fn wrap(version: &Weak, inner: T) -> Self { +impl Observed { + fn wrap(sn: Weak, inner: T) -> Self { Self { - version: Weak::clone(version), + sn, inner, } } - pub fn inner(&self) -> &T { - &self.inner + fn current_sn(&self) -> &Weak { + &self.sn } - pub fn inc_sn(&self) { - self.version - .upgrade() - .map(|v| v.fetch_add(1, Ordering::Relaxed)); + /// ## Panics + /// This will panic if the sequence number is dropped. + fn inc_sn(&self) { + let sn = self.sn.upgrade().unwrap(); + sn.fetch_add(1, Ordering::Relaxed); } - pub fn get_sn(&self) -> Option { - self.version.upgrade().map(|v| v.load(Ordering::Relaxed)) + fn inner(&self) -> &T { + &self.inner } - pub fn map R, R>(&self, f: F) -> Observed { - Observed { - version: Weak::clone(&self.version), - inner: f(self), - } - } } impl Observed { @@ -67,33 +78,6 @@ impl Observed { } } -pub trait ObserveState { - type State: Debug; - fn get_state(&self) -> Option; -} - -impl ObserveState for Vec> - where - U: Copy + Display + Eq + PartialOrd + Ord + Sub + From, -{ - type State = Vec; - fn get_state(&self) -> Option { - let r = self - .iter() - .map( - |range| match (*range.end() - *range.start()).cmp(&U::from(1)) { - std::cmp::Ordering::Less => format!("{}", range.start()), - std::cmp::Ordering::Equal => format!("[{}, {}]", range.start(), range.end()), - std::cmp::Ordering::Greater => format!("[{},...,{}]", range.start(), range.end()), - }, - ) - .collect::>(); - - (!r.is_empty()).then_some(r) - } -} - -#[cfg(not(feature = "shuttle"))] pub fn spawn( within: tracing::Span, check_interval: std::time::Duration, @@ -104,7 +88,7 @@ pub fn spawn( let mut last_observed = 0; loop { ::tokio::time::sleep(check_interval).await; - let now = observed.get_sn(); + let now = observed.current_sn().upgrade().map(|v| v.load(Ordering::Relaxed)); if let Some(now) = now { if now == last_observed { if let Some(state) = observed.get_state() { @@ -121,7 +105,9 @@ pub fn spawn( pub struct InstrumentedGateway { gateway: Gateway, - _version: Arc, + // Gateway owns the sequence number associated with it. When it goes out of scope, sn is destroyed + // and external observers can see that they no longer need to watch it. + _sn: Arc, } impl Observed { @@ -142,12 +128,11 @@ impl Observed { transport: TransportImpl, ) -> Self { let version = Arc::new(AtomicUsize::default()); - // todo: this sucks, we shouldn't do an extra clone let r = Self::wrap( - &Arc::downgrade(&version), + Arc::downgrade(&version), InstrumentedGateway { gateway: Gateway::new(query_id, config, roles, transport), - _version: version, + _sn: version, }, ); @@ -168,7 +153,7 @@ impl Observed { total_records: TotalRecords, ) -> SendingEnd { Observed::wrap( - self.get_version(), + Weak::clone(self.current_sn()), self.inner().gateway.get_sender(channel_id, total_records), ) } @@ -176,7 +161,7 @@ impl Observed { #[must_use] pub fn get_receiver(&self, channel_id: &ChannelId) -> ReceivingEnd { Observed::wrap( - self.get_version(), + Weak::clone(self.current_sn()), self.inner().gateway.get_receiver(channel_id), ) } @@ -184,7 +169,7 @@ impl Observed { pub fn to_observed(&self) -> Observed> { // todo: inner.inner Observed::wrap( - self.get_version(), + Weak::clone(self.current_sn()), Arc::downgrade(&self.inner().gateway.inner), ) } @@ -360,3 +345,24 @@ mod send { } } } + +impl ObserveState for Vec> + where + U: Copy + Display + Eq + PartialOrd + Ord + Sub + From, +{ + type State = Vec; + fn get_state(&self) -> Option { + let r = self + .iter() + .map( + |range| match (*range.end() - *range.start()).cmp(&U::from(1)) { + std::cmp::Ordering::Less => format!("{}", range.start()), + std::cmp::Ordering::Equal => format!("[{}, {}]", range.start(), range.end()), + std::cmp::Ordering::Greater => format!("[{},...,{}]", range.start(), range.end()), + }, + ) + .collect::>(); + + (!r.is_empty()).then_some(r) + } +}