Skip to content

Commit

Permalink
Callback after resolving PAFs
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 15, 2025
1 parent d68abf9 commit 011a649
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 43 deletions.
2 changes: 0 additions & 2 deletions rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,6 @@ mod tests {
#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_jetstream_ack() {
use async_nats::jetstream::stream::No;

let js_url = "localhost:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
Expand Down
80 changes: 40 additions & 40 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl JetstreamWriter {
continue;
}

// List of PAFs(one message can be written to multiple streams)
let mut pafs = vec![];
for vertex in &*this.config {
// check whether we need to write to this downstream vertex
Expand Down Expand Up @@ -244,27 +245,8 @@ impl JetstreamWriter {
continue;
}

this.resolve_pafs(ResolveAndPublishResult {
pafs,
payload: message.value.clone().into(),
offset: message.id.offset,
})
.await?;

if let Some(ref callback_handler) = this.callback_handler {
let metadata = message.metadata.ok_or_else(|| {
Error::Source(
"Message does not contain previous vertex name in the metadata"
.to_owned(),
)
})?;
if let Err(e) = callback_handler
.callback(&message.headers, &message.tags, metadata.previous_vertex)
.await
{
tracing::error!(?e, "Failed to send callback for message");
}
};
this.resolve_pafs(pafs, message, this.callback_handler.clone())
.await?;

processed_msgs_count += 1;
if last_logged_at.elapsed().as_secs() >= 1 {
Expand Down Expand Up @@ -357,7 +339,12 @@ impl JetstreamWriter {
/// asynchronously, if it fails it will do a blocking write to resolve the PAFs.
/// At any point in time, we will only have X PAF resolvers running, this will help us create a
/// natural backpressure.
pub(super) async fn resolve_pafs(&self, result: ResolveAndPublishResult) -> Result<()> {
pub(super) async fn resolve_pafs(
&self,
pafs: Vec<((String, u16), PublishAckFuture)>,
message: Message,
callback_handler: Option<CallbackHandler>,
) -> Result<()> {
let start_time = Instant::now();
let permit = Arc::clone(&self.sem)
.acquire_owned()
Expand All @@ -371,7 +358,7 @@ impl JetstreamWriter {

tokio::spawn(async move {
let _permit = permit;
for (stream, paf) in result.pafs {
for (stream, paf) in pafs {
match paf.await {
Ok(ack) => {
if ack.duplicate {
Expand All @@ -385,9 +372,12 @@ impl JetstreamWriter {
Offset::Int(IntOffset::new(ack.sequence, stream.1)),
));
tracker_handle
.delete(result.offset.clone())
.delete(message.id.offset.clone())
.await
.expect("Failed to delete offset from tracker");
if let Err(e) = do_callback(callback_handler.as_ref(), &message).await {
tracing::error!(?e, "Failed to send callback request");
}
}
Err(e) => {
error!(
Expand All @@ -397,7 +387,7 @@ impl JetstreamWriter {
);
match JetstreamWriter::blocking_write(
stream.clone(),
result.payload.clone(),
message.value.clone(),
js_ctx.clone(),
cancel_token.clone(),
)
Expand All @@ -414,12 +404,17 @@ impl JetstreamWriter {
stream.clone(),
Offset::Int(IntOffset::new(ack.sequence, stream.1)),
));
if let Err(e) =
do_callback(callback_handler.as_ref(), &message).await
{
tracing::error!(?e, "Failed to send callback request");
}
}
Err(e) => {
error!(?e, "Blocking write failed for stream {}", stream.0);
// Since we failed to write to the stream, we need to send a NAK to the reader
tracker_handle
.discard(result.offset.clone())
.discard(message.id.offset.clone())
.await
.expect("Failed to discard offset from the tracker");
return;
Expand All @@ -442,17 +437,14 @@ impl JetstreamWriter {
/// an error it means it is fatal non-retryable error.
async fn blocking_write(
stream: Stream,
payload: Vec<u8>,
payload: Bytes,
js_ctx: Context,
cln_token: CancellationToken,
) -> Result<PublishAck> {
let start_time = Instant::now();
info!("Blocking write for stream {}", stream.0);
loop {
match js_ctx
.publish(stream.0.clone(), Bytes::from(payload.clone()))
.await
{
match js_ctx.publish(stream.0.clone(), payload.clone()).await {
Ok(paf) => match paf.await {
Ok(ack) => {
if ack.duplicate {
Expand Down Expand Up @@ -484,15 +476,23 @@ impl JetstreamWriter {
}
}

/// ResolveAndPublishResult resolves the result of the write PAF operation.
/// It contains the list of pafs(one message can be written to multiple streams)
/// and the payload that was written. Once the PAFs for all the streams have been
/// resolved, the information is published to callee_tx.
#[derive(Debug)]
pub(crate) struct ResolveAndPublishResult {
pub(crate) pafs: Vec<(Stream, PublishAckFuture)>,
pub(crate) payload: Vec<u8>,
pub(crate) offset: Bytes,
async fn do_callback(callback_handler: Option<&CallbackHandler>, message: &Message) -> Result<()> {
let Some(callback_handler) = callback_handler else {
return Ok(());
};

let metadata = message.metadata.as_ref().ok_or_else(|| {
Error::Source("Message does not contain previous vertex name in the metadata".to_owned())
})?;

callback_handler
.callback(
&message.headers,
&message.tags,
metadata.previous_vertex.clone(),
)
.await
.map_err(|e| Error::Source(format!("Failed to send callback for message: {e:?}")))
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion rust/serving/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ mod tests {
use crate::callback::{CallbackHandler, DEFAULT_CALLBACK_URL_HEADER_KEY, DEFAULT_ID_HEADER};
use crate::config::generate_certs;
use crate::pipeline::PipelineDCG;
use crate::{AppState, Error, Settings};
use crate::{AppState, Settings};
use axum_server::tls_rustls::RustlsConfig;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down

0 comments on commit 011a649

Please sign in to comment.