diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b7f59c2314..c100d3da04 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -80,7 +80,7 @@ jobs: steps: - name: Start Pulsar standalone container - run: docker run -d -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:4.0.0 bin/pulsar standalone + run: docker run -d -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:4.0.0 bin/pulsar standalone - name: Set up Go 1.x uses: actions/setup-go@v5 @@ -204,19 +204,19 @@ jobs: driver: [jetstream] case: [ - e2e, - diamond-e2e, - transformer-e2e, - kafka-e2e, - map-e2e, - reduce-one-e2e, - reduce-two-e2e, - udsource-e2e, - api-e2e, - sideinputs-e2e, - idle-source-e2e, + # e2e, + # diamond-e2e, + # transformer-e2e, + # kafka-e2e, + # map-e2e, + # reduce-one-e2e, + # reduce-two-e2e, + # udsource-e2e, + # api-e2e, + # sideinputs-e2e, + # idle-source-e2e, monovertex-e2e, - builtin-source-e2e, + # builtin-source-e2e, ] include: - driver: redis diff --git a/Makefile b/Makefile index aab918c933..fbb66aa8c4 100644 --- a/Makefile +++ b/Makefile @@ -135,7 +135,8 @@ endif $(MAKE) restart-control-plane-components cat test/manifests/e2e-api-pod.yaml | sed 's@quay.io/numaproj/@$(IMAGE_NAMESPACE)/@' | sed 's/:latest/:$(VERSION)/' | kubectl -n numaflow-system apply -f - go generate $(shell find ./test/$* -name '*.go') - go test -v -timeout 15m -count 1 --tags test -p 1 ./test/$* + go test -v -timeout 15m -count 1 --tags test -p 1 ./test/$* || true + $(MAKE) show-logs && false $(MAKE) cleanup-e2e image-restart: @@ -156,6 +157,15 @@ cleanup-e2e: kubectl -n numaflow-system delete secret -lnumaflow-e2e=true --ignore-not-found=true kubectl -n numaflow-system delete po -lnumaflow-e2e=true --ignore-not-found=true +show-logs: + kubectl -n numaflow-system get mvtx + kubectl -n numaflow-system get po -l 'app.kubernetes.io/name=transformer-mono-vertex' + kubectl -n numaflow-system describe po -l 'app.kubernetes.io/name=transformer-mono-vertex' + echo "Numa Logs--" + kubectl -n numaflow-system logs -l 'app.kubernetes.io/name=transformer-mono-vertex' -c numa + echo "Previous Logs--" + kubectl -n numaflow-system logs -p -l 'app.kubernetes.io/name=transformer-mono-vertex' -c numa + # To run just one of the e2e tests by name (i.e. 'make TestCreateSimplePipeline'): Test%: $(MAKE) cleanup-e2e @@ -188,6 +198,7 @@ endif ifdef IMAGE_IMPORT_CMD $(IMAGE_IMPORT_CMD) $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION) endif + docker run --entrypoint /bin/numaflow-rs $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION) --rust || true .PHONY: build-rust-in-docker build-rust-in-docker: diff --git a/rust/numaflow-core/src/config.rs b/rust/numaflow-core/src/config.rs index 167c2f1cd4..686136c789 100644 --- a/rust/numaflow-core/src/config.rs +++ b/rust/numaflow-core/src/config.rs @@ -10,6 +10,10 @@ use crate::Result; const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT"; const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT"; +const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED"; +const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY"; +const DEFAULT_CALLBACK_CONCURRENCY: usize = 100; + /// Building blocks (Source, Sink, Transformer, FallBack, Metrics, etc.) to build a Pipeline or a /// MonoVertex. pub(crate) mod components; @@ -97,7 +101,7 @@ pub(crate) struct Settings { impl Settings { /// load based on the CRD type, either a pipeline or a monovertex. /// Settings are populated through reading the env vars set via the controller. The main - /// CRD is the base64 spec of the CR. + /// CRD is the base64 spec of the CR. fn load() -> Result { if let Ok(obj) = env::var(ENV_MONO_VERTEX_OBJ) { let cfg = MonovertexConfig::load(obj)?; @@ -112,7 +116,7 @@ impl Settings { custom_resource_type: CustomResourceType::Pipeline(cfg), }); } - Err(Error::Config("No configuration found".to_string())) + Err(Error::Config("No configuration found - environment variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string())) } } diff --git a/rust/numaflow-core/src/config/monovertex.rs b/rust/numaflow-core/src/config/monovertex.rs index c6f18e3c8e..4bd7ac5657 100644 --- a/rust/numaflow-core/src/config/monovertex.rs +++ b/rust/numaflow-core/src/config/monovertex.rs @@ -18,6 +18,10 @@ use crate::config::monovertex::sink::SinkType; use crate::error::Error; use crate::Result; +use super::pipeline::ServingCallbackConfig; + +use super::{DEFAULT_CALLBACK_CONCURRENCY, ENV_CALLBACK_CONCURRENCY, ENV_CALLBACK_ENABLED}; + const DEFAULT_BATCH_SIZE: u64 = 500; const DEFAULT_TIMEOUT_IN_MS: u32 = 1000; const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120; @@ -33,6 +37,7 @@ pub(crate) struct MonovertexConfig { pub(crate) transformer_config: Option, pub(crate) fb_sink_config: Option, pub(crate) metrics_config: MetricsConfig, + pub(crate) callback_config: Option, } impl Default for MonovertexConfig { @@ -53,6 +58,7 @@ impl Default for MonovertexConfig { transformer_config: None, fb_sink_config: None, metrics_config: MetricsConfig::default(), + callback_config: None, } } } @@ -143,6 +149,21 @@ impl MonovertexConfig { .and_then(|scale| scale.lookback_seconds.map(|x| x as u16)) .unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS); + let mut callback_config = None; + if env::var(ENV_CALLBACK_ENABLED).is_ok() { + let callback_concurrency: usize = env::var(ENV_CALLBACK_CONCURRENCY) + .unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}")) + .parse() + .map_err(|e| { + Error::Config(format!( + "Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}" + )) + })?; + callback_config = Some(ServingCallbackConfig { + callback_concurrency, + }); + } + Ok(MonovertexConfig { name: mono_vertex_name, replica: *get_vertex_replica(), @@ -153,6 +174,7 @@ impl MonovertexConfig { sink_config, transformer_config, fb_sink_config, + callback_config, }) } } diff --git a/rust/numaflow-core/src/config/pipeline.rs b/rust/numaflow-core/src/config/pipeline.rs index 1368b0b32d..7e7fdf84fc 100644 --- a/rust/numaflow-core/src/config/pipeline.rs +++ b/rust/numaflow-core/src/config/pipeline.rs @@ -19,12 +19,15 @@ use crate::config::pipeline::map::MapVtxConfig; use crate::error::Error; use crate::Result; +use super::{DEFAULT_CALLBACK_CONCURRENCY, ENV_CALLBACK_CONCURRENCY, ENV_CALLBACK_ENABLED}; + const DEFAULT_BATCH_SIZE: u64 = 500; const DEFAULT_TIMEOUT_IN_MS: u32 = 1000; const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120; const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL"; const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER"; const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD"; +const ENV_PAF_BATCH_SIZE: &str = "PAF_BATCH_SIZE"; const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB const DEFAULT_MAP_SOCKET: &str = "/var/run/numaflow/map.sock"; pub(crate) const DEFAULT_BATCH_MAP_SOCKET: &str = "/var/run/numaflow/batchmap.sock"; @@ -47,6 +50,12 @@ pub(crate) struct PipelineConfig { pub(crate) to_vertex_config: Vec, pub(crate) vertex_config: VertexType, pub(crate) metrics_config: MetricsConfig, + pub(crate) callback_config: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ServingCallbackConfig { + pub(crate) callback_concurrency: usize, } impl Default for PipelineConfig { @@ -66,6 +75,7 @@ impl Default for PipelineConfig { transformer_config: None, }), metrics_config: Default::default(), + callback_config: None, } } } @@ -286,9 +296,15 @@ impl PipelineConfig { .map(|(key, val)| (key.into(), val.into())) .filter(|(key, _val)| { // FIXME(cr): this filter is non-exhaustive, should we invert? - key == ENV_NUMAFLOW_SERVING_JETSTREAM_URL - || key == ENV_NUMAFLOW_SERVING_JETSTREAM_USER - || key == ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD + [ + ENV_NUMAFLOW_SERVING_JETSTREAM_URL, + ENV_NUMAFLOW_SERVING_JETSTREAM_USER, + ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD, + ENV_PAF_BATCH_SIZE, + ENV_CALLBACK_ENABLED, + ENV_CALLBACK_CONCURRENCY, + ] + .contains(&key.as_str()) }) .collect(); @@ -373,10 +389,25 @@ impl PipelineConfig { .and_then(|scale| scale.lookback_seconds.map(|x| x as u16)) .unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS); + let mut callback_config = None; + if get_var(ENV_CALLBACK_ENABLED).is_ok() { + let callback_concurrency: usize = get_var(ENV_CALLBACK_CONCURRENCY) + .unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}")) + .parse() + .map_err(|e| { + Error::Config(format!( + "Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}" + )) + })?; + callback_config = Some(ServingCallbackConfig { + callback_concurrency, + }); + } + Ok(PipelineConfig { batch_size: batch_size as usize, - paf_concurrency: env::var("PAF_BATCH_SIZE") - .unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string()) + paf_concurrency: get_var(ENV_PAF_BATCH_SIZE) + .unwrap_or_else(|_| (DEFAULT_BATCH_SIZE * 2).to_string()) .parse() .unwrap(), read_timeout: Duration::from_millis(timeout_in_ms as u64), @@ -388,6 +419,7 @@ impl PipelineConfig { to_vertex_config, vertex_config: vertex, metrics_config: MetricsConfig::with_lookback_window_in_secs(look_back_window), + callback_config, }) } } @@ -419,6 +451,7 @@ mod tests { transformer_config: None, }), metrics_config: Default::default(), + callback_config: None, }; let config = PipelineConfig::default(); @@ -485,6 +518,7 @@ mod tests { lag_refresh_interval_in_secs: 3, lookback_window_in_secs: 120, }, + ..Default::default() }; assert_eq!(pipeline_config, expected); } @@ -536,7 +570,7 @@ mod tests { }, transformer_config: None, }), - metrics_config: Default::default(), + ..Default::default() }; assert_eq!(pipeline_config, expected); @@ -588,7 +622,7 @@ mod tests { }, transformer_config: None, }), - metrics_config: Default::default(), + ..Default::default() }; assert_eq!(pipeline_config, expected); @@ -704,7 +738,7 @@ mod tests { }), map_mode: MapMode::Unary, }), - metrics_config: MetricsConfig::default(), + ..Default::default() }; assert_eq!(pipeline_config, expected); diff --git a/rust/numaflow-core/src/mapper/map.rs b/rust/numaflow-core/src/mapper/map.rs index 36918d62f6..b1e85a8ba3 100644 --- a/rust/numaflow-core/src/mapper/map.rs +++ b/rust/numaflow-core/src/mapper/map.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use bytes::Bytes; use numaflow_pb::clients::map::map_client::MapClient; use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; @@ -304,6 +305,7 @@ impl MapHandle { Ok(()) }); + tracing::info!("Returning output_rx stream"); Ok((ReceiverStream::new(output_rx), handle)) } @@ -327,6 +329,7 @@ impl MapHandle { tokio::spawn(async move { let _permit = permit; + let offset = read_msg.id.offset.clone(); let (sender, receiver) = oneshot::channel(); let msg = UnaryActorMessage { message: read_msg.clone(), @@ -343,14 +346,16 @@ impl MapHandle { match receiver.await { Ok(Ok(mut mapped_messages)) => { // update the tracker with the number of messages sent and send the mapped messages - if let Err(e) = tracker_handle - .update( - read_msg.id.offset.clone(), - mapped_messages.len() as u32, - true, - ) - .await - { + for message in mapped_messages.iter() { + if let Err(e) = tracker_handle + .update(offset.clone(), message.tags.clone()) + .await + { + error_tx.send(e).await.expect("failed to send error"); + return; + } + } + if let Err(e) = tracker_handle.update_eof(offset).await { error_tx.send(e).await.expect("failed to send error"); return; } @@ -397,10 +402,18 @@ impl MapHandle { for receiver in receivers { match receiver.await { Ok(Ok(mut mapped_messages)) => { - let offset = mapped_messages.first().unwrap().id.offset.clone(); - tracker_handle - .update(offset.clone(), mapped_messages.len() as u32, true) - .await?; + let mut offset: Option = None; + for message in mapped_messages.iter() { + if offset.is_none() { + offset = Some(message.id.offset.clone()); + } + tracker_handle + .update(message.id.offset.clone(), message.tags.clone()) + .await?; + } + if let Some(offset) = offset { + tracker_handle.update_eof(offset).await?; + } for mapped_message in mapped_messages.drain(..) { output_tx .send(mapped_message) @@ -454,8 +467,13 @@ impl MapHandle { while let Some(result) = receiver.recv().await { match result { Ok(mapped_message) => { - let offset = mapped_message.id.offset.clone(); - if let Err(e) = tracker_handle.update(offset.clone(), 1, false).await { + if let Err(e) = tracker_handle + .update( + mapped_message.id.offset.clone(), + mapped_message.tags.clone(), + ) + .await + { error_tx.send(e).await.expect("failed to send error"); return; } @@ -474,7 +492,7 @@ impl MapHandle { } } - if let Err(e) = tracker_handle.update(read_msg.id.offset, 0, true).await { + if let Err(e) = tracker_handle.update_eof(read_msg.id.offset).await { error_tx.send(e).await.expect("failed to send error"); } }); @@ -529,7 +547,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = MapClient::new(create_rpc_channel(sock_file).await?); let mapper = MapHandle::new( @@ -557,6 +575,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (output_tx, mut output_rx) = mpsc::channel(10); @@ -621,7 +640,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = MapClient::new(create_rpc_channel(sock_file).await?); let mapper = MapHandle::new( MapMode::Unary, @@ -649,6 +668,7 @@ mod tests { index: i, }, headers: Default::default(), + metadata: None, }; input_tx.send(message).await.unwrap(); } @@ -711,7 +731,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = MapClient::new(create_rpc_channel(sock_file).await?); let mapper = MapHandle::new( MapMode::Unary, @@ -738,6 +758,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; input_tx.send(message).await.unwrap(); @@ -806,7 +827,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = MapClient::new(create_rpc_channel(sock_file).await?); let mapper = MapHandle::new( @@ -832,6 +853,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }, Message { keys: Arc::from(vec!["second".into()]), @@ -845,6 +867,7 @@ mod tests { index: 1, }, headers: Default::default(), + metadata: None, }, ]; @@ -917,7 +940,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = MapClient::new(create_rpc_channel(sock_file).await?); let mapper = MapHandle::new( MapMode::Batch, @@ -942,6 +965,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }, Message { keys: Arc::from(vec!["second".into()]), @@ -955,6 +979,7 @@ mod tests { index: 1, }, headers: Default::default(), + metadata: None, }, ]; @@ -1027,7 +1052,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = MapClient::new(create_rpc_channel(sock_file).await?); let mapper = MapHandle::new( @@ -1052,6 +1077,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (input_tx, input_rx) = mpsc::channel(10); @@ -1125,7 +1151,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; let client = MapClient::new(create_rpc_channel(sock_file).await?); - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let mapper = MapHandle::new( MapMode::Stream, 500, @@ -1148,6 +1174,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (input_tx, input_rx) = mpsc::channel(10); diff --git a/rust/numaflow-core/src/mapper/map/user_defined.rs b/rust/numaflow-core/src/mapper/map/user_defined.rs index d7abfff10d..c879843563 100644 --- a/rust/numaflow-core/src/mapper/map/user_defined.rs +++ b/rust/numaflow-core/src/mapper/map/user_defined.rs @@ -261,6 +261,7 @@ async fn process_response(sender_map: &ResponseSenderMap, resp: MapResponse) { offset: Some(msg_info.offset.clone()), event_time: msg_info.event_time, headers: msg_info.headers.clone(), + metadata: None, }; response_messages.push(message); } @@ -387,6 +388,7 @@ impl UserDefinedStreamMap { offset: None, event_time: message_info.event_time, headers: message_info.headers.clone(), + metadata: None, }; response_sender .send(Ok(message)) @@ -496,6 +498,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (tx, rx) = tokio::sync::oneshot::channel(); @@ -586,6 +589,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }, crate::message::Message { keys: Arc::from(vec!["second".into()]), @@ -602,6 +606,7 @@ mod tests { index: 1, }, headers: Default::default(), + metadata: None, }, ]; @@ -701,6 +706,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (tx, mut rx) = tokio::sync::mpsc::channel(3); diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index 86259bf8f5..7a53d17a4a 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -34,6 +34,13 @@ pub(crate) struct Message { pub(crate) id: MessageID, /// headers of the message pub(crate) headers: HashMap, + pub(crate) metadata: Option, +} + +#[derive(Debug, Clone)] +pub(crate) struct Metadata { + // name of the previous vertex. + pub(crate) previous_vertex: String, } /// Offset of the message which will be used to acknowledge the message. @@ -212,6 +219,7 @@ impl TryFrom for Message { event_time: utc_from_timestamp(message_info.event_time), id: id.into(), headers: header.headers, + metadata: None, }) } } @@ -264,6 +272,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let result: Result = message.clone().try_into(); diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 4549ade9fe..e0eb0cc1e6 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -605,6 +605,9 @@ pub(crate) async fn start_metrics_https_server( addr: SocketAddr, metrics_state: UserDefinedContainerState, ) -> crate::Result<()> { + // Setup the CryptoProvider (controls core cryptography used by rustls) for the process + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + // Generate a self-signed certificate let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()]) .map_err(|e| Error::Metrics(format!("Generating self-signed certificate: {}", e)))?; @@ -718,6 +721,7 @@ struct TimestampedPending { #[derive(Clone)] pub(crate) enum LagReader { Source(Source), + #[allow(clippy::upper_case_acronyms)] ISB(Vec), // multiple partitions } @@ -859,7 +863,7 @@ async fn build_pending_info( match &mut lag_reader { LagReader::Source(source) => { - match fetch_source_pending(&source).await { + match fetch_source_pending(source).await { Ok(pending) => { if pending != -1 { let mut stats = pending_stats.lock().await; @@ -884,8 +888,8 @@ async fn build_pending_info( } LagReader::ISB(readers) => { - for mut reader in readers { - match fetch_isb_pending(&mut reader).await { + for reader in readers { + match fetch_isb_pending(reader).await { Ok(pending) => { if pending != -1 { let mut stats = pending_stats.lock().await; @@ -981,7 +985,7 @@ async fn expose_pending_metrics( } /// Calculate the average pending messages over the last `seconds` seconds. -async fn calculate_pending(seconds: i64, pending_stats: &Vec) -> i64 { +async fn calculate_pending(seconds: i64, pending_stats: &[TimestampedPending]) -> i64 { let mut result = -1; let mut total = 0; let mut num = 0; diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 089f624334..b829156a70 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -1,3 +1,4 @@ +use serving::callback::CallbackHandler; use tokio_util::sync::CancellationToken; use tracing::info; @@ -23,7 +24,11 @@ pub(crate) async fn start_forwarder( cln_token: CancellationToken, config: &MonovertexConfig, ) -> error::Result<()> { - let tracker_handle = TrackerHandle::new(); + let callback_handler = config + .callback_config + .as_ref() + .map(|cb_cfg| CallbackHandler::new(config.name.clone(), cb_cfg.callback_concurrency)); + let tracker_handle = TrackerHandle::new(callback_handler); let (transformer, transformer_grpc_client) = create_components::create_transformer( config.batch_size, config.transformer_config.clone(), diff --git a/rust/numaflow-core/src/monovertex/forwarder.rs b/rust/numaflow-core/src/monovertex/forwarder.rs index e49c2a8a4c..9aeecf2f8e 100644 --- a/rust/numaflow-core/src/monovertex/forwarder.rs +++ b/rust/numaflow-core/src/monovertex/forwarder.rs @@ -188,7 +188,7 @@ mod tests { #[tokio::test] async fn test_forwarder() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); // create the source which produces x number of messages let cln_token = CancellationToken::new(); @@ -245,6 +245,7 @@ mod tests { .await .map_err(|e| panic!("failed to create source reader: {:?}", e)) .unwrap(); + let tracker_handle = TrackerHandle::new(None); let source = Source::new( 5, SourceType::UserDefinedSource(src_read, src_ack, lag_reader), @@ -317,7 +318,7 @@ mod tests { #[tokio::test] async fn test_flatmap_operation() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); // create the source which produces x number of messages let cln_token = CancellationToken::new(); diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index f03479ea3c..652f1c8181 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_nats::jetstream::Context; use async_nats::{jetstream, ConnectOptions}; use futures::future::try_join_all; +use serving::callback::CallbackHandler; use tokio_util::sync::CancellationToken; use tracing::info; @@ -50,7 +51,10 @@ async fn start_source_forwarder( config: PipelineConfig, source_config: SourceVtxConfig, ) -> Result<()> { - let tracker_handle = TrackerHandle::new(); + let callback_handler = config.callback_config.as_ref().map(|cb_cfg| { + CallbackHandler::new(config.vertex_name.clone(), cb_cfg.callback_concurrency) + }); + let tracker_handle = TrackerHandle::new(callback_handler); let js_context = create_js_context(config.js_client_config.clone()).await?; let buffer_writer = create_buffer_writer( @@ -122,8 +126,12 @@ async fn start_map_forwarder( let mut mapper_grpc_client = None; let mut isb_lag_readers = vec![]; + let callback_handler = config.callback_config.as_ref().map(|cb_cfg| { + CallbackHandler::new(config.vertex_name.clone(), cb_cfg.callback_concurrency) + }); + for stream in reader_config.streams.clone() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(callback_handler.clone()); let buffer_reader = create_buffer_reader( stream, @@ -214,11 +222,15 @@ async fn start_sink_forwarder( .ok_or_else(|| error::Error::Config("No from vertex config found".to_string()))? .reader_config; + let callback_handler = config.callback_config.as_ref().map(|cb_cfg| { + CallbackHandler::new(config.vertex_name.clone(), cb_cfg.callback_concurrency) + }); + // Create sink writers and buffer readers for each stream let mut sink_writers = vec![]; let mut buffer_readers = vec![]; for stream in reader_config.streams.clone() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(callback_handler.clone()); let buffer_reader = create_buffer_reader( stream, @@ -471,6 +483,7 @@ mod tests { lag_refresh_interval_in_secs: 3, lookback_window_in_secs: 120, }, + callback_config: None, }; let cancellation_token = CancellationToken::new(); @@ -563,6 +576,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let message: bytes::BytesMut = message.try_into().unwrap(); @@ -628,6 +642,7 @@ mod tests { lag_refresh_interval_in_secs: 3, lookback_window_in_secs: 120, }, + callback_config: None, }; let cancellation_token = CancellationToken::new(); @@ -759,6 +774,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let message: bytes::BytesMut = message.try_into().unwrap(); @@ -871,6 +887,7 @@ mod tests { lag_refresh_interval_in_secs: 3, lookback_window_in_secs: 120, }, + callback_config: None, }; let cancellation_token = CancellationToken::new(); diff --git a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs index d2c71759a0..a14ab7b4c8 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs @@ -163,7 +163,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_source_forwarder() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); // create the source which produces x number of messages let cln_token = CancellationToken::new(); diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 152004aed7..97aa5f9864 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -155,12 +155,18 @@ impl JetstreamReader { index: 0, }; + let metadata = crate::message::Metadata{ + // Copy previous vertex name from message id + previous_vertex: String::from_utf8_lossy(&message.id.vertex_name).into(), + }; + message.metadata = Some(metadata); + message.offset = Some(offset.clone()); message.id = message_id.clone(); // Insert the message into the tracker and wait for the ack to be sent back. let (ack_tx, ack_rx) = oneshot::channel(); - tracker_handle.insert(message_id.offset.clone(), ack_tx).await?; + tracker_handle.insert(&message, ack_tx).await?; tokio::spawn(Self::start_work_in_progress( jetstream_message, @@ -335,7 +341,7 @@ mod tests { 0, context.clone(), buf_reader_config, - TrackerHandle::new(), + TrackerHandle::new(None), 500, ) .await @@ -360,6 +366,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let message_bytes: BytesMut = message.try_into().unwrap(); context @@ -395,7 +402,7 @@ mod tests { // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let stream_name = "test_ack"; // Delete stream if it exists @@ -459,6 +466,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; offsets.push(message.id.offset.clone()); let message_bytes: BytesMut = message.try_into().unwrap(); diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index e71335a576..063af6391a 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -181,6 +181,7 @@ impl JetstreamWriter { let this = self.clone(); let handle: JoinHandle> = tokio::spawn(async move { + tracing::info!("Starting streaming Jetstream writer"); let mut messages_stream = messages_stream; let mut hash = DefaultHasher::new(); @@ -196,6 +197,7 @@ impl JetstreamWriter { continue; } + // List of PAFs(one message can be written to multiple streams) let mut pafs = vec![]; for vertex in &*this.config { // check whether we need to write to this downstream vertex @@ -239,12 +241,7 @@ impl JetstreamWriter { continue; } - this.resolve_pafs(ResolveAndPublishResult { - pafs, - payload: message.value.clone().into(), - offset: message.id.offset, - }) - .await?; + this.resolve_pafs(pafs, message).await?; processed_msgs_count += 1; if last_logged_at.elapsed().as_secs() >= 1 { @@ -257,6 +254,7 @@ impl JetstreamWriter { last_logged_at = Instant::now(); } } + tracing::info!("Streaming jetstream writer finished"); Ok(()) }); Ok(handle) @@ -336,7 +334,11 @@ impl JetstreamWriter { /// asynchronously, if it fails it will do a blocking write to resolve the PAFs. /// At any point in time, we will only have X PAF resolvers running, this will help us create a /// natural backpressure. - pub(super) async fn resolve_pafs(&self, result: ResolveAndPublishResult) -> Result<()> { + pub(super) async fn resolve_pafs( + &self, + pafs: Vec<((String, u16), PublishAckFuture)>, + message: Message, + ) -> Result<()> { let start_time = Instant::now(); let permit = Arc::clone(&self.sem) .acquire_owned() @@ -350,7 +352,7 @@ impl JetstreamWriter { tokio::spawn(async move { let _permit = permit; - for (stream, paf) in result.pafs { + for (stream, paf) in pafs { match paf.await { Ok(ack) => { if ack.duplicate { @@ -364,7 +366,7 @@ impl JetstreamWriter { Offset::Int(IntOffset::new(ack.sequence, stream.1)), )); tracker_handle - .delete(result.offset.clone()) + .delete(message.id.offset.clone()) .await .expect("Failed to delete offset from tracker"); } @@ -376,7 +378,7 @@ impl JetstreamWriter { ); match JetstreamWriter::blocking_write( stream.clone(), - result.payload.clone(), + message.value.clone(), js_ctx.clone(), cancel_token.clone(), ) @@ -398,7 +400,7 @@ impl JetstreamWriter { error!(?e, "Blocking write failed for stream {}", stream.0); // Since we failed to write to the stream, we need to send a NAK to the reader tracker_handle - .discard(result.offset.clone()) + .discard(message.id.offset.clone()) .await .expect("Failed to discard offset from the tracker"); return; @@ -421,17 +423,14 @@ impl JetstreamWriter { /// an error it means it is fatal non-retryable error. async fn blocking_write( stream: Stream, - payload: Vec, + payload: Bytes, js_ctx: Context, cln_token: CancellationToken, ) -> Result { let start_time = Instant::now(); info!("Blocking write for stream {}", stream.0); loop { - match js_ctx - .publish(stream.0.clone(), Bytes::from(payload.clone())) - .await - { + match js_ctx.publish(stream.0.clone(), payload.clone()).await { Ok(paf) => match paf.await { Ok(ack) => { if ack.duplicate { @@ -463,17 +462,6 @@ impl JetstreamWriter { } } -/// ResolveAndPublishResult resolves the result of the write PAF operation. -/// It contains the list of pafs(one message can be written to multiple streams) -/// and the payload that was written. Once the PAFs for all the streams have been -/// resolved, the information is published to callee_tx. -#[derive(Debug)] -pub(crate) struct ResolveAndPublishResult { - pub(crate) pafs: Vec<(Stream, PublishAckFuture)>, - pub(crate) payload: Vec, - pub(crate) offset: Bytes, -} - #[cfg(test)] mod tests { use std::collections::HashMap; @@ -494,7 +482,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_async_write() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let cln_token = CancellationToken::new(); let js_url = "localhost:4222"; // Create JetStream context @@ -552,6 +540,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let paf = writer @@ -611,6 +600,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let message_bytes: BytesMut = message.try_into().unwrap(); @@ -632,7 +622,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_write_with_cancellation() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let js_url = "localhost:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); @@ -695,6 +685,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let paf = writer .write( @@ -720,6 +711,7 @@ mod tests { index: 11, }, headers: HashMap::new(), + metadata: None, }; let paf = writer .write( @@ -830,7 +822,7 @@ mod tests { #[cfg(feature = "nats-tests")] #[tokio::test] async fn test_check_stream_status() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let js_url = "localhost:4222"; // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); @@ -928,7 +920,7 @@ mod tests { // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let stream_name = "test_publish_messages"; // Delete stream if it exists @@ -987,12 +979,10 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); - tracker_handle - .insert(message.id.offset.clone(), ack_tx) - .await - .unwrap(); + tracker_handle.insert(&message, ack_tx).await.unwrap(); ack_rxs.push(ack_rx); messages_tx.send(message).await.unwrap(); } @@ -1016,7 +1006,7 @@ mod tests { // Create JetStream context let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let stream_name = "test_publish_cancellation"; // Delete stream if it exists @@ -1075,12 +1065,10 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); - tracker_handle - .insert(message.id.offset.clone(), ack_tx) - .await - .unwrap(); + tracker_handle.insert(&message, ack_tx).await.unwrap(); ack_rxs.push(ack_rx); tx.send(message).await.unwrap(); } @@ -1102,12 +1090,10 @@ mod tests { index: 101, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); - tracker_handle - .insert("offset_101".to_string().into(), ack_tx) - .await - .unwrap(); + tracker_handle.insert(&message, ack_tx).await.unwrap(); ack_rxs.push(ack_rx); tx.send(message).await.unwrap(); drop(tx); @@ -1135,7 +1121,7 @@ mod tests { let js_url = "localhost:4222"; let client = async_nats::connect(js_url).await.unwrap(); let context = jetstream::new(client); - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let cln_token = CancellationToken::new(); let vertex1_streams = vec!["vertex1-0", "vertex1-1"]; @@ -1215,12 +1201,10 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); - tracker_handle - .insert(message.id.offset.clone(), ack_tx) - .await - .unwrap(); + tracker_handle.insert(&message, ack_tx).await.unwrap(); ack_rxs.push(ack_rx); messages_tx.send(message).await.unwrap(); } diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index be24f2f327..fdbfbe21c5 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -293,7 +293,7 @@ impl SinkWriter { let sink_start = time::Instant::now(); let total_valid_msgs = batch.len(); - match this.write(batch, cancellation_token.clone()).await { + match this.write(batch.clone(), cancellation_token.clone()).await { Ok(_) => { for offset in offsets { // Delete the message from the tracker @@ -751,7 +751,7 @@ mod tests { 10, Duration::from_secs(1), SinkClientType::Log, - TrackerHandle::new(), + TrackerHandle::new(None), ) .build() .await @@ -770,6 +770,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); @@ -781,7 +782,7 @@ mod tests { #[tokio::test] async fn test_streaming_write() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let sink_writer = SinkWriterBuilder::new( 10, Duration::from_millis(100), @@ -805,6 +806,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); @@ -813,10 +815,7 @@ mod tests { for msg in messages { let (ack_tx, ack_rx) = oneshot::channel(); ack_rxs.push(ack_rx); - tracker_handle - .insert(msg.id.offset.clone(), ack_tx) - .await - .unwrap(); + tracker_handle.insert(&msg, ack_tx).await.unwrap(); let _ = tx.send(msg).await; } drop(tx); @@ -836,7 +835,7 @@ mod tests { #[tokio::test] async fn test_streaming_write_error() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); // start the server let (_shutdown_tx, shutdown_rx) = oneshot::channel(); let tmp_dir = tempfile::TempDir::new().unwrap(); @@ -883,6 +882,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); @@ -891,10 +891,7 @@ mod tests { for msg in messages { let (ack_tx, ack_rx) = oneshot::channel(); ack_rxs.push(ack_rx); - tracker_handle - .insert(msg.id.offset.clone(), ack_tx) - .await - .unwrap(); + tracker_handle.insert(&msg, ack_tx).await.unwrap(); let _ = tx.send(msg).await; } drop(tx); @@ -921,7 +918,7 @@ mod tests { #[tokio::test] async fn test_fallback_write() { - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); // start the server let (_shutdown_tx, shutdown_rx) = oneshot::channel(); @@ -970,6 +967,7 @@ mod tests { index: i, }, headers: HashMap::new(), + metadata: None, }) .collect(); @@ -977,10 +975,7 @@ mod tests { let mut ack_rxs = vec![]; for msg in messages { let (ack_tx, ack_rx) = oneshot::channel(); - tracker_handle - .insert(msg.id.offset.clone(), ack_tx) - .await - .unwrap(); + tracker_handle.insert(&msg, ack_tx).await.unwrap(); ack_rxs.push(ack_rx); let _ = tx.send(msg).await; } @@ -1017,6 +1012,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let request: SinkRequest = message.into(); diff --git a/rust/numaflow-core/src/sink/blackhole.rs b/rust/numaflow-core/src/sink/blackhole.rs index 308a59e35b..328731b916 100644 --- a/rust/numaflow-core/src/sink/blackhole.rs +++ b/rust/numaflow-core/src/sink/blackhole.rs @@ -44,6 +44,7 @@ mod tests { offset: "1".to_string().into(), index: 0, }, + metadata: None, }, Message { keys: Arc::from(vec![]), @@ -57,6 +58,7 @@ mod tests { offset: "2".to_string().into(), index: 1, }, + metadata: None, }, ]; diff --git a/rust/numaflow-core/src/sink/log.rs b/rust/numaflow-core/src/sink/log.rs index 71bb373741..c70a1d2537 100644 --- a/rust/numaflow-core/src/sink/log.rs +++ b/rust/numaflow-core/src/sink/log.rs @@ -57,6 +57,7 @@ mod tests { offset: "1".to_string().into(), index: 0, }, + metadata: None, }, Message { keys: Arc::from(vec![]), @@ -70,6 +71,7 @@ mod tests { offset: "2".to_string().into(), index: 1, }, + metadata: None, }, ]; diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index 0bcb4c6853..76e0ba6ad6 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -212,6 +212,7 @@ mod tests { offset: "1".to_string().into(), index: 0, }, + metadata: None, }, Message { keys: Arc::from(vec![]), @@ -225,6 +226,7 @@ mod tests { offset: "2".to_string().into(), index: 1, }, + metadata: None, }, ]; diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index a0de52fa06..bdd2d03ee6 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -308,9 +308,7 @@ impl Source { let offset = message.offset.clone().expect("offset can never be none"); // insert the offset and the ack one shot in the tracker. - tracker_handle - .insert(message.id.offset.clone(), resp_ack_tx) - .await?; + tracker_handle.insert(&message, resp_ack_tx).await?; // store the ack one shot in the batch to invoke ack later. ack_batch.push((offset, resp_ack_rx)); @@ -570,7 +568,7 @@ mod tests { let source = Source::new( 5, SourceType::UserDefinedSource(src_read, src_ack, lag_reader), - TrackerHandle::new(), + TrackerHandle::new(None), true, None, ); diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index 855030f73b..b1ab1694d3 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -178,6 +178,7 @@ mod stream_generator { index: Default::default(), }, headers: Default::default(), + metadata: None, } } diff --git a/rust/numaflow-core/src/source/pulsar.rs b/rust/numaflow-core/src/source/pulsar.rs index 6a2c7162b6..5e02db3da6 100644 --- a/rust/numaflow-core/src/source/pulsar.rs +++ b/rust/numaflow-core/src/source/pulsar.rs @@ -26,6 +26,7 @@ impl TryFrom for Message { index: 0, }, headers: message.headers, + metadata: None, }) } } diff --git a/rust/numaflow-core/src/source/serving.rs b/rust/numaflow-core/src/source/serving.rs index 5eeea4b054..65f3a3e153 100644 --- a/rust/numaflow-core/src/source/serving.rs +++ b/rust/numaflow-core/src/source/serving.rs @@ -4,7 +4,7 @@ pub(crate) use serving::ServingSource; use super::{get_vertex_name, Message, Offset}; use crate::config::get_vertex_replica; -use crate::message::{MessageID, StringOffset}; +use crate::message::{MessageID, Metadata, StringOffset}; use crate::Error; use crate::Result; @@ -27,6 +27,9 @@ impl TryFrom for Message { index: 0, }, headers: message.headers, + metadata: Some(Metadata { + previous_vertex: get_vertex_name().to_string(), + }), }) } } diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index 66ac456ddf..d089e95acb 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -129,6 +129,7 @@ impl TryFrom for Message { index: 0, }, headers: result.headers, + metadata: None, }) } } diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index a8ccaca54a..919ab5d268 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -1,4 +1,4 @@ -//! Tracker is added because when do data forwarding in [MonoVertex](crate::monovertex::forwarder) or +//! Tracker is added because when we do data forwarding in [MonoVertex](crate::monovertex::forwarder) or //! in [Pipeline](crate::pipeline::forwarder), immaterial whether we are in source, UDF, or Sink, we //! have to track whether the message has completely moved to the next vertex (N+1)th before we can //! mark that message as done in the Nth vertex. We use Tracker to let Read know that it can mark the @@ -9,38 +9,45 @@ //! In the future Watermark will also be propagated based on this. use std::collections::HashMap; +use std::sync::Arc; use bytes::Bytes; +use serving::callback::CallbackHandler; +use serving::{DEFAULT_CALLBACK_URL_HEADER_KEY, DEFAULT_ID_HEADER}; use tokio::sync::{mpsc, oneshot}; use crate::error::Error; -use crate::message::ReadAck; +use crate::message::{Message, ReadAck}; use crate::Result; /// TrackerEntry represents the state of a tracked message. #[derive(Debug)] struct TrackerEntry { ack_send: oneshot::Sender, - count: u32, + count: usize, eof: bool, + callback_info: Option, } /// ActorMessage represents the messages that can be sent to the Tracker actor. enum ActorMessage { Insert { - offset: String, + offset: Bytes, ack_send: oneshot::Sender, + callback_info: Option, }, Update { - offset: String, - count: u32, - eof: bool, + offset: Bytes, + responses: Option>, + }, + UpdateEOF { + offset: Bytes, }, Delete { - offset: String, + offset: Bytes, }, Discard { - offset: String, + offset: Bytes, }, DiscardAll, // New variant for discarding all messages #[cfg(test)] @@ -52,8 +59,62 @@ enum ActorMessage { /// Tracker is responsible for managing the state of messages being processed. /// It keeps track of message offsets and their completeness, and sends acknowledgments. struct Tracker { - entries: HashMap, + entries: HashMap, receiver: mpsc::Receiver, + callback_handler: Option, +} + +#[derive(Debug)] +struct CallbackInfo { + id: String, + callback_url: String, + from_vertex: String, + responses: Vec>>, +} + +impl TryFrom<&Message> for CallbackInfo { + type Error = Error; + + fn try_from(message: &Message) -> std::result::Result { + let callback_url = message + .headers + .get(DEFAULT_CALLBACK_URL_HEADER_KEY) + .ok_or_else(|| { + Error::Source(format!( + "{DEFAULT_CALLBACK_URL_HEADER_KEY} header is not present in the message headers", + )) + })? + .to_owned(); + let uuid = message + .headers + .get(DEFAULT_ID_HEADER) + .ok_or_else(|| { + Error::Source(format!( + "{DEFAULT_ID_HEADER} is not found in message headers", + )) + })? + .to_owned(); + + let from_vertex = message + .metadata + .as_ref() + .ok_or_else(|| Error::Source("Metadata field is empty in the message".into()))? + .previous_vertex + .clone(); + + let mut msg_tags = None; + if let Some(ref tags) = message.tags { + if !tags.is_empty() { + msg_tags = Some(tags.iter().cloned().collect()); + } + }; + Ok(CallbackInfo { + id: uuid, + callback_url, + from_vertex, + responses: vec![msg_tags], + }) + } } impl Drop for Tracker { @@ -70,10 +131,14 @@ impl Drop for Tracker { impl Tracker { /// Creates a new Tracker instance with the given receiver for actor messages. - fn new(receiver: mpsc::Receiver) -> Self { + fn new( + receiver: mpsc::Receiver, + callback_handler: Option, + ) -> Self { Self { entries: HashMap::new(), receiver, + callback_handler, } } @@ -90,14 +155,18 @@ impl Tracker { ActorMessage::Insert { offset, ack_send: respond_to, + callback_info, } => { - self.handle_insert(offset, respond_to); + self.handle_insert(offset, callback_info, respond_to); + } + ActorMessage::Update { offset, responses } => { + self.handle_update(offset, responses); } - ActorMessage::Update { offset, count, eof } => { - self.handle_update(offset, count, eof); + ActorMessage::UpdateEOF { offset } => { + self.handle_update_eof(offset).await; } ActorMessage::Delete { offset } => { - self.handle_delete(offset); + self.handle_delete(offset).await; } ActorMessage::Discard { offset } => { self.handle_discard(offset); @@ -114,61 +183,74 @@ impl Tracker { } /// Inserts a new entry into the tracker with the given offset and ack sender. - fn handle_insert(&mut self, offset: String, respond_to: oneshot::Sender) { + fn handle_insert( + &mut self, + offset: Bytes, + callback_info: Option, + respond_to: oneshot::Sender, + ) { self.entries.insert( offset, TrackerEntry { ack_send: respond_to, count: 0, eof: true, + callback_info, }, ); } /// Updates an existing entry in the tracker with the number of expected messages and EOF status. - fn handle_update(&mut self, offset: String, count: u32, eof: bool) { - if let Some(entry) = self.entries.get_mut(&offset) { - entry.count += count; - entry.eof = eof; - // if the count is zero, we can send an ack immediately - // this is case where map stream will send eof true after - // receiving all the messages. - if entry.count == 0 { - let entry = self.entries.remove(&offset).unwrap(); - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } + fn handle_update(&mut self, offset: Bytes, responses: Option>) { + let Some(entry) = self.entries.get_mut(&offset) else { + return; + }; + + entry.count += 1; + if let Some(cb) = entry.callback_info.as_mut() { + cb.responses.push(responses); + } + } + + async fn handle_update_eof(&mut self, offset: Bytes) { + let Some(entry) = self.entries.get_mut(&offset) else { + return; + }; + entry.eof = true; + // if the count is zero, we can send an ack immediately + // this is case where map stream will send eof true after + // receiving all the messages. + if entry.count == 0 { + let entry = self.entries.remove(&offset).unwrap(); + self.ack_message(entry).await; } } /// Removes an entry from the tracker and sends an acknowledgment if the count is zero - /// or the entry is marked as EOF. - fn handle_delete(&mut self, offset: String) { - if let Some(mut entry) = self.entries.remove(&offset) { - if entry.count > 0 { - entry.count -= 1; - } - if entry.count == 0 && entry.eof { - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } else { - self.entries.insert(offset, entry); - } + /// and the entry is marked as EOF. + async fn handle_delete(&mut self, offset: Bytes) { + let Some(mut entry) = self.entries.remove(&offset) else { + return; + }; + if entry.count > 0 { + entry.count -= 1; + } + if entry.count == 0 && entry.eof { + self.ack_message(entry).await; + } else { + self.entries.insert(offset, entry); } } /// Discards an entry from the tracker and sends a nak. - fn handle_discard(&mut self, offset: String) { - if let Some(entry) = self.entries.remove(&offset) { - entry - .ack_send - .send(ReadAck::Nak) - .expect("Failed to send nak"); - } + fn handle_discard(&mut self, offset: Bytes) { + let Some(entry) = self.entries.remove(&offset) else { + return; + }; + entry + .ack_send + .send(ReadAck::Nak) + .expect("Failed to send nak"); } /// Discards all entries from the tracker and sends a nak for each. @@ -180,6 +262,37 @@ impl Tracker { .expect("Failed to send nak"); } } + + async fn ack_message(&self, entry: TrackerEntry) { + let TrackerEntry { + ack_send, + callback_info, + .. + } = entry; + + ack_send.send(ReadAck::Ack).expect("Failed to send ack"); + + let Some(ref callback_handler) = self.callback_handler else { + return; + }; + let Some(callback_info) = callback_info else { + tracing::error!("Callback is enabled, but Tracker doesn't contain callback info"); + return; + }; + + let id = callback_info.id.clone(); + let result = callback_handler + .callback( + callback_info.id, + callback_info.callback_url, + callback_info.from_vertex, + callback_info.responses, + ) + .await; + if let Err(e) = result { + tracing::error!(?e, id, "Failed to send callback"); + } + } } /// TrackerHandle provides an interface to interact with the Tracker. @@ -187,26 +300,37 @@ impl Tracker { #[derive(Clone)] pub(crate) struct TrackerHandle { sender: mpsc::Sender, + enable_callbacks: bool, } impl TrackerHandle { /// Creates a new TrackerHandle instance and spawns the Tracker. - pub(crate) fn new() -> Self { + pub(crate) fn new(callback_handler: Option) -> Self { + let enable_callbacks = callback_handler.is_some(); let (sender, receiver) = mpsc::channel(100); - let tracker = Tracker::new(receiver); + let tracker = Tracker::new(receiver, callback_handler); tokio::spawn(tracker.run()); - Self { sender } + Self { + sender, + enable_callbacks, + } } /// Inserts a new message into the Tracker with the given offset and acknowledgment sender. pub(crate) async fn insert( &self, - offset: Bytes, + message: &Message, ack_send: oneshot::Sender, ) -> Result<()> { + let offset = message.id.offset.clone(); + let mut callback_info = None; + if self.enable_callbacks { + callback_info = Some(message.try_into()?); + } let message = ActorMessage::Insert { - offset: String::from_utf8_lossy(&offset).to_string(), + offset, ack_send, + callback_info, }; self.sender .send(message) @@ -215,13 +339,34 @@ impl TrackerHandle { Ok(()) } - /// Updates an existing message in the Tracker with the given offset, count, and EOF status. - pub(crate) async fn update(&self, offset: Bytes, count: u32, eof: bool) -> Result<()> { - let message = ActorMessage::Update { - offset: String::from_utf8_lossy(&offset).to_string(), - count, - eof, + /// Informs the tracker that a new message has been generated. The tracker should contain + /// and entry for this message's offset. + pub(crate) async fn update( + &self, + offset: Bytes, + message_tags: Option>, + ) -> Result<()> { + let responses: Option> = match (self.enable_callbacks, message_tags) { + (true, Some(tags)) => { + if !tags.is_empty() { + Some(tags.iter().cloned().collect::>()) + } else { + None + } + } + _ => None, }; + let message = ActorMessage::Update { offset, responses }; + self.sender + .send(message) + .await + .map_err(|e| Error::Tracker(format!("{:?}", e)))?; + Ok(()) + } + + /// Updates the EOF status for an offset in the Tracker + pub(crate) async fn update_eof(&self, offset: Bytes) -> Result<()> { + let message = ActorMessage::UpdateEOF { offset }; self.sender .send(message) .await @@ -231,9 +376,7 @@ impl TrackerHandle { /// Deletes a message from the Tracker with the given offset. pub(crate) async fn delete(&self, offset: Bytes) -> Result<()> { - let message = ActorMessage::Delete { - offset: String::from_utf8_lossy(&offset).to_string(), - }; + let message = ActorMessage::Delete { offset }; self.sender .send(message) .await @@ -243,9 +386,7 @@ impl TrackerHandle { /// Discards a message from the Tracker with the given offset. pub(crate) async fn discard(&self, offset: Bytes) -> Result<()> { - let message = ActorMessage::Discard { - offset: String::from_utf8_lossy(&offset).to_string(), - }; + let message = ActorMessage::Discard { offset }; self.sender .send(message) .await @@ -279,27 +420,45 @@ impl TrackerHandle { #[cfg(test)] mod tests { + use std::sync::Arc; + use tokio::sync::oneshot; use tokio::time::{timeout, Duration}; + use crate::message::MessageID; + use super::*; #[tokio::test] async fn test_insert_update_delete() { - let handle = TrackerHandle::new(); + let handle = TrackerHandle::new(None); let (ack_send, ack_recv) = oneshot::channel(); + let offset = Bytes::from_static(b"offset1"); + let message = Message { + keys: Arc::from([]), + tags: None, + value: Bytes::from_static(b"test"), + offset: None, + event_time: Default::default(), + id: MessageID { + vertex_name: "in".into(), + offset: offset.clone(), + index: 1, + }, + headers: HashMap::new(), + metadata: None, + }; + // Insert a new message - handle - .insert("offset1".to_string().into(), ack_send) - .await - .unwrap(); + handle.insert(&message, ack_send).await.unwrap(); // Update the message handle - .update("offset1".to_string().into(), 1, true) + .update(offset.clone(), message.tags.clone()) .await .unwrap(); + handle.update_eof(offset).await.unwrap(); // Delete the message handle.delete("offset1".to_string().into()).await.unwrap(); @@ -313,25 +472,41 @@ mod tests { #[tokio::test] async fn test_update_with_multiple_deletes() { - let handle = TrackerHandle::new(); + let handle = TrackerHandle::new(None); let (ack_send, ack_recv) = oneshot::channel(); + let offset = Bytes::from_static(b"offset1"); + let message = Message { + keys: Arc::from([]), + tags: None, + value: Bytes::from_static(b"test"), + offset: None, + event_time: Default::default(), + id: MessageID { + vertex_name: "in".into(), + offset: offset.clone(), + index: 1, + }, + headers: HashMap::new(), + metadata: None, + }; + // Insert a new message - handle - .insert("offset1".to_string().into(), ack_send) - .await - .unwrap(); + handle.insert(&message, ack_send).await.unwrap(); + let messages: Vec = std::iter::repeat(message).take(3).collect(); // Update the message with a count of 3 - handle - .update("offset1".to_string().into(), 3, true) - .await - .unwrap(); + for message in messages { + handle + .update(offset.clone(), message.tags.clone()) + .await + .unwrap(); + } // Delete the message three times - handle.delete("offset1".to_string().into()).await.unwrap(); - handle.delete("offset1".to_string().into()).await.unwrap(); - handle.delete("offset1".to_string().into()).await.unwrap(); + handle.delete(offset.clone()).await.unwrap(); + handle.delete(offset.clone()).await.unwrap(); + handle.delete(offset).await.unwrap(); // Verify that the message was deleted and ack was received after the third delete let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap(); @@ -342,17 +517,30 @@ mod tests { #[tokio::test] async fn test_discard() { - let handle = TrackerHandle::new(); + let handle = TrackerHandle::new(None); let (ack_send, ack_recv) = oneshot::channel(); + let offset = Bytes::from_static(b"offset1"); + let message = Message { + keys: Arc::from([]), + tags: None, + value: Bytes::from_static(b"test"), + offset: None, + event_time: Default::default(), + id: MessageID { + vertex_name: "in".into(), + offset: offset.clone(), + index: 1, + }, + headers: HashMap::new(), + metadata: None, + }; + // Insert a new message - handle - .insert("offset1".to_string().into(), ack_send) - .await - .unwrap(); + handle.insert(&message, ack_send).await.unwrap(); // Discard the message - handle.discard("offset1".to_string().into()).await.unwrap(); + handle.discard(offset).await.unwrap(); // Verify that the message was discarded and nak was received let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap(); @@ -363,23 +551,38 @@ mod tests { #[tokio::test] async fn test_discard_after_update_with_higher_count() { - let handle = TrackerHandle::new(); + let handle = TrackerHandle::new(None); let (ack_send, ack_recv) = oneshot::channel(); - // Insert a new message - handle - .insert("offset1".to_string().into(), ack_send) - .await - .unwrap(); + let offset = Bytes::from_static(b"offset1"); + let message = Message { + keys: Arc::from([]), + tags: None, + value: Bytes::from_static(b"test"), + offset: None, + event_time: Default::default(), + id: MessageID { + vertex_name: "in".into(), + offset: offset.clone(), + index: 1, + }, + headers: HashMap::new(), + metadata: None, + }; - // Update the message with a count of 3 - handle - .update("offset1".to_string().into(), 3, false) - .await - .unwrap(); + // Insert a new message + handle.insert(&message, ack_send).await.unwrap(); + + let messages: Vec = std::iter::repeat(message).take(3).collect(); + for message in messages { + handle + .update(offset.clone(), message.tags.clone()) + .await + .unwrap(); + } // Discard the message - handle.discard("offset1".to_string().into()).await.unwrap(); + handle.discard(offset).await.unwrap(); // Verify that the message was discarded and nak was received let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap(); diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index 23cd6ffbbf..05c567c64b 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -146,12 +146,13 @@ impl Transformer { Transformer::transform(transform_handle, read_msg.clone()).await?; // update the tracker with the number of responses for each message + for message in transformed_messages.iter() { + tracker_handle + .update(read_msg.id.offset.clone(), message.tags.clone()) + .await?; + } tracker_handle - .update( - read_msg.id.offset.clone(), - transformed_messages.len() as u32, - true, - ) + .update_eof(read_msg.id.offset.clone()) .await?; Ok::, Error>(transformed_messages) @@ -220,7 +221,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = SourceTransformClient::new(create_rpc_channel(sock_file).await?); let transformer = Transformer::new(500, 10, client, tracker_handle.clone()).await?; @@ -237,6 +238,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let transformed_messages = @@ -283,7 +285,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = SourceTransformClient::new(create_rpc_channel(sock_file).await?); let transformer = Transformer::new(500, 10, client, tracker_handle.clone()).await?; @@ -301,6 +303,7 @@ mod tests { index: i, }, headers: Default::default(), + metadata: None, }; messages.push(message); } @@ -358,7 +361,7 @@ mod tests { // wait for the server to start tokio::time::sleep(Duration::from_millis(100)).await; - let tracker_handle = TrackerHandle::new(); + let tracker_handle = TrackerHandle::new(None); let client = SourceTransformClient::new(create_rpc_channel(sock_file).await?); let transformer = Transformer::new(500, 10, client, tracker_handle.clone()).await?; @@ -374,6 +377,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let result = transformer.transform_batch(vec![message]).await; diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index f519c0b801..c56afecbd5 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -143,6 +143,7 @@ impl UserDefinedTransformer { offset: Some(msg_info.offset.clone()), event_time: utc_from_timestamp(result.event_time), headers: msg_info.headers.clone(), + metadata: None, }; response_messages.push(message); } @@ -253,6 +254,7 @@ mod tests { index: 0, }, headers: Default::default(), + metadata: None, }; let (tx, rx) = tokio::sync::oneshot::channel(); @@ -297,6 +299,7 @@ mod tests { index: 0, }, headers: HashMap::new(), + metadata: None, }; let request: SourceTransformRequest = message.into(); diff --git a/rust/numaflow/src/main.rs b/rust/numaflow/src/main.rs index e0836ce21c..56f40ca48c 100644 --- a/rust/numaflow/src/main.rs +++ b/rust/numaflow/src/main.rs @@ -1,5 +1,6 @@ use std::env; use std::error::Error; +use std::time::Duration; use tracing::error; use tracing_subscriber::layer::SubscriberExt; @@ -27,6 +28,8 @@ async fn main() -> Result<(), Box> { if let Err(e) = run().await { error!("{e:?}"); + tracing::warn!("Sleeping after error"); + tokio::time::sleep(Duration::from_secs(300)).await; return Err(e); } Ok(()) @@ -42,7 +45,13 @@ async fn run() -> Result<(), Box> { } else if args.contains(&"--rust".to_string()) { numaflow_core::run() .await - .map_err(|e| format!("Error running rust binary: {e:?}"))? + .map_err(|e| format!("Error running rust binary: {e:?}"))?; + } else { + return Err(format!( + "Invalid argument. Use --servesink, or --rust. Current args = {:?}", + args + ) + .into()); } - Err("Invalid argument. Use --servesink, or --rust".into()) + Ok(()) } diff --git a/rust/serving/Cargo.toml b/rust/serving/Cargo.toml index 857d69db77..673bf60460 100644 --- a/rust/serving/Cargo.toml +++ b/rust/serving/Cargo.toml @@ -38,6 +38,7 @@ rcgen = "0.13.1" parking_lot = "0.12.3" prometheus-client = "0.22.3" thiserror = "1.0.63" +reqwest = { workspace = true, features = ["rustls-tls", "json"] } [dev-dependencies] reqwest = { workspace = true, features = ["json"] } diff --git a/rust/serving/src/app.rs b/rust/serving/src/app.rs index 330752ad6c..721376855f 100644 --- a/rust/serving/src/app.rs +++ b/rust/serving/src/app.rs @@ -12,9 +12,10 @@ use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::rt::TokioExecutor; use tokio::signal; use tower::ServiceBuilder; +use tower_http::classify::ServerErrorsFailureClass; use tower_http::timeout::TimeoutLayer; -use tower_http::trace::{DefaultOnResponse, TraceLayer}; -use tracing::{info, info_span, Level}; +use tower_http::trace::TraceLayer; +use tracing::{info, info_span, Span}; use uuid::Uuid; use self::{ @@ -81,6 +82,11 @@ where .layer( TraceLayer::new_for_http() .make_span_with(move |req: &Request| { + let req_path = req.uri().path(); + if ["/metrics", "/readyz", "/livez", "/sidecar-livez"].contains(&req_path) { + // We don't need request ID for these endpoints + return info_span!("request", method=?req.method(), path=req_path); + } let tid = req .headers() .get(&tid_header) @@ -93,9 +99,22 @@ where .get::() .map(MatchedPath::as_str); - info_span!("request", tid, method=?req.method(), matched_path) + info_span!("request", tid, method=?req.method(), path=req_path, matched_path) }) - .on_response(DefaultOnResponse::new().level(Level::INFO)), + .on_response( + |response: &Response, latency: Duration, _span: &Span| { + if response.status().is_server_error() { + // 5xx responses will be logged at 'error' level in `on_failure` + return; + } + tracing::info!(status=?response.status(), ?latency) + }, + ) + .on_failure( + |error: ServerErrorsFailureClass, latency: Duration, _span: &Span| { + tracing::error!(?error, ?latency, "Server error"); + }, + ), ) // capture metrics for all requests .layer(middleware::from_fn(capture_metrics)) diff --git a/rust/serving/src/app/callback.rs b/rust/serving/src/app/callback.rs index d5708a4e62..7d8de8815e 100644 --- a/rust/serving/src/app/callback.rs +++ b/rust/serving/src/app/callback.rs @@ -1,9 +1,9 @@ use axum::{body::Bytes, extract::State, http::HeaderMap, routing, Json, Router}; -use serde::{Deserialize, Serialize}; use tracing::error; use self::store::Store; use crate::app::response::ApiError; +use crate::callback::Callback; /// in-memory state store including connection tracking pub(crate) mod state; @@ -12,26 +12,6 @@ use state::State as CallbackState; /// store for storing the state pub(crate) mod store; -/// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback -/// to inform that message has been processed by this component. -#[derive(Debug, Serialize, Deserialize)] -pub(crate) struct Callback { - pub(crate) id: String, - pub(crate) vertex: String, - pub(crate) cb_time: u64, - pub(crate) from_vertex: String, - /// Due to flat-map operation, we can have 0 or more responses. - pub(crate) responses: Vec, -} - -/// It contains details about the `To` vertex via tags (conditional forwarding). -#[derive(Debug, Serialize, Deserialize)] -pub(crate) struct Response { - /// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that - /// the message has been dropped. - pub(crate) tags: Option>, -} - #[derive(Clone)] struct CallbackAppState { tid_header: String, @@ -106,6 +86,7 @@ mod tests { use crate::app::callback::state::State as CallbackState; use crate::app::callback::store::memstore::InMemoryStore; use crate::app::tracker::MessageGraph; + use crate::callback::Response; use crate::pipeline::PipelineDCG; const PIPELINE_SPEC_ENCODED: &str = "eyJ2ZXJ0aWNlcyI6W3sibmFtZSI6ImluIiwic291cmNlIjp7InNlcnZpbmciOnsiYXV0aCI6bnVsbCwic2VydmljZSI6dHJ1ZSwibXNnSURIZWFkZXJLZXkiOiJYLU51bWFmbG93LUlkIiwic3RvcmUiOnsidXJsIjoicmVkaXM6Ly9yZWRpczo2Mzc5In19fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIiLCJlbnYiOlt7Im5hbWUiOiJSVVNUX0xPRyIsInZhbHVlIjoiZGVidWcifV19LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InBsYW5uZXIiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJwbGFubmVyIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InRpZ2VyIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsidGlnZXIiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZG9nIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZG9nIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6ImVsZXBoYW50IiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZWxlcGhhbnQiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiYXNjaWlhcnQiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJhc2NpaWFydCJdLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJidWlsdGluIjpudWxsLCJncm91cEJ5IjpudWxsfSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fSx7Im5hbWUiOiJzZXJ2ZS1zaW5rIiwic2luayI6eyJ1ZHNpbmsiOnsiY29udGFpbmVyIjp7ImltYWdlIjoic2VydmVzaW5rOjAuMSIsImVudiI6W3sibmFtZSI6Ik5VTUFGTE9XX0NBTExCQUNLX1VSTF9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctQ2FsbGJhY2stVXJsIn0seyJuYW1lIjoiTlVNQUZMT1dfTVNHX0lEX0hFQURFUl9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctSWQifV0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn19LCJyZXRyeVN0cmF0ZWd5Ijp7fX0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZXJyb3Itc2luayIsInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InNlcnZlc2luazowLjEiLCJlbnYiOlt7Im5hbWUiOiJOVU1BRkxPV19DQUxMQkFDS19VUkxfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUNhbGxiYWNrLVVybCJ9LHsibmFtZSI6Ik5VTUFGTE9XX01TR19JRF9IRUFERVJfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUlkIn1dLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9fSwicmV0cnlTdHJhdGVneSI6e319LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19XSwiZWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoicGxhbm5lciIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImFzY2lpYXJ0IiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiYXNjaWlhcnQiXX19fSx7ImZyb20iOiJwbGFubmVyIiwidG8iOiJ0aWdlciIsImNvbmRpdGlvbnMiOnsidGFncyI6eyJvcGVyYXRvciI6Im9yIiwidmFsdWVzIjpbInRpZ2VyIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZG9nIiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiZG9nIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZWxlcGhhbnQiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlbGVwaGFudCJdfX19LHsiZnJvbSI6InRpZ2VyIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZG9nIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZWxlcGhhbnQiLCJ0byI6InNlcnZlLXNpbmsiLCJjb25kaXRpb25zIjpudWxsfSx7ImZyb20iOiJhc2NpaWFydCIsInRvIjoic2VydmUtc2luayIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImVycm9yLXNpbmsiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlcnJvciJdfX19XSwibGlmZWN5Y2xlIjp7fSwid2F0ZXJtYXJrIjp7fX0="; diff --git a/rust/serving/src/app/callback/state.rs b/rust/serving/src/app/callback/state.rs index 419a40f60f..2354899aa9 100644 --- a/rust/serving/src/app/callback/state.rs +++ b/rust/serving/src/app/callback/state.rs @@ -228,7 +228,7 @@ where mod tests { use super::*; use crate::app::callback::store::memstore::InMemoryStore; - use crate::app::callback::Response; + use crate::callback::Response; use crate::pipeline::PipelineDCG; use axum::body::Bytes; diff --git a/rust/serving/src/app/callback/store/memstore.rs b/rust/serving/src/app/callback/store/memstore.rs index 4c5c36666d..0e7135b8e2 100644 --- a/rust/serving/src/app/callback/store/memstore.rs +++ b/rust/serving/src/app/callback/store/memstore.rs @@ -12,7 +12,7 @@ use crate::Error; pub(crate) struct InMemoryStore { /// The data field is a `HashMap` where the key is a `String` and the value is a `Vec>`. /// It is wrapped in an `Arc>` to allow shared ownership and thread safety. - data: Arc>>>>, + pub(crate) data: Arc>>>>, } impl InMemoryStore { @@ -98,7 +98,7 @@ mod tests { use super::*; use crate::app::callback::store::{PayloadToSave, Store}; - use crate::app::callback::{Callback, Response}; + use crate::callback::{Callback, Response}; #[tokio::test] async fn test_save_and_retrieve_callbacks() { diff --git a/rust/serving/src/app/callback/store/redisstore.rs b/rust/serving/src/app/callback/store/redisstore.rs index d4fe501cdb..490b7be940 100644 --- a/rust/serving/src/app/callback/store/redisstore.rs +++ b/rust/serving/src/app/callback/store/redisstore.rs @@ -203,7 +203,7 @@ impl super::Store for RedisConnection { mod tests { use super::*; use crate::app::callback::store::LocalStore; - use crate::app::callback::Response; + use crate::callback::Response; use axum::body::Bytes; use redis::AsyncCommands; diff --git a/rust/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs index 6d4d9bb737..ad4db52ca6 100644 --- a/rust/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -260,12 +260,12 @@ mod tests { use tower::ServiceExt; use super::*; - use crate::app::callback; use crate::app::callback::state::State as CallbackState; use crate::app::callback::store::memstore::InMemoryStore; use crate::app::callback::store::PayloadToSave; - use crate::app::callback::Callback; use crate::app::tracker::MessageGraph; + use crate::callback::{Callback, Response}; + use crate::config::DEFAULT_ID_HEADER; use crate::pipeline::PipelineDCG; use crate::{Error, Settings}; @@ -360,21 +360,21 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![callback::Response { tags: None }], + responses: vec![Response { tags: None }], }, Callback { id: id.to_string(), vertex: "cat".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![callback::Response { tags: None }], + responses: vec![Response { tags: None }], }, Callback { id: id.to_string(), vertex: "out".to_string(), cb_time: 12345, from_vertex: "cat".to_string(), - responses: vec![callback::Response { tags: None }], + responses: vec![Response { tags: None }], }, ] } @@ -526,7 +526,7 @@ mod tests { .method("POST") .uri("/sync_serve") .header("Content-Type", "text/plain") - .header("ID", ID_VALUE) + .header(DEFAULT_ID_HEADER, ID_VALUE) .body(Body::from("Test Message")) .unwrap(); diff --git a/rust/serving/src/app/tracker.rs b/rust/serving/src/app/tracker.rs index 5f3b24db7b..e1b23d35d1 100644 --- a/rust/serving/src/app/tracker.rs +++ b/rust/serving/src/app/tracker.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use crate::app::callback::Callback; +use crate::callback::Callback; use crate::pipeline::{Edge, OperatorType, PipelineDCG}; use crate::Error; @@ -250,7 +250,7 @@ impl MessageGraph { #[cfg(test)] mod tests { use super::*; - use crate::app::callback::Response; + use crate::callback::Response; use crate::pipeline::{Conditions, Tag, Vertex}; #[test] diff --git a/rust/serving/src/callback.rs b/rust/serving/src/callback.rs new file mode 100644 index 0000000000..4c6db654c0 --- /dev/null +++ b/rust/serving/src/callback.rs @@ -0,0 +1,256 @@ +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use tokio::sync::Semaphore; + +use crate::config::DEFAULT_ID_HEADER; + +/// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback +/// to inform that message has been processed by this component. +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct Callback { + pub(crate) id: String, + pub(crate) vertex: String, + pub(crate) cb_time: u64, + pub(crate) from_vertex: String, + /// Due to flat-map operation, we can have 0 or more responses. + pub(crate) responses: Vec, +} + +/// It contains details about the `To` vertex via tags (conditional forwarding). +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct Response { + /// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that + /// the message has been dropped. + pub(crate) tags: Option>, +} + +#[derive(Clone)] +pub struct CallbackHandler { + client: Client, + vertex_name: String, + semaphore: Arc, +} + +impl CallbackHandler { + pub fn new(vertex_name: String, concurrency_limit: usize) -> Self { + let client = Client::builder() + .danger_accept_invalid_certs(true) + .timeout(Duration::from_secs(1)) + .build() + .expect("Creating callback client for Serving source"); + + let semaphore = Arc::new(Semaphore::new(concurrency_limit)); + + Self { + client, + vertex_name, + semaphore, + } + } + + pub async fn callback( + &self, + id: String, + callback_url: String, + previous_vertex: String, + responses: Vec>>, + ) -> crate::Result<()> { + let cb_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time is older than Unix epoch time") + .as_millis() as u64; + + let responses = responses + .into_iter() + .map(|tags| Response { tags }) + .collect(); + + let callback_payload = Callback { + vertex: self.vertex_name.clone(), + id: id.clone(), + cb_time, + responses, + from_vertex: previous_vertex, + }; + + let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); + let client = self.client.clone(); + tokio::spawn(async move { + let _permit = permit; + // Retry in case of failure in making request. + // When there is a failure, we retry after wait_secs. This value is doubled after each retry attempt. + // Then longest wait time will be 64 seconds. + let mut wait_secs = 1; + const TOTAL_ATTEMPTS: usize = 7; + for i in 1..=TOTAL_ATTEMPTS { + let resp = client + .post(&callback_url) + .header(DEFAULT_ID_HEADER, id.clone()) + .json(&[&callback_payload]) + .send() + .await; + let resp = match resp { + Ok(resp) => resp, + Err(e) => { + if i < TOTAL_ATTEMPTS { + tracing::warn!( + ?e, + "Sending callback request failed. Will retry after a delay" + ); + tokio::time::sleep(Duration::from_secs(wait_secs)).await; + wait_secs *= 2; + } else { + tracing::error!(?e, "Sending callback request failed"); + } + continue; + } + }; + + if resp.status().is_success() { + break; + } + + if resp.status().is_client_error() { + // TODO: When the source serving pod restarts, the callbacks will fail with 4xx status + // since the request ID won't be available in it's in-memory tracker. + // No point in retrying such cases + // 4xx can also happen if payload is wrong (due to bugs in the code). We should differentiate + // between what can be retried and not. + let status_code = resp.status(); + let response_body = resp.text().await; + tracing::error!( + ?status_code, + ?response_body, + "Received client error while making callback. Callback will not be retried" + ); + break; + } + + let status_code = resp.status(); + let response_body = resp.text().await; + if i < TOTAL_ATTEMPTS { + tracing::warn!( + ?status_code, + ?response_body, + "Received non-OK status for callback request. Will retry after a delay" + ); + tokio::time::sleep(Duration::from_secs(wait_secs)).await; + wait_secs *= 2; + } else { + tracing::error!( + ?status_code, + ?response_body, + "Received non-OK status for callback request" + ); + } + } + }); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::app::callback::state::State as CallbackState; + use crate::app::callback::store::memstore::InMemoryStore; + use crate::app::start_main_server; + use crate::app::tracker::MessageGraph; + use crate::callback::CallbackHandler; + use crate::config::generate_certs; + use crate::pipeline::PipelineDCG; + use crate::{AppState, Settings}; + use axum_server::tls_rustls::RustlsConfig; + use std::sync::Arc; + use std::time::Duration; + use tokio::sync::mpsc; + + type Result = std::result::Result>; + + #[tokio::test] + async fn test_callback() -> Result<()> { + // Set up the CryptoProvider (controls core cryptography used by rustls) for the process + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let (cert, key) = generate_certs()?; + + let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into()) + .await + .map_err(|e| format!("Failed to create tls config {:?}", e))?; + + let settings = Settings { + app_listen_port: 3003, + ..Default::default() + }; + // We start the 'Serving' https server with an in-memory store + // When the server receives callback request, the in-memory store will be populated. + // This is verified at the end of the test. + let store = InMemoryStore::new(); + let message_graph = MessageGraph::from_pipeline(&PipelineDCG::default())?; + let (tx, _) = mpsc::channel(10); + + let mut app_state = AppState { + message: tx, + settings: Arc::new(settings), + callback_state: CallbackState::new(message_graph, store.clone()).await?, + }; + + // We use this value as the request id of the callback request + const ID_VALUE: &str = "1234"; + + // Register the request id in the store. This normally happens when the Serving source + // receives a request from the client. The callbacks for this request must only happen after this. + let _callback_notify_rx = app_state.callback_state.register(ID_VALUE.into()); + + let server_handle = tokio::spawn(start_main_server(app_state, tls_config)); + + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(2)) + .danger_accept_invalid_certs(true) + .build()?; + + // Wait for the server to be ready + let mut server_ready = false; + for _ in 0..10 { + let resp = client.get("https://localhost:3003/livez").send().await?; + if resp.status().is_success() { + server_ready = true; + break; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + assert!(server_ready, "Server is not ready"); + + let callback_handler = CallbackHandler::new("test".into(), 10); + + // On the server, this fails with SubGraphInvalidInput("Invalid callback: 1234, vertex: in") + // We get 200 OK response from the server, since we already registered this request ID in the store. + callback_handler + .callback( + ID_VALUE.into(), + "https://localhost:3003/v1/process/callback".into(), + "in".into(), + vec![], + ) + .await?; + let mut data = None; + for _ in 0..10 { + tokio::time::sleep(Duration::from_millis(2)).await; + data = { + let guard = store.data.lock().unwrap(); + guard.get(ID_VALUE).cloned() + }; + if data.is_some() { + break; + } + } + assert!(data.is_some(), "Callback data not found in store"); + server_handle.abort(); + Ok(()) + } +} diff --git a/rust/serving/src/config.rs b/rust/serving/src/config.rs index c485ad722c..d5e93ab6f2 100644 --- a/rust/serving/src/config.rs +++ b/rust/serving/src/config.rs @@ -18,6 +18,9 @@ const ENV_NUMAFLOW_SERVING_APP_PORT: &str = "NUMAFLOW_SERVING_APP_LISTEN_PORT"; const ENV_NUMAFLOW_SERVING_AUTH_TOKEN: &str = "NUMAFLOW_SERVING_AUTH_TOKEN"; const ENV_MIN_PIPELINE_SPEC: &str = "NUMAFLOW_SERVING_MIN_PIPELINE_SPEC"; +pub const DEFAULT_ID_HEADER: &str = "X-Numaflow-Id"; +pub const DEFAULT_CALLBACK_URL_HEADER_KEY: &str = "X-Numaflow-Callback-Url"; + pub fn generate_certs() -> std::result::Result<(Certificate, KeyPair), String> { let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()]) .map_err(|e| format!("Failed to generate cert {:?}", e))?; @@ -63,7 +66,7 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Self { - tid_header: "ID".to_owned(), + tid_header: DEFAULT_ID_HEADER.to_owned(), app_listen_port: 3000, metrics_server_listen_port: 3001, upstream_addr: "localhost:8888".to_owned(), @@ -175,7 +178,7 @@ mod tests { fn test_default_config() { let settings = Settings::default(); - assert_eq!(settings.tid_header, "ID"); + assert_eq!(settings.tid_header, "X-Numaflow-Id"); assert_eq!(settings.app_listen_port, 3000); assert_eq!(settings.metrics_server_listen_port, 3001); assert_eq!(settings.upstream_addr, "localhost:8888"); @@ -236,6 +239,7 @@ mod tests { conditions: None, }], }, + ..Default::default() }; assert_eq!(settings, expected_config); } diff --git a/rust/serving/src/lib.rs b/rust/serving/src/lib.rs index 7fe953616f..f49d0c9164 100644 --- a/rust/serving/src/lib.rs +++ b/rust/serving/src/lib.rs @@ -15,7 +15,7 @@ use crate::metrics::start_https_metrics_server; mod app; mod config; -pub use config::Settings; +pub use {config::Settings, config::DEFAULT_CALLBACK_URL_HEADER_KEY, config::DEFAULT_ID_HEADER}; mod consts; mod error; @@ -23,9 +23,10 @@ mod metrics; mod pipeline; pub mod source; +use source::MessageWrapper; pub use source::{Message, ServingSource}; -use crate::source::MessageWrapper; +pub mod callback; #[derive(Clone)] pub(crate) struct AppState { diff --git a/rust/serving/src/source.rs b/rust/serving/src/source.rs index 0efce51990..4c1d7ae35c 100644 --- a/rust/serving/src/source.rs +++ b/rust/serving/src/source.rs @@ -9,6 +9,7 @@ use tokio::time::Instant; use crate::app::callback::state::State as CallbackState; use crate::app::callback::store::redisstore::RedisConnection; use crate::app::tracker::MessageGraph; +use crate::config::{DEFAULT_CALLBACK_URL_HEADER_KEY, DEFAULT_ID_HEADER}; use crate::Settings; use crate::{Error, Result}; @@ -50,6 +51,7 @@ struct ServingSourceActor { /// has been successfully processed. tracker: HashMap>, vertex_replica_id: u16, + callback_url: String, } impl ServingSourceActor { @@ -72,12 +74,17 @@ impl ServingSourceActor { })?; let callback_state = CallbackState::new(msg_graph, redis_store).await?; + let callback_url = format!( + "https://{}:{}/v1/process/callback", + &settings.host_ip, &settings.app_listen_port + ); tokio::spawn(async move { let mut serving_actor = ServingSourceActor { messages: messages_rx, handler_rx, tracker: HashMap::new(), vertex_replica_id, + callback_url, }; serving_actor.run().await; }); @@ -140,10 +147,17 @@ impl ServingSourceActor { }; let MessageWrapper { confirm_save, - message, + mut message, } = message; self.tracker.insert(message.id.clone(), confirm_save); + message.headers.insert( + DEFAULT_CALLBACK_URL_HEADER_KEY.into(), + self.callback_url.clone(), + ); + message + .headers + .insert(DEFAULT_ID_HEADER.into(), message.id.clone()); messages.push(message); } Ok(messages) @@ -233,6 +247,9 @@ mod tests { type Result = std::result::Result>; #[tokio::test] async fn test_serving_source() -> Result<()> { + // Setup the CryptoProvider (controls core cryptography used by rustls) for the process + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let settings = Arc::new(Settings::default()); let serving_source = ServingSource::new(Arc::clone(&settings), 10, Duration::from_millis(1), 0).await?; diff --git a/test/api-e2e/api_test.go b/test/api-e2e/api_test.go index e1800ea678..b31db2aa04 100644 --- a/test/api-e2e/api_test.go +++ b/test/api-e2e/api_test.go @@ -311,13 +311,10 @@ func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPodsForPipeline() { } func (s *APISuite) TestMetricsAPIsForMonoVertex() { - _, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - w := s.Given().MonoVertex("@testdata/mono-vertex.yaml"). When(). CreateMonoVertexAndWait() - defer w.DeleteMonoVertexAndWait() + // defer w.DeleteMonoVertexAndWait() monoVertexName := "mono-vertex" diff --git a/test/monovertex-e2e/monovertex_test.go b/test/monovertex-e2e/monovertex_test.go index ba0c640527..105c6d74be 100644 --- a/test/monovertex-e2e/monovertex_test.go +++ b/test/monovertex-e2e/monovertex_test.go @@ -34,7 +34,7 @@ type MonoVertexSuite struct { func (s *MonoVertexSuite) TestMonoVertexWithTransformer() { w := s.Given().MonoVertex("@testdata/mono-vertex-with-transformer.yaml"). When().CreateMonoVertexAndWait() - defer w.DeleteMonoVertexAndWait() + // defer w.DeleteMonoVertexAndWait() w.Expect().MonoVertexPodsRunning().MvtxDaemonPodsRunning() diff --git a/test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml b/test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml index e491448505..c74d5583ad 100644 --- a/test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml +++ b/test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml @@ -3,6 +3,15 @@ kind: MonoVertex metadata: name: transformer-mono-vertex spec: + containerTemplate: + env: + - name: RUST_LOG + value: "debug" + - name: RUST_BACKTRACE + value: "1" + livenessProbe: + periodSeconds: 300 + initialDelaySeconds: 300 scale: min: 1 source: @@ -23,4 +32,4 @@ spec: env: - name: SINK_HASH_KEY # Use the name of the mono vertex as the key - value: "transformer-mono-vertex" \ No newline at end of file + value: "transformer-mono-vertex"