diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index 2c23cef99..6f5a08937 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -23,7 +23,7 @@ use crate::Result; #[derive(Debug)] struct TrackerEntry { ack_send: oneshot::Sender, - count: u32, + count: usize, eof: bool, callback_info: Option, } @@ -42,6 +42,7 @@ enum ActorMessage { UpdateMany { offset: Bytes, responses: Vec>>, + count: usize, eof: bool, }, UpdateEOF { @@ -170,15 +171,16 @@ impl Tracker { ActorMessage::UpdateMany { offset, responses, + count, eof, } => { - self.handle_update_many(offset, responses, eof); + self.handle_update_many(offset, responses, count, eof); } ActorMessage::UpdateEOF { offset } => { - self.handle_update_eof(offset); + self.handle_update_eof(offset).await; } ActorMessage::Delete { offset } => { - self.handle_delete(offset); + self.handle_delete(offset).await; } ActorMessage::Discard { offset } => { self.handle_discard(offset); @@ -223,20 +225,9 @@ impl Tracker { .callback_info .as_mut() .map(|cb| cb.responses.push(responses)); - - // if the count is zero, we can send an ack immediately - // this is case where map stream will send eof true after - // receiving all the messages. - if entry.count == 0 { - let entry = self.entries.remove(&offset).unwrap(); - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } } - fn handle_update_eof(&mut self, offset: Bytes) { + async fn handle_update_eof(&mut self, offset: Bytes) { let Some(entry) = self.entries.get_mut(&offset) else { return; }; @@ -246,10 +237,7 @@ impl Tracker { // receiving all the messages. if entry.count == 0 { let entry = self.entries.remove(&offset).unwrap(); - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); + self.ack_message(entry).await; } } @@ -257,34 +245,24 @@ impl Tracker { &mut self, offset: Bytes, responses: Vec>>, + count: usize, eof: bool, ) { let Some(entry) = self.entries.get_mut(&offset) else { return; }; - entry.count += responses.len() as u32; + entry.count += count; entry.eof = eof; entry .callback_info .as_mut() .map(|cb| cb.responses.extend(responses)); - - // if the count is zero, we can send an ack immediately - // this is case where map stream will send eof true after - // receiving all the messages. - if entry.count == 0 { - let entry = self.entries.remove(&offset).unwrap(); - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } } /// Removes an entry from the tracker and sends an acknowledgment if the count is zero /// and the entry is marked as EOF. - fn handle_delete(&mut self, offset: Bytes) { + async fn handle_delete(&mut self, offset: Bytes) { let Some(mut entry) = self.entries.remove(&offset) else { return; }; @@ -292,10 +270,7 @@ impl Tracker { entry.count -= 1; } if entry.count == 0 && entry.eof { - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); + self.ack_message(entry).await; } else { self.entries.insert(offset, entry); } @@ -321,6 +296,37 @@ impl Tracker { .expect("Failed to send nak"); } } + + async fn ack_message(&self, entry: TrackerEntry) { + let TrackerEntry { + ack_send, + callback_info, + .. + } = entry; + + ack_send.send(ReadAck::Ack).expect("Failed to send ack"); + + let Some(ref callback_handler) = self.callback_handler else { + return; + }; + let Some(callback_info) = callback_info else { + tracing::error!("Callback is enabled, but Tracker doesn't contain callback info"); + return; + }; + + let id = callback_info.id.clone(); + let result = callback_handler + .callback( + callback_info.id, + callback_info.callback_url, + callback_info.from_vertex, + callback_info.responses, + ) + .await; + if let Err(e) = result { + tracing::error!(?e, id, "Failed to send callback"); + } + } } /// TrackerHandle provides an interface to interact with the Tracker. @@ -419,6 +425,7 @@ impl TrackerHandle { let message = ActorMessage::UpdateMany { offset, responses, + count: messages.len(), eof, }; self.sender