From 8aa0b16415ffedcd50aa0d35c354a0d96010a594 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Tue, 14 Jan 2025 22:28:19 +0530 Subject: [PATCH] feat: expose pending messages in async pipeline Signed-off-by: Yashash H L --- rust/numaflow-core/src/metrics.rs | 320 ++++++++++++++---- rust/numaflow-core/src/monovertex.rs | 3 +- rust/numaflow-core/src/pipeline.rs | 23 +- .../src/pipeline/isb/jetstream/reader.rs | 14 + rust/numaflow-core/src/shared/metrics.rs | 24 +- 5 files changed, 307 insertions(+), 77 deletions(-) diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 2a672ec31..a980698d9 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -1,4 +1,5 @@ -use std::collections::BTreeMap; +use crate::pipeline::isb::jetstream::reader::JetstreamReader; +use std::collections::{BTreeMap, HashMap}; use std::iter; use std::net::SocketAddr; use std::sync::{Arc, OnceLock}; @@ -73,8 +74,9 @@ const SINK_WRITE_TOTAL: &str = "write"; const DROPPED_TOTAL: &str = "dropped"; const FALLBACK_SINK_WRITE_TOTAL: &str = "write"; -// pending as gauge +// pending as gauge for mvtx const PENDING: &str = "pending"; +const VERTEX_PENDING: &str = "pending_messages"; // processing times as timers const E2E_TIME: &str = "processing_time"; @@ -204,6 +206,7 @@ pub(crate) struct MonoVtxMetrics { pub(crate) struct PipelineMetrics { pub(crate) forwarder: PipelineForwarderMetrics, pub(crate) isb: PipelineISBMetrics, + pub(crate) pending: Family, Gauge>, } /// Family of metrics for the sink @@ -232,7 +235,6 @@ pub(crate) struct PipelineForwarderMetrics { pub(crate) write_time: Family, Histogram>, pub(crate) read_bytes_total: Family, Counter>, pub(crate) processed_time: Family, Histogram>, - pub(crate) pending: Family, Gauge>, pub(crate) dropped_total: Family, Counter>, } @@ -398,7 +400,6 @@ impl PipelineMetrics { ack_time: Family::, Histogram>::new_with_constructor(|| { Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)) }), - pending: Family::, Gauge>::default(), write_total: Family::, Counter>::default(), write_time: Family::, Histogram>::new_with_constructor( || Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)), @@ -411,6 +412,7 @@ impl PipelineMetrics { Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)) }), }, + pending: Family::, Gauge>::default(), }; let mut registry = global_registry().registry.lock(); @@ -446,11 +448,6 @@ impl PipelineMetrics { "Time taken to ack data", metrics.forwarder.ack_time.clone(), ); - forwarder_registry.register( - PENDING, - "Number of pending messages", - metrics.forwarder.pending.clone(), - ); forwarder_registry.register( SINK_WRITE_TOTAL, "Total number of Data Messages Written", @@ -466,6 +463,13 @@ impl PipelineMetrics { "Time taken to write data", metrics.forwarder.write_time.clone(), ); + + let vertex_registry = registry.sub_registry_with_prefix("vertex"); + vertex_registry.register( + VERTEX_PENDING, + "Total number of pending messages", + metrics.pending.clone(), + ); metrics } } @@ -714,10 +718,11 @@ struct TimestampedPending { /// and exposing the metrics. It maintains a list of pending stats and ensures that /// only the most recent entries are kept. pub(crate) struct PendingReader { - lag_reader: Source, + source_lag_reader: Option, + isb_lag_readers: Option>, lag_checking_interval: Duration, refresh_interval: Duration, - pending_stats: Arc>>, + pending_stats: Arc>>>, lookback_seconds: u16, } @@ -728,22 +733,34 @@ pub(crate) struct PendingReaderTasks { /// PendingReaderBuilder is used to build a [LagReader] instance. pub(crate) struct PendingReaderBuilder { - lag_reader: Source, + source_lag_reader: Option, + isb_lag_readers: Option>, lag_checking_interval: Option, refresh_interval: Option, lookback_seconds: Option, } impl PendingReaderBuilder { - pub(crate) fn new(lag_reader: Source) -> Self { + pub(crate) fn new() -> Self { Self { - lag_reader, + source_lag_reader: None, + isb_lag_readers: None, lag_checking_interval: None, refresh_interval: None, lookback_seconds: None, } } + pub(crate) fn source_lag_reader(mut self, source: Source) -> Self { + self.source_lag_reader = Some(source); + self + } + + pub(crate) fn isb_lag_readers(mut self, readers: Vec) -> Self { + self.isb_lag_readers = Some(readers); + self + } + pub(crate) fn lag_checking_interval(mut self, interval: Duration) -> Self { self.lag_checking_interval = Some(interval); self @@ -760,8 +777,21 @@ impl PendingReaderBuilder { } pub(crate) fn build(self) -> PendingReader { + let mut pending_map = HashMap::new(); + if let Some(_) = &self.source_lag_reader { + pending_map.insert("source".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + } else if let Some(readers) = &self.isb_lag_readers { + for reader in readers { + pending_map.insert( + reader.name().to_string(), + Vec::with_capacity(MAX_PENDING_STATS), + ); + } + } + PendingReader { - lag_reader: self.lag_reader, + source_lag_reader: self.source_lag_reader, + isb_lag_readers: self.isb_lag_readers, lag_checking_interval: self .lag_checking_interval .unwrap_or_else(|| Duration::from_secs(3)), @@ -769,7 +799,7 @@ impl PendingReaderBuilder { .refresh_interval .unwrap_or_else(|| Duration::from_secs(5)), lookback_seconds: self.lookback_seconds.unwrap_or(120), - pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))), + pending_stats: Arc::new(Mutex::new(pending_map)), } } } @@ -783,14 +813,21 @@ impl PendingReader { /// /// Dropping the PendingReaderTasks will abort the background tasks. pub async fn start(&self, is_mono_vertex: bool) -> PendingReaderTasks { - let pending_reader = self.lag_reader.clone(); + let src_pending_reader = self.source_lag_reader.clone(); + let isb_pending_reader = self.isb_lag_readers.clone(); let lag_checking_interval = self.lag_checking_interval; let refresh_interval = self.refresh_interval; let pending_stats = Arc::clone(&self.pending_stats); let lookback_seconds = self.lookback_seconds; let buildup_handle = tokio::spawn(async move { - build_pending_info(pending_reader, lag_checking_interval, pending_stats).await; + build_pending_info( + src_pending_reader, + isb_pending_reader, + lag_checking_interval, + pending_stats, + ) + .await; }); let pending_stats = Arc::clone(&self.pending_stats); @@ -821,45 +858,87 @@ impl Drop for PendingReaderTasks { /// Periodically checks the pending messages from the source client and build the pending stats. async fn build_pending_info( - source: Source, + source: Option, + mut isb_readers: Option>, lag_checking_interval: Duration, - pending_stats: Arc>>, + pending_stats: Arc>>>, ) { let mut ticker = time::interval(lag_checking_interval); loop { ticker.tick().await; - match fetch_pending(&source).await { - Ok(pending) => { - if pending != -1 { - let mut stats = pending_stats.lock().await; - stats.push(TimestampedPending { - pending, - timestamp: std::time::Instant::now(), - }); - let n = stats.len(); - // Ensure only the most recent MAX_PENDING_STATS entries are kept - if n >= MAX_PENDING_STATS { - stats.drain(0..(n - MAX_PENDING_STATS)); + + if let Some(source) = source.as_ref() { + match fetch_source_pending(&source).await { + Ok(pending) => { + if pending != -1 { + let mut stats = pending_stats.lock().await; + stats.get_mut("source").unwrap().push(TimestampedPending { + pending, + timestamp: std::time::Instant::now(), + }); + let n = stats.len(); + // Ensure only the most recent MAX_PENDING_STATS entries are kept + if n >= MAX_PENDING_STATS { + stats + .get_mut("source") + .unwrap() + .drain(0..(n - MAX_PENDING_STATS)); + } } } + Err(err) => { + error!("Failed to get pending messages: {:?}", err); + } } - Err(err) => { - error!("Failed to get pending messages: {:?}", err); + } + + if let Some(readers) = isb_readers.as_mut() { + for mut reader in readers { + match fetch_isb_pending(&mut reader).await { + Ok(pending) => { + if pending != -1 { + let mut stats = pending_stats.lock().await; + stats + .get_mut(reader.name()) + .unwrap() + .push(TimestampedPending { + pending, + timestamp: std::time::Instant::now(), + }); + let n = stats.len(); + // Ensure only the most recent MAX_PENDING_STATS entries are kept + if n >= MAX_PENDING_STATS { + stats + .get_mut(reader.name()) + .unwrap() + .drain(0..(n - MAX_PENDING_STATS)); + } + } + } + Err(err) => { + error!("Failed to get pending messages: {:?}", err); + } + } } } } } -async fn fetch_pending(lag_reader: &Source) -> crate::error::Result { +async fn fetch_source_pending(lag_reader: &Source) -> crate::error::Result { let response: i64 = lag_reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable) Ok(response) } +async fn fetch_isb_pending(reader: &mut JetstreamReader) -> crate::error::Result { + let response: i64 = reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable) + Ok(response) +} + // Periodically exposes the pending metrics by calculating the average pending messages over different intervals. async fn expose_pending_metrics( is_mono_vertex: bool, refresh_interval: Duration, - pending_stats: Arc>>, + pending_map: Arc>>>, lookback_seconds: u16, ) { let mut ticker = time::interval(refresh_interval); @@ -877,46 +956,46 @@ async fn expose_pending_metrics( loop { ticker.tick().await; - for (label, seconds) in lookback_seconds_map { - let pending = calculate_pending(seconds as i64, &pending_stats).await; - if pending != -1 { - let mut metric_labels = mvtx_forward_metric_labels().clone(); - metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); - pending_info.insert(label, pending); - if is_mono_vertex { - monovertex_metrics() - .pending - .get_or_create(&metric_labels) - .set(pending); - } else { - pipeline_metrics() - .forwarder - .pending - .get_or_create(&metric_labels) - .set(pending); + for (name, pending_stats) in pending_map.lock().await.iter() { + for (label, seconds) in lookback_seconds_map { + let pending = calculate_pending(seconds as i64, pending_stats).await; + if pending != -1 { + pending_info.insert(label, pending); + if is_mono_vertex { + let mut metric_labels = mvtx_forward_metric_labels().clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + monovertex_metrics() + .pending + .get_or_create(&metric_labels) + .set(pending); + } else { + let mut metric_labels = + pipeline_forward_metric_labels(name, Some(name)).clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + pipeline_metrics() + .pending + .get_or_create(&metric_labels) + .set(pending); + } } } - } - // skip for those the pending is not implemented - if !pending_info.is_empty() { - info!("Pending messages {:?}", pending_info); - pending_info.clear(); + // skip for those the pending is not implemented + if !pending_info.is_empty() { + info!("Pending messages {:?}", pending_info); + pending_info.clear(); + } } } } /// Calculate the average pending messages over the last `seconds` seconds. -async fn calculate_pending( - seconds: i64, - pending_stats: &Arc>>, -) -> i64 { +async fn calculate_pending(seconds: i64, pending_stats: &Vec) -> i64 { let mut result = -1; let mut total = 0; let mut num = 0; let now = std::time::Instant::now(); - let stats = pending_stats.lock().await; - for item in stats.iter().rev() { + for item in pending_stats.iter().rev() { if now.duration_since(item.timestamp).as_secs() < seconds as u64 { total += item.pending; num += 1; @@ -1098,8 +1177,11 @@ mod tests { } #[tokio::test] - async fn test_expose_pending_metrics() { - let pending_stats = Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))); + async fn test_expose_pending_metrics_for_source() { + let mut pending_map = HashMap::new(); + pending_map.insert("source".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + + let pending_stats = Arc::new(Mutex::new(pending_map)); let refresh_interval = Duration::from_secs(1); let lookback_seconds = 120; @@ -1107,19 +1189,20 @@ mod tests { // The array will be sorted by the timestamp with the most recent last. { let mut pending_stats = pending_stats.lock().await; - pending_stats.push(TimestampedPending { + let pending_vec = pending_stats.get_mut("source").unwrap(); + pending_vec.push(TimestampedPending { pending: 15, timestamp: Instant::now() - Duration::from_secs(150), }); - pending_stats.push(TimestampedPending { + pending_vec.push(TimestampedPending { pending: 30, timestamp: Instant::now() - Duration::from_secs(70), }); - pending_stats.push(TimestampedPending { + pending_vec.push(TimestampedPending { pending: 20, timestamp: Instant::now() - Duration::from_secs(30), }); - pending_stats.push(TimestampedPending { + pending_vec.push(TimestampedPending { pending: 10, timestamp: Instant::now(), }); @@ -1155,6 +1238,103 @@ mod tests { } assert_eq!(stored_values, [15, 20, 18, 18]); } + + #[tokio::test] + async fn test_expose_pending_metrics_for_isb() { + let mut pending_map = HashMap::new(); + pending_map.insert("stream1".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + pending_map.insert("stream2".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + + let pending_stats = Arc::new(Mutex::new(pending_map)); + let refresh_interval = Duration::from_secs(1); + let lookback_seconds = 120; + + // Populate pending_stats with some values. + // The array will be sorted by the timestamp with the most recent last. + { + let mut pending_stats = pending_stats.lock().await; + let pending_vec = pending_stats.get_mut("stream1").unwrap(); + pending_vec.push(TimestampedPending { + pending: 15, + timestamp: Instant::now() - Duration::from_secs(150), + }); + pending_vec.push(TimestampedPending { + pending: 30, + timestamp: Instant::now() - Duration::from_secs(70), + }); + pending_vec.push(TimestampedPending { + pending: 20, + timestamp: Instant::now() - Duration::from_secs(30), + }); + pending_vec.push(TimestampedPending { + pending: 10, + timestamp: Instant::now(), + }); + + let pending_vec = pending_stats.get_mut("stream2").unwrap(); + pending_vec.push(TimestampedPending { + pending: 15, + timestamp: Instant::now() - Duration::from_secs(150), + }); + pending_vec.push(TimestampedPending { + pending: 30, + timestamp: Instant::now() - Duration::from_secs(70), + }); + pending_vec.push(TimestampedPending { + pending: 20, + timestamp: Instant::now() - Duration::from_secs(30), + }); + pending_vec.push(TimestampedPending { + pending: 10, + timestamp: Instant::now(), + }); + } + + tokio::spawn({ + let pending_stats = Arc::clone(&pending_stats); + async move { + expose_pending_metrics(false, refresh_interval, pending_stats, lookback_seconds) + .await; + } + }); + + // We use tokio::time::interval() as the ticker in the expose_pending_metrics() function. + // The first tick happens immediately, so we don't need to wait for the refresh_interval for the first iteration to complete. + tokio::time::sleep(Duration::from_millis(50)).await; + + let lookback_seconds_map: [(&str, u16); 4] = + [("1m", 60), ("default", 120), ("5m", 300), ("15m", 900)]; + + // Get the stored values for all time intervals + // We will store the values corresponding to the labels (from lookback_seconds_map) "1m", "default", "5m", "15" in the same order in this array + let mut stored_values_stream_one: [i64; 4] = [0; 4]; + let mut stored_values_stream_two: [i64; 4] = [0; 4]; + + { + for (i, (label, _)) in lookback_seconds_map.iter().enumerate() { + let mut metric_labels = + pipeline_forward_metric_labels("stream1", Some("stream1")).clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + let guage = pipeline_metrics() + .pending + .get_or_create(&metric_labels) + .get(); + stored_values_stream_one[i] = guage; + + let mut metric_labels = + pipeline_forward_metric_labels("stream2", Some("stream2")).clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + let guage = pipeline_metrics() + .pending + .get_or_create(&metric_labels) + .get(); + stored_values_stream_two[i] = guage; + } + } + assert_eq!(stored_values_stream_one, [15, 20, 18, 18]); + assert_eq!(stored_values_stream_two, [15, 20, 18, 18]); + } + #[test] fn test_exponential_buckets_range_basic() { let min = 1.0; diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 1518a3c9f..2933be215 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -82,7 +82,8 @@ async fn start( ) -> error::Result<()> { // start the pending reader to publish pending metrics let pending_reader = - shared::metrics::create_pending_reader(&mvtx_config.metrics_config, source.clone()).await; + shared::metrics::create_source_pending_reader(&mvtx_config.metrics_config, source.clone()) + .await; let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token); diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index d2cb77091..8ec23658f 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -6,9 +6,9 @@ use futures::future::try_join_all; use tokio_util::sync::CancellationToken; use tracing::info; -use crate::config::pipeline; use crate::config::pipeline::map::MapVtxConfig; use crate::config::pipeline::{PipelineConfig, SinkVtxConfig, SourceVtxConfig}; +use crate::config::{is_mono_vertex, pipeline}; use crate::metrics::{PipelineContainerState, UserDefinedContainerState}; use crate::pipeline::forwarder::source_forwarder; use crate::pipeline::isb::jetstream::reader::JetstreamReader; @@ -18,10 +18,10 @@ use crate::shared::create_components; use crate::shared::create_components::create_sink_writer; use crate::shared::metrics::start_metrics_server; use crate::tracker::TrackerHandle; -use crate::{error, Result}; +use crate::{error, shared, Result}; mod forwarder; -mod isb; +pub(crate) mod isb; /// Starts the appropriate forwarder based on the pipeline configuration. pub(crate) async fn start_forwarder( @@ -77,6 +77,10 @@ async fn start_source_forwarder( ) .await?; + let pending_reader = + shared::metrics::create_source_pending_reader(&config.metrics_config, source.clone()).await; + let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; + start_metrics_server( config.metrics_config.clone(), UserDefinedContainerState::Pipeline(PipelineContainerState::Source(( @@ -117,6 +121,8 @@ async fn start_map_forwarder( // Create buffer writers and buffer readers let mut forwarder_components = vec![]; let mut mapper_grpc_client = None; + let mut isb_lag_readers = vec![]; + for stream in reader_config.streams.clone() { let tracker_handle = TrackerHandle::new(); @@ -129,6 +135,8 @@ async fn start_map_forwarder( ) .await?; + isb_lag_readers.push(buffer_reader.clone()); + let (mapper, mapper_rpc_client) = create_components::create_mapper( config.batch_size, config.read_timeout, @@ -152,6 +160,10 @@ async fn start_map_forwarder( forwarder_components.push((buffer_reader, buffer_writer, mapper)); } + let pending_reader = + shared::metrics::create_isb_pending_reader(&config.metrics_config, isb_lag_readers).await; + let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; + start_metrics_server( config.metrics_config.clone(), UserDefinedContainerState::Pipeline(PipelineContainerState::Map(mapper_grpc_client)), @@ -228,6 +240,11 @@ async fn start_sink_forwarder( sink_writers.push((sink_writer, sink_grpc_client, fb_sink_grpc_client)); } + let pending_reader = + shared::metrics::create_isb_pending_reader(&config.metrics_config, buffer_readers.clone()) + .await; + let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; + // Start the metrics server with one of the clients if let Some((_, sink, fb_sink)) = sink_writers.first() { start_metrics_server( diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 79b8572ef..152004aed 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -252,6 +252,20 @@ impl JetstreamReader { } } } + + pub(crate) async fn pending(&mut self) -> Result> { + let x = self.consumer.info().await.map_err(|e| { + Error::ISB(format!( + "Failed to get consumer info for stream {}: {}", + self.stream_name, e + )) + })?; + Ok(Some(x.num_pending as usize + x.num_ack_pending)) + } + + pub(crate) fn name(&self) -> &'static str { + self.stream_name + } } impl fmt::Display for JetstreamReader { diff --git a/rust/numaflow-core/src/shared/metrics.rs b/rust/numaflow-core/src/shared/metrics.rs index dfc22401a..480f2bb4c 100644 --- a/rust/numaflow-core/src/shared/metrics.rs +++ b/rust/numaflow-core/src/shared/metrics.rs @@ -8,6 +8,7 @@ use crate::config::components::metrics::MetricsConfig; use crate::metrics::{ start_metrics_https_server, PendingReader, PendingReaderBuilder, UserDefinedContainerState, }; +use crate::pipeline::isb::jetstream::reader::JetstreamReader; use crate::source::Source; /// Starts the metrics server @@ -29,11 +30,11 @@ pub(crate) async fn start_metrics_server( } /// Creates a pending reader -pub(crate) async fn create_pending_reader( +pub(crate) async fn create_source_pending_reader( metrics_config: &MetricsConfig, - lag_reader_grpc_client: Source, + lag_reader: Source, ) -> PendingReader { - PendingReaderBuilder::new(lag_reader_grpc_client) + PendingReaderBuilder::new() .lag_checking_interval(Duration::from_secs( metrics_config.lag_check_interval_in_secs.into(), )) @@ -41,5 +42,22 @@ pub(crate) async fn create_pending_reader( metrics_config.lag_refresh_interval_in_secs.into(), )) .lookback_seconds(metrics_config.lookback_window_in_secs) + .source_lag_reader(lag_reader) + .build() +} + +pub(crate) async fn create_isb_pending_reader( + metrics_config: &MetricsConfig, + lag_readers: Vec, +) -> PendingReader { + PendingReaderBuilder::new() + .lag_checking_interval(Duration::from_secs( + metrics_config.lag_check_interval_in_secs.into(), + )) + .refresh_interval(Duration::from_secs( + metrics_config.lag_refresh_interval_in_secs.into(), + )) + .lookback_seconds(metrics_config.lookback_window_in_secs) + .isb_lag_readers(lag_readers) .build() }