Skip to content

Commit

Permalink
Single update method for Tracker handle
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 17, 2025
1 parent d6b40e7 commit 45dfcd3
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 96 deletions.
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl MonovertexConfig {
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = env::var(ENV_CALLBACK_ENABLED) {
if env::var(ENV_CALLBACK_ENABLED).is_ok() {
let callback_concurrency: usize = env::var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl PipelineConfig {
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = get_var(ENV_CALLBACK_ENABLED) {
if get_var(ENV_CALLBACK_ENABLED).is_ok() {
let callback_concurrency: usize = get_var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
Expand Down
34 changes: 31 additions & 3 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use numaflow_pb::clients::map::map_client::MapClient;
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -328,6 +329,7 @@ impl MapHandle {
tokio::spawn(async move {
let _permit = permit;

let offset = read_msg.id.offset.clone();
let (sender, receiver) = oneshot::channel();
let msg = UnaryActorMessage {
message: read_msg.clone(),
Expand All @@ -344,7 +346,16 @@ impl MapHandle {
match receiver.await {
Ok(Ok(mut mapped_messages)) => {
// update the tracker with the number of messages sent and send the mapped messages
if let Err(e) = tracker_handle.update_many(&mapped_messages, true).await {
for message in mapped_messages.iter() {
if let Err(e) = tracker_handle
.update(offset.clone(), message.tags.clone())
.await
{
error_tx.send(e).await.expect("failed to send error");
return;
}
}
if let Err(e) = tracker_handle.update_eof(offset).await {
error_tx.send(e).await.expect("failed to send error");
return;
}
Expand Down Expand Up @@ -391,7 +402,18 @@ impl MapHandle {
for receiver in receivers {
match receiver.await {
Ok(Ok(mut mapped_messages)) => {
tracker_handle.update_many(&mapped_messages, true).await?;
let mut offset: Option<Bytes> = None;
for message in mapped_messages.iter() {
if offset.is_none() {
offset = Some(message.id.offset.clone());
}
tracker_handle
.update(message.id.offset.clone(), message.tags.clone())
.await?;
}
if let Some(offset) = offset {
tracker_handle.update_eof(offset).await?;
}
for mapped_message in mapped_messages.drain(..) {
output_tx
.send(mapped_message)
Expand Down Expand Up @@ -445,7 +467,13 @@ impl MapHandle {
while let Some(result) = receiver.recv().await {
match result {
Ok(mapped_message) => {
if let Err(e) = tracker_handle.update(&mapped_message).await {
if let Err(e) = tracker_handle
.update(
mapped_message.id.offset.clone(),
mapped_message.tags.clone(),
)
.await
{
error_tx.send(e).await.expect("failed to send error");
return;
}
Expand Down
9 changes: 5 additions & 4 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ struct TimestampedPending {
#[derive(Clone)]
pub(crate) enum LagReader {
Source(Source),
#[allow(clippy::upper_case_acronyms)]
ISB(Vec<JetstreamReader>), // multiple partitions
}

Expand Down Expand Up @@ -862,7 +863,7 @@ async fn build_pending_info(

match &mut lag_reader {
LagReader::Source(source) => {
match fetch_source_pending(&source).await {
match fetch_source_pending(source).await {
Ok(pending) => {
if pending != -1 {
let mut stats = pending_stats.lock().await;
Expand All @@ -887,8 +888,8 @@ async fn build_pending_info(
}

LagReader::ISB(readers) => {
for mut reader in readers {
match fetch_isb_pending(&mut reader).await {
for reader in readers {
match fetch_isb_pending(reader).await {
Ok(pending) => {
if pending != -1 {
let mut stats = pending_stats.lock().await;
Expand Down Expand Up @@ -984,7 +985,7 @@ async fn expose_pending_metrics(
}

/// Calculate the average pending messages over the last `seconds` seconds.
async fn calculate_pending(seconds: i64, pending_stats: &Vec<TimestampedPending>) -> i64 {
async fn calculate_pending(seconds: i64, pending_stats: &[TimestampedPending]) -> i64 {
let mut result = -1;
let mut total = 0;
let mut num = 0;
Expand Down
113 changes: 31 additions & 82 deletions rust/numaflow-core/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! In the future Watermark will also be propagated based on this.
use std::collections::HashMap;
use std::sync::Arc;

use bytes::Bytes;
use serving::callback::CallbackHandler;
Expand Down Expand Up @@ -39,12 +40,6 @@ enum ActorMessage {
offset: Bytes,
responses: Option<Vec<String>>,
},
UpdateMany {
offset: Bytes,
responses: Vec<Option<Vec<String>>>,
count: usize,
eof: bool,
},
UpdateEOF {
offset: Bytes,
},
Expand Down Expand Up @@ -107,7 +102,6 @@ impl TryFrom<&Message> for CallbackInfo {
.previous_vertex
.clone();

// FIXME: empty message tags
let mut msg_tags = None;
if let Some(ref tags) = message.tags {
if !tags.is_empty() {
Expand Down Expand Up @@ -139,12 +133,12 @@ impl Tracker {
/// Creates a new Tracker instance with the given receiver for actor messages.
fn new(
receiver: mpsc::Receiver<ActorMessage>,
callback_handler: impl Into<Option<CallbackHandler>>,
callback_handler: Option<CallbackHandler>,
) -> Self {
Self {
entries: HashMap::new(),
receiver,
callback_handler: callback_handler.into(),
callback_handler,
}
}

Expand All @@ -168,14 +162,6 @@ impl Tracker {
ActorMessage::Update { offset, responses } => {
self.handle_update(offset, responses);
}
ActorMessage::UpdateMany {
offset,
responses,
count,
eof,
} => {
self.handle_update_many(offset, responses, count, eof);
}
ActorMessage::UpdateEOF { offset } => {
self.handle_update_eof(offset).await;
}
Expand Down Expand Up @@ -221,10 +207,9 @@ impl Tracker {
};

entry.count += 1;
entry
.callback_info
.as_mut()
.map(|cb| cb.responses.push(responses));
if let Some(cb) = entry.callback_info.as_mut() {
cb.responses.push(responses);
}
}

async fn handle_update_eof(&mut self, offset: Bytes) {
Expand All @@ -241,25 +226,6 @@ impl Tracker {
}
}

fn handle_update_many(
&mut self,
offset: Bytes,
responses: Vec<Option<Vec<String>>>,
count: usize,
eof: bool,
) {
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};

entry.count += count;
entry.eof = eof;
entry
.callback_info
.as_mut()
.map(|cb| cb.responses.extend(responses));
}

/// Removes an entry from the tracker and sends an acknowledgment if the count is zero
/// and the entry is marked as EOF.
async fn handle_delete(&mut self, offset: Bytes) {
Expand Down Expand Up @@ -373,13 +339,16 @@ impl TrackerHandle {
Ok(())
}

/// Updates an existing message in the Tracker with the given offset, count, and EOF status.
pub(crate) async fn update(&self, message: &Message) -> Result<()> {
let offset = message.id.offset.clone();
/// Informs the tracker that a new message has been generated. The tracker should contain
/// and entry for this message's offset.
pub(crate) async fn update(
&self,
offset: Bytes,
message_tags: Option<Arc<[String]>>,
) -> Result<()> {
let mut responses: Option<Vec<String>> = None;
if self.enable_callbacks {
// FIXME: empty message tags
if let Some(ref tags) = message.tags {
if let Some(tags) = message_tags {
if !tags.is_empty() {
responses = Some(tags.iter().cloned().collect());
}
Expand All @@ -393,7 +362,7 @@ impl TrackerHandle {
Ok(())
}

/// Updates an existing message in the Tracker with the given offset, count, and EOF status.
/// Updates the EOF status for an offset in the Tracker
pub(crate) async fn update_eof(&self, offset: Bytes) -> Result<()> {
let message = ActorMessage::UpdateEOF { offset };
self.sender
Expand All @@ -403,38 +372,6 @@ impl TrackerHandle {
Ok(())
}

/// Updates an existing message in the Tracker with the given offset, count, and EOF status.
pub(crate) async fn update_many(&self, messages: &[Message], eof: bool) -> Result<()> {
if messages.is_empty() {
return Ok(());
}
let offset = messages.first().unwrap().id.offset.clone();
let mut responses: Vec<Option<Vec<String>>> = vec![];
// if self.enable_callbacks {
// FIXME: empty message tags
for message in messages {
let mut response: Option<Vec<String>> = None;
if let Some(ref tags) = message.tags {
if !tags.is_empty() {
response = Some(tags.iter().cloned().collect());
}
};
responses.push(response);
}
// }
let message = ActorMessage::UpdateMany {
offset,
responses,
count: messages.len(),
eof,
};
self.sender
.send(message)
.await
.map_err(|e| Error::Tracker(format!("{:?}", e)))?;
Ok(())
}

/// Deletes a message from the Tracker with the given offset.
pub(crate) async fn delete(&self, offset: Bytes) -> Result<()> {
let message = ActorMessage::Delete { offset };
Expand Down Expand Up @@ -515,7 +452,10 @@ mod tests {
handle.insert(&message, ack_send).await.unwrap();

// Update the message
handle.update(&message).await.unwrap();
handle
.update(offset.clone(), message.tags.clone())
.await
.unwrap();
handle.update_eof(offset).await.unwrap();

// Delete the message
Expand Down Expand Up @@ -554,7 +494,12 @@ mod tests {

let messages: Vec<Message> = std::iter::repeat(message).take(3).collect();
// Update the message with a count of 3
handle.update_many(&messages, true).await.unwrap();
for message in messages {
handle
.update(offset.clone(), message.tags.clone())
.await
.unwrap();
}

// Delete the message three times
handle.delete(offset.clone()).await.unwrap();
Expand Down Expand Up @@ -627,8 +572,12 @@ mod tests {
handle.insert(&message, ack_send).await.unwrap();

let messages: Vec<Message> = std::iter::repeat(message).take(3).collect();
// Update the message with a count of 3
handle.update_many(&messages, false).await.unwrap();
for message in messages {
handle
.update(offset.clone(), message.tags.clone())
.await
.unwrap();
}

// Discard the message
handle.discard(offset).await.unwrap();
Expand Down
15 changes: 11 additions & 4 deletions rust/numaflow-core/src/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl Transformer {
let start_time = tokio::time::Instant::now();
let _permit = permit;

let offset = read_msg.id.offset.clone();
let (sender, receiver) = oneshot::channel();
let msg = ActorMessage::Transform {
message: read_msg.clone(),
Expand All @@ -137,10 +138,16 @@ impl Transformer {
// wait for one-shot
match receiver.await {
Ok(Ok(mut transformed_messages)) => {
if let Err(e) = tracker_handle
.update_many(&transformed_messages, true)
.await
{
for message in transformed_messages.iter() {
if let Err(e) = tracker_handle
.update(offset.clone(), message.tags.clone())
.await
{
let _ = error_tx.send(e).await;
return;
}
}
if let Err(e) = tracker_handle.update_eof(offset).await {
let _ = error_tx.send(e).await;
return;
}
Expand Down
2 changes: 1 addition & 1 deletion rust/serving/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ where
.on_response(
|response: &Response<Body>, latency: Duration, _span: &Span| {
if response.status().is_server_error() {
// 5xx responses will be captured in on_failure at and logged at 'error' level
// 5xx responses will be logged at 'error' level in `on_failure`
return;
}
tracing::info!(status=?response.status(), ?latency)
Expand Down

0 comments on commit 45dfcd3

Please sign in to comment.