From b9c6977d3588c02ca166d3ac1a020d5625f8e46f Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Thu, 2 Jan 2025 12:38:33 +0530 Subject: [PATCH] Initial implementation of the callback handler Signed-off-by: Sreekanth --- rust/Cargo.lock | 155 +++++++++++++++++- rust/Cargo.toml | 4 +- rust/numaflow-core/Cargo.toml | 1 + rust/numaflow-core/src/callback.rs | 105 ++++++++++++ rust/numaflow-core/src/error.rs | 3 + rust/numaflow-core/src/lib.rs | 2 + rust/numaflow-core/src/mapper/map.rs | 9 + .../src/mapper/map/user_defined.rs | 6 + rust/numaflow-core/src/message.rs | 15 ++ rust/numaflow-core/src/pipeline.rs | 2 + .../src/pipeline/isb/jetstream/reader.rs | 2 + .../src/pipeline/isb/jetstream/writer.rs | 8 + rust/numaflow-core/src/sink.rs | 4 + rust/numaflow-core/src/sink/blackhole.rs | 2 + rust/numaflow-core/src/sink/log.rs | 2 + rust/numaflow-core/src/sink/user_defined.rs | 2 + rust/numaflow-core/src/source/generator.rs | 1 + rust/numaflow-core/src/source/pulsar.rs | 1 + rust/numaflow-core/src/source/serving.rs | 1 + rust/numaflow-core/src/transformer.rs | 3 + .../src/transformer/user_defined.rs | 2 + 21 files changed, 319 insertions(+), 11 deletions(-) create mode 100644 rust/numaflow-core/src/callback.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 7d70eea7b..5888629d2 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -188,7 +188,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tower 0.5.1", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -672,6 +672,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1092,6 +1107,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.5.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -1575,6 +1606,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nkeys" version = "0.4.4" @@ -1738,6 +1786,7 @@ dependencies = [ "pulsar", "rand", "rcgen", + "reqwest 0.12.12", "rustls 0.23.19", "semver", "serde", @@ -1809,12 +1858,50 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -1992,6 +2079,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "portable-atomic" version = "1.10.0" @@ -2415,7 +2508,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 0.1.2", - "system-configuration", + "system-configuration 0.5.1", "tokio", "tokio-rustls 0.24.1", "tower-service", @@ -2429,24 +2522,28 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.9" +version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-core", "futures-util", + "h2 0.4.7", "http 1.1.0", "http-body 1.0.1", "http-body-util", "hyper 1.5.1", "hyper-rustls 0.27.3", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -2458,8 +2555,11 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 1.0.2", + "system-configuration 0.6.1", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.0", + "tower 0.5.2", "tower-service", "url", "wasm-bindgen", @@ -2817,7 +2917,7 @@ name = "servesink" version = "0.1.0" dependencies = [ "numaflow 0.1.1", - "reqwest 0.12.9", + "reqwest 0.12.12", "tokio", "tonic", "tracing", @@ -3035,7 +3135,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", "core-foundation 0.9.4", - "system-configuration-sys", + "system-configuration-sys 0.5.0", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.6.0", + "core-foundation 0.9.4", + "system-configuration-sys 0.6.0", ] [[package]] @@ -3048,6 +3159,16 @@ dependencies = [ "libc", ] +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.14.0" @@ -3195,6 +3316,16 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-retry" version = "0.3.0" @@ -3338,14 +3469,14 @@ dependencies = [ [[package]] name = "tower" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "sync_wrapper 0.1.2", + "sync_wrapper 1.0.2", "tokio", "tower-layer", "tower-service", @@ -3562,6 +3693,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 91e0cc736..1adcd9e0b 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -40,8 +40,8 @@ verbose_file_reads = "warn" # This profile optimizes for runtime performance and small binary size at the expense of longer build times. # Compared to default release profile, this profile reduced binary size from 29MB to 21MB # and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max). -# [profile.release] -# lto = "fat" +[profile.release] +lto = "fat" # This profile optimizes for short build times at the expense of larger binary size and slower runtime performance. # If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release` diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index c7a33fe27..be2577d2c 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -46,6 +46,7 @@ futures = "0.3.30" pin-project = "1.1.5" rand = "0.8.5" async-nats = "0.38.0" +reqwest = { version = "0.12.12", features = ["json"] } [dev-dependencies] tempfile = "3.11.0" diff --git a/rust/numaflow-core/src/callback.rs b/rust/numaflow-core/src/callback.rs new file mode 100644 index 000000000..f9bcc3a66 --- /dev/null +++ b/rust/numaflow-core/src/callback.rs @@ -0,0 +1,105 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use reqwest::Client; + +use crate::{ + config::{get_pipeline_name, get_vertex_name}, + message::Message, + Error, +}; + +/// The data to be sent in the POST request +#[derive(serde::Serialize)] +struct CallbackPayload { + /// Name of the vertex + vertex: String, + /// Name of the pipeline + pipeline: String, + /// Unique identifier of the message + id: String, + /// Time when the callback was made + cb_time: u64, + /// List of tags associated with the message + tags: Vec, + /// Name of the vertex from which the message was sent + from_vertex: String, +} + +struct CallbackHandler { + client: Client, + callback_url: String, + callback_header_key: String, + key_meta_id: String, +} + +impl CallbackHandler { + pub(crate) fn new( + callback_header_key: String, + key_meta_id: String, + callback_url: String, + ) -> crate::Result { + let client = Client::builder() + .timeout(Duration::from_secs(1)) + .build() + .map_err(|e| Error::Callback(format!("Creating HTTP client: {e:?}")))?; + Ok(Self { + client, + callback_url, + callback_header_key, + key_meta_id, + }) + } + + pub(crate) async fn callback(&self, message: Message) -> crate::Result<()> { + let callback_url = message + .headers + .get(&self.callback_header_key) + .unwrap_or(&self.callback_url); + let uuid = message.headers.get(&self.key_meta_id).ok_or_else(|| { + Error::Callback(format!( + "{} is not found in message headers", + &self.key_meta_id + )) + })?; + let cb_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time is older than Unix epoch time") + .as_millis() as u64; + + let mut msg_tags = vec![]; + if let Some(tags) = message.tags { + msg_tags = tags.iter().cloned().collect(); + }; + + let Some(metadata) = message.metadata else { + return Err(Error::Callback("Message metadata is empty".into())); + }; + + let callback_payload = CallbackPayload { + vertex: get_vertex_name().to_owned(), + pipeline: get_pipeline_name().to_owned(), + id: uuid.to_owned(), + cb_time, + tags: msg_tags, + from_vertex: metadata.previous_vertex, + }; + + let resp = self + .client + .post(callback_url) + .json(&callback_payload) + .send() + .await + .map_err(|e| Error::Callback(format!("Sending callback request: {e:?}")))?; + + if !resp.status().is_success() { + return Err(Error::Callback(format!( + "Received non-OK status for callback request. Status={}, Body: {}", + resp.status(), + resp.text().await.unwrap_or_default() + ))); + } + + Ok(()) + } +} diff --git a/rust/numaflow-core/src/error.rs b/rust/numaflow-core/src/error.rs index 0e499d068..af30f0c64 100644 --- a/rust/numaflow-core/src/error.rs +++ b/rust/numaflow-core/src/error.rs @@ -55,6 +55,9 @@ pub enum Error { #[error("Task Error - {0}")] Tracker(String), + + #[error("Callback handler - {0}")] + Callback(String), } impl From for Error { diff --git a/rust/numaflow-core/src/lib.rs b/rust/numaflow-core/src/lib.rs index 79ce4348b..66e273e86 100644 --- a/rust/numaflow-core/src/lib.rs +++ b/rust/numaflow-core/src/lib.rs @@ -54,6 +54,8 @@ mod tracker; /// Map is a feature that allows users to execute custom code to transform their data. mod mapper; +mod callback; + pub async fn run() -> Result<()> { let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let cln_token = CancellationToken::new(); diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index 8c279376a..c2241bf7f 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -554,6 +554,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (output_tx, mut output_rx) = mpsc::channel(10); @@ -646,6 +647,7 @@ mod tests { index: i, }, headers: Default::default(), + metadata: None, }; input_tx.send(message).await.unwrap(); } @@ -735,6 +737,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; input_tx.send(message).await.unwrap(); @@ -829,6 +832,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }, Message { keys: Arc::from(vec!["second".into()]), @@ -842,6 +846,7 @@ mod tests { index: 1, }, headers: Default::default(), + metadata: None, }, ]; @@ -939,6 +944,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }, Message { keys: Arc::from(vec!["second".into()]), @@ -952,6 +958,7 @@ mod tests { index: 1, }, headers: Default::default(), + metadata: None, }, ]; @@ -1049,6 +1056,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (input_tx, input_rx) = mpsc::channel(10); @@ -1145,6 +1153,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (input_tx, input_rx) = mpsc::channel(10); diff --git a/rust/numaflow-core/src/mapper/map/user_defined.rs b/rust/numaflow-core/src/mapper/map/user_defined.rs index 6bc816c40..6f7cf9be2 100644 --- a/rust/numaflow-core/src/mapper/map/user_defined.rs +++ b/rust/numaflow-core/src/mapper/map/user_defined.rs @@ -243,6 +243,7 @@ async fn process_response(sender_map: &ResponseSenderMap, resp: MapResponse) { offset: Some(msg_info.offset.clone()), event_time: msg_info.event_time, headers: msg_info.headers.clone(), + metadata: None, }; response_messages.push(message); } @@ -369,6 +370,7 @@ impl UserDefinedStreamMap { offset: None, event_time: message_info.event_time, headers: message_info.headers.clone(), + metadata: None, }; response_sender .send(Ok(message)) @@ -478,6 +480,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (tx, rx) = tokio::sync::oneshot::channel(); @@ -568,6 +571,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }, crate::message::Message { keys: Arc::from(vec!["second".into()]), @@ -584,6 +588,7 @@ mod tests { index: 1, }, headers: Default::default(), + metadata: None, }, ]; @@ -683,6 +688,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (tx, mut rx) = tokio::sync::mpsc::channel(3); diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index a33b4a704..f208aa775 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -44,6 +44,13 @@ pub(crate) struct Message { pub(crate) id: MessageID, /// headers of the message pub(crate) headers: HashMap, + pub(crate) metadata: Option, +} + +#[derive(Debug, Clone)] +pub(crate) struct Metadata { + // name of the previous vertex. + pub(crate) previous_vertex: String, } /// Offset of the message which will be used to acknowledge the message. @@ -96,6 +103,7 @@ impl TryFrom for Message { event_time, id, headers, + metadata: None, }) } } @@ -268,6 +276,8 @@ impl TryFrom for Message { .ok_or(Error::Proto("Missing message_info".to_string()))?; let id = header.id.ok_or(Error::Proto("Missing id".to_string()))?; + let previous_vertex = id.vertex_name.clone(); + Ok(Message { keys: Arc::from(header.keys.into_boxed_slice()), tags: None, @@ -276,6 +286,7 @@ impl TryFrom for Message { event_time: utc_from_timestamp(message_info.event_time), id: id.into(), headers: header.headers, + metadata: Some(Metadata { previous_vertex }), }) } } @@ -344,6 +355,7 @@ impl TryFrom for Message { index: 0, }, headers: result.headers, + metadata: None, }) } } @@ -466,6 +478,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let result: Result = message.clone().try_into(); @@ -547,6 +560,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let request: SourceTransformRequest = message.into(); @@ -597,6 +611,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let request: SinkRequest = message.into(); diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index d2cb77091..18a1f2e71 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -542,6 +542,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let message: bytes::BytesMut = message.try_into().unwrap(); @@ -738,6 +739,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let message: bytes::BytesMut = message.try_into().unwrap(); diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 79b8572ef..f9b0d1dea 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -346,6 +346,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let message_bytes: BytesMut = message.try_into().unwrap(); context @@ -445,6 +446,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; offsets.push(message.id.offset.clone()); let message_bytes: BytesMut = message.try_into().unwrap(); diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index e71335a57..62a8fbd83 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -552,6 +552,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let paf = writer @@ -611,6 +612,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let message_bytes: BytesMut = message.try_into().unwrap(); @@ -695,6 +697,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let paf = writer .write( @@ -720,6 +723,7 @@ mod tests { index: 11, }, headers: HashMap::new(), + metadata: None, }; let paf = writer .write( @@ -987,6 +991,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); tracker_handle @@ -1075,6 +1080,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); tracker_handle @@ -1102,6 +1108,7 @@ mod tests { index: 101, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); tracker_handle @@ -1215,6 +1222,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); tracker_handle diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index 0b30f4c30..f18d0459a 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -709,6 +709,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); @@ -744,6 +745,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); @@ -822,6 +824,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); @@ -909,6 +912,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); diff --git a/rust/numaflow-core/src/sink/blackhole.rs b/rust/numaflow-core/src/sink/blackhole.rs index eb2f33136..117bfb3e2 100644 --- a/rust/numaflow-core/src/sink/blackhole.rs +++ b/rust/numaflow-core/src/sink/blackhole.rs @@ -44,6 +44,7 @@ mod tests { offset: "1".to_string().into(), index: 0, }, + metadata: None, }, Message { keys: Arc::from(vec![]), @@ -57,6 +58,7 @@ mod tests { offset: "2".to_string().into(), index: 1, }, + metadata: None, }, ]; diff --git a/rust/numaflow-core/src/sink/log.rs b/rust/numaflow-core/src/sink/log.rs index 9ae426f1f..5815f66a6 100644 --- a/rust/numaflow-core/src/sink/log.rs +++ b/rust/numaflow-core/src/sink/log.rs @@ -60,6 +60,7 @@ mod tests { offset: "1".to_string().into(), index: 0, }, + metadata: None, }, Message { keys: Arc::from(vec![]), @@ -73,6 +74,7 @@ mod tests { offset: "2".to_string().into(), index: 1, }, + metadata: None, }, ]; diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index efb9c1178..da0ebcca3 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -193,6 +193,7 @@ mod tests { offset: "1".to_string().into(), index: 0, }, + metadata: None, }, Message { keys: Arc::from(vec![]), @@ -206,6 +207,7 @@ mod tests { offset: "2".to_string().into(), index: 1, }, + metadata: None, }, ]; diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index 855030f73..b1ab1694d 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -178,6 +178,7 @@ mod stream_generator { index: Default::default(), }, headers: Default::default(), + metadata: None, } } diff --git a/rust/numaflow-core/src/source/pulsar.rs b/rust/numaflow-core/src/source/pulsar.rs index 6a2c7162b..5e02db3da 100644 --- a/rust/numaflow-core/src/source/pulsar.rs +++ b/rust/numaflow-core/src/source/pulsar.rs @@ -26,6 +26,7 @@ impl TryFrom for Message { index: 0, }, headers: message.headers, + metadata: None, }) } } diff --git a/rust/numaflow-core/src/source/serving.rs b/rust/numaflow-core/src/source/serving.rs index 02074904e..5b19a0d11 100644 --- a/rust/numaflow-core/src/source/serving.rs +++ b/rust/numaflow-core/src/source/serving.rs @@ -26,6 +26,7 @@ impl TryFrom for Message { index: 0, }, headers: message.headers, + metadata: None, }) } } diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index 6f9298b7c..a44e838c0 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -295,6 +295,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (output_tx, mut output_rx) = mpsc::channel(10); @@ -374,6 +375,7 @@ mod tests { index: i, }, headers: Default::default(), + metadata: None, }; input_tx.send(message).await.unwrap(); } @@ -458,6 +460,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; input_tx.send(message).await.unwrap(); diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 398d5a4bc..388bb1e18 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -123,6 +123,7 @@ impl UserDefinedTransformer { offset: Some(msg_info.offset.clone()), event_time: utc_from_timestamp(result.event_time), headers: msg_info.headers.clone(), + metadata: None, }; response_messages.push(message); } @@ -232,6 +233,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (tx, rx) = tokio::sync::oneshot::channel();