Skip to content

Commit

Permalink
Instrumented infra
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Oct 24, 2023
1 parent 3ef3e7e commit 0072cea
Show file tree
Hide file tree
Showing 15 changed files with 573 additions and 198 deletions.
2 changes: 2 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ disallowed-methods = [
{ path = "futures::future::join_all", reason = "We don't have a replacement for this method yet. Consider extending `SeqJoin` trait." },
{ path = "futures::future::try_join_all", reason = "Use Context.try_join instead." },
]

allow-private-module-inception = true
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
typenum = "1.16"
# hpke is pinned to it
x25519-dalek = "2.0.0-rc.3"
delegate = "0.10.0"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5.0"
Expand Down
34 changes: 0 additions & 34 deletions src/helpers/buffers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,3 @@ mod unordered_receiver;
pub use ordering_mpsc::{ordering_mpsc, OrderingMpscReceiver, OrderingMpscSender};
pub use ordering_sender::{OrderedStream, OrderingSender};
pub use unordered_receiver::UnorderedReceiver;

#[cfg(debug_assertions)]
#[allow(unused)] // todo(alex): make test world print the state again
mod waiting {
use std::collections::HashMap;

use crate::helpers::ChannelId;

pub(in crate::helpers) struct WaitingTasks<'a> {
tasks: HashMap<&'a ChannelId, Vec<u32>>,
}

impl<'a> WaitingTasks<'a> {
pub fn new(tasks: HashMap<&'a ChannelId, Vec<u32>>) -> Self {
Self { tasks }
}

pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
}
}

impl std::fmt::Debug for WaitingTasks<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
for (channel, records) in &self.tasks {
write!(f, "\n {channel:?}: {records:?}")?;
}
write!(f, "\n]")?;

Ok(())
}
}
}
18 changes: 18 additions & 0 deletions src/helpers/buffers/ordering_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ impl WaitingShard {
self.wakers.pop_front().unwrap().w.wake();
}
}

pub fn waiting(&self) -> impl Iterator<Item = usize> + '_ {
self.wakers.iter().map(|waker| waker.i)
}
}

/// A collection of wakers that are indexed by the send index (`i`).
Expand Down Expand Up @@ -224,6 +228,12 @@ impl Waiting {
fn wake(&self, i: usize) {
self.shard(i).wake(i);
}

fn waiting(&self, indices: &mut Vec<usize>) {
self.shards
.iter()
.for_each(|shard| indices.extend(shard.lock().unwrap().waiting()));
}
}

/// An `OrderingSender` accepts messages for sending in any order, but
Expand Down Expand Up @@ -375,6 +385,14 @@ impl OrderingSender {
) -> OrderedStream<crate::sync::Arc<Self>> {
OrderedStream { sender: self }
}

pub fn waiting(&self) -> Vec<usize> {
let mut buf = Vec::new();
self.waiting.waiting(&mut buf);
buf.sort_unstable();

buf
}
}

/// A future for writing item `i` into an `OrderingSender`.
Expand Down
30 changes: 27 additions & 3 deletions src/helpers/buffers/unordered_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ where
/// Note: in protocols we try to send before receiving, so we can rely on
/// that easing load on this mechanism. There might also need to be some
/// end-to-end back pressure for tasks that do not involve sending at all.
overflow_wakers: Vec<Waker>,
overflow_wakers: Vec<(Waker, usize)>,
_marker: PhantomData<C>,
}

Expand Down Expand Up @@ -172,7 +172,7 @@ where
);
// We don't save a waker at `self.next`, so `>` and not `>=`.
if i > self.next + self.wakers.len() {
self.overflow_wakers.push(waker);
self.overflow_wakers.push((waker, i));
} else {
let index = i % self.wakers.len();
if let Some(old) = self.wakers[index].as_ref() {
Expand All @@ -195,7 +195,8 @@ where
}
if self.next % (self.wakers.len() / 2) == 0 {
// Wake all the overflowed wakers. See comments on `overflow_wakers`.
for w in take(&mut self.overflow_wakers) {
// todo: we may want to wake specific wakers now
for (w, _) in take(&mut self.overflow_wakers) {
w.wake();
}
}
Expand Down Expand Up @@ -228,6 +229,22 @@ where
}
}
}

fn waiting(&self) -> impl Iterator<Item = usize> + '_ {
let start = self.next % self.wakers.len();
self.wakers
.iter()
.enumerate()
.filter_map(|(i, waker)| waker.as_ref().map(|_| i))
.map(move |i| {
if i < start {
self.next + (self.wakers.len() - start + i)
} else {
self.next + (i - start)
}
})
.chain(self.overflow_wakers.iter().map(|v| v.1))
}
}

/// Take an ordered stream of bytes and make messages from that stream
Expand Down Expand Up @@ -284,6 +301,13 @@ where
_marker: PhantomData,
}
}

pub fn waiting(&self) -> Vec<usize> {
let mut r = self.inner.lock().unwrap().waiting().collect::<Vec<_>>();
r.sort_unstable();

r
}
}

impl<S, C> Clone for UnorderedReceiver<S, C>
Expand Down
233 changes: 233 additions & 0 deletions src/helpers/gateway/gateway.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
use std::{
fmt::{Debug, Formatter},
num::NonZeroUsize,
time::Duration,
};

use delegate::delegate;
#[cfg(all(feature = "shuttle", test))]
use shuttle::future as tokio;

