diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 761512fdb..9837be100 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1591,7 +1591,6 @@ dependencies = [ "log", "numaflow 0.1.1", "numaflow-models", - "once_cell", "parking_lot", "pep440_rs", "prometheus-client", diff --git a/rust/numaflow-core/src/config.rs b/rust/numaflow-core/src/config.rs index c3263e999..6310295f4 100644 --- a/rust/numaflow-core/src/config.rs +++ b/rust/numaflow-core/src/config.rs @@ -645,4 +645,4 @@ mod tests { let drop = OnFailureStrategy::Drop; assert_eq!(drop.to_string(), "drop"); } -} \ No newline at end of file +} diff --git a/rust/numaflow-core/src/lib.rs b/rust/numaflow-core/src/lib.rs index a941bd6cb..4e410c9f9 100644 --- a/rust/numaflow-core/src/lib.rs +++ b/rust/numaflow-core/src/lib.rs @@ -34,3 +34,6 @@ mod source; /// /// [Transformer]: https://numaflow.numaproj.io/user-guide/sources/transformer/overview/ mod transformer; + +/// Reads from a stream. +mod reader; diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index d230e994f..64f407976 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -7,9 +7,9 @@ use chrono::{DateTime, Utc}; use crate::error::Error; use crate::monovertex::sink_pb::sink_request::Request; use crate::monovertex::sink_pb::SinkRequest; -use crate::monovertex::{source_pb, sourcetransform_pb}; use crate::monovertex::source_pb::{read_response, AckRequest}; use crate::monovertex::sourcetransform_pb::SourceTransformRequest; +use crate::monovertex::{source_pb, sourcetransform_pb}; use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp}; /// A message that is sent from the source to the sink. diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index be87ad361..3aecbe066 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -3,7 +3,7 @@ use crate::error; use crate::shared::utils; use crate::shared::utils::create_rpc_channel; use crate::sink::user_defined::SinkWriter; -use crate::source::user_defined::Source; +use crate::source::user_defined::new_source; use crate::transformer::user_defined::SourceTransformer; use forwarder::ForwarderBuilder; use metrics::MetricsState; @@ -145,6 +145,13 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) -> ) .await?; + let (source_reader, lag_reader) = new_source( + source_grpc_client.clone(), + config().batch_size as usize, + config().timeout_in_ms as u16, + ) + .await?; + // Start the metrics server in a separate background async spawn, // This should be running throughout the lifetime of the application, hence the handle is not // joined. @@ -159,12 +166,12 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) -> // FIXME: what to do with the handle utils::start_metrics_server(metrics_state).await; - // start the lag reader to publish lag metrics - let mut lag_reader = utils::create_lag_reader(source_grpc_client.clone()).await; - lag_reader.start().await; + // start the pending reader to publish pending metrics + let mut pending_reader = utils::create_pending_reader(lag_reader).await; + pending_reader.start().await; // build the forwarder - let source_reader = Source::new(source_grpc_client.clone()).await?; + let sink_writer = SinkWriter::new(sink_grpc_client.clone()).await?; let mut forwarder_builder = ForwarderBuilder::new(source_reader, sink_writer, cln_token); diff --git a/rust/numaflow-core/src/monovertex/forwarder.rs b/rust/numaflow-core/src/monovertex/forwarder.rs index ab58cfad0..130305d01 100644 --- a/rust/numaflow-core/src/monovertex/forwarder.rs +++ b/rust/numaflow-core/src/monovertex/forwarder.rs @@ -6,21 +6,20 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use crate::config::{config, OnFailureStrategy}; -use crate::error; use crate::error::Error; use crate::message::{Message, Offset}; use crate::monovertex::metrics; use crate::monovertex::metrics::forward_metrics; use crate::monovertex::sink_pb::Status::{Failure, Fallback, Success}; use crate::sink::user_defined::SinkWriter; -use crate::source::user_defined::Source; use crate::transformer::user_defined::SourceTransformer; +use crate::{error, source}; /// Forwarder is responsible for reading messages from the source, applying transformation if /// transformer is present, writing the messages to the sink, and then acknowledging the messages /// back to the source. -pub(crate) struct Forwarder { - source: Source, +pub(crate) struct Forwarder { + source: T, sink_writer: SinkWriter, source_transformer: Option, fb_sink_writer: Option, @@ -29,21 +28,17 @@ pub(crate) struct Forwarder { } /// ForwarderBuilder is used to build a Forwarder instance with optional fields. -pub(crate) struct ForwarderBuilder { - source: Source, +pub(crate) struct ForwarderBuilder { + source: T, sink_writer: SinkWriter, cln_token: CancellationToken, source_transformer: Option, fb_sink_writer: Option, } -impl ForwarderBuilder { +impl ForwarderBuilder { /// Create a new builder with mandatory fields - pub(crate) fn new( - source: Source, - sink_writer: SinkWriter, - cln_token: CancellationToken, - ) -> Self { + pub(crate) fn new(source: T, sink_writer: SinkWriter, cln_token: CancellationToken) -> Self { Self { source, sink_writer, @@ -67,7 +62,7 @@ impl ForwarderBuilder { /// Build the Forwarder instance #[must_use] - pub(crate) fn build(self) -> Forwarder { + pub(crate) fn build(self) -> Forwarder { let common_labels = metrics::forward_metrics_labels().clone(); Forwarder { source: self.source, @@ -80,7 +75,10 @@ impl ForwarderBuilder { } } -impl Forwarder { +impl Forwarder +where + T: source::Source, +{ /// start starts the forward-a-chunk loop and exits only after a chunk has been forwarded and ack'ed. /// this means that, in the happy path scenario a block is always completely processed. /// this function will return on any error and will cause end up in a non-0 exit code. @@ -121,13 +119,9 @@ impl Forwarder { /// and then acknowledge the messages back to the source. async fn read_and_process_messages(&mut self) -> error::Result { let start_time = tokio::time::Instant::now(); - let messages = self - .source - .read(config().batch_size, config().timeout_in_ms) - .await - .map_err(|e| { - Error::ForwarderError(format!("Failed to read messages from source {:?}", e)) - })?; + let messages = self.source.read().await.map_err(|e| { + Error::ForwarderError(format!("Failed to read messages from source {:?}", e)) + })?; debug!( "Read batch size: {} and latency - {}ms", @@ -542,21 +536,21 @@ impl Forwarder { mod tests { use std::collections::HashSet; - use chrono::Utc; - use numaflow::source::{Message, Offset, SourceReadRequest}; - use numaflow::{sink, source, sourcetransform}; - use tokio::sync::mpsc; - use tokio::sync::mpsc::Sender; - use tokio_util::sync::CancellationToken; - + use crate::config::config; use crate::monovertex::forwarder::ForwarderBuilder; use crate::monovertex::sink_pb::sink_client::SinkClient; use crate::monovertex::source_pb::source_client::SourceClient; use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient; use crate::shared::utils::create_rpc_channel; use crate::sink::user_defined::SinkWriter; - use crate::source::user_defined::Source; + use crate::source::user_defined::UserDefinedSource; use crate::transformer::user_defined::SourceTransformer; + use chrono::Utc; + use numaflow::source::{Message, Offset, SourceReadRequest}; + use numaflow::{sink, source, sourcetransform}; + use tokio::sync::mpsc; + use tokio::sync::mpsc::Sender; + use tokio_util::sync::CancellationToken; struct SimpleSource { yet_to_be_acked: std::sync::RwLock>, @@ -735,9 +729,11 @@ mod tests { let cln_token = CancellationToken::new(); - let source = Source::new(SourceClient::new( - create_rpc_channel(source_sock_file.clone()).await.unwrap(), - )) + let source = UserDefinedSource::new( + SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), + config().batch_size as usize, + config().timeout_in_ms as u16, + ) .await .expect("failed to connect to source server"); @@ -857,9 +853,11 @@ mod tests { let cln_token = CancellationToken::new(); - let source = Source::new(SourceClient::new( - create_rpc_channel(source_sock_file.clone()).await.unwrap(), - )) + let source = UserDefinedSource::new( + SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), + 500, + 100, + ) .await .expect("failed to connect to source server"); @@ -971,9 +969,11 @@ mod tests { let cln_token = CancellationToken::new(); - let source = Source::new(SourceClient::new( - create_rpc_channel(source_sock_file.clone()).await.unwrap(), - )) + let source = UserDefinedSource::new( + SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()), + 500, + 100, + ) .await .expect("failed to connect to source server"); diff --git a/rust/numaflow-core/src/monovertex/metrics.rs b/rust/numaflow-core/src/monovertex/metrics.rs index 496f14330..adbbdde3b 100644 --- a/rust/numaflow-core/src/monovertex/metrics.rs +++ b/rust/numaflow-core/src/monovertex/metrics.rs @@ -21,6 +21,7 @@ use crate::error::Error; use crate::monovertex::sink_pb::sink_client::SinkClient; use crate::monovertex::source_pb::source_client::SourceClient; use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient; +use crate::reader; use prometheus_client::encoding::text::encode; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; @@ -362,11 +363,11 @@ struct TimestampedPending { timestamp: std::time::Instant, } -/// `LagReader` is responsible for periodically checking the lag of the source client +/// PendingReader is responsible for periodically checking the lag of the reader /// and exposing the metrics. It maintains a list of pending stats and ensures that /// only the most recent entries are kept. -pub(crate) struct LagReader { - source_client: SourceClient, +pub(crate) struct PendingReader { + lag_reader: T, lag_checking_interval: Duration, refresh_interval: Duration, buildup_handle: Option>, @@ -374,17 +375,17 @@ pub(crate) struct LagReader { pending_stats: Arc>>, } -/// LagReaderBuilder is used to build a `LagReader` instance. -pub(crate) struct LagReaderBuilder { - source_client: SourceClient, +/// PendingReaderBuilder is used to build a [LagReader] instance. +pub(crate) struct PendingReaderBuilder { + lag_reader: T, lag_checking_interval: Option, refresh_interval: Option, } -impl LagReaderBuilder { - pub(crate) fn new(source_client: SourceClient) -> Self { +impl PendingReaderBuilder { + pub(crate) fn new(lag_reader: T) -> Self { Self { - source_client, + lag_reader, lag_checking_interval: None, refresh_interval: None, } @@ -400,9 +401,9 @@ impl LagReaderBuilder { self } - pub(crate) fn build(self) -> LagReader { - LagReader { - source_client: self.source_client, + pub(crate) fn build(self) -> PendingReader { + PendingReader { + lag_reader: self.lag_reader, lag_checking_interval: self .lag_checking_interval .unwrap_or_else(|| Duration::from_secs(3)), @@ -416,20 +417,20 @@ impl LagReaderBuilder { } } -impl LagReader { +impl PendingReader { /// Starts the lag reader by spawning tasks to build up pending info and expose pending metrics. /// /// This method spawns two asynchronous tasks: /// - One to periodically check the lag and update the pending stats. /// - Another to periodically expose the pending metrics. pub async fn start(&mut self) { - let source_client = self.source_client.clone(); + let pending_reader = self.lag_reader.clone(); let lag_checking_interval = self.lag_checking_interval; let refresh_interval = self.refresh_interval; let pending_stats = self.pending_stats.clone(); self.buildup_handle = Some(tokio::spawn(async move { - build_pending_info(source_client, lag_checking_interval, pending_stats).await; + build_pending_info(pending_reader, lag_checking_interval, pending_stats).await; })); let pending_stats = self.pending_stats.clone(); @@ -439,8 +440,8 @@ impl LagReader { } } -/// When lag-reader is dropped, we need to clean up the pending exposer and the pending builder tasks. -impl Drop for LagReader { +/// When the PendingReader is dropped, we need to clean up the pending exposer and the pending builder tasks. +impl Drop for PendingReader { fn drop(&mut self) { if let Some(handle) = self.expose_handle.take() { handle.abort(); @@ -454,15 +455,15 @@ impl Drop for LagReader { } /// Periodically checks the pending messages from the source client and build the pending stats. -async fn build_pending_info( - mut source_client: SourceClient, +async fn build_pending_info( + mut lag_reader: T, lag_checking_interval: Duration, pending_stats: Arc>>, ) { let mut ticker = time::interval(lag_checking_interval); loop { ticker.tick().await; - match fetch_pending(&mut source_client).await { + match fetch_pending(&mut lag_reader).await { Ok(pending) => { if pending != -1 { let mut stats = pending_stats.lock().await; @@ -484,14 +485,8 @@ async fn build_pending_info( } } -async fn fetch_pending(source_client: &mut SourceClient) -> crate::error::Result { - let request = Request::new(()); - let response = source_client - .pending_fn(request) - .await? - .into_inner() - .result - .map_or(-1, |r| r.count); // default to -1(unavailable) +async fn fetch_pending(lag_reader: &mut T) -> crate::error::Result { + let response: i64 = lag_reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable) Ok(response) } @@ -556,8 +551,6 @@ async fn calculate_pending( result } -// TODO add tests - #[cfg(test)] mod tests { use super::*; diff --git a/rust/numaflow-core/src/reader.rs b/rust/numaflow-core/src/reader.rs new file mode 100644 index 000000000..43dc7e8fc --- /dev/null +++ b/rust/numaflow-core/src/reader.rs @@ -0,0 +1,9 @@ +/// Lag reader reports the pending information at Reader (source, ISBs), this information is used by +/// the auto-scaler. +#[trait_variant::make(LagReader: Send)] +#[allow(dead_code)] +pub(crate) trait LocalLagReader { + /// Pending elements yet to be read from the stream. The stream could be the [crate::source], or ISBs + /// It may or may not include unacknowledged messages. + async fn pending(&mut self) -> crate::error::Result>; +} diff --git a/rust/numaflow-core/src/shared/server_info.rs b/rust/numaflow-core/src/shared/server_info.rs index 0df809363..f058b7a31 100644 --- a/rust/numaflow-core/src/shared/server_info.rs +++ b/rust/numaflow-core/src/shared/server_info.rs @@ -73,7 +73,12 @@ pub(crate) async fn check_for_server_compatibility( } else { // Get minimum supported SDK versions and check compatibility let min_supported_sdk_versions = version::get_minimum_supported_sdk_versions(); - check_sdk_compatibility(sdk_version, sdk_language, container_type, min_supported_sdk_versions)?; + check_sdk_compatibility( + sdk_version, + sdk_language, + container_type, + min_supported_sdk_versions, + )?; } Ok(()) @@ -110,19 +115,20 @@ fn check_numaflow_compatibility( fn check_sdk_compatibility( sdk_version: &str, sdk_language: &str, - container_type : &str, + container_type: &str, min_supported_sdk_versions: &SdkConstraints, ) -> error::Result<()> { // Check if the SDK language is present in the minimum supported SDK versions if !min_supported_sdk_versions.contains_key(sdk_language) { return Err(Error::ServerInfoError(format!( "SDK version constraint not found for language: {}, container type: {}", - sdk_language, - container_type + sdk_language, container_type ))); } let empty_map = HashMap::new(); - let lang_constraints = min_supported_sdk_versions.get(sdk_language).unwrap_or(&empty_map); + let lang_constraints = min_supported_sdk_versions + .get(sdk_language) + .unwrap_or(&empty_map); if let Some(sdk_required_version) = lang_constraints.get(container_type) { let sdk_constraint = format!(">={}", sdk_required_version); @@ -161,15 +167,13 @@ fn check_sdk_compatibility( // Language not found in the supported SDK versions warn!( "SDK version constraint not found for language: {}, container type: {}", - sdk_language, - container_type + sdk_language, container_type ); // Return error indicating the language return Err(Error::ServerInfoError(format!( "SDK version constraint not found for language: {}, container type: {}", - sdk_language, - container_type + sdk_language, container_type ))); } Ok(()) @@ -263,9 +267,7 @@ fn trim_after_dash(input: &str) -> &str { /// The file name is in the format of -server-info. fn get_container_type(server_info_file: &PathBuf) -> Option<&str> { let file_name = server_info_file.file_name()?; - let container_type = file_name - .to_str()? - .trim_end_matches("-server-info"); + let container_type = file_name.to_str()?.trim_end_matches("-server-info"); if container_type.is_empty() { None } else { @@ -562,8 +564,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -574,8 +580,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language,TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -589,8 +599,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -601,8 +615,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -616,8 +634,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -628,8 +650,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -643,8 +669,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -655,8 +685,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -670,8 +704,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -682,8 +720,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_stable_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -697,8 +739,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -709,8 +755,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -724,8 +774,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -736,8 +790,12 @@ mod tests { let sdk_language = "python"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -751,8 +809,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -763,8 +825,12 @@ mod tests { let sdk_language = "java"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -778,8 +844,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -790,8 +860,12 @@ mod tests { let sdk_language = "go"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( @@ -805,8 +879,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_ok()); } @@ -817,8 +895,12 @@ mod tests { let sdk_language = "rust"; let min_supported_sdk_versions = create_sdk_constraints_pre_release_versions(); - let result = - check_sdk_compatibility(sdk_version, sdk_language, TEST_CONTAINER_TYPE, &min_supported_sdk_versions); + let result = check_sdk_compatibility( + sdk_version, + sdk_language, + TEST_CONTAINER_TYPE, + &min_supported_sdk_versions, + ); assert!(result.is_err()); assert!( diff --git a/rust/numaflow-core/src/shared/utils.rs b/rust/numaflow-core/src/shared/utils.rs index 99e9cef99..4068aaf9b 100644 --- a/rust/numaflow-core/src/shared/utils.rs +++ b/rust/numaflow-core/src/shared/utils.rs @@ -3,15 +3,15 @@ use std::path::PathBuf; use std::time::Duration; use crate::config::config; -use crate::error; use crate::error::Error; use crate::monovertex::metrics::{ - start_metrics_https_server, LagReader, LagReaderBuilder, MetricsState, + start_metrics_https_server, MetricsState, PendingReader, PendingReaderBuilder, }; use crate::monovertex::sink_pb::sink_client::SinkClient; use crate::monovertex::source_pb::source_client::SourceClient; use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient; use crate::shared::server_info; +use crate::{error, reader}; use axum::http::Uri; use backoff::retry::Retry; @@ -81,8 +81,10 @@ pub(crate) async fn start_metrics_server(metrics_state: MetricsState) -> JoinHan }) } -pub(crate) async fn create_lag_reader(lag_reader_grpc_client: SourceClient) -> LagReader { - LagReaderBuilder::new(lag_reader_grpc_client) +pub(crate) async fn create_pending_reader( + lag_reader_grpc_client: T, +) -> PendingReader { + PendingReaderBuilder::new(lag_reader_grpc_client) .lag_checking_interval(Duration::from_secs( config().lag_check_interval_in_secs.into(), )) diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index 4ba2755ce..34108bec1 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -1,4 +1,22 @@ +use crate::message::{Message, Offset}; + /// [User-Defined Source] extends Numaflow to add custom sources supported outside the builtins. /// /// [User-Defined Source]: https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/ pub(crate) mod user_defined; + +/// Set of items that has to be implemented to become a Source. +pub(crate) trait Source { + #[allow(dead_code)] + /// Name of the source. + fn name(&self) -> &'static str; + + async fn read(&mut self) -> crate::Result>; + + /// acknowledge an offset. The implementor might choose to do it in an asynchronous way. + async fn ack(&mut self, _: Vec) -> crate::Result<()>; + + #[allow(dead_code)] + /// number of partitions processed by this source. + fn partitions(&self) -> Vec; +} diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index b51324a68..53ab9d9dc 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -5,24 +5,44 @@ use crate::message::{Message, Offset}; use crate::monovertex::source_pb; use crate::monovertex::source_pb::source_client::SourceClient; use crate::monovertex::source_pb::{ - ack_response, read_request, AckRequest, AckResponse, ReadRequest, ReadResponse, + read_request, AckRequest, AckResponse, ReadRequest, ReadResponse, }; +use crate::reader::LagReader; +use crate::source::Source; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::{Request, Streaming}; -/// SourceReader reads messages from a source. +/// User-Defined Source to operative on custom sources. #[derive(Debug)] -pub(crate) struct Source { +pub(crate) struct UserDefinedSource { read_tx: mpsc::Sender, resp_stream: Streaming, ack_tx: mpsc::Sender, ack_resp_stream: Streaming, + num_records: usize, + timeout_in_ms: u16, } -impl Source { - pub(crate) async fn new(mut client: SourceClient) -> error::Result { +/// Creates a new User-Defined Source and its corresponding Lag Reader. +pub(crate) async fn new_source( + client: SourceClient, + num_records: usize, + timeout_in_ms: u16, +) -> error::Result<(UserDefinedSource, UserDefinedSourceLagReader)> { + let ud_src = UserDefinedSource::new(client.clone(), num_records, timeout_in_ms).await?; + let lag_reader = UserDefinedSourceLagReader::new(client); + + Ok((ud_src, lag_reader)) +} + +impl UserDefinedSource { + pub(crate) async fn new( + mut client: SourceClient, + num_records: usize, + timeout_in_ms: u16, + ) -> error::Result { let (read_tx, resp_stream) = Self::create_reader(&mut client).await?; let (ack_tx, ack_resp_stream) = Self::create_acker(&mut client).await?; @@ -31,6 +51,8 @@ impl Source { resp_stream, ack_tx, ack_resp_stream, + num_records, + timeout_in_ms, }) } @@ -98,16 +120,18 @@ impl Source { Ok((ack_tx, ack_resp_stream)) } +} + +impl Source for UserDefinedSource { + fn name(&self) -> &'static str { + "user-defined-source" + } - pub(crate) async fn read( - &mut self, - num_records: u64, - timeout_in_ms: u32, - ) -> error::Result> { + async fn read(&mut self) -> error::Result> { let request = ReadRequest { request: Some(read_request::Request { - num_records, - timeout_in_ms, + num_records: self.num_records as u64, + timeout_in_ms: self.timeout_in_ms as u32, }), handshake: None, }; @@ -117,7 +141,7 @@ impl Source { .await .map_err(|e| SourceError(e.to_string()))?; - let mut messages = Vec::with_capacity(num_records as usize); + let mut messages = Vec::with_capacity(self.num_records); while let Some(response) = self.resp_stream.message().await? { if response.status.map_or(false, |status| status.eot) { @@ -133,7 +157,7 @@ impl Source { Ok(messages) } - pub(crate) async fn ack(&mut self, offsets: Vec) -> error::Result { + async fn ack(&mut self, offsets: Vec) -> error::Result<()> { let n = offsets.len(); // send n ack requests @@ -154,20 +178,46 @@ impl Source { .ok_or(SourceError("failed to receive ack response".to_string()))?; } - Ok(AckResponse { - result: Some(ack_response::Result { success: Some(()) }), - handshake: None, - }) + Ok(()) + } + + fn partitions(&self) -> Vec { + todo!() + } +} + +#[derive(Clone)] +pub(crate) struct UserDefinedSourceLagReader { + source_client: SourceClient, +} + +impl UserDefinedSourceLagReader { + fn new(source_client: SourceClient) -> Self { + Self { source_client } + } +} + +impl LagReader for UserDefinedSourceLagReader { + async fn pending(&mut self) -> error::Result> { + Ok(self + .source_client + .pending_fn(Request::new(())) + .await? + .into_inner() + .result + .map(|r| r.count as usize)) } } #[cfg(test)] mod tests { + use super::*; + use std::collections::HashSet; use crate::monovertex::source_pb::source_client::SourceClient; use crate::shared::utils::create_rpc_channel; - use crate::source::user_defined::Source; + use chrono::Utc; use numaflow::source; use numaflow::source::{Message, Offset, SourceReadRequest}; @@ -253,19 +303,21 @@ mod tests { let client = SourceClient::new(create_rpc_channel(sock_file).await.unwrap()); - let mut source = Source::new(client) + let (mut source, mut lag_reader) = new_source(client, 5, 1000) .await .map_err(|e| panic!("failed to create source reader: {:?}", e)) .unwrap(); - let messages = source.read(5, 1000).await.unwrap(); + let messages = source.read().await.unwrap(); assert_eq!(messages.len(), 5); let response = source .ack(messages.iter().map(|m| m.offset.clone()).collect()) - .await - .unwrap(); - assert!(response.result.unwrap().success.is_some()); + .await; + assert!(response.is_ok()); + + let pending = lag_reader.pending().await.unwrap(); + assert_eq!(pending, Some(0)); // we need to drop the client, because if there are any in-flight requests // server fails to shut down. https://github.com/numaproj/numaflow-rs/issues/85 diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 71a9d24cd..b2564b0e7 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -1,17 +1,20 @@ use std::collections::HashMap; -use tonic::transport::Channel; -use tonic::{Request, Streaming}; +use crate::config::config; +use crate::error::{Error, Result}; +use crate::message::{Message, Offset}; +use crate::monovertex::sourcetransform_pb::{ + self, source_transform_client::SourceTransformClient, SourceTransformRequest, + SourceTransformResponse, +}; +use crate::shared::utils::utc_from_timestamp; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; +use tonic::transport::Channel; +use tonic::{Request, Streaming}; use tracing::warn; -use crate::error::{Result, Error}; -use crate::message::{Message, Offset}; -use crate::monovertex::sourcetransform_pb::{self, SourceTransformRequest, SourceTransformResponse, source_transform_client::SourceTransformClient}; -use crate::shared::utils::utc_from_timestamp; -use crate::config::config; const DROP: &str = "U+005C__DROP__"; @@ -216,7 +219,7 @@ mod tests { let mut client = SourceTransformer::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?, )) - .await?; + .await?; let message = crate::message::Message { keys: vec!["first".into()], @@ -234,7 +237,7 @@ mod tests { tokio::time::Duration::from_secs(2), client.transform_fn(vec![message]), ) - .await??; + .await??; assert_eq!(resp.len(), 1); // we need to drop the client, because if there are any in-flight requests @@ -291,7 +294,7 @@ mod tests { let mut client = SourceTransformer::new(SourceTransformClient::new( create_rpc_channel(sock_file).await?, )) - .await?; + .await?; let message = crate::message::Message { keys: vec!["second".into()], @@ -318,4 +321,4 @@ mod tests { handle.await.expect("failed to join server task"); Ok(()) } -} \ No newline at end of file +}