Skip to content

Commit

Permalink
Adding some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Nov 2, 2023
1 parent 4178511 commit 195c551
Showing 1 changed file with 63 additions and 57 deletions.
120 changes: 63 additions & 57 deletions src/helpers/gateway/stall_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,54 @@ use crate::{
};


/// Trait for structs that can report their current state.
pub trait ObserveState {
type State: Debug;
fn get_state(&self) -> Option<Self::State>;
}

/// This object does not own the sequence number, it must be stored outside and dropped when
/// observing entity goes out of scope. If that happens, any attempt to increment it through this
/// instance will result in a panic.
///
/// Observing and incrementing sequence numbers do not introduce happens-before relationship.
pub struct Observed<T> {
version: Weak<AtomicUsize>,
/// Each time a state change occurs inside the observable object `T`, its sequence number is
/// incremented by 1. It is up to the caller to decide what is a state change.
///
/// The sequence number is stored as a weak reference, so it can be dropped when the observed
/// object is dropped.
///
/// External observers watching this object will declare it stalled if it's sequence number
/// hasn't been incremented for long enough time. If `T` implements `ObserveState`, then the
/// state of `T` is also reported.
sn: Weak<AtomicUsize>,
inner: T,
}

impl<T> Observed<T> {
pub fn get_version(&self) -> &Weak<AtomicUsize> {
&self.version
}

pub fn wrap(version: &Weak<AtomicUsize>, inner: T) -> Self {
impl <T> Observed<T> {
fn wrap(sn: Weak<AtomicUsize>, inner: T) -> Self {
Self {
version: Weak::clone(version),
sn,
inner,
}
}

pub fn inner(&self) -> &T {
&self.inner
fn current_sn(&self) -> &Weak<AtomicUsize> {
&self.sn
}

pub fn inc_sn(&self) {
self.version
.upgrade()
.map(|v| v.fetch_add(1, Ordering::Relaxed));
/// ## Panics
/// This will panic if the sequence number is dropped.
fn inc_sn(&self) {
let sn = self.sn.upgrade().unwrap();
sn.fetch_add(1, Ordering::Relaxed);
}

pub fn get_sn(&self) -> Option<usize> {
self.version.upgrade().map(|v| v.load(Ordering::Relaxed))
fn inner(&self) -> &T {
&self.inner
}

pub fn map<F: Fn(&Self) -> R, R>(&self, f: F) -> Observed<R> {
Observed {
version: Weak::clone(&self.version),
inner: f(self),
}
}
}

impl<T: ObserveState> Observed<T> {
Expand All @@ -67,33 +78,6 @@ impl<T: ObserveState> Observed<T> {
}
}

pub trait ObserveState {
type State: Debug;
fn get_state(&self) -> Option<Self::State>;
}

impl<U> ObserveState for Vec<RangeInclusive<U>>
where
U: Copy + Display + Eq + PartialOrd + Ord + Sub<Output = U> + From<u8>,
{
type State = Vec<String>;
fn get_state(&self) -> Option<Self::State> {
let r = self
.iter()
.map(
|range| match (*range.end() - *range.start()).cmp(&U::from(1)) {
std::cmp::Ordering::Less => format!("{}", range.start()),
std::cmp::Ordering::Equal => format!("[{}, {}]", range.start(), range.end()),
std::cmp::Ordering::Greater => format!("[{},...,{}]", range.start(), range.end()),
},
)
.collect::<Vec<_>>();

(!r.is_empty()).then_some(r)
}
}

#[cfg(not(feature = "shuttle"))]
pub fn spawn<T: ObserveState + Send + Sync + 'static>(
within: tracing::Span,
check_interval: std::time::Duration,
Expand All @@ -104,7 +88,7 @@ pub fn spawn<T: ObserveState + Send + Sync + 'static>(
let mut last_observed = 0;
loop {
::tokio::time::sleep(check_interval).await;
let now = observed.get_sn();
let now = observed.current_sn().upgrade().map(|v| v.load(Ordering::Relaxed));
if let Some(now) = now {
if now == last_observed {
if let Some(state) = observed.get_state() {
Expand All @@ -121,7 +105,9 @@ pub fn spawn<T: ObserveState + Send + Sync + 'static>(

pub struct InstrumentedGateway {
gateway: Gateway,
_version: Arc<AtomicUsize>,
// Gateway owns the sequence number associated with it. When it goes out of scope, sn is destroyed
// and external observers can see that they no longer need to watch it.
_sn: Arc<AtomicUsize>,
}

impl Observed<InstrumentedGateway> {
Expand All @@ -142,12 +128,11 @@ impl Observed<InstrumentedGateway> {
transport: TransportImpl,
) -> Self {
let version = Arc::new(AtomicUsize::default());
// todo: this sucks, we shouldn't do an extra clone
let r = Self::wrap(
&Arc::downgrade(&version),
Arc::downgrade(&version),
InstrumentedGateway {
gateway: Gateway::new(query_id, config, roles, transport),
_version: version,
_sn: version,
},
);

Expand All @@ -168,23 +153,23 @@ impl Observed<InstrumentedGateway> {
total_records: TotalRecords,
) -> SendingEnd<M> {
Observed::wrap(
self.get_version(),
Weak::clone(self.current_sn()),
self.inner().gateway.get_sender(channel_id, total_records),
)
}

#[must_use]
pub fn get_receiver<M: Message>(&self, channel_id: &ChannelId) -> ReceivingEnd<M> {
Observed::wrap(
self.get_version(),
Weak::clone(self.current_sn()),
self.inner().gateway.get_receiver(channel_id),
)
}

pub fn to_observed(&self) -> Observed<Weak<State>> {
// todo: inner.inner
Observed::wrap(
self.get_version(),
Weak::clone(self.current_sn()),
Arc::downgrade(&self.inner().gateway.inner),
)
}
Expand Down Expand Up @@ -360,3 +345,24 @@ mod send {
}
}
}

impl<U> ObserveState for Vec<RangeInclusive<U>>
where
U: Copy + Display + Eq + PartialOrd + Ord + Sub<Output = U> + From<u8>,
{
type State = Vec<String>;
fn get_state(&self) -> Option<Self::State> {
let r = self
.iter()
.map(
|range| match (*range.end() - *range.start()).cmp(&U::from(1)) {
std::cmp::Ordering::Less => format!("{}", range.start()),
std::cmp::Ordering::Equal => format!("[{}, {}]", range.start(), range.end()),
std::cmp::Ordering::Greater => format!("[{},...,{}]", range.start(), range.end()),
},
)
.collect::<Vec<_>>();

(!r.is_empty()).then_some(r)
}
}

0 comments on commit 195c551

Please sign in to comment.