Skip to content

Commit

Permalink
Unit tests callback failures
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 13, 2025
1 parent b65f928 commit d68abf9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
5 changes: 3 additions & 2 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,10 @@ impl JetstreamWriter {

if let Some(ref callback_handler) = this.callback_handler {
let metadata = message.metadata.ok_or_else(|| {
Error::Source(format!(
Error::Source(
"Message does not contain previous vertex name in the metadata"
))
.to_owned(),
)
})?;
if let Err(e) = callback_handler
.callback(&message.headers, &message.tags, metadata.previous_vertex)
Expand Down
27 changes: 25 additions & 2 deletions rust/serving/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ mod tests {
use crate::callback::{CallbackHandler, DEFAULT_CALLBACK_URL_HEADER_KEY, DEFAULT_ID_HEADER};
use crate::config::generate_certs;
use crate::pipeline::PipelineDCG;
use crate::{AppState, Settings};
use crate::{AppState, Error, Settings};
use axum_server::tls_rustls::RustlsConfig;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -252,10 +252,11 @@ mod tests {
.map(|(k, v)| (k.into(), v.into()))
.collect();

let tags = Arc::from(vec!["tag1".to_owned()]);
// On the server, this fails with SubGraphInvalidInput("Invalid callback: 1234, vertex: in")
// We get 200 OK response from the server, since we already registered this request ID in the store.
callback_handler
.callback(&message_headers, &None, "in".into())
.callback(&message_headers, &Some(tags), "in".into())
.await?;
let mut data = None;
for _ in 0..10 {
Expand All @@ -272,4 +273,26 @@ mod tests {
server_handle.abort();
Ok(())
}

#[tokio::test]
async fn test_callback_missing_headers() -> Result<()> {
let callback_handler = CallbackHandler::new("test".into(), 10);
let message_headers: HashMap<String, String> = HashMap::new();
let result = callback_handler
.callback(&message_headers, &None, "in".into())
.await;
assert!(result.is_err());

let mut message_headers: HashMap<String, String> = HashMap::new();
message_headers.insert(
DEFAULT_CALLBACK_URL_HEADER_KEY.into(),
"https://localhost:3003/v1/process/callback".into(),
);
let result = callback_handler
.callback(&message_headers, &None, "in".into())
.await;
assert!(result.is_err());

Ok(())
}
}

0 comments on commit d68abf9

Please sign in to comment.