diff --git a/rust/numaflow-core/src/config.rs b/rust/numaflow-core/src/config.rs index 167c2f1cd..6355f0d14 100644 --- a/rust/numaflow-core/src/config.rs +++ b/rust/numaflow-core/src/config.rs @@ -97,7 +97,7 @@ pub(crate) struct Settings { impl Settings { /// load based on the CRD type, either a pipeline or a monovertex. /// Settings are populated through reading the env vars set via the controller. The main - /// CRD is the base64 spec of the CR. + /// CRD is the base64 spec of the CR. fn load() -> Result { if let Ok(obj) = env::var(ENV_MONO_VERTEX_OBJ) { let cfg = MonovertexConfig::load(obj)?; @@ -112,7 +112,7 @@ impl Settings { custom_resource_type: CustomResourceType::Pipeline(cfg), }); } - Err(Error::Config("No configuration found".to_string())) + Err(Error::Config("No configuration found - env variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string())) } } diff --git a/rust/numaflow-core/src/config/pipeline.rs b/rust/numaflow-core/src/config/pipeline.rs index 255da3286..0f40017aa 100644 --- a/rust/numaflow-core/src/config/pipeline.rs +++ b/rust/numaflow-core/src/config/pipeline.rs @@ -25,6 +25,7 @@ const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120; const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL"; const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER"; const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD"; +const ENV_PAF_BATCH_SIZE: &str = "PAF_BATCH_SIZE"; const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED"; const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY"; const DEFAULT_CALLBACK_CONCURRENCY: usize = 100; @@ -296,9 +297,15 @@ impl PipelineConfig { .map(|(key, val)| (key.into(), val.into())) .filter(|(key, _val)| { // FIXME(cr): this filter is non-exhaustive, should we invert? - key == ENV_NUMAFLOW_SERVING_JETSTREAM_URL - || key == ENV_NUMAFLOW_SERVING_JETSTREAM_USER - || key == ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD + [ + ENV_NUMAFLOW_SERVING_JETSTREAM_URL, + ENV_NUMAFLOW_SERVING_JETSTREAM_USER, + ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD, + ENV_PAF_BATCH_SIZE, + ENV_CALLBACK_ENABLED, + ENV_CALLBACK_CONCURRENCY, + ] + .contains(&key.as_str()) }) .collect(); @@ -400,7 +407,7 @@ impl PipelineConfig { Ok(PipelineConfig { batch_size: batch_size as usize, - paf_concurrency: env::var("PAF_BATCH_SIZE") + paf_concurrency: get_var(ENV_PAF_BATCH_SIZE) .unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string()) .parse() .unwrap(), diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 2a672ec31..0ec903aa0 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -600,6 +600,9 @@ pub(crate) async fn start_metrics_https_server( addr: SocketAddr, metrics_state: UserDefinedContainerState, ) -> crate::Result<()> { + // Setup the CryptoProvider (controls core cryptography used by rustls) for the process + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + // Generate a self-signed certificate let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()]) .map_err(|e| Error::Metrics(format!("Generating self-signed certificate: {}", e)))?; diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index bf4faa2da..d09ffb32c 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -155,6 +155,12 @@ impl JetstreamReader { index: 0, }; + let metadata = crate::message::Metadata{ + // Copy previous vertex name from message id + previous_vertex: String::from_utf8_lossy(&message.id.vertex_name).into(), + }; + message.metadata = Some(metadata); + message.offset = Some(offset.clone()); message.id = message_id.clone(); diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index 040bcf700..9d64b1844 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -315,7 +315,7 @@ impl SinkWriter { for message in batch { let metadata = message.metadata.ok_or_else(|| { Error::Source(format!( - "Message does not contain previous vertex name in the metadata" + "Writing to Sink: message does not contain previous vertex name in the metadata" )) })?; callback_handler diff --git a/rust/numaflow/src/main.rs b/rust/numaflow/src/main.rs index 9a5ab6fe8..7af5a0b97 100644 --- a/rust/numaflow/src/main.rs +++ b/rust/numaflow/src/main.rs @@ -36,7 +36,13 @@ async fn run() -> Result<(), Box> { } else if args.contains(&"--rust".to_string()) { numaflow_core::run() .await - .map_err(|e| format!("Error running rust binary: {e:?}"))? + .map_err(|e| format!("Error running rust binary: {e:?}"))?; + } else { + return Err(format!( + "Invalid argument. Use --servesink, or --rust. Current args = {:?}", + args + ) + .into()); } - Err("Invalid argument. Use --servesink, or --rust".into()) + Ok(()) } diff --git a/rust/serving/src/app.rs b/rust/serving/src/app.rs index 0f33311e8..b73ac4743 100644 --- a/rust/serving/src/app.rs +++ b/rust/serving/src/app.rs @@ -82,6 +82,11 @@ where .layer( TraceLayer::new_for_http() .make_span_with(move |req: &Request| { + let req_path = req.uri().path(); + if ["/metrics", "/readyz", "/livez", "/sidecar-livez"].contains(&req_path) { + // We don't need request ID for these endpoints + return info_span!("request", method=?req.method(), path=req_path); + } let tid = req .headers() .get(&tid_header) @@ -94,10 +99,14 @@ where .get::() .map(MatchedPath::as_str); - info_span!("request", tid, method=?req.method(), path=req.uri().path(), matched_path) + info_span!("request", tid, method=?req.method(), path=req_path, matched_path) }) .on_response( |response: &Response, latency: Duration, _span: &Span| { + if response.status().is_server_error() { + // 5xx responses will be captured in on_failure at and logged at 'error' level + return; + } tracing::info!(status=?response.status(), ?latency) }, ) diff --git a/rust/serving/src/callback.rs b/rust/serving/src/callback.rs index ea9af4036..191d753bb 100644 --- a/rust/serving/src/callback.rs +++ b/rust/serving/src/callback.rs @@ -15,18 +15,16 @@ const CALLBACK_URL_HEADER_KEY: &str = "X-Numaflow-Callback-Url"; /// The data to be sent in the POST request #[derive(serde::Serialize)] struct CallbackPayload { - /// Name of the vertex - vertex: String, - /// Name of the pipeline - pipeline: String, /// Unique identifier of the message id: String, + /// Name of the vertex + vertex: String, /// Time when the callback was made cb_time: u64, - /// List of tags associated with the message - tags: Vec, /// Name of the vertex from which the message was sent from_vertex: String, + /// List of tags associated with the message + tags: Option>, } #[derive(Clone)] @@ -77,23 +75,25 @@ impl CallbackHandler { )) })? .to_owned(); - let uuid = message_headers.get(ID_HEADER).ok_or_else(|| { - Error::Source(format!("{ID_HEADER} is not found in message headers",)) - })?; + let uuid = message_headers + .get(ID_HEADER) + .ok_or_else(|| Error::Source(format!("{ID_HEADER} is not found in message headers",)))? + .to_owned(); let cb_time = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("System time is older than Unix epoch time") .as_millis() as u64; - let mut msg_tags = vec![]; + let mut msg_tags = None; if let Some(tags) = message_tags { - msg_tags = tags.iter().cloned().collect(); + if !tags.is_empty() { + msg_tags = Some(tags.iter().cloned().collect()); + } }; let callback_payload = CallbackPayload { vertex: self.vertex_name.clone(), - pipeline: self.pipeline_name.clone(), - id: uuid.to_owned(), + id: uuid.clone(), cb_time, tags: msg_tags, from_vertex: previous_vertex, @@ -105,7 +105,8 @@ impl CallbackHandler { let _permit = permit; let resp = client .post(callback_url) - .json(&callback_payload) + .header(ID_HEADER, uuid.clone()) + .json(&[callback_payload]) .send() .await .map_err(|e| Error::Source(format!("Sending callback request: {e:?}")))