Skip to content

Commit

Permalink
Add per data-source healthcheck with retry (#101)
Browse files Browse the repository at this point in the history
Automatically retry connection to failed data sources between 2 general
status checks

- Ensure that all sources are updated even after single source update
- Make ProxyService keep track of the status of all its sources
- Split part of the status check helpers in their own files

Also pick up small clippy lints

Fixes: FP-2072
  • Loading branch information
gagbo authored Nov 21, 2022
1 parent 2bae58e commit 4bc9e39
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 15 deletions.
147 changes: 133 additions & 14 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ use serde_json::{Map, Value};
use std::collections::HashMap;
use std::{convert::Infallible, net::SocketAddr, path::Path, sync::Arc, time::Duration};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::Mutex;
use tokio::sync::{broadcast::Sender, watch};
use tokio::{fs, time::interval};
use tokio_tungstenite_reconnect::{Message, ReconnectingWebSocket};
use tracing::{debug, error, info, info_span, instrument, trace, Instrument, Span};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Span};
use url::Url;

mod status_check;
#[cfg(test)]
mod tests;

use status_check::DataSourceCheckTask;

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ProxyDataSource {
Expand Down Expand Up @@ -57,6 +64,7 @@ pub(crate) struct Inner {
endpoint: Url,
token: String,
pub(crate) data_sources: HashMap<Name, ProxyDataSource>,
data_sources_state: Mutex<HashMap<Name, UpsertProxyDataSource>>,
wasm_modules: WasmModules,
max_retries: u32,
listen_address: Option<SocketAddr>,
Expand Down Expand Up @@ -121,6 +129,7 @@ impl ProxyService {
endpoint,
token: token.token,
data_sources,
data_sources_state: Default::default(),
wasm_modules,
max_retries,
listen_address,
Expand All @@ -129,6 +138,35 @@ impl ProxyService {
}
}

/// Return a suitable ProxyMessage payload informing of the current
/// state of all data sources.
#[instrument(skip_all)]
pub async fn to_data_sources_proxy_message(&self) -> SetDataSourcesMessage {
SetDataSourcesMessage {
data_sources: self
.inner
.data_sources_state
.lock()
.await
.values()
.cloned()
.collect(),
}
}

/// Delegate to access the current state of a single data source by name
#[instrument(err, skip(self))]
pub async fn data_source_state(&self, name: &Name) -> Result<UpsertProxyDataSource> {
self.inner
.data_sources_state
.lock()
.await
.get(name)
.cloned()
.ok_or_else(|| anyhow!("{name} is an unknown data source for this proxy"))
}

#[instrument(err, skip_all)]
pub async fn connect(&self, shutdown: Sender<()>) -> Result<()> {
info!("connecting to fiberplane: {}", self.inner.endpoint);
let (ws, mut conn_id_receiver) = self.connect_websocket().await?;
Expand All @@ -147,7 +185,7 @@ impl ProxyService {
async move {
while conn_id_receiver.changed().await.is_ok() {
if let Some(conn_id) = conn_id_receiver.borrow().clone() {
Span::current().record("conn_id", &conn_id.as_str());
Span::current().record("conn_id", conn_id.as_str());
} else {
Span::current().record("conn_id", &tracing::field::Empty);
}
Expand Down Expand Up @@ -176,17 +214,45 @@ impl ProxyService {
let service = self.clone();
let data_sources_sender = outgoing_sender.clone();
let mut shutdown_clone = shutdown.subscribe();
let (data_source_check_task_sender, mut data_source_check_task_receiver) =
unbounded_channel::<DataSourceCheckTask>();
let data_source_check_task_tx_too = data_source_check_task_sender.clone();
tokio::spawn(async move {
let mut status_check_interval = interval(service.inner.status_check_interval);
loop {
select! {
// Note that the first tick returns immediately
_ = status_check_interval.tick().fuse() => {
let data_sources = service.get_data_sources().await;
service.update_all_data_sources(data_source_check_task_sender.clone()).await;
// Update data sources will both try to connect and automatically queue individual retries to the
// `data_source_check_task_receiver` queue with the correct delay if necessary
let data_sources = service.to_data_sources_proxy_message().await;
debug!("sending data sources to relay: {:?}", data_sources);
let message = ProxyMessage::SetDataSources(data_sources);
data_sources_sender.send(message).ok();
}
// A status check for a data source failed, and
// the queued retry will arrive here.
task = data_source_check_task_receiver.recv().fuse() => {
if let Some(task) = task {
let source_name = task.name().clone();
// Update data source will both try to connect and automatically queue a retry
// to the same queue as here (`data_source_check_task_receiver`)
// with the correct delay if necessary
service.update_data_source(task, data_source_check_task_tx_too.clone()).await;

// Log the result of the new update attempt
match service.data_source_state(&source_name).await {
Ok(attempt) => debug!("retried connecting to {}: new status is: {:?}", source_name, attempt),
Err(err) => warn!("retried connecting to {}: {err}", source_name),
}

let data_sources = service.to_data_sources_proxy_message().await;
debug!("sending data sources to relay: {:?}", data_sources);
let message = ProxyMessage::SetDataSources(data_sources);
data_sources_sender.send(message).ok();
}
}
_ = shutdown_clone.recv().fuse() => {
// Let the relay know that all of these data sources are going offline
let data_sources = service
Expand All @@ -201,7 +267,7 @@ impl ProxyService {
status: DataSourceStatus::Error(Error::ProxyDisconnected),
})
.collect();
let message = ProxyMessage::SetDataSources(SetDataSourcesMessage{ data_sources});
let message = ProxyMessage::SetDataSources(SetDataSourcesMessage{ data_sources });
data_sources_sender.send(message).ok();

break;
Expand Down Expand Up @@ -396,9 +462,23 @@ impl ProxyService {
}
}

async fn get_data_sources(&self) -> SetDataSourcesMessage {
let data_sources = join_all(self.inner.data_sources.iter().map(
|(name, data_source)| async move {
/// Try to connect to a data source according to task
///
/// On success, return the update message
/// On failure, return the update message _and_ queue the next retry task to the
/// individual_check_task_queue_tx sender if the retry policy allows for a new
/// retry.
async fn update_data_source(
&self,
task: DataSourceCheckTask,
individual_check_task_queue_tx: UnboundedSender<DataSourceCheckTask>,
) {
let update = self
.inner
.data_sources
.iter()
.find(|(name, _)| *name == task.name())
.map(|(name, data_source)| async move {
let response = if V1_PROVIDERS.contains(&data_source.provider_type.as_str()) {
self.check_provider_status_v1(name.clone()).await
} else {
Expand All @@ -407,22 +487,61 @@ impl ProxyService {

let status = match response {
Ok(_) => DataSourceStatus::Connected,
Err(err) => DataSourceStatus::Error(err),
Err(ref err) => DataSourceStatus::Error(err.clone()),
};

if let Some((delay, task)) = task.next() {
if response.is_err() {
warn!(
"error connecting to data source: {name}, retrying in {}s",
delay.as_secs()
);
tokio::spawn(async move {
tokio::time::sleep(delay).await;
individual_check_task_queue_tx.send(task)
});
}
}

UpsertProxyDataSource {
name: name.clone(),
description: data_source.description.clone(),
provider_type: data_source.provider_type.clone(),
protocol_version: get_protocol_version(&data_source.provider_type),
status,
}
},
))
.await
.into_iter()
.collect();
SetDataSourcesMessage { data_sources }
})
.unwrap()
.await;

self.inner
.data_sources_state
.lock()
.await
.insert(update.name.clone(), update);
}

async fn update_all_data_sources(
&self,
to_check_task_queue: UnboundedSender<DataSourceCheckTask>,
) {
join_all(
self.inner
.data_sources
.iter()
.zip(std::iter::repeat(to_check_task_queue))
.map(|((name, _), to_check_task_queue)| async move {
let task = DataSourceCheckTask::new(
name.clone(),
self.inner.status_check_interval,
Duration::from_secs(10),
1.5,
);
self.update_data_source(task, to_check_task_queue.clone())
.await
}),
)
.await;
}

#[instrument(err, skip(self))]
Expand Down
88 changes: 88 additions & 0 deletions src/service/status_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::time::Duration;

use fiberplane::protocols::names::Name;

/// A token representing both:
/// - a task to check the status of the data source having a given name, and
/// - the retry strategy to use in case the status check failed.
#[derive(Debug, Clone)]
pub(crate) struct DataSourceCheckTask {
name: Name,
retries_left: isize,
delay_till_next: Duration,
backoff_factor: f32,
}

impl DataSourceCheckTask {
/// Constructor
///
/// `backoff_factor` MUST be greater than 1
/// `initial_delay` MUST be greater than 0s
///
/// The constructor guarantees that:
/// - assuming a "try" takes a negligible amount of time,
/// - no retry will be attempted past `total_checks_duration`
///
/// If you are not sure that a "try" is going to be instant,
/// you should add a safety buffer by decreasing the `total_checks_duration` argument.
pub(crate) fn new(
name: Name,
total_checks_duration: Duration,
initial_delay: Duration,
backoff_factor: f32,
) -> DataSourceCheckTask {
let max_retries: isize = if initial_delay > total_checks_duration {
0
} else {
// Formula comes from the sum of terms in geometric series
(((1.0
+ (backoff_factor - 1.0) * total_checks_duration.as_secs_f32()
/ initial_delay.as_secs_f32())
.ln()
/ backoff_factor.ln())
.floor()
- 1.0) as isize
};

Self {
name,
retries_left: max_retries,
delay_till_next: initial_delay,
backoff_factor,
}
}

/// Return the next check task to accomplish after this one, with
/// the delay to wait before sending it to the channel.
pub(crate) fn next(self) -> Option<(Duration, Self)> {
let Self {
name,
retries_left,
delay_till_next,
backoff_factor,
} = self;
if retries_left <= 0 {
return None;
}
Some((
delay_till_next,
Self {
name,
retries_left: retries_left - 1,
delay_till_next: Duration::from_secs_f32(
delay_till_next.as_secs_f32() * backoff_factor,
),
backoff_factor,
},
))
}

pub(crate) fn name(&self) -> &Name {
&self.name
}

#[cfg(test)]
pub(crate) fn retries_left(&self) -> isize {
self.retries_left
}
}
38 changes: 38 additions & 0 deletions src/service/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crate::service::status_check::DataSourceCheckTask;
use fiberplane::protocols::names::Name;
use std::time::Duration;

#[test]
fn exponential_backoff_cap() {
fn test_rec(task: DataSourceCheckTask, old_delay: Duration, remaining_budget: Duration) {
if let Some((delay, new_task)) = task.next() {
assert!(
delay > old_delay,
"the new delay is longer than the old one."
);
let new_remaining_budget = remaining_budget.checked_sub(delay);
assert!(
new_remaining_budget.is_some(),
"The delay ({:?}) is bigger than the remaining budget {:?}",
delay,
remaining_budget
);
test_rec(new_task, delay, new_remaining_budget.unwrap());
}
}

fn test_case(total_duration: Duration, initial_delay: Duration, backoff_factor: f32) {
let task = DataSourceCheckTask::new(
Name::from_static("be-the-change"),
total_duration,
initial_delay,
backoff_factor,
);
assert!(task.retries_left() >= 0, "At least 1 try will be attempted");
test_rec(task, Duration::from_secs(0), total_duration);
}

test_case(Duration::from_secs(300), Duration::from_secs(10), 1.5);
test_case(Duration::from_secs(300), Duration::from_secs(1000), 1.5);
test_case(Duration::from_secs(300), Duration::from_secs(300), 1.5);
}
2 changes: 1 addition & 1 deletion src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ async fn service_shutdown() {
// Read any message sent from the service, until it gets closed, which
// will indicate that the service has shutdown.
loop {
if let None = ws.next().await {
if (ws.next().await).is_none() {
break;
};
}
Expand Down

0 comments on commit 4bc9e39

Please sign in to comment.