Skip to content

Commit

Permalink
update pending reader creation
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jan 15, 2025
1 parent facd0e6 commit b71ff0c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 45 deletions.
14 changes: 4 additions & 10 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ struct TimestampedPending {
pub(crate) enum LagReader {
Source(Source),
// TODO: Arc<[T]>
ISB(Vec<JetstreamReader>) // multiple partitions
ISB(Vec<JetstreamReader>), // multiple partitions
}

/// PendingReader is responsible for periodically checking the lag of the reader
Expand Down Expand Up @@ -756,7 +756,6 @@ impl PendingReaderBuilder {
}
}


pub(crate) fn lag_checking_interval(mut self, interval: Duration) -> Self {
self.lag_checking_interval = Some(interval);
self
Expand All @@ -775,7 +774,7 @@ impl PendingReaderBuilder {
pub(crate) fn build(self) -> PendingReader {
let mut pending_map = HashMap::new();
match &self.lag_reader {
LagReader::Source(source) => {
LagReader::Source(_) => {
pending_map.insert("source".to_string(), Vec::with_capacity(MAX_PENDING_STATS));
}
LagReader::ISB(readers) => {
Expand Down Expand Up @@ -819,12 +818,7 @@ impl PendingReader {

let lag_reader = self.lag_reader.clone();
let buildup_handle = tokio::spawn(async move {
build_pending_info(
lag_reader,
lag_checking_interval,
pending_stats,
)
.await;
build_pending_info(lag_reader, lag_checking_interval, pending_stats).await;
});

let pending_stats = Arc::clone(&self.pending_stats);
Expand Down Expand Up @@ -860,7 +854,7 @@ async fn build_pending_info(
pending_stats: Arc<Mutex<HashMap<String, Vec<TimestampedPending>>>>,
) {
let mut ticker = time::interval(lag_checking_interval);

loop {
ticker.tick().await;

Expand Down
9 changes: 6 additions & 3 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tracing::info;
use crate::config::is_mono_vertex;
use crate::config::monovertex::MonovertexConfig;
use crate::error::{self};
use crate::metrics::LagReader;
use crate::shared::create_components;
use crate::sink::SinkWriter;
use crate::source::Source;
Expand Down Expand Up @@ -81,9 +82,11 @@ async fn start(
cln_token: CancellationToken,
) -> error::Result<()> {
// start the pending reader to publish pending metrics
let pending_reader =
shared::metrics::create_source_pending_reader(&mvtx_config.metrics_config, source.clone())
.await;
let pending_reader = shared::metrics::create_pending_reader(
&mvtx_config.metrics_config,
LagReader::Source(source.clone()),
)
.await;
let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await;

let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token);
Expand Down
26 changes: 17 additions & 9 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::info;
use crate::config::pipeline::map::MapVtxConfig;
use crate::config::pipeline::{PipelineConfig, SinkVtxConfig, SourceVtxConfig};
use crate::config::{is_mono_vertex, pipeline};
use crate::metrics::{PipelineContainerState, UserDefinedContainerState};
use crate::metrics::{LagReader, PipelineContainerState, UserDefinedContainerState};
use crate::pipeline::forwarder::source_forwarder;
use crate::pipeline::isb::jetstream::reader::JetstreamReader;
use crate::pipeline::isb::jetstream::writer::JetstreamWriter;
Expand Down Expand Up @@ -77,8 +77,11 @@ async fn start_source_forwarder(
)
.await?;

let pending_reader =
shared::metrics::create_source_pending_reader(&config.metrics_config, source.clone()).await;
let pending_reader = shared::metrics::create_pending_reader(
&config.metrics_config,
LagReader::Source(source.clone()),
)
.await;
let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await;

start_metrics_server(
Expand Down Expand Up @@ -160,8 +163,11 @@ async fn start_map_forwarder(
forwarder_components.push((buffer_reader, buffer_writer, mapper));
}

let pending_reader =
shared::metrics::create_isb_pending_reader(&config.metrics_config, isb_lag_readers).await;
let pending_reader = shared::metrics::create_pending_reader(
&config.metrics_config,
LagReader::ISB(isb_lag_readers),
)
.await;
let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await;

start_metrics_server(
Expand Down Expand Up @@ -240,9 +246,11 @@ async fn start_sink_forwarder(
sink_writers.push((sink_writer, sink_grpc_client, fb_sink_grpc_client));
}

let pending_reader =
shared::metrics::create_isb_pending_reader(&config.metrics_config, buffer_readers.clone())
.await;
let pending_reader = shared::metrics::create_pending_reader(
&config.metrics_config,
LagReader::ISB(buffer_readers.clone()),
)
.await;
let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await;

// Start the metrics server with one of the clients
Expand Down Expand Up @@ -437,7 +445,7 @@ mod tests {
streams: streams
.iter()
.enumerate()
.map(|(i, stream_name)| (stream_name.to_string(), i as u16))
.map(|(i, stream_name)| ((*stream_name).to_string(), i as u16))
.collect(),
partitions: 5,
max_length: 30000,
Expand Down
28 changes: 5 additions & 23 deletions rust/numaflow-core/src/shared/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use tracing::error;

use crate::config::components::metrics::MetricsConfig;
use crate::metrics::{
start_metrics_https_server, PendingReader, PendingReaderBuilder, UserDefinedContainerState,
start_metrics_https_server, LagReader, PendingReader, PendingReaderBuilder,
UserDefinedContainerState,
};
use crate::pipeline::isb::jetstream::reader::JetstreamReader;
use crate::source::Source;

/// Starts the metrics server
pub(crate) async fn start_metrics_server(
Expand All @@ -30,34 +29,17 @@ pub(crate) async fn start_metrics_server(
}

/// Creates a pending reader
pub(crate) async fn create_source_pending_reader(
pub(crate) async fn create_pending_reader(
metrics_config: &MetricsConfig,
lag_reader: Source,
lag_reader: LagReader,
) -> PendingReader {
PendingReaderBuilder::new()
PendingReaderBuilder::new(lag_reader)
.lag_checking_interval(Duration::from_secs(
metrics_config.lag_check_interval_in_secs.into(),
))
.refresh_interval(Duration::from_secs(
metrics_config.lag_refresh_interval_in_secs.into(),
))
.lookback_seconds(metrics_config.lookback_window_in_secs)
.source_lag_reader(lag_reader)
.build()
}

pub(crate) async fn create_isb_pending_reader(
metrics_config: &MetricsConfig,
lag_readers: Vec<JetstreamReader>,
) -> PendingReader {
PendingReaderBuilder::new()
.lag_checking_interval(Duration::from_secs(
metrics_config.lag_check_interval_in_secs.into(),
))
.refresh_interval(Duration::from_secs(
metrics_config.lag_refresh_interval_in_secs.into(),
))
.lookback_seconds(metrics_config.lookback_window_in_secs)
.isb_lag_readers(lag_readers)
.build()
}

0 comments on commit b71ff0c

Please sign in to comment.