Skip to content

Commit

Permalink
Retries when callback fails
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 13, 2025
1 parent 94ad530 commit b8dbb6d
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 39 deletions.
6 changes: 5 additions & 1 deletion rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use crate::Result;
const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";
const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT";

const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED";
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;

/// Building blocks (Source, Sink, Transformer, FallBack, Metrics, etc.) to build a Pipeline or a
/// MonoVertex.
pub(crate) mod components;
Expand Down Expand Up @@ -112,7 +116,7 @@ impl Settings {
custom_resource_type: CustomResourceType::Pipeline(cfg),
});
}
Err(Error::Config("No configuration found - env variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string()))
Err(Error::Config("No configuration found - environment variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string()))
}
}

Expand Down
6 changes: 2 additions & 4 deletions rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ use crate::Result;

use super::pipeline::ServingCallbackConfig;

use super::{DEFAULT_CALLBACK_CONCURRENCY, ENV_CALLBACK_CONCURRENCY, ENV_CALLBACK_ENABLED};

const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;

const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED"; //FIXME: duplicates
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct MonovertexConfig {
pub(crate) name: String,
Expand Down
7 changes: 3 additions & 4 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ use crate::config::pipeline::map::MapVtxConfig;
use crate::error::Error;
use crate::Result;

use super::{DEFAULT_CALLBACK_CONCURRENCY, ENV_CALLBACK_CONCURRENCY, ENV_CALLBACK_ENABLED};

const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL";
const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER";
const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD";
const ENV_PAF_BATCH_SIZE: &str = "PAF_BATCH_SIZE";
const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED";
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;
const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
const DEFAULT_MAP_SOCKET: &str = "/var/run/numaflow/map.sock";
pub(crate) const DEFAULT_BATCH_MAP_SOCKET: &str = "/var/run/numaflow/batchmap.sock";
Expand Down Expand Up @@ -408,7 +407,7 @@ impl PipelineConfig {
Ok(PipelineConfig {
batch_size: batch_size as usize,
paf_concurrency: get_var(ENV_PAF_BATCH_SIZE)
.unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string())
.unwrap_or_else(|_| (DEFAULT_BATCH_SIZE * 2).to_string())
.parse()
.unwrap(),
read_timeout: Duration::from_millis(timeout_in_ms as u64),
Expand Down
1 change: 1 addition & 0 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ impl MapHandle {
Ok(())
});

tracing::info!("Returning output_rx stream");
Ok((ReceiverStream::new(output_rx), handle))
}

Expand Down
1 change: 0 additions & 1 deletion rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ pub(crate) async fn start_forwarder(

let callback_handler = match config.callback_config {
Some(ref cb_cfg) => Some(CallbackHandler::new(
config.name.clone(),
config.name.clone(),
cb_cfg.callback_concurrency,
)),
Expand Down
3 changes: 0 additions & 3 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async fn start_source_forwarder(

let callback_handler = match config.callback_config {
Some(ref cb_cfg) => Some(CallbackHandler::new(
config.pipeline_name.clone(),
config.vertex_name.clone(),
cb_cfg.callback_concurrency,
)),
Expand Down Expand Up @@ -155,7 +154,6 @@ async fn start_map_forwarder(

let callback_handler = match config.callback_config {
Some(ref cb_cfg) => Some(CallbackHandler::new(
config.pipeline_name.clone(),
config.vertex_name.clone(),
cb_cfg.callback_concurrency,
)),
Expand Down Expand Up @@ -263,7 +261,6 @@ async fn start_sink_forwarder(

let callback_handler = match config.callback_config {
Some(ref cb_cfg) => Some(CallbackHandler::new(
config.pipeline_name.clone(),
config.vertex_name.clone(),
cb_cfg.callback_concurrency,
)),
Expand Down
8 changes: 6 additions & 2 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl JetstreamWriter {
let this = self.clone();

let handle: JoinHandle<Result<()>> = tokio::spawn(async move {
tracing::info!("Starting streaming Jetstream writer");
let mut messages_stream = messages_stream;
let mut hash = DefaultHasher::new();

Expand Down Expand Up @@ -256,10 +257,12 @@ impl JetstreamWriter {
"Message does not contain previous vertex name in the metadata"
))
})?;
callback_handler
if let Err(e) = callback_handler
.callback(&message.headers, &message.tags, metadata.previous_vertex)
.await
.unwrap(); // FIXME:
{
tracing::error!(?e, "Failed to send callback for message");
}
};

processed_msgs_count += 1;
Expand All @@ -273,6 +276,7 @@ impl JetstreamWriter {
last_logged_at = Instant::now();
}
}
tracing::info!("Streaming jetstream writer finished");
Ok(())
});
Ok(handle)
Expand Down
12 changes: 7 additions & 5 deletions rust/numaflow-core/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,12 @@ impl SinkWriter {
"Writing to Sink: message does not contain previous vertex name in the metadata"
))
})?;
callback_handler
if let Err(e) = callback_handler
.callback(&message.headers, &message.tags, metadata.previous_vertex)
.await
.unwrap(); // FIXME:
{
tracing::error!(?e, "Failed to send callback for message");
}
}
};

Expand Down Expand Up @@ -839,7 +841,7 @@ mod tests {
drop(tx);

let handle = sink_writer
.streaming_write(ReceiverStream::new(rx), CancellationToken::new())
.streaming_write(ReceiverStream::new(rx), CancellationToken::new(), None)
.await
.unwrap();

Expand Down Expand Up @@ -918,7 +920,7 @@ mod tests {
drop(tx);
let cln_token = CancellationToken::new();
let handle = sink_writer
.streaming_write(ReceiverStream::new(rx), cln_token.clone())
.streaming_write(ReceiverStream::new(rx), cln_token.clone(), None)
.await
.unwrap();

Expand Down Expand Up @@ -1006,7 +1008,7 @@ mod tests {
drop(tx);
let cln_token = CancellationToken::new();
let handle = sink_writer
.streaming_write(ReceiverStream::new(rx), cln_token.clone())
.streaming_write(ReceiverStream::new(rx), cln_token.clone(), None)
.await
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/sink/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod tests {
offset: "1".to_string().into(),
index: 0,
},
metadata: None,
},
Message {
keys: Arc::from(vec![]),
Expand All @@ -57,6 +58,7 @@ mod tests {
offset: "2".to_string().into(),
index: 1,
},
metadata: None,
},
];

Expand Down
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/sink/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ mod tests {
offset: "1".to_string().into(),
index: 0,
},
metadata: None,
},
Message {
keys: Arc::from(vec![]),
Expand All @@ -70,6 +71,7 @@ mod tests {
offset: "2".to_string().into(),
index: 1,
},
metadata: None,
},
];

Expand Down
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/sink/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ mod tests {
offset: "1".to_string().into(),
index: 0,
},
metadata: None,
},
Message {
keys: Arc::from(vec![]),
Expand All @@ -225,6 +226,7 @@ mod tests {
offset: "2".to_string().into(),
index: 1,
},
metadata: None,
},
];