use crate::{
helpers::{
gateway::{
observable::{ObserveState, Observed},
receive,
receive::GatewayReceivers,
send,
send::GatewaySenders,
transport::RoleResolvingTransport,
},
ChannelId, Message, ReceivingEnd, Role, RoleAssignment, SendingEnd, TotalRecords,
TransportImpl,
},
protocol::QueryId,
sync::{atomic::AtomicUsize, Arc, Weak},
};

/// Gateway into IPA Network infrastructure. It allows helpers send and receive messages.
pub struct Gateway {
config: GatewayConfig,
transport: RoleResolvingTransport,
// todo: use different state when feature is off
inner: Arc<State>,
}

#[derive(Default)]
pub struct State {
senders: GatewaySenders,
receivers: GatewayReceivers,
}
impl State {
pub fn downgrade(self: &Arc<Self>) -> Weak<Self> {
Arc::downgrade(self)
}
}

#[derive(Clone, Copy, Debug)]
pub struct GatewayConfig {
/// The number of items that can be active at the one time.
/// This is used to determine the size of sending and receiving buffers.
active: NonZeroUsize,

/// Time to wait before checking gateway progress. If no progress has been made between
/// checks, the gateway is considered to be stalled and will create a report with outstanding
/// send/receive requests
pub progress_check_interval: Duration,
}

impl Default for GatewayConfig {
fn default() -> Self {
Self::new(1024)
}
}

impl GatewayConfig {
/// Generate a new configuration with the given active limit.
///
/// ## Panics
/// If `active` is 0.
#[must_use]
pub fn new(active: usize) -> Self {
// In-memory tests move data fast, so progress check intervals can be lower.
// Real world scenarios currently over-report stalls because of inefficiencies inside
// infrastructure and actual networking issues. This checks is only valuable to report
// bugs, so keeping it large enough to avoid false positives.
Self {
active: NonZeroUsize::new(active).unwrap(),
progress_check_interval: Duration::from_secs(if cfg!(test) { 5 } else { 60 }),
}
}

/// The configured amount of active work.
#[must_use]
pub fn active_work(&self) -> NonZeroUsize {
self.active
}
}

impl Observed<Gateway> {
delegate! {
to self.inner() {
#[inline]
pub fn role(&self) -> Role;

#[inline]
pub fn config(&self) -> &GatewayConfig;
}
}

pub fn new(
query_id: QueryId,
config: GatewayConfig,
roles: RoleAssignment,
transport: TransportImpl,
) -> Self {
let version = Arc::new(AtomicUsize::default());
// todo: this sucks, we shouldn't do an extra clone
Self::wrap(&version, Gateway::new(query_id, config, roles, transport))
}

#[must_use]
pub fn get_sender<M: Message>(
&self,
channel_id: &ChannelId,
total_records: TotalRecords,
) -> SendingEnd<M> {
Observed::wrap(
self.get_version(),
self.inner().get_sender(channel_id, total_records),
)
}

#[must_use]
pub fn get_receiver<M: Message>(&self, channel_id: &ChannelId) -> ReceivingEnd<M> {
Observed::wrap(self.get_version(), self.inner().get_receiver(channel_id))
}

pub fn to_observed(&self) -> Observed<Weak<State>> {
// todo: inner.inner
Observed::wrap(self.get_version(), self.inner().inner.downgrade())
}
}

pub struct GatewayWaitingTasks<S, R> {
senders_state: Option<S>,
receivers_state: Option<R>,
}

impl<S: Debug, R: Debug> Debug for GatewayWaitingTasks<S, R> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(senders_state) = &self.senders_state {
write!(f, "\n{{{senders_state:?}\n}}")?;
}
if let Some(receivers_state) = &self.receivers_state {
write!(f, "\n{{{receivers_state:?}\n}}")?;
}

Ok(())
}
}

impl ObserveState for Weak<State> {
type State = GatewayWaitingTasks<send::WaitingTasks, receive::WaitingTasks>;

fn get_state(&self) -> Option<Self::State> {
self.upgrade().map(|state| Self::State {
senders_state: state.senders.get_state(),
receivers_state: state.receivers.get_state(),
})
}
}

impl Gateway {
#[must_use]
pub fn new(
query_id: QueryId,
config: GatewayConfig,
roles: RoleAssignment,
transport: TransportImpl,
) -> Self {
Self {
config,
transport: RoleResolvingTransport {
query_id,
roles,
inner: transport,
config,
},
inner: State::default().into(),
}
}

#[must_use]
pub fn role(&self) -> Role {
self.transport.role()
}

#[must_use]
pub fn config(&self) -> &GatewayConfig {
&self.config
}

///
/// ## Panics
/// If there is a failure connecting via HTTP
#[must_use]
pub fn get_sender<M: Message>(
&self,
channel_id: &ChannelId,
total_records: TotalRecords,
) -> send::SendingEnd<M> {
let (tx, maybe_stream) = self.inner.senders.get_or_create::<M>(
channel_id,
self.config.active_work(),
total_records,
);
if let Some(stream) = maybe_stream {
tokio::spawn({
let channel_id = channel_id.clone();
let transport = self.transport.clone();
async move {
// TODO(651): In the HTTP case we probably need more robust error handling here.
transport
.send(&channel_id, stream)
.await
.expect("{channel_id:?} receiving end should be accepted by transport");
}
});
}

send::SendingEnd::new(tx, self.role(), channel_id)
}

#[must_use]
pub fn get_receiver<M: Message>(&self, channel_id: &ChannelId) -> receive::ReceivingEnd<M> {
receive::ReceivingEnd::new(
channel_id.clone(),
self.inner
.receivers
.get_or_create(channel_id, || self.transport.receive(channel_id)),
)
}
}
Loading

0 comments on commit 0072cea

Please sign in to comment.