Skip to content

Commit

Permalink
Callbacks are working with pipeline
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 9, 2025
1 parent b5e3e85 commit 94ad530
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 24 deletions.
4 changes: 2 additions & 2 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub(crate) struct Settings {
impl Settings {
/// load based on the CRD type, either a pipeline or a monovertex.
/// Settings are populated through reading the env vars set via the controller. The main
/// CRD is the base64 spec of the CR.
/// CRD is the base64 spec of the CR.
fn load() -> Result<Self> {
if let Ok(obj) = env::var(ENV_MONO_VERTEX_OBJ) {
let cfg = MonovertexConfig::load(obj)?;
Expand All @@ -112,7 +112,7 @@ impl Settings {
custom_resource_type: CustomResourceType::Pipeline(cfg),
});
}
Err(Error::Config("No configuration found".to_string()))
Err(Error::Config("No configuration found - env variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string()))
}
}

Expand Down
15 changes: 11 additions & 4 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL";
const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER";
const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD";
const ENV_PAF_BATCH_SIZE: &str = "PAF_BATCH_SIZE";
const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED";
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;
Expand Down Expand Up @@ -296,9 +297,15 @@ impl PipelineConfig {
.map(|(key, val)| (key.into(), val.into()))
.filter(|(key, _val)| {
// FIXME(cr): this filter is non-exhaustive, should we invert?
key == ENV_NUMAFLOW_SERVING_JETSTREAM_URL
|| key == ENV_NUMAFLOW_SERVING_JETSTREAM_USER
|| key == ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD
[
ENV_NUMAFLOW_SERVING_JETSTREAM_URL,
ENV_NUMAFLOW_SERVING_JETSTREAM_USER,
ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD,
ENV_PAF_BATCH_SIZE,
ENV_CALLBACK_ENABLED,
ENV_CALLBACK_CONCURRENCY,
]
.contains(&key.as_str())
})
.collect();

Expand Down Expand Up @@ -400,7 +407,7 @@ impl PipelineConfig {

Ok(PipelineConfig {
batch_size: batch_size as usize,
paf_concurrency: env::var("PAF_BATCH_SIZE")
paf_concurrency: get_var(ENV_PAF_BATCH_SIZE)
.unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string())
.parse()
.unwrap(),
Expand Down
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ pub(crate) async fn start_metrics_https_server(
addr: SocketAddr,
metrics_state: UserDefinedContainerState,
) -> crate::Result<()> {
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

// Generate a self-signed certificate
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| Error::Metrics(format!("Generating self-signed certificate: {}", e)))?;
Expand Down
6 changes: 6 additions & 0 deletions rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ impl JetstreamReader {
index: 0,
};

let metadata = crate::message::Metadata{
// Copy previous vertex name from message id
previous_vertex: String::from_utf8_lossy(&message.id.vertex_name).into(),
};
message.metadata = Some(metadata);

message.offset = Some(offset.clone());
message.id = message_id.clone();

Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl SinkWriter {
for message in batch {
let metadata = message.metadata.ok_or_else(|| {
Error::Source(format!(
"Message does not contain previous vertex name in the metadata"
"Writing to Sink: message does not contain previous vertex name in the metadata"
))
})?;
callback_handler
Expand Down
10 changes: 8 additions & 2 deletions rust/numaflow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ async fn run() -> Result<(), Box<dyn Error>> {
} else if args.contains(&"--rust".to_string()) {
numaflow_core::run()
.await
.map_err(|e| format!("Error running rust binary: {e:?}"))?
.map_err(|e| format!("Error running rust binary: {e:?}"))?;
} else {
return Err(format!(
"Invalid argument. Use --servesink, or --rust. Current args = {:?}",
args
)
.into());
}
Err("Invalid argument. Use --servesink, or --rust".into())
Ok(())
}
11 changes: 10 additions & 1 deletion rust/serving/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ where
.layer(
TraceLayer::new_for_http()
.make_span_with(move |req: &Request<Body>| {
let req_path = req.uri().path();
if ["/metrics", "/readyz", "/livez", "/sidecar-livez"].contains(&req_path) {
// We don't need request ID for these endpoints
return info_span!("request", method=?req.method(), path=req_path);
}
let tid = req
.headers()
.get(&tid_header)
Expand All @@ -94,10 +99,14 @@ where
.get::<MatchedPath>()
.map(MatchedPath::as_str);

info_span!("request", tid, method=?req.method(), path=req.uri().path(), matched_path)
info_span!("request", tid, method=?req.method(), path=req_path, matched_path)
})
.on_response(
|response: &Response<Body>, latency: Duration, _span: &Span| {
if response.status().is_server_error() {
// 5xx responses will be captured in on_failure at and logged at 'error' level
return;
}
tracing::info!(status=?response.status(), ?latency)
},
)
Expand Down
29 changes: 15 additions & 14 deletions rust/serving/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@ const CALLBACK_URL_HEADER_KEY: &str = "X-Numaflow-Callback-Url";
/// The data to be sent in the POST request
#[derive(serde::Serialize)]
struct CallbackPayload {
/// Name of the vertex
vertex: String,
/// Name of the pipeline
pipeline: String,
/// Unique identifier of the message
id: String,
/// Name of the vertex
vertex: String,
/// Time when the callback was made
cb_time: u64,
/// List of tags associated with the message
tags: Vec<String>,
/// Name of the vertex from which the message was sent
from_vertex: String,
/// List of tags associated with the message
tags: Option<Vec<String>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -77,23 +75,25 @@ impl CallbackHandler {
))
})?
.to_owned();
let uuid = message_headers.get(ID_HEADER).ok_or_else(|| {
Error::Source(format!("{ID_HEADER} is not found in message headers",))
})?;
let uuid = message_headers
.get(ID_HEADER)
.ok_or_else(|| Error::Source(format!("{ID_HEADER} is not found in message headers",)))?
.to_owned();
let cb_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time is older than Unix epoch time")
.as_millis() as u64;

let mut msg_tags = vec![];
let mut msg_tags = None;
if let Some(tags) = message_tags {
msg_tags = tags.iter().cloned().collect();
if !tags.is_empty() {
msg_tags = Some(tags.iter().cloned().collect());
}
};

let callback_payload = CallbackPayload {
vertex: self.vertex_name.clone(),
pipeline: self.pipeline_name.clone(),
id: uuid.to_owned(),
id: uuid.clone(),
cb_time,
tags: msg_tags,
from_vertex: previous_vertex,
Expand All @@ -105,7 +105,8 @@ impl CallbackHandler {
let _permit = permit;
let resp = client
.post(callback_url)
.json(&callback_payload)
.header(ID_HEADER, uuid.clone())
.json(&[callback_payload])
.send()
.await
.map_err(|e| Error::Source(format!("Sending callback request: {e:?}")))
Expand Down

0 comments on commit 94ad530

Please sign in to comment.