Skip to content

Commit

Permalink
improve health checks
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Aug 10, 2024
1 parent efaac62 commit dedc7c3
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 465 deletions.
1 change: 1 addition & 0 deletions serving/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions serving/source-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ uuid = { version = "1.10.0", features = ["v4"] }
once_cell = "1.19.0"
serde_json = "1.0.122"
numaflow-models = { path = "../numaflow-models"}
trait-variant = "0.1.2"
rcgen = "0.13.1"
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
serde = { version = "1.0.204", features = ["derive"] }
Expand Down
46 changes: 36 additions & 10 deletions serving/source-sink/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::net::SocketAddr;
use std::time::Duration;

use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

use crate::config::config;
pub(crate) use crate::error::Error;
use crate::forwarder::Forwarder;
use crate::metrics::start_metrics_https_server;
use crate::sink::{SinkClient, SinkConfig};
use crate::source::{SourceClient, SourceConfig};
use crate::transformer::{TransformerClient, TransformerConfig};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

pub(crate) use self::error::Result;

Expand Down Expand Up @@ -38,7 +40,7 @@ pub(crate) mod shared;

/// forwards a chunk of data from the source to the sink via an optional transformer.
/// It takes an optional custom_shutdown_rx for shutting down the forwarder, useful for testing.
pub async fn run_forwarder(
pub async fn init(
source_config: SourceConfig,
sink_config: SinkConfig,
transformer_config: Option<TransformerConfig>,
Expand Down Expand Up @@ -81,6 +83,31 @@ pub async fn run_forwarder(
)
.await?;

// Start the metrics server, which server the prometheus metrics.
// TODO: make the port configurable.
let metrics_addr: SocketAddr = format!("0.0.0.0:{}", &config().metrics_server_listen_port)
.parse()
.expect("Invalid address");

// Start the metrics server in a separate background async spawn,
// This should be running throughout the lifetime of the application, hence the handle is not
// joined.
let metrics_source_client = source_client.clone();
let metrics_sink_client = sink_client.clone();
let metrics_transformer_client = transformer_client.clone();
tokio::spawn(async move {
if let Err(e) = start_metrics_https_server(
metrics_addr,
metrics_source_client,
metrics_sink_client,
metrics_transformer_client,
)
.await
{
error!("Metrics server error: {:?}", e);
}
});

// start the lag reader to publish lag metrics
let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None);
lag_reader.start().await;
Expand All @@ -101,18 +128,18 @@ async fn wait_until_ready(
transformer_client: &mut Option<TransformerClient>,
) -> Result<()> {
loop {
let source_ready = source_client.is_ready().await.is_ok();
let source_ready = source_client.is_ready().await;
if !source_ready {
info!("UDSource is not ready, waiting...");
}

let sink_ready = sink_client.is_ready().await.is_ok();
let sink_ready = sink_client.is_ready().await;
if !sink_ready {
info!("UDSink is not ready, waiting...");
}

let transformer_ready = if let Some(client) = transformer_client {
let ready = client.is_ready().await.is_ok();
let ready = client.is_ready().await;
if !ready {
info!("UDTransformer is not ready, waiting...");
}
Expand Down Expand Up @@ -225,8 +252,7 @@ mod tests {

let forwarder_cln_token = cln_token.clone();
let forwarder_handle = tokio::spawn(async move {
let result =
super::run_forwarder(source_config, sink_config, None, forwarder_cln_token).await;
let result = super::init(source_config, sink_config, None, forwarder_cln_token).await;
assert!(result.is_ok());
});

Expand Down
24 changes: 3 additions & 21 deletions serving/source-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::net::SocketAddr;

use tokio::signal;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
Expand All @@ -8,8 +6,7 @@ use tracing::{error, info};
use tracing_subscriber::EnvFilter;

use sourcer_sinker::config::config;
use sourcer_sinker::metrics::start_metrics_https_server;
use sourcer_sinker::run_forwarder;
use sourcer_sinker::init;
use sourcer_sinker::sink::SinkConfig;
use sourcer_sinker::source::SourceConfig;
use sourcer_sinker::transformer::TransformerConfig;
Expand All @@ -26,19 +23,6 @@ async fn main() {
.with_target(false)
.init();

// Start the metrics server, which server the prometheus metrics.
// TODO: make the port configurable.
let metrics_addr: SocketAddr = "0.0.0.0:2469".parse().expect("Invalid address");

// Start the metrics server in a separate background async spawn,
// This should be running throughout the lifetime of the application, hence the handle is not
// joined.
let metrics_server_handle = tokio::spawn(async move {
if let Err(e) = start_metrics_https_server(metrics_addr).await {
error!("Metrics server error: {:?}", e);
}
});

// Initialize the source, sink and transformer configurations
// We are using the default configurations for now.
let source_config = SourceConfig {
Expand All @@ -50,6 +34,7 @@ async fn main() {
max_message_size: config().grpc_max_message_size,
..Default::default()
};

let transformer_config = if config().is_transformer_enabled {
Some(TransformerConfig {
max_message_size: config().grpc_max_message_size,
Expand All @@ -68,17 +53,14 @@ async fn main() {
});

// Run the forwarder
if let Err(e) = run_forwarder(source_config, sink_config, transformer_config, cln_token).await {
if let Err(e) = init(source_config, sink_config, transformer_config, cln_token).await {
error!("Application error: {:?}", e);
}

if !shutdown_handle.is_finished() {
shutdown_handle.abort();
}

if !metrics_server_handle.is_finished() {
metrics_server_handle.abort();
}
info!("Gracefully Exiting...");
}

Expand Down
89 changes: 56 additions & 33 deletions serving/source-sink/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::{routing::get, Router};
Expand All @@ -17,7 +18,9 @@ use tokio::time;
use tracing::{debug, error, info};

use crate::error::Error;
use crate::sink::SinkClient;
use crate::source::SourceClient;
use crate::transformer::TransformerClient;

// Define the labels for the metrics
pub const MONO_VERTEX_NAME: &str = "vertex";
Expand All @@ -31,16 +34,36 @@ pub const FORWARDER_READ_BYTES_TOTAL: &str = "forwarder_read_bytes_total";
pub const FORWARDER_ACK_TOTAL: &str = "forwarder_ack_total";
pub const FORWARDER_WRITE_TOTAL: &str = "forwarder_write_total";

#[derive(Clone)]
struct MetricsState {
pub source_client: SourceClient,
pub sink_client: SinkClient,
pub transformer_client: Option<TransformerClient>,
}

/// Collect and emit prometheus metrics.
/// Metrics router and server
pub async fn start_metrics_http_server<A>(addr: A) -> crate::Result<()>
#[allow(dead_code)]
pub(crate) async fn start_metrics_http_server<A>(
addr: A,
source_client: SourceClient,
sink_client: SinkClient,
transformer_client: Option<TransformerClient>,
) -> crate::Result<()>
where
A: ToSocketAddrs + std::fmt::Debug,
{
// setup_metrics_recorder should only be invoked once
let recorder_handle = setup_metrics_recorder()?;

let metrics_app = metrics_router(recorder_handle);
let metrics_app = metrics_router(
recorder_handle,
MetricsState {
source_client,
sink_client,
transformer_client,
},
);

let listener = TcpListener::bind(&addr)
.await
Expand All @@ -54,9 +77,12 @@ where
Ok(())
}

pub async fn start_metrics_https_server(addr: SocketAddr) -> crate::Result<()>
where
{
pub(crate) async fn start_metrics_https_server(
addr: SocketAddr,
source_client: SourceClient,
sink_client: SinkClient,
transformer_client: Option<TransformerClient>,
) -> crate::Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

// Generate a self-signed certificate
Expand All @@ -70,7 +96,14 @@ where
// setup_metrics_recorder should only be invoked once
let recorder_handle = setup_metrics_recorder()?;

let metrics_app = metrics_router(recorder_handle);
let metrics_app = metrics_router(
recorder_handle,
MetricsState {
source_client,
sink_client,
transformer_client,
},
);

axum_server::bind_rustls(addr, tls_config)
.serve(metrics_app.into_make_service())
Expand All @@ -81,12 +114,14 @@ where
}

/// router for metrics and k8s health endpoints
fn metrics_router(recorder_handle: PrometheusHandle) -> Router {
fn metrics_router(recorder_handle: PrometheusHandle, metrics_state: MetricsState) -> Router {
let metrics_app = Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
.route("/sidecar-livez", get(sidecar_livez));
.route("/sidecar-livez", get(sidecar_livez))
.with_state(metrics_state);

metrics_app
}

Expand All @@ -98,7 +133,18 @@ async fn readyz() -> impl IntoResponse {
StatusCode::NO_CONTENT
}

async fn sidecar_livez() -> impl IntoResponse {
async fn sidecar_livez(State(mut state): State<MetricsState>) -> impl IntoResponse {
if !state.source_client.is_ready().await {
return StatusCode::SERVICE_UNAVAILABLE;
}
if !state.sink_client.is_ready().await {
return StatusCode::SERVICE_UNAVAILABLE;
}
if let Some(mut transformer_client) = state.transformer_client {
if !transformer_client.is_ready().await {
return StatusCode::SERVICE_UNAVAILABLE;
}
}
StatusCode::NO_CONTENT
}

Expand Down Expand Up @@ -288,27 +334,4 @@ async fn calculate_pending(
result
}

#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::time::Duration;

use tokio::time::sleep;

use super::*;

#[tokio::test]
async fn test_start_metrics_server() {
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let server = tokio::spawn(async move {
let result = start_metrics_http_server(addr).await;
assert!(result.is_ok())
});

// Give the server a little bit of time to start
sleep(Duration::from_millis(100)).await;

// Stop the server
server.abort();
}
}
// TODO add tests
Loading

0 comments on commit dedc7c3

Please sign in to comment.