diff --git a/rust/numaflow-core/src/config/monovertex.rs b/rust/numaflow-core/src/config/monovertex.rs index 4fdbc95fe..4bd7ac565 100644 --- a/rust/numaflow-core/src/config/monovertex.rs +++ b/rust/numaflow-core/src/config/monovertex.rs @@ -150,7 +150,7 @@ impl MonovertexConfig { .unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS); let mut callback_config = None; - if let Ok(_) = env::var(ENV_CALLBACK_ENABLED) { + if env::var(ENV_CALLBACK_ENABLED).is_ok() { let callback_concurrency: usize = env::var(ENV_CALLBACK_CONCURRENCY) .unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}")) .parse() diff --git a/rust/numaflow-core/src/config/pipeline.rs b/rust/numaflow-core/src/config/pipeline.rs index 549fff556..7e7fdf84f 100644 --- a/rust/numaflow-core/src/config/pipeline.rs +++ b/rust/numaflow-core/src/config/pipeline.rs @@ -390,7 +390,7 @@ impl PipelineConfig { .unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS); let mut callback_config = None; - if let Ok(_) = get_var(ENV_CALLBACK_ENABLED) { + if get_var(ENV_CALLBACK_ENABLED).is_ok() { let callback_concurrency: usize = get_var(ENV_CALLBACK_CONCURRENCY) .unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}")) .parse() diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index c9682a48b..b1e85a8ba 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use bytes::Bytes; use numaflow_pb::clients::map::map_client::MapClient; use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; @@ -328,6 +329,7 @@ impl MapHandle { tokio::spawn(async move { let _permit = permit; + let offset = read_msg.id.offset.clone(); let (sender, receiver) = oneshot::channel(); let msg = UnaryActorMessage { message: read_msg.clone(), @@ -344,7 +346,16 @@ impl MapHandle { match receiver.await { Ok(Ok(mut mapped_messages)) => { // update the tracker with the number of messages sent and send the mapped messages - if let Err(e) = tracker_handle.update_many(&mapped_messages, true).await { + for message in mapped_messages.iter() { + if let Err(e) = tracker_handle + .update(offset.clone(), message.tags.clone()) + .await + { + error_tx.send(e).await.expect("failed to send error"); + return; + } + } + if let Err(e) = tracker_handle.update_eof(offset).await { error_tx.send(e).await.expect("failed to send error"); return; } @@ -391,7 +402,18 @@ impl MapHandle { for receiver in receivers { match receiver.await { Ok(Ok(mut mapped_messages)) => { - tracker_handle.update_many(&mapped_messages, true).await?; + let mut offset: Option = None; + for message in mapped_messages.iter() { + if offset.is_none() { + offset = Some(message.id.offset.clone()); + } + tracker_handle + .update(message.id.offset.clone(), message.tags.clone()) + .await?; + } + if let Some(offset) = offset { + tracker_handle.update_eof(offset).await?; + } for mapped_message in mapped_messages.drain(..) { output_tx .send(mapped_message) @@ -445,7 +467,13 @@ impl MapHandle { while let Some(result) = receiver.recv().await { match result { Ok(mapped_message) => { - if let Err(e) = tracker_handle.update(&mapped_message).await { + if let Err(e) = tracker_handle + .update( + mapped_message.id.offset.clone(), + mapped_message.tags.clone(), + ) + .await + { error_tx.send(e).await.expect("failed to send error"); return; } diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index ef285f57d..e0eb0cc1e 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -721,6 +721,7 @@ struct TimestampedPending { #[derive(Clone)] pub(crate) enum LagReader { Source(Source), + #[allow(clippy::upper_case_acronyms)] ISB(Vec), // multiple partitions } @@ -862,7 +863,7 @@ async fn build_pending_info( match &mut lag_reader { LagReader::Source(source) => { - match fetch_source_pending(&source).await { + match fetch_source_pending(source).await { Ok(pending) => { if pending != -1 { let mut stats = pending_stats.lock().await; @@ -887,8 +888,8 @@ async fn build_pending_info( } LagReader::ISB(readers) => { - for mut reader in readers { - match fetch_isb_pending(&mut reader).await { + for reader in readers { + match fetch_isb_pending(reader).await { Ok(pending) => { if pending != -1 { let mut stats = pending_stats.lock().await; @@ -984,7 +985,7 @@ async fn expose_pending_metrics( } /// Calculate the average pending messages over the last `seconds` seconds. -async fn calculate_pending(seconds: i64, pending_stats: &Vec) -> i64 { +async fn calculate_pending(seconds: i64, pending_stats: &[TimestampedPending]) -> i64 { let mut result = -1; let mut total = 0; let mut num = 0; diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index 6f5a08937..158882012 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -9,6 +9,7 @@ //! In the future Watermark will also be propagated based on this. use std::collections::HashMap; +use std::sync::Arc; use bytes::Bytes; use serving::callback::CallbackHandler; @@ -39,12 +40,6 @@ enum ActorMessage { offset: Bytes, responses: Option>, }, - UpdateMany { - offset: Bytes, - responses: Vec>>, - count: usize, - eof: bool, - }, UpdateEOF { offset: Bytes, }, @@ -107,7 +102,6 @@ impl TryFrom<&Message> for CallbackInfo { .previous_vertex .clone(); - // FIXME: empty message tags let mut msg_tags = None; if let Some(ref tags) = message.tags { if !tags.is_empty() { @@ -139,12 +133,12 @@ impl Tracker { /// Creates a new Tracker instance with the given receiver for actor messages. fn new( receiver: mpsc::Receiver, - callback_handler: impl Into>, + callback_handler: Option, ) -> Self { Self { entries: HashMap::new(), receiver, - callback_handler: callback_handler.into(), + callback_handler, } } @@ -168,14 +162,6 @@ impl Tracker { ActorMessage::Update { offset, responses } => { self.handle_update(offset, responses); } - ActorMessage::UpdateMany { - offset, - responses, - count, - eof, - } => { - self.handle_update_many(offset, responses, count, eof); - } ActorMessage::UpdateEOF { offset } => { self.handle_update_eof(offset).await; } @@ -221,10 +207,9 @@ impl Tracker { }; entry.count += 1; - entry - .callback_info - .as_mut() - .map(|cb| cb.responses.push(responses)); + if let Some(cb) = entry.callback_info.as_mut() { + cb.responses.push(responses); + } } async fn handle_update_eof(&mut self, offset: Bytes) { @@ -241,25 +226,6 @@ impl Tracker { } } - fn handle_update_many( - &mut self, - offset: Bytes, - responses: Vec>>, - count: usize, - eof: bool, - ) { - let Some(entry) = self.entries.get_mut(&offset) else { - return; - }; - - entry.count += count; - entry.eof = eof; - entry - .callback_info - .as_mut() - .map(|cb| cb.responses.extend(responses)); - } - /// Removes an entry from the tracker and sends an acknowledgment if the count is zero /// and the entry is marked as EOF. async fn handle_delete(&mut self, offset: Bytes) { @@ -373,13 +339,16 @@ impl TrackerHandle { Ok(()) } - /// Updates an existing message in the Tracker with the given offset, count, and EOF status. - pub(crate) async fn update(&self, message: &Message) -> Result<()> { - let offset = message.id.offset.clone(); + /// Informs the tracker that a new message has been generated. The tracker should contain + /// and entry for this message's offset. + pub(crate) async fn update( + &self, + offset: Bytes, + message_tags: Option>, + ) -> Result<()> { let mut responses: Option> = None; if self.enable_callbacks { - // FIXME: empty message tags - if let Some(ref tags) = message.tags { + if let Some(tags) = message_tags { if !tags.is_empty() { responses = Some(tags.iter().cloned().collect()); } @@ -393,7 +362,7 @@ impl TrackerHandle { Ok(()) } - /// Updates an existing message in the Tracker with the given offset, count, and EOF status. + /// Updates the EOF status for an offset in the Tracker pub(crate) async fn update_eof(&self, offset: Bytes) -> Result<()> { let message = ActorMessage::UpdateEOF { offset }; self.sender @@ -403,38 +372,6 @@ impl TrackerHandle { Ok(()) } - /// Updates an existing message in the Tracker with the given offset, count, and EOF status. - pub(crate) async fn update_many(&self, messages: &[Message], eof: bool) -> Result<()> { - if messages.is_empty() { - return Ok(()); - } - let offset = messages.first().unwrap().id.offset.clone(); - let mut responses: Vec>> = vec![]; - // if self.enable_callbacks { - // FIXME: empty message tags - for message in messages { - let mut response: Option> = None; - if let Some(ref tags) = message.tags { - if !tags.is_empty() { - response = Some(tags.iter().cloned().collect()); - } - }; - responses.push(response); - } - // } - let message = ActorMessage::UpdateMany { - offset, - responses, - count: messages.len(), - eof, - }; - self.sender - .send(message) - .await - .map_err(|e| Error::Tracker(format!("{:?}", e)))?; - Ok(()) - } - /// Deletes a message from the Tracker with the given offset. pub(crate) async fn delete(&self, offset: Bytes) -> Result<()> { let message = ActorMessage::Delete { offset }; @@ -515,7 +452,10 @@ mod tests { handle.insert(&message, ack_send).await.unwrap(); // Update the message - handle.update(&message).await.unwrap(); + handle + .update(offset.clone(), message.tags.clone()) + .await + .unwrap(); handle.update_eof(offset).await.unwrap(); // Delete the message @@ -554,7 +494,12 @@ mod tests { let messages: Vec = std::iter::repeat(message).take(3).collect(); // Update the message with a count of 3 - handle.update_many(&messages, true).await.unwrap(); + for message in messages { + handle + .update(offset.clone(), message.tags.clone()) + .await + .unwrap(); + } // Delete the message three times handle.delete(offset.clone()).await.unwrap(); @@ -627,8 +572,12 @@ mod tests { handle.insert(&message, ack_send).await.unwrap(); let messages: Vec = std::iter::repeat(message).take(3).collect(); - // Update the message with a count of 3 - handle.update_many(&messages, false).await.unwrap(); + for message in messages { + handle + .update(offset.clone(), message.tags.clone()) + .await + .unwrap(); + } // Discard the message handle.discard(offset).await.unwrap(); diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index 21bdc19c5..a474d6338 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -120,6 +120,7 @@ impl Transformer { let start_time = tokio::time::Instant::now(); let _permit = permit; + let offset = read_msg.id.offset.clone(); let (sender, receiver) = oneshot::channel(); let msg = ActorMessage::Transform { message: read_msg.clone(), @@ -137,10 +138,16 @@ impl Transformer { // wait for one-shot match receiver.await { Ok(Ok(mut transformed_messages)) => { - if let Err(e) = tracker_handle - .update_many(&transformed_messages, true) - .await - { + for message in transformed_messages.iter() { + if let Err(e) = tracker_handle + .update(offset.clone(), message.tags.clone()) + .await + { + let _ = error_tx.send(e).await; + return; + } + } + if let Err(e) = tracker_handle.update_eof(offset).await { let _ = error_tx.send(e).await; return; } diff --git a/rust/serving/src/app.rs b/rust/serving/src/app.rs index 7e6e7edb1..721376855 100644 --- a/rust/serving/src/app.rs +++ b/rust/serving/src/app.rs @@ -104,7 +104,7 @@ where .on_response( |response: &Response, latency: Duration, _span: &Span| { if response.status().is_server_error() { - // 5xx responses will be captured in on_failure at and logged at 'error' level + // 5xx responses will be logged at 'error' level in `on_failure` return; } tracing::info!(status=?response.status(), ?latency)