Skip to content

Commit

Permalink
Make callback from tracker
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 16, 2025
1 parent 30e8a11 commit d6b40e7
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions rust/numaflow-core/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::Result;
#[derive(Debug)]
struct TrackerEntry {
ack_send: oneshot::Sender<ReadAck>,
count: u32,
count: usize,
eof: bool,
callback_info: Option<CallbackInfo>,
}
Expand All @@ -42,6 +42,7 @@ enum ActorMessage {
UpdateMany {
offset: Bytes,
responses: Vec<Option<Vec<String>>>,
count: usize,
eof: bool,
},
UpdateEOF {
Expand Down Expand Up @@ -170,15 +171,16 @@ impl Tracker {
ActorMessage::UpdateMany {
offset,
responses,
count,
eof,
} => {
self.handle_update_many(offset, responses, eof);
self.handle_update_many(offset, responses, count, eof);
}
ActorMessage::UpdateEOF { offset } => {
self.handle_update_eof(offset);
self.handle_update_eof(offset).await;
}
ActorMessage::Delete { offset } => {
self.handle_delete(offset);
self.handle_delete(offset).await;
}
ActorMessage::Discard { offset } => {
self.handle_discard(offset);
Expand Down Expand Up @@ -223,20 +225,9 @@ impl Tracker {
.callback_info
.as_mut()
.map(|cb| cb.responses.push(responses));

// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
}

fn handle_update_eof(&mut self, offset: Bytes) {
async fn handle_update_eof(&mut self, offset: Bytes) {
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};
Expand All @@ -246,56 +237,40 @@ impl Tracker {
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
self.ack_message(entry).await;
}
}

fn handle_update_many(
&mut self,
offset: Bytes,
responses: Vec<Option<Vec<String>>>,
count: usize,
eof: bool,
) {
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};

entry.count += responses.len() as u32;
entry.count += count;
entry.eof = eof;
entry
.callback_info
.as_mut()
.map(|cb| cb.responses.extend(responses));

// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
}

/// Removes an entry from the tracker and sends an acknowledgment if the count is zero
/// and the entry is marked as EOF.
fn handle_delete(&mut self, offset: Bytes) {
async fn handle_delete(&mut self, offset: Bytes) {
let Some(mut entry) = self.entries.remove(&offset) else {
return;
};
if entry.count > 0 {
entry.count -= 1;
}
if entry.count == 0 && entry.eof {
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
self.ack_message(entry).await;
} else {
self.entries.insert(offset, entry);
}
Expand All @@ -321,6 +296,37 @@ impl Tracker {
.expect("Failed to send nak");
}
}

async fn ack_message(&self, entry: TrackerEntry) {
let TrackerEntry {
ack_send,
callback_info,
..
} = entry;

ack_send.send(ReadAck::Ack).expect("Failed to send ack");

let Some(ref callback_handler) = self.callback_handler else {
return;
};
let Some(callback_info) = callback_info else {
tracing::error!("Callback is enabled, but Tracker doesn't contain callback info");
return;
};

let id = callback_info.id.clone();
let result = callback_handler
.callback(
callback_info.id,
callback_info.callback_url,
callback_info.from_vertex,
callback_info.responses,
)
.await;
if let Err(e) = result {
tracing::error!(?e, id, "Failed to send callback");
}
}
}

/// TrackerHandle provides an interface to interact with the Tracker.
Expand Down Expand Up @@ -419,6 +425,7 @@ impl TrackerHandle {
let message = ActorMessage::UpdateMany {
offset,
responses,
count: messages.len(),
eof,
};
self.sender
Expand Down

0 comments on commit d6b40e7

Please sign in to comment.