From fd4a0aaca263c82a92340ac00f28bd77feba9f27 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 15 Jan 2025 09:55:41 +0530 Subject: [PATCH] feat: expose pending messages metric in async pipeline (#2330) Signed-off-by: Yashash H L Signed-off-by: Vigith Maurice Co-authored-by: Vigith Maurice --- rust/numaflow-core/src/mapper/map.rs | 27 +- .../src/mapper/map/user_defined.rs | 2 +- rust/numaflow-core/src/message.rs | 5 +- rust/numaflow-core/src/metrics.rs | 309 ++++++++++++++---- rust/numaflow-core/src/monovertex.rs | 8 +- rust/numaflow-core/src/pipeline.rs | 37 ++- .../src/pipeline/isb/jetstream/reader.rs | 14 + rust/numaflow-core/src/shared/metrics.rs | 8 +- rust/numaflow-core/src/sink.rs | 7 +- rust/numaflow-core/src/source.rs | 3 +- rust/numaflow-core/src/source/serving.rs | 11 +- rust/numaflow-core/src/source/user_defined.rs | 12 +- .../src/transformer/user_defined.rs | 12 +- rust/serving/src/app.rs | 6 +- rust/serving/src/config.rs | 3 +- rust/serving/src/lib.rs | 5 +- rust/serving/src/source.rs | 3 +- 17 files changed, 348 insertions(+), 124 deletions(-) diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index 8c279376a..36918d62f 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -1,3 +1,13 @@ +use std::sync::Arc; +use std::time::Duration; + +use numaflow_pb::clients::map::map_client::MapClient; +use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::StreamExt; +use tonic::transport::Channel; + use crate::config::pipeline::map::MapMode; use crate::error; use crate::error::Error; @@ -6,14 +16,6 @@ use crate::mapper::map::user_defined::{ }; use crate::message::Message; use crate::tracker::TrackerHandle; -use numaflow_pb::clients::map::map_client::MapClient; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::StreamExt; -use tonic::transport::Channel; pub(super) mod user_defined; /// UnaryActorMessage is a message that is sent to the UnaryMapperActor. @@ -481,12 +483,8 @@ impl MapHandle { #[cfg(test)] mod tests { - use super::*; - use crate::Result; use std::time::Duration; - use crate::message::{MessageID, Offset, StringOffset}; - use crate::shared::grpc::create_rpc_channel; use numaflow::mapstream; use numaflow::{batchmap, map}; use numaflow_pb::clients::map::map_client::MapClient; @@ -494,6 +492,11 @@ mod tests { use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; + use super::*; + use crate::message::{MessageID, Offset, StringOffset}; + use crate::shared::grpc::create_rpc_channel; + use crate::Result; + struct SimpleMapper; #[tonic::async_trait] diff --git a/rust/numaflow-core/src/mapper/map/user_defined.rs b/rust/numaflow-core/src/mapper/map/user_defined.rs index 0799eb654..d7abfff10 100644 --- a/rust/numaflow-core/src/mapper/map/user_defined.rs +++ b/rust/numaflow-core/src/mapper/map/user_defined.rs @@ -430,12 +430,12 @@ impl UserDefinedStreamMap { #[cfg(test)] mod tests { - use numaflow::mapstream; use std::error::Error; use std::sync::Arc; use std::time::Duration; use numaflow::batchmap::Server; + use numaflow::mapstream; use numaflow::{batchmap, map}; use numaflow_pb::clients::map::map_client::MapClient; use tempfile::TempDir; diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index fe20613da..86259bf8f 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -218,14 +218,15 @@ impl TryFrom for Message { #[cfg(test)] mod tests { - use crate::error::Result; + use std::collections::HashMap; + use chrono::TimeZone; use numaflow_pb::objects::isb::{ Body, Header, Message as ProtoMessage, MessageId, MessageInfo, }; - use std::collections::HashMap; use super::*; + use crate::error::Result; #[test] fn test_offset_display() { diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 2a672ec31..d63b82de7 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::iter; use std::net::SocketAddr; use std::sync::{Arc, OnceLock}; @@ -29,6 +29,7 @@ use tonic::Request; use tracing::{debug, error, info}; use crate::config::{get_pipeline_name, get_vertex_name, get_vertex_replica}; +use crate::pipeline::isb::jetstream::reader::JetstreamReader; use crate::source::Source; use crate::Error; @@ -73,8 +74,10 @@ const SINK_WRITE_TOTAL: &str = "write"; const DROPPED_TOTAL: &str = "dropped"; const FALLBACK_SINK_WRITE_TOTAL: &str = "write"; -// pending as gauge +// pending as gauge for mvtx (these metric names are hardcoded in the auto-scaler) const PENDING: &str = "pending"; +// pending as gauge for pipeline +const VERTEX_PENDING: &str = "pending_messages"; // processing times as timers const E2E_TIME: &str = "processing_time"; @@ -204,6 +207,7 @@ pub(crate) struct MonoVtxMetrics { pub(crate) struct PipelineMetrics { pub(crate) forwarder: PipelineForwarderMetrics, pub(crate) isb: PipelineISBMetrics, + pub(crate) pending: Family, Gauge>, } /// Family of metrics for the sink @@ -232,7 +236,6 @@ pub(crate) struct PipelineForwarderMetrics { pub(crate) write_time: Family, Histogram>, pub(crate) read_bytes_total: Family, Counter>, pub(crate) processed_time: Family, Histogram>, - pub(crate) pending: Family, Gauge>, pub(crate) dropped_total: Family, Counter>, } @@ -398,7 +401,6 @@ impl PipelineMetrics { ack_time: Family::, Histogram>::new_with_constructor(|| { Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)) }), - pending: Family::, Gauge>::default(), write_total: Family::, Counter>::default(), write_time: Family::, Histogram>::new_with_constructor( || Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)), @@ -411,6 +413,7 @@ impl PipelineMetrics { Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)) }), }, + pending: Family::, Gauge>::default(), }; let mut registry = global_registry().registry.lock(); @@ -446,11 +449,6 @@ impl PipelineMetrics { "Time taken to ack data", metrics.forwarder.ack_time.clone(), ); - forwarder_registry.register( - PENDING, - "Number of pending messages", - metrics.forwarder.pending.clone(), - ); forwarder_registry.register( SINK_WRITE_TOTAL, "Total number of Data Messages Written", @@ -466,6 +464,13 @@ impl PipelineMetrics { "Time taken to write data", metrics.forwarder.write_time.clone(), ); + + let vertex_registry = registry.sub_registry_with_prefix("vertex"); + vertex_registry.register( + VERTEX_PENDING, + "Total number of pending messages", + metrics.pending.clone(), + ); metrics } } @@ -710,14 +715,21 @@ struct TimestampedPending { timestamp: std::time::Instant, } +#[derive(Clone)] +pub(crate) enum LagReader { + Source(Source), + // TODO: Arc<[T]> + ISB(Vec), // multiple partitions +} + /// PendingReader is responsible for periodically checking the lag of the reader /// and exposing the metrics. It maintains a list of pending stats and ensures that /// only the most recent entries are kept. pub(crate) struct PendingReader { - lag_reader: Source, + lag_reader: LagReader, lag_checking_interval: Duration, refresh_interval: Duration, - pending_stats: Arc>>, + pending_stats: Arc>>>, lookback_seconds: u16, } @@ -728,14 +740,14 @@ pub(crate) struct PendingReaderTasks { /// PendingReaderBuilder is used to build a [LagReader] instance. pub(crate) struct PendingReaderBuilder { - lag_reader: Source, + lag_reader: LagReader, lag_checking_interval: Option, refresh_interval: Option, lookback_seconds: Option, } impl PendingReaderBuilder { - pub(crate) fn new(lag_reader: Source) -> Self { + pub(crate) fn new(lag_reader: LagReader) -> Self { Self { lag_reader, lag_checking_interval: None, @@ -760,6 +772,22 @@ impl PendingReaderBuilder { } pub(crate) fn build(self) -> PendingReader { + let mut pending_map = HashMap::new(); + match &self.lag_reader { + LagReader::Source(_) => { + pending_map.insert("source".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + } + LagReader::ISB(readers) => { + // need a lag reader per partition + for reader in readers { + pending_map.insert( + reader.name().to_string(), + Vec::with_capacity(MAX_PENDING_STATS), + ); + } + } + } + PendingReader { lag_reader: self.lag_reader, lag_checking_interval: self @@ -769,7 +797,7 @@ impl PendingReaderBuilder { .refresh_interval .unwrap_or_else(|| Duration::from_secs(5)), lookback_seconds: self.lookback_seconds.unwrap_or(120), - pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))), + pending_stats: Arc::new(Mutex::new(pending_map)), } } } @@ -783,14 +811,14 @@ impl PendingReader { /// /// Dropping the PendingReaderTasks will abort the background tasks. pub async fn start(&self, is_mono_vertex: bool) -> PendingReaderTasks { - let pending_reader = self.lag_reader.clone(); let lag_checking_interval = self.lag_checking_interval; let refresh_interval = self.refresh_interval; let pending_stats = Arc::clone(&self.pending_stats); let lookback_seconds = self.lookback_seconds; + let lag_reader = self.lag_reader.clone(); let buildup_handle = tokio::spawn(async move { - build_pending_info(pending_reader, lag_checking_interval, pending_stats).await; + build_pending_info(lag_reader, lag_checking_interval, pending_stats).await; }); let pending_stats = Arc::clone(&self.pending_stats); @@ -821,45 +849,89 @@ impl Drop for PendingReaderTasks { /// Periodically checks the pending messages from the source client and build the pending stats. async fn build_pending_info( - source: Source, + mut lag_reader: LagReader, lag_checking_interval: Duration, - pending_stats: Arc>>, + pending_stats: Arc>>>, ) { let mut ticker = time::interval(lag_checking_interval); + loop { ticker.tick().await; - match fetch_pending(&source).await { - Ok(pending) => { - if pending != -1 { - let mut stats = pending_stats.lock().await; - stats.push(TimestampedPending { - pending, - timestamp: std::time::Instant::now(), - }); - let n = stats.len(); - // Ensure only the most recent MAX_PENDING_STATS entries are kept - if n >= MAX_PENDING_STATS { - stats.drain(0..(n - MAX_PENDING_STATS)); + + match &mut lag_reader { + LagReader::Source(source) => { + match fetch_source_pending(&source).await { + Ok(pending) => { + if pending != -1 { + let mut stats = pending_stats.lock().await; + stats.get_mut("source").unwrap().push(TimestampedPending { + pending, + timestamp: std::time::Instant::now(), + }); + let n = stats.len(); + // Ensure only the most recent MAX_PENDING_STATS entries are kept + if n >= MAX_PENDING_STATS { + stats + .get_mut("source") + .unwrap() + .drain(0..(n - MAX_PENDING_STATS)); + } + } + } + Err(err) => { + error!("Failed to get pending messages: {:?}", err); } } } - Err(err) => { - error!("Failed to get pending messages: {:?}", err); + + LagReader::ISB(readers) => { + for mut reader in readers { + match fetch_isb_pending(&mut reader).await { + Ok(pending) => { + if pending != -1 { + let mut stats = pending_stats.lock().await; + stats + .get_mut(reader.name()) + .unwrap() + .push(TimestampedPending { + pending, + timestamp: std::time::Instant::now(), + }); + let n = stats.len(); + // Ensure only the most recent MAX_PENDING_STATS entries are kept + if n >= MAX_PENDING_STATS { + stats + .get_mut(reader.name()) + .unwrap() + .drain(0..(n - MAX_PENDING_STATS)); + } + } + } + Err(err) => { + error!("Failed to get pending messages: {:?}", err); + } + } + } } } } } -async fn fetch_pending(lag_reader: &Source) -> crate::error::Result { +async fn fetch_source_pending(lag_reader: &Source) -> crate::error::Result { let response: i64 = lag_reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable) Ok(response) } +async fn fetch_isb_pending(reader: &mut JetstreamReader) -> crate::error::Result { + let response: i64 = reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable) + Ok(response) +} + // Periodically exposes the pending metrics by calculating the average pending messages over different intervals. async fn expose_pending_metrics( is_mono_vertex: bool, refresh_interval: Duration, - pending_stats: Arc>>, + pending_map: Arc>>>, lookback_seconds: u16, ) { let mut ticker = time::interval(refresh_interval); @@ -877,46 +949,46 @@ async fn expose_pending_metrics( loop { ticker.tick().await; - for (label, seconds) in lookback_seconds_map { - let pending = calculate_pending(seconds as i64, &pending_stats).await; - if pending != -1 { - let mut metric_labels = mvtx_forward_metric_labels().clone(); - metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); - pending_info.insert(label, pending); - if is_mono_vertex { - monovertex_metrics() - .pending - .get_or_create(&metric_labels) - .set(pending); - } else { - pipeline_metrics() - .forwarder - .pending - .get_or_create(&metric_labels) - .set(pending); + for (name, pending_stats) in pending_map.lock().await.iter() { + for (label, seconds) in lookback_seconds_map { + let pending = calculate_pending(seconds as i64, pending_stats).await; + if pending != -1 { + pending_info.insert(label, pending); + if is_mono_vertex { + let mut metric_labels = mvtx_forward_metric_labels().clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + monovertex_metrics() + .pending + .get_or_create(&metric_labels) + .set(pending); + } else { + let mut metric_labels = + pipeline_forward_metric_labels(name, Some(name)).clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + pipeline_metrics() + .pending + .get_or_create(&metric_labels) + .set(pending); + } } } - } - // skip for those the pending is not implemented - if !pending_info.is_empty() { - info!("Pending messages {:?}", pending_info); - pending_info.clear(); + // skip for those the pending is not implemented + if !pending_info.is_empty() { + info!("Pending messages {:?}", pending_info); + pending_info.clear(); + } } } } /// Calculate the average pending messages over the last `seconds` seconds. -async fn calculate_pending( - seconds: i64, - pending_stats: &Arc>>, -) -> i64 { +async fn calculate_pending(seconds: i64, pending_stats: &Vec) -> i64 { let mut result = -1; let mut total = 0; let mut num = 0; let now = std::time::Instant::now(); - let stats = pending_stats.lock().await; - for item in stats.iter().rev() { + for item in pending_stats.iter().rev() { if now.duration_since(item.timestamp).as_secs() < seconds as u64 { total += item.pending; num += 1; @@ -1098,8 +1170,11 @@ mod tests { } #[tokio::test] - async fn test_expose_pending_metrics() { - let pending_stats = Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))); + async fn test_expose_pending_metrics_for_source() { + let mut pending_map = HashMap::new(); + pending_map.insert("source".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + + let pending_stats = Arc::new(Mutex::new(pending_map)); let refresh_interval = Duration::from_secs(1); let lookback_seconds = 120; @@ -1107,19 +1182,20 @@ mod tests { // The array will be sorted by the timestamp with the most recent last. { let mut pending_stats = pending_stats.lock().await; - pending_stats.push(TimestampedPending { + let pending_vec = pending_stats.get_mut("source").unwrap(); + pending_vec.push(TimestampedPending { pending: 15, timestamp: Instant::now() - Duration::from_secs(150), }); - pending_stats.push(TimestampedPending { + pending_vec.push(TimestampedPending { pending: 30, timestamp: Instant::now() - Duration::from_secs(70), }); - pending_stats.push(TimestampedPending { + pending_vec.push(TimestampedPending { pending: 20, timestamp: Instant::now() - Duration::from_secs(30), }); - pending_stats.push(TimestampedPending { + pending_vec.push(TimestampedPending { pending: 10, timestamp: Instant::now(), }); @@ -1155,6 +1231,103 @@ mod tests { } assert_eq!(stored_values, [15, 20, 18, 18]); } + + #[tokio::test] + async fn test_expose_pending_metrics_for_isb() { + let mut pending_map = HashMap::new(); + pending_map.insert("stream1".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + pending_map.insert("stream2".to_string(), Vec::with_capacity(MAX_PENDING_STATS)); + + let pending_stats = Arc::new(Mutex::new(pending_map)); + let refresh_interval = Duration::from_secs(1); + let lookback_seconds = 120; + + // Populate pending_stats with some values. + // The array will be sorted by the timestamp with the most recent last. + { + let mut pending_stats = pending_stats.lock().await; + let pending_vec = pending_stats.get_mut("stream1").unwrap(); + pending_vec.push(TimestampedPending { + pending: 15, + timestamp: Instant::now() - Duration::from_secs(150), + }); + pending_vec.push(TimestampedPending { + pending: 30, + timestamp: Instant::now() - Duration::from_secs(70), + }); + pending_vec.push(TimestampedPending { + pending: 20, + timestamp: Instant::now() - Duration::from_secs(30), + }); + pending_vec.push(TimestampedPending { + pending: 10, + timestamp: Instant::now(), + }); + + let pending_vec = pending_stats.get_mut("stream2").unwrap(); + pending_vec.push(TimestampedPending { + pending: 15, + timestamp: Instant::now() - Duration::from_secs(150), + }); + pending_vec.push(TimestampedPending { + pending: 30, + timestamp: Instant::now() - Duration::from_secs(70), + }); + pending_vec.push(TimestampedPending { + pending: 20, + timestamp: Instant::now() - Duration::from_secs(30), + }); + pending_vec.push(TimestampedPending { + pending: 10, + timestamp: Instant::now(), + }); + } + + tokio::spawn({ + let pending_stats = Arc::clone(&pending_stats); + async move { + expose_pending_metrics(false, refresh_interval, pending_stats, lookback_seconds) + .await; + } + }); + + // We use tokio::time::interval() as the ticker in the expose_pending_metrics() function. + // The first tick happens immediately, so we don't need to wait for the refresh_interval for the first iteration to complete. + tokio::time::sleep(Duration::from_millis(50)).await; + + let lookback_seconds_map: [(&str, u16); 4] = + [("1m", 60), ("default", 120), ("5m", 300), ("15m", 900)]; + + // Get the stored values for all time intervals + // We will store the values corresponding to the labels (from lookback_seconds_map) "1m", "default", "5m", "15" in the same order in this array + let mut stored_values_stream_one: [i64; 4] = [0; 4]; + let mut stored_values_stream_two: [i64; 4] = [0; 4]; + + { + for (i, (label, _)) in lookback_seconds_map.iter().enumerate() { + let mut metric_labels = + pipeline_forward_metric_labels("stream1", Some("stream1")).clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + let guage = pipeline_metrics() + .pending + .get_or_create(&metric_labels) + .get(); + stored_values_stream_one[i] = guage; + + let mut metric_labels = + pipeline_forward_metric_labels("stream2", Some("stream2")).clone(); + metric_labels.push((PENDING_PERIOD_LABEL.to_string(), label.to_string())); + let guage = pipeline_metrics() + .pending + .get_or_create(&metric_labels) + .get(); + stored_values_stream_two[i] = guage; + } + } + assert_eq!(stored_values_stream_one, [15, 20, 18, 18]); + assert_eq!(stored_values_stream_two, [15, 20, 18, 18]); + } + #[test] fn test_exponential_buckets_range_basic() { let min = 1.0; diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 1518a3c9f..4d4f109d4 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -5,6 +5,7 @@ use tracing::info; use crate::config::is_mono_vertex; use crate::config::monovertex::MonovertexConfig; use crate::error::{self}; +use crate::metrics::LagReader; use crate::shared::create_components; use crate::sink::SinkWriter; use crate::source::Source; @@ -81,8 +82,11 @@ async fn start( cln_token: CancellationToken, ) -> error::Result<()> { // start the pending reader to publish pending metrics - let pending_reader = - shared::metrics::create_pending_reader(&mvtx_config.metrics_config, source.clone()).await; + let pending_reader = shared::metrics::create_pending_reader( + &mvtx_config.metrics_config, + LagReader::Source(source.clone()), + ) + .await; let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token); diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index d2cb77091..54dcd36ce 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -6,10 +6,10 @@ use futures::future::try_join_all; use tokio_util::sync::CancellationToken; use tracing::info; -use crate::config::pipeline; use crate::config::pipeline::map::MapVtxConfig; use crate::config::pipeline::{PipelineConfig, SinkVtxConfig, SourceVtxConfig}; -use crate::metrics::{PipelineContainerState, UserDefinedContainerState}; +use crate::config::{is_mono_vertex, pipeline}; +use crate::metrics::{LagReader, PipelineContainerState, UserDefinedContainerState}; use crate::pipeline::forwarder::source_forwarder; use crate::pipeline::isb::jetstream::reader::JetstreamReader; use crate::pipeline::isb::jetstream::writer::JetstreamWriter; @@ -18,10 +18,10 @@ use crate::shared::create_components; use crate::shared::create_components::create_sink_writer; use crate::shared::metrics::start_metrics_server; use crate::tracker::TrackerHandle; -use crate::{error, Result}; +use crate::{error, shared, Result}; mod forwarder; -mod isb; +pub(crate) mod isb; /// Starts the appropriate forwarder based on the pipeline configuration. pub(crate) async fn start_forwarder( @@ -77,6 +77,13 @@ async fn start_source_forwarder( ) .await?; + let pending_reader = shared::metrics::create_pending_reader( + &config.metrics_config, + LagReader::Source(source.clone()), + ) + .await; + let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; + start_metrics_server( config.metrics_config.clone(), UserDefinedContainerState::Pipeline(PipelineContainerState::Source(( @@ -117,6 +124,8 @@ async fn start_map_forwarder( // Create buffer writers and buffer readers let mut forwarder_components = vec![]; let mut mapper_grpc_client = None; + let mut isb_lag_readers = vec![]; + for stream in reader_config.streams.clone() { let tracker_handle = TrackerHandle::new(); @@ -129,6 +138,8 @@ async fn start_map_forwarder( ) .await?; + isb_lag_readers.push(buffer_reader.clone()); + let (mapper, mapper_rpc_client) = create_components::create_mapper( config.batch_size, config.read_timeout, @@ -152,6 +163,13 @@ async fn start_map_forwarder( forwarder_components.push((buffer_reader, buffer_writer, mapper)); } + let pending_reader = shared::metrics::create_pending_reader( + &config.metrics_config, + LagReader::ISB(isb_lag_readers), + ) + .await; + let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; + start_metrics_server( config.metrics_config.clone(), UserDefinedContainerState::Pipeline(PipelineContainerState::Map(mapper_grpc_client)), @@ -228,6 +246,13 @@ async fn start_sink_forwarder( sink_writers.push((sink_writer, sink_grpc_client, fb_sink_grpc_client)); } + let pending_reader = shared::metrics::create_pending_reader( + &config.metrics_config, + LagReader::ISB(buffer_readers.clone()), + ) + .await; + let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; + // Start the metrics server with one of the clients if let Some((_, sink, fb_sink)) = sink_writers.first() { start_metrics_server( @@ -324,7 +349,6 @@ async fn create_js_context(config: pipeline::isb::jetstream::ClientConfig) -> Re #[cfg(test)] mod tests { - use crate::pipeline::pipeline::map::MapMode; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -345,6 +369,7 @@ mod tests { use crate::config::pipeline::PipelineConfig; use crate::pipeline::pipeline::isb; use crate::pipeline::pipeline::isb::{BufferReaderConfig, BufferWriterConfig}; + use crate::pipeline::pipeline::map::MapMode; use crate::pipeline::pipeline::VertexType; use crate::pipeline::pipeline::{FromVertexConfig, ToVertexConfig}; use crate::pipeline::pipeline::{SinkVtxConfig, SourceVtxConfig}; @@ -420,7 +445,7 @@ mod tests { streams: streams .iter() .enumerate() - .map(|(i, stream_name)| (stream_name.to_string(), i as u16)) + .map(|(i, stream_name)| ((*stream_name).to_string(), i as u16)) .collect(), partitions: 5, max_length: 30000, diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 79b8572ef..152004aed 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -252,6 +252,20 @@ impl JetstreamReader { } } } + + pub(crate) async fn pending(&mut self) -> Result> { + let x = self.consumer.info().await.map_err(|e| { + Error::ISB(format!( + "Failed to get consumer info for stream {}: {}", + self.stream_name, e + )) + })?; + Ok(Some(x.num_pending as usize + x.num_ack_pending)) + } + + pub(crate) fn name(&self) -> &'static str { + self.stream_name + } } impl fmt::Display for JetstreamReader { diff --git a/rust/numaflow-core/src/shared/metrics.rs b/rust/numaflow-core/src/shared/metrics.rs index dfc22401a..70e7523b1 100644 --- a/rust/numaflow-core/src/shared/metrics.rs +++ b/rust/numaflow-core/src/shared/metrics.rs @@ -6,9 +6,9 @@ use tracing::error; use crate::config::components::metrics::MetricsConfig; use crate::metrics::{ - start_metrics_https_server, PendingReader, PendingReaderBuilder, UserDefinedContainerState, + start_metrics_https_server, LagReader, PendingReader, PendingReaderBuilder, + UserDefinedContainerState, }; -use crate::source::Source; /// Starts the metrics server pub(crate) async fn start_metrics_server( @@ -31,9 +31,9 @@ pub(crate) async fn start_metrics_server( /// Creates a pending reader pub(crate) async fn create_pending_reader( metrics_config: &MetricsConfig, - lag_reader_grpc_client: Source, + lag_reader: LagReader, ) -> PendingReader { - PendingReaderBuilder::new(lag_reader_grpc_client) + PendingReaderBuilder::new(lag_reader) .lag_checking_interval(Duration::from_secs( metrics_config.lag_check_interval_in_secs.into(), )) diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index c60c57bd3..be24f2f32 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -713,15 +713,16 @@ impl Drop for SinkWriter { mod tests { use std::sync::Arc; - use super::*; - use crate::message::{Message, MessageID, Offset, ReadAck, StringOffset}; - use crate::shared::grpc::create_rpc_channel; use chrono::{TimeZone, Utc}; use numaflow::sink; use numaflow_pb::clients::sink::{SinkRequest, SinkResponse}; use tokio::time::Duration; use tokio_util::sync::CancellationToken; + use super::*; + use crate::message::{Message, MessageID, Offset, ReadAck, StringOffset}; + use crate::shared::grpc::create_rpc_channel; + struct SimpleSink; #[tonic::async_trait] impl sink::Sinker for SimpleSink { diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index a30fc9777..d48c37ab6 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -1,5 +1,6 @@ -use numaflow_pulsar::source::PulsarSource; use std::sync::Arc; + +use numaflow_pulsar::source::PulsarSource; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::sync::{mpsc, oneshot}; diff --git a/rust/numaflow-core/src/source/serving.rs b/rust/numaflow-core/src/source/serving.rs index 431cfbba3..5eeea4b05 100644 --- a/rust/numaflow-core/src/source/serving.rs +++ b/rust/numaflow-core/src/source/serving.rs @@ -2,13 +2,12 @@ use std::sync::Arc; pub(crate) use serving::ServingSource; +use super::{get_vertex_name, Message, Offset}; use crate::config::get_vertex_replica; use crate::message::{MessageID, StringOffset}; use crate::Error; use crate::Result; -use super::{get_vertex_name, Message, Offset}; - impl TryFrom for Message { type Error = Error; @@ -83,16 +82,16 @@ impl super::LagReader for ServingSource { #[cfg(test)] mod tests { - use crate::{ - message::{Message, MessageID, Offset, StringOffset}, - source::{SourceAcker, SourceReader}, - }; use std::{collections::HashMap, sync::Arc, time::Duration}; use bytes::Bytes; use serving::{ServingSource, Settings}; use super::get_vertex_replica; + use crate::{ + message::{Message, MessageID, Offset, StringOffset}, + source::{SourceAcker, SourceReader}, + }; type Result = std::result::Result>; diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index 5f274119b..66ac456dd 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; +use std::time::Duration; + use base64::prelude::BASE64_STANDARD; use base64::Engine; use numaflow_pb::clients::source; @@ -5,8 +8,6 @@ use numaflow_pb::clients::source::source_client::SourceClient; use numaflow_pb::clients::source::{ read_request, read_response, AckRequest, AckResponse, ReadRequest, ReadResponse, }; -use std::sync::Arc; -use std::time::Duration; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; @@ -283,15 +284,16 @@ impl LagReader for UserDefinedSourceLagReader { mod tests { use std::collections::{HashMap, HashSet}; - use super::*; - use crate::message::IntOffset; - use crate::shared::grpc::{create_rpc_channel, prost_timestamp_from_utc}; use chrono::{TimeZone, Utc}; use numaflow::source; use numaflow::source::{Message, Offset, SourceReadRequest}; use numaflow_pb::clients::source::source_client::SourceClient; use tokio::sync::mpsc::Sender; + use super::*; + use crate::message::IntOffset; + use crate::shared::grpc::{create_rpc_channel, prost_timestamp_from_utc}; + struct SimpleSource { num: usize, yet_to_ack: std::sync::RwLock>, diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 78518e4c0..f519c0b80 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -184,16 +184,18 @@ impl UserDefinedTransformer { #[cfg(test)] mod tests { - use super::*; - use crate::message::StringOffset; - use crate::shared::grpc::create_rpc_channel; - use chrono::{TimeZone, Utc}; - use numaflow::sourcetransform; use std::error::Error; use std::result::Result; use std::time::Duration; + + use chrono::{TimeZone, Utc}; + use numaflow::sourcetransform; use tempfile::TempDir; + use super::*; + use crate::message::StringOffset; + use crate::shared::grpc::create_rpc_channel; + struct NowCat; #[tonic::async_trait] diff --git a/rust/serving/src/app.rs b/rust/serving/src/app.rs index 82ef1ef62..330752ad6 100644 --- a/rust/serving/src/app.rs +++ b/rust/serving/src/app.rs @@ -253,14 +253,14 @@ mod tests { use std::sync::Arc; use axum::http::StatusCode; + use callback::state::State as CallbackState; + use tokio::sync::mpsc; use tower::ServiceExt; + use tracker::MessageGraph; use super::*; use crate::app::callback::store::memstore::InMemoryStore; use crate::Settings; - use callback::state::State as CallbackState; - use tokio::sync::mpsc; - use tracker::MessageGraph; const PIPELINE_SPEC_ENCODED: &str = "eyJ2ZXJ0aWNlcyI6W3sibmFtZSI6ImluIiwic291cmNlIjp7InNlcnZpbmciOnsiYXV0aCI6bnVsbCwic2VydmljZSI6dHJ1ZSwibXNnSURIZWFkZXJLZXkiOiJYLU51bWFmbG93LUlkIiwic3RvcmUiOnsidXJsIjoicmVkaXM6Ly9yZWRpczo2Mzc5In19fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIiLCJlbnYiOlt7Im5hbWUiOiJSVVNUX0xPRyIsInZhbHVlIjoiZGVidWcifV19LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InBsYW5uZXIiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJwbGFubmVyIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InRpZ2VyIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsidGlnZXIiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZG9nIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZG9nIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6ImVsZXBoYW50IiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZWxlcGhhbnQiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiYXNjaWlhcnQiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJhc2NpaWFydCJdLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJidWlsdGluIjpudWxsLCJncm91cEJ5IjpudWxsfSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fSx7Im5hbWUiOiJzZXJ2ZS1zaW5rIiwic2luayI6eyJ1ZHNpbmsiOnsiY29udGFpbmVyIjp7ImltYWdlIjoic2VydmVzaW5rOjAuMSIsImVudiI6W3sibmFtZSI6Ik5VTUFGTE9XX0NBTExCQUNLX1VSTF9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctQ2FsbGJhY2stVXJsIn0seyJuYW1lIjoiTlVNQUZMT1dfTVNHX0lEX0hFQURFUl9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctSWQifV0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn19LCJyZXRyeVN0cmF0ZWd5Ijp7fX0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZXJyb3Itc2luayIsInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InNlcnZlc2luazowLjEiLCJlbnYiOlt7Im5hbWUiOiJOVU1BRkxPV19DQUxMQkFDS19VUkxfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUNhbGxiYWNrLVVybCJ9LHsibmFtZSI6Ik5VTUFGTE9XX01TR19JRF9IRUFERVJfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUlkIn1dLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9fSwicmV0cnlTdHJhdGVneSI6e319LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19XSwiZWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoicGxhbm5lciIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImFzY2lpYXJ0IiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiYXNjaWlhcnQiXX19fSx7ImZyb20iOiJwbGFubmVyIiwidG8iOiJ0aWdlciIsImNvbmRpdGlvbnMiOnsidGFncyI6eyJvcGVyYXRvciI6Im9yIiwidmFsdWVzIjpbInRpZ2VyIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZG9nIiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiZG9nIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZWxlcGhhbnQiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlbGVwaGFudCJdfX19LHsiZnJvbSI6InRpZ2VyIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZG9nIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZWxlcGhhbnQiLCJ0byI6InNlcnZlLXNpbmsiLCJjb25kaXRpb25zIjpudWxsfSx7ImZyb20iOiJhc2NpaWFydCIsInRvIjoic2VydmUtc2luayIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImVycm9yLXNpbmsiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlcnJvciJdfX19XSwibGlmZWN5Y2xlIjp7fSwid2F0ZXJtYXJrIjp7fX0="; diff --git a/rust/serving/src/config.rs b/rust/serving/src/config.rs index 16c2ee125..c485ad722 100644 --- a/rust/serving/src/config.rs +++ b/rust/serving/src/config.rs @@ -168,9 +168,8 @@ impl TryFrom> for Settings { #[cfg(test)] mod tests { - use crate::pipeline::{Edge, Vertex}; - use super::*; + use crate::pipeline::{Edge, Vertex}; #[test] fn test_default_config() { diff --git a/rust/serving/src/lib.rs b/rust/serving/src/lib.rs index bdc3aeab9..7fe953616 100644 --- a/rust/serving/src/lib.rs +++ b/rust/serving/src/lib.rs @@ -1,13 +1,13 @@ use std::net::SocketAddr; use std::sync::Arc; -use crate::app::callback::state::State as CallbackState; use app::callback::store::Store; use axum_server::tls_rustls::RustlsConfig; use tokio::sync::mpsc; use tracing::info; pub use self::error::{Error, Result}; +use crate::app::callback::state::State as CallbackState; use crate::app::start_main_server; use crate::config::generate_certs; use crate::metrics::start_https_metrics_server; @@ -23,9 +23,10 @@ mod metrics; mod pipeline; pub mod source; -use crate::source::MessageWrapper; pub use source::{Message, ServingSource}; +use crate::source::MessageWrapper; + #[derive(Clone)] pub(crate) struct AppState { pub(crate) message: mpsc::Sender, diff --git a/rust/serving/src/source.rs b/rust/serving/src/source.rs index d03817967..0efce5199 100644 --- a/rust/serving/src/source.rs +++ b/rust/serving/src/source.rs @@ -227,9 +227,8 @@ impl ServingSource { mod tests { use std::{sync::Arc, time::Duration}; - use crate::Settings; - use super::ServingSource; + use crate::Settings; type Result = std::result::Result>; #[tokio::test]