Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement callbacks from vertices when serving is used as source #2311

Merged
merged 35 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b5e3e85
Tested callbacks with monovertex
BulkBeing Jan 7, 2025
94ad530
Callbacks are working with pipeline
BulkBeing Jan 9, 2025
b8dbb6d
Retries when callback fails
BulkBeing Jan 13, 2025
d86e41d
Unit test for callbacks
BulkBeing Jan 13, 2025
b65f928
Formatting
BulkBeing Jan 13, 2025
d68abf9
Unit tests callback failures
BulkBeing Jan 13, 2025
011a649
Callback after resolving PAFs
BulkBeing Jan 15, 2025
4e04250
Merge changes from main
BulkBeing Jan 15, 2025
dd948d1
Merge branch 'main' into serving-callbacks
BulkBeing Jan 15, 2025
abf5b89
Avoid Bytes to String conversion in tracker
BulkBeing Jan 16, 2025
a1b52dc
Move callback handler to tracker
BulkBeing Jan 16, 2025
30e8a11
Use let-else pattern
BulkBeing Jan 16, 2025
d6b40e7
Make callback from tracker
BulkBeing Jan 16, 2025
45dfcd3
Single update method for Tracker handle
BulkBeing Jan 17, 2025
e6467a2
Refactoring
BulkBeing Jan 17, 2025
5e7fc1e
Debugging test failure
BulkBeing Jan 17, 2025
22a2a8a
Debugging test failure - increase timeout
BulkBeing Jan 17, 2025
53fb296
Debugging test failure - print kubectl logs
BulkBeing Jan 20, 2025
d542634
Merge branch main into current branch
BulkBeing Jan 20, 2025
ee1374f
Debugging test failure - mark test failure step in Makefile as success
BulkBeing Jan 20, 2025
141cd00
Debugging test failure - List pods from numaflow-system namespace
BulkBeing Jan 20, 2025
0400edd
Debugging test failure - print daemon pod logs
BulkBeing Jan 20, 2025
75382e8
Debugging test failure - Disable successful tests
BulkBeing Jan 20, 2025
9902d8b
Debugging test failure - tail -f logs
BulkBeing Jan 20, 2025
fe373be
Debugging test failure - print previous termination state
BulkBeing Jan 20, 2025
99b1cf5
Debugging test failure - Avoid pod deletion on test failure
BulkBeing Jan 20, 2025
772c4ea
Debugging test failure - tail logs with Go code
BulkBeing Jan 20, 2025
ac571e6
Debugging test failure - Increase liveness periods
BulkBeing Jan 20, 2025
9a1b7ca
Debugging test failure - Sleep if error occurs
BulkBeing Jan 20, 2025
ea5d6ab
Debugging test failure - Run docker image directly
BulkBeing Jan 20, 2025
00b9ae9
Revert changes made for debugging
BulkBeing Jan 21, 2025
bd99be6
Unit test for callback retry
BulkBeing Jan 21, 2025
8ba4992
Unit test for tracker with callback handler
BulkBeing Jan 21, 2025
5524bba
chore: code review
vigith Jan 21, 2025
8ccb4f1
chore: cargo fmt
vigith Jan 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub(crate) struct Settings {
impl Settings {
/// load based on the CRD type, either a pipeline or a monovertex.
/// Settings are populated through reading the env vars set via the controller. The main
/// CRD is the base64 spec of the CR.
/// CRD is the base64 spec of the CR.
fn load() -> Result<Self> {
if let Ok(obj) = env::var(ENV_MONO_VERTEX_OBJ) {
let cfg = MonovertexConfig::load(obj)?;
Expand All @@ -112,7 +112,7 @@ impl Settings {
custom_resource_type: CustomResourceType::Pipeline(cfg),
});
}
Err(Error::Config("No configuration found".to_string()))
Err(Error::Config("No configuration found - env variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string()))
}
}

Expand Down
24 changes: 24 additions & 0 deletions rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ use crate::config::monovertex::sink::SinkType;
use crate::error::Error;
use crate::Result;

use super::pipeline::ServingCallbackConfig;

const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;

const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED"; //FIXME: duplicates
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct MonovertexConfig {
pub(crate) name: String,
Expand All @@ -33,6 +39,7 @@ pub(crate) struct MonovertexConfig {
pub(crate) transformer_config: Option<TransformerConfig>,
pub(crate) fb_sink_config: Option<SinkConfig>,
pub(crate) metrics_config: MetricsConfig,
pub(crate) callback_config: Option<ServingCallbackConfig>,
}

impl Default for MonovertexConfig {
Expand All @@ -53,6 +60,7 @@ impl Default for MonovertexConfig {
transformer_config: None,
fb_sink_config: None,
metrics_config: MetricsConfig::default(),
callback_config: None,
}
}
}
Expand Down Expand Up @@ -143,6 +151,21 @@ impl MonovertexConfig {
.and_then(|scale| scale.lookback_seconds.map(|x| x as u16))
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = env::var(ENV_CALLBACK_ENABLED) {
let callback_concurrency: usize = env::var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
.map_err(|e| {
Error::Config(format!(
"Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}"
))
})?;
callback_config = Some(ServingCallbackConfig {
callback_concurrency,
});
}

Ok(MonovertexConfig {
name: mono_vertex_name,
replica: *get_vertex_replica(),
Expand All @@ -153,6 +176,7 @@ impl MonovertexConfig {
sink_config,
transformer_config,
fb_sink_config,
callback_config,
})
}
}
Expand Down
49 changes: 42 additions & 7 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL";
const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER";
const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD";
const ENV_PAF_BATCH_SIZE: &str = "PAF_BATCH_SIZE";
const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED";
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;
const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
const DEFAULT_MAP_SOCKET: &str = "/var/run/numaflow/map.sock";
pub(crate) const DEFAULT_BATCH_MAP_SOCKET: &str = "/var/run/numaflow/batchmap.sock";
Expand All @@ -47,6 +51,12 @@ pub(crate) struct PipelineConfig {
pub(crate) to_vertex_config: Vec<ToVertexConfig>,
pub(crate) vertex_config: VertexType,
pub(crate) metrics_config: MetricsConfig,
pub(crate) callback_config: Option<ServingCallbackConfig>,
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ServingCallbackConfig {
pub(crate) callback_concurrency: usize,
}

impl Default for PipelineConfig {
Expand All @@ -66,6 +76,7 @@ impl Default for PipelineConfig {
transformer_config: None,
}),
metrics_config: Default::default(),
callback_config: None,
}
}
}
Expand Down Expand Up @@ -286,9 +297,15 @@ impl PipelineConfig {
.map(|(key, val)| (key.into(), val.into()))
.filter(|(key, _val)| {
// FIXME(cr): this filter is non-exhaustive, should we invert?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

key == ENV_NUMAFLOW_SERVING_JETSTREAM_URL
|| key == ENV_NUMAFLOW_SERVING_JETSTREAM_USER
|| key == ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD
[
ENV_NUMAFLOW_SERVING_JETSTREAM_URL,
ENV_NUMAFLOW_SERVING_JETSTREAM_USER,
ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD,
ENV_PAF_BATCH_SIZE,
ENV_CALLBACK_ENABLED,
ENV_CALLBACK_CONCURRENCY,
]
.contains(&key.as_str())
})
.collect();

Expand Down Expand Up @@ -373,9 +390,24 @@ impl PipelineConfig {
.and_then(|scale| scale.lookback_seconds.map(|x| x as u16))
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = get_var(ENV_CALLBACK_ENABLED) {
let callback_concurrency: usize = get_var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
.map_err(|e| {
Error::Config(format!(
"Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}"
))
})?;
callback_config = Some(ServingCallbackConfig {
callback_concurrency,
});
}

Ok(PipelineConfig {
batch_size: batch_size as usize,
paf_concurrency: env::var("PAF_BATCH_SIZE")
paf_concurrency: get_var(ENV_PAF_BATCH_SIZE)
.unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string())
.parse()
.unwrap(),
Expand All @@ -388,6 +420,7 @@ impl PipelineConfig {
to_vertex_config,
vertex_config: vertex,
metrics_config: MetricsConfig::with_lookback_window_in_secs(look_back_window),
callback_config,
})
}
}
Expand Down Expand Up @@ -419,6 +452,7 @@ mod tests {
transformer_config: None,
}),
metrics_config: Default::default(),
callback_config: None,
};

