From d68abf96d682f459bc5fdecb267fcb3b448c3072 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Mon, 13 Jan 2025 19:19:46 +0530 Subject: [PATCH] Unit tests callback failures Signed-off-by: Sreekanth --- .../src/pipeline/isb/jetstream/writer.rs | 5 ++-- rust/serving/src/callback.rs | 27 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index 2a7818df7..2605c625f 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -253,9 +253,10 @@ impl JetstreamWriter { if let Some(ref callback_handler) = this.callback_handler { let metadata = message.metadata.ok_or_else(|| { - Error::Source(format!( + Error::Source( "Message does not contain previous vertex name in the metadata" - )) + .to_owned(), + ) })?; if let Err(e) = callback_handler .callback(&message.headers, &message.tags, metadata.previous_vertex) diff --git a/rust/serving/src/callback.rs b/rust/serving/src/callback.rs index b46641cb1..52c0a5754 100644 --- a/rust/serving/src/callback.rs +++ b/rust/serving/src/callback.rs @@ -178,7 +178,7 @@ mod tests { use crate::callback::{CallbackHandler, DEFAULT_CALLBACK_URL_HEADER_KEY, DEFAULT_ID_HEADER}; use crate::config::generate_certs; use crate::pipeline::PipelineDCG; - use crate::{AppState, Settings}; + use crate::{AppState, Error, Settings}; use axum_server::tls_rustls::RustlsConfig; use std::collections::HashMap; use std::sync::Arc; @@ -252,10 +252,11 @@ mod tests { .map(|(k, v)| (k.into(), v.into())) .collect(); + let tags = Arc::from(vec!["tag1".to_owned()]); // On the server, this fails with SubGraphInvalidInput("Invalid callback: 1234, vertex: in") // We get 200 OK response from the server, since we already registered this request ID in the store. callback_handler - .callback(&message_headers, &None, "in".into()) + .callback(&message_headers, &Some(tags), "in".into()) .await?; let mut data = None; for _ in 0..10 { @@ -272,4 +273,26 @@ mod tests { server_handle.abort(); Ok(()) } + + #[tokio::test] + async fn test_callback_missing_headers() -> Result<()> { + let callback_handler = CallbackHandler::new("test".into(), 10); + let message_headers: HashMap = HashMap::new(); + let result = callback_handler + .callback(&message_headers, &None, "in".into()) + .await; + assert!(result.is_err()); + + let mut message_headers: HashMap = HashMap::new(); + message_headers.insert( + DEFAULT_CALLBACK_URL_HEADER_KEY.into(), + "https://localhost:3003/v1/process/callback".into(), + ); + let result = callback_handler + .callback(&message_headers, &None, "in".into()) + .await; + assert!(result.is_err()); + + Ok(()) + } }