Expand Down
86 changes: 67 additions & 19 deletions rust/serving/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ struct CallbackPayload {
#[derive(Clone)]
pub struct CallbackHandler {
client: Client,
pipeline_name: String,
vertex_name: String,
semaphore: Arc<Semaphore>,
}

impl CallbackHandler {
pub fn new(pipeline_name: String, vertex_name: String, concurrency_limit: usize) -> Self {
pub fn new(vertex_name: String, concurrency_limit: usize) -> Self {
// if env::var(ENV_CALLBACK_ENABLED).is_err() {
// return Ok(None);
// };
Expand All @@ -55,7 +54,6 @@ impl CallbackHandler {

Self {
client,
pipeline_name,
vertex_name,
semaphore,
}
Expand Down Expand Up @@ -103,22 +101,72 @@ impl CallbackHandler {
let client = self.client.clone();
tokio::spawn(async move {
let _permit = permit;
let resp = client
.post(callback_url)
.header(ID_HEADER, uuid.clone())
.json(&[callback_payload])
.send()
.await
.map_err(|e| Error::Source(format!("Sending callback request: {e:?}")))
.unwrap(); //FIXME:

if !resp.status().is_success() {
// return Err(Error::Source(format!(
// "Received non-OK status for callback request. Status={}, Body: {}",
// resp.status(),
// resp.text().await.unwrap_or_default()
// )));
tracing::error!("Received non-OK status for callback request"); //FIXME: what to do with errors
// Retry incase of failure in making request.
// When there is a failure, we retry after wait_secs. This value is doubled after each retry attempt.
// Then longest wait time will be 64 seconds.
let mut wait_secs = 1;
const TOTAL_ATTEMPTS: usize = 7;
for i in 1..=TOTAL_ATTEMPTS {
let resp = client
.post(&callback_url)
.header(ID_HEADER, uuid.clone())
.json(&[&callback_payload])
.send()
.await;
let resp = match resp {
Ok(resp) => resp,
Err(e) => {
if i < TOTAL_ATTEMPTS {
tracing::warn!(
?e,
"Sending callback request failed. Will retry after a delay"
);
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
wait_secs *= 2;
} else {
tracing::error!(?e, "Sending callback request failed");
}
continue;
}
};

if resp.status().is_success() {
break;
}

if resp.status().is_client_error() {
// TODO: When the source serving pod restarts, the callbacks will fail with 4xx status
// since the request ID won't be available in it's in-memory tracker.
// No point in retrying such cases
// 4xx can also happen if payload is wrong (due to bugs in the code). We should differentiate
// between what can be retried and not.
let status_code = resp.status();
let response_body = resp.text().await;
tracing::error!(
?status_code,
?response_body,
"Received client error while making callback. Callback will not be retried"
);
break;
}

let status_code = resp.status();
let response_body = resp.text().await;
if i < TOTAL_ATTEMPTS {
tracing::warn!(
?status_code,
?response_body,
"Received non-OK status for callback request. Will retry after a delay"
);
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
wait_secs *= 2;
} else {
tracing::error!(
?status_code,
?response_body,
"Received non-OK status for callback request"
);
}
}
});

Expand Down

0 comments on commit b8dbb6d

Please sign in to comment.