let config = PipelineConfig::default();
Expand Down Expand Up @@ -485,6 +519,7 @@ mod tests {
lag_refresh_interval_in_secs: 3,
lookback_window_in_secs: 120,
},
..Default::default()
};
assert_eq!(pipeline_config, expected);
}
Expand Down Expand Up @@ -536,7 +571,7 @@ mod tests {
},
transformer_config: None,
}),
metrics_config: Default::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down Expand Up @@ -588,7 +623,7 @@ mod tests {
},
transformer_config: None,
}),
metrics_config: Default::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down Expand Up @@ -704,7 +739,7 @@ mod tests {
}),
map_mode: MapMode::Unary,
}),
metrics_config: MetricsConfig::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down
9 changes: 9 additions & 0 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (output_tx, mut output_rx) = mpsc::channel(10);
Expand Down Expand Up @@ -646,6 +647,7 @@ mod tests {
index: i,
},
headers: Default::default(),
metadata: None,
};
input_tx.send(message).await.unwrap();
}
Expand Down Expand Up @@ -735,6 +737,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

input_tx.send(message).await.unwrap();
Expand Down Expand Up @@ -829,6 +832,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
},
Message {
keys: Arc::from(vec!["second".into()]),
Expand All @@ -842,6 +846,7 @@ mod tests {
index: 1,
},
headers: Default::default(),
metadata: None,
},
];

