Skip to content

Commit

Permalink
Consolidate everything in stall_detection module
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Nov 2, 2023
1 parent 3f6ac83 commit 4178511
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 129 deletions.
8 changes: 2 additions & 6 deletions src/helpers/gateway/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
#[cfg(feature = "stall-detection")]
mod observable;
#[cfg(feature = "stall-detection")]
pub mod observer;
mod receive;
mod send;
#[cfg(feature = "stall-detection")]
mod stall_detection;
pub(super) mod stall_detection;
mod transport;

use std::num::NonZeroUsize;
Expand All @@ -15,7 +11,7 @@ pub(super) use send::SendingEnd;
#[cfg(all(test, feature = "shuttle"))]
use shuttle::future as tokio;
#[cfg(feature = "stall-detection")]
pub(super) use {observable::Observed, stall_detection::InstrumentedGateway};
pub(super) use {stall_detection::InstrumentedGateway};

use crate::{
helpers::{
Expand Down
83 changes: 0 additions & 83 deletions src/helpers/gateway/observable.rs

This file was deleted.

31 changes: 0 additions & 31 deletions src/helpers/gateway/observer.rs

This file was deleted.

112 changes: 106 additions & 6 deletions src/helpers/gateway/stall_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use std::{
fmt::{Debug, Formatter},
ops::RangeInclusive,
};
use std::fmt::Display;
use std::ops::Sub;
use std::sync::atomic::Ordering;

use delegate::delegate;

use super::{
observable::{ObserveState, Observed},
Gateway, GatewayConfig, State,
};
use crate::{
Expand All @@ -16,8 +18,107 @@ use crate::{
},
protocol::QueryId,
sync::{atomic::AtomicUsize, Arc, Weak},
task::JoinHandle,
};


pub struct Observed<T> {
version: Weak<AtomicUsize>,
inner: T,
}

impl<T> Observed<T> {
pub fn get_version(&self) -> &Weak<AtomicUsize> {
&self.version
}

pub fn wrap(version: &Weak<AtomicUsize>, inner: T) -> Self {
Self {
version: Weak::clone(version),
inner,
}
}

pub fn inner(&self) -> &T {
&self.inner
}

pub fn inc_sn(&self) {
self.version
.upgrade()
.map(|v| v.fetch_add(1, Ordering::Relaxed));
}

pub fn get_sn(&self) -> Option<usize> {
self.version.upgrade().map(|v| v.load(Ordering::Relaxed))
}

pub fn map<F: Fn(&Self) -> R, R>(&self, f: F) -> Observed<R> {
Observed {
version: Weak::clone(&self.version),
inner: f(self),
}
}
}

impl<T: ObserveState> Observed<T> {
pub fn get_state(&self) -> Option<T::State> {
self.inner().get_state()
}
}

pub trait ObserveState {
type State: Debug;
fn get_state(&self) -> Option<Self::State>;
}

impl<U> ObserveState for Vec<RangeInclusive<U>>
where
U: Copy + Display + Eq + PartialOrd + Ord + Sub<Output = U> + From<u8>,
{
type State = Vec<String>;
fn get_state(&self) -> Option<Self::State> {
let r = self
.iter()
.map(
|range| match (*range.end() - *range.start()).cmp(&U::from(1)) {
std::cmp::Ordering::Less => format!("{}", range.start()),
std::cmp::Ordering::Equal => format!("[{}, {}]", range.start(), range.end()),
std::cmp::Ordering::Greater => format!("[{},...,{}]", range.start(), range.end()),
},
)
.collect::<Vec<_>>();

(!r.is_empty()).then_some(r)
}
}

#[cfg(not(feature = "shuttle"))]
pub fn spawn<T: ObserveState + Send + Sync + 'static>(
within: tracing::Span,
check_interval: std::time::Duration,
observed: Observed<T>,
) -> JoinHandle<()> {
use tracing::Instrument;
tokio::spawn(async move {
let mut last_observed = 0;
loop {
::tokio::time::sleep(check_interval).await;
let now = observed.get_sn();
if let Some(now) = now {
if now == last_observed {
if let Some(state) = observed.get_state() {
tracing::warn!(sn = now, state = ?state, "Helper is stalled after {check_interval:?}");
}
}
last_observed = now;
} else {
break;
}
}
}.instrument(within))
}

pub struct InstrumentedGateway {
gateway: Gateway,
_version: Arc<AtomicUsize>,
Expand Down Expand Up @@ -51,7 +152,7 @@ impl Observed<InstrumentedGateway> {
);

// spawn observer
crate::helpers::observer::spawn(
spawn(
tracing::info_span!("observer", role=?r.role()),
config.progress_check_interval,
r.to_observed(),
Expand Down Expand Up @@ -145,12 +246,13 @@ mod receive {
helpers::{
error::Error,
gateway::{
observable::ObserveState, receive::GatewayReceivers, Observed, ReceivingEnd,
receive::GatewayReceivers, ReceivingEnd,
},
ChannelId, Message,
},
protocol::RecordId,
};
use super::*;

impl<M: Message> Observed<ReceivingEnd<M>> {
delegate::delegate! {
Expand Down Expand Up @@ -201,14 +303,12 @@ mod send {
};

use crate::helpers::{gateway::{
observable::ObserveState,
send::{GatewaySender, GatewaySenders},
}, ChannelId, Message, TotalRecords};
use crate::helpers::gateway::Observed;
use crate::protocol::RecordId;
use crate::helpers::error::Error;
use super::*;

#[cfg(feature = "stall-detection")]
impl<M: Message> Observed<crate::helpers::gateway::send::SendingEnd<M>> {
delegate::delegate! {
to { self.inc_sn(); self.inner() } {
Expand Down
4 changes: 1 addition & 3 deletions src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ use std::ops::{Index, IndexMut};
#[cfg(test)]
pub use buffers::OrderingSender;
pub use error::{Error, Result};
#[cfg(feature = "stall-detection")]
pub use gateway::observer;

#[cfg(feature = "stall-detection")]
mod gateway_stuff {
use crate::helpers::{
gateway,
gateway::{InstrumentedGateway, Observed},
gateway::{InstrumentedGateway, stall_detection::Observed},
};

pub type Gateway = Observed<InstrumentedGateway>;
Expand Down

0 comments on commit 4178511

Please sign in to comment.