Expand Down Expand Up @@ -939,6 +944,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
},
Message {
keys: Arc::from(vec!["second".into()]),
Expand All @@ -952,6 +958,7 @@ mod tests {
index: 1,
},
headers: Default::default(),
metadata: None,
},
];

Expand Down Expand Up @@ -1049,6 +1056,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (input_tx, input_rx) = mpsc::channel(10);
Expand Down Expand Up @@ -1145,6 +1153,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (input_tx, input_rx) = mpsc::channel(10);
Expand Down
6 changes: 6 additions & 0 deletions rust/numaflow-core/src/mapper/map/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async fn process_response(sender_map: &ResponseSenderMap, resp: MapResponse) {
offset: Some(msg_info.offset.clone()),
event_time: msg_info.event_time,
headers: msg_info.headers.clone(),
metadata: None,
};
response_messages.push(message);
}
Expand Down Expand Up @@ -387,6 +388,7 @@ impl UserDefinedStreamMap {
offset: None,
event_time: message_info.event_time,
headers: message_info.headers.clone(),
metadata: None,
};
response_sender
.send(Ok(message))
Expand Down Expand Up @@ -496,6 +498,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (tx, rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -586,6 +589,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
},
crate::message::Message {
keys: Arc::from(vec!["second".into()]),
Expand All @@ -602,6 +606,7 @@ mod tests {
index: 1,
},
headers: Default::default(),
metadata: None,
},
];

Expand Down Expand Up @@ -701,6 +706,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (tx, mut rx) = tokio::sync::mpsc::channel(3);
Expand Down
9 changes: 9 additions & 0 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub(crate) struct Message {
pub(crate) id: MessageID,
/// headers of the message
pub(crate) headers: HashMap<String, String>,
pub(crate) metadata: Option<Metadata>,
}

#[derive(Debug, Clone)]
pub(crate) struct Metadata {
// name of the previous vertex.
pub(crate) previous_vertex: String,
}

/// Offset of the message which will be used to acknowledge the message.
Expand Down Expand Up @@ -212,6 +219,7 @@ impl TryFrom<Bytes> for Message {
event_time: utc_from_timestamp(message_info.event_time),
id: id.into(),
headers: header.headers,
metadata: None,
})
}
}
Expand Down Expand Up @@ -263,6 +271,7 @@ mod tests {
index: 0,
},
headers: HashMap::new(),
metadata: None,
};

let result: Result<BytesMut> = message.clone().try_into();
Expand Down
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ pub(crate) async fn start_metrics_https_server(
addr: SocketAddr,
metrics_state: UserDefinedContainerState,
) -> crate::Result<()> {
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

// Generate a self-signed certificate
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| Error::Metrics(format!("Generating self-signed certificate: {}", e)))?;
Expand Down
Loading
Loading