Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement callbacks from vertices when serving is used as source #2311

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b5e3e85
Tested callbacks with monovertex
BulkBeing Jan 7, 2025
94ad530
Callbacks are working with pipeline
BulkBeing Jan 9, 2025
b8dbb6d
Retries when callback fails
BulkBeing Jan 13, 2025
d86e41d
Unit test for callbacks
BulkBeing Jan 13, 2025
b65f928
Formatting
BulkBeing Jan 13, 2025
d68abf9
Unit tests callback failures
BulkBeing Jan 13, 2025
011a649
Callback after resolving PAFs
BulkBeing Jan 15, 2025
4e04250
Merge changes from main
BulkBeing Jan 15, 2025
dd948d1
Merge branch 'main' into serving-callbacks
BulkBeing Jan 15, 2025
abf5b89
Avoid Bytes to String conversion in tracker
BulkBeing Jan 16, 2025
a1b52dc
Move callback handler to tracker
BulkBeing Jan 16, 2025
30e8a11
Use let-else pattern
BulkBeing Jan 16, 2025
d6b40e7
Make callback from tracker
BulkBeing Jan 16, 2025
45dfcd3
Single update method for Tracker handle
BulkBeing Jan 17, 2025
e6467a2
Refactoring
BulkBeing Jan 17, 2025
5e7fc1e
Debugging test failure
BulkBeing Jan 17, 2025
22a2a8a
Debugging test failure - increase timeout
BulkBeing Jan 17, 2025
53fb296
Debugging test failure - print kubectl logs
BulkBeing Jan 20, 2025
d542634
Merge branch main into current branch
BulkBeing Jan 20, 2025
ee1374f
Debugging test failure - mark test failure step in Makefile as success
BulkBeing Jan 20, 2025
141cd00
Debugging test failure - List pods from numaflow-system namespace
BulkBeing Jan 20, 2025
0400edd
Debugging test failure - print daemon pod logs
BulkBeing Jan 20, 2025
75382e8
Debugging test failure - Disable successful tests
BulkBeing Jan 20, 2025
9902d8b
Debugging test failure - tail -f logs
BulkBeing Jan 20, 2025
fe373be
Debugging test failure - print previous termination state
BulkBeing Jan 20, 2025
99b1cf5
Debugging test failure - Avoid pod deletion on test failure
BulkBeing Jan 20, 2025
772c4ea
Debugging test failure - tail logs with Go code
BulkBeing Jan 20, 2025
ac571e6
Debugging test failure - Increase liveness periods
BulkBeing Jan 20, 2025
9a1b7ca
Debugging test failure - Sleep if error occurs
BulkBeing Jan 20, 2025
ea5d6ab
Debugging test failure - Run docker image directly
BulkBeing Jan 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use crate::Result;
const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";
const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT";

const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED";
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;

/// Building blocks (Source, Sink, Transformer, FallBack, Metrics, etc.) to build a Pipeline or a
/// MonoVertex.
pub(crate) mod components;
Expand Down Expand Up @@ -97,7 +101,7 @@ pub(crate) struct Settings {
impl Settings {
/// load based on the CRD type, either a pipeline or a monovertex.
/// Settings are populated through reading the env vars set via the controller. The main
/// CRD is the base64 spec of the CR.
/// CRD is the base64 spec of the CR.
fn load() -> Result<Self> {
if let Ok(obj) = env::var(ENV_MONO_VERTEX_OBJ) {
let cfg = MonovertexConfig::load(obj)?;
Expand All @@ -112,7 +116,7 @@ impl Settings {
custom_resource_type: CustomResourceType::Pipeline(cfg),
});
}
Err(Error::Config("No configuration found".to_string()))
Err(Error::Config("No configuration found - environment variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string()))
}
}

Expand Down
22 changes: 22 additions & 0 deletions rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use crate::config::monovertex::sink::SinkType;
use crate::error::Error;
use crate::Result;

use super::pipeline::ServingCallbackConfig;

use super::{DEFAULT_CALLBACK_CONCURRENCY, ENV_CALLBACK_CONCURRENCY, ENV_CALLBACK_ENABLED};

const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
Expand All @@ -33,6 +37,7 @@ pub(crate) struct MonovertexConfig {
pub(crate) transformer_config: Option<TransformerConfig>,
pub(crate) fb_sink_config: Option<SinkConfig>,
pub(crate) metrics_config: MetricsConfig,
pub(crate) callback_config: Option<ServingCallbackConfig>,
}

impl Default for MonovertexConfig {
Expand All @@ -53,6 +58,7 @@ impl Default for MonovertexConfig {
transformer_config: None,
fb_sink_config: None,
metrics_config: MetricsConfig::default(),
callback_config: None,
}
}
}
Expand Down Expand Up @@ -143,6 +149,21 @@ impl MonovertexConfig {
.and_then(|scale| scale.lookback_seconds.map(|x| x as u16))
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = env::var(ENV_CALLBACK_ENABLED) {
let callback_concurrency: usize = env::var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
.map_err(|e| {
Error::Config(format!(
"Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}"
))
})?;
callback_config = Some(ServingCallbackConfig {
callback_concurrency,
});
}

Ok(MonovertexConfig {
name: mono_vertex_name,
replica: *get_vertex_replica(),
Expand All @@ -153,6 +174,7 @@ impl MonovertexConfig {
sink_config,
transformer_config,
fb_sink_config,
callback_config,
})
}
}
Expand Down
50 changes: 42 additions & 8 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ use crate::config::pipeline::map::MapVtxConfig;
use crate::error::Error;
use crate::Result;

use super::{DEFAULT_CALLBACK_CONCURRENCY, ENV_CALLBACK_CONCURRENCY, ENV_CALLBACK_ENABLED};

const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL";
const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER";
const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD";
const ENV_PAF_BATCH_SIZE: &str = "PAF_BATCH_SIZE";
const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
const DEFAULT_MAP_SOCKET: &str = "/var/run/numaflow/map.sock";
pub(crate) const DEFAULT_BATCH_MAP_SOCKET: &str = "/var/run/numaflow/batchmap.sock";
Expand All @@ -47,6 +50,12 @@ pub(crate) struct PipelineConfig {
pub(crate) to_vertex_config: Vec<ToVertexConfig>,
pub(crate) vertex_config: VertexType,
pub(crate) metrics_config: MetricsConfig,
pub(crate) callback_config: Option<ServingCallbackConfig>,
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ServingCallbackConfig {
pub(crate) callback_concurrency: usize,
}

impl Default for PipelineConfig {
Expand All @@ -66,6 +75,7 @@ impl Default for PipelineConfig {
transformer_config: None,
}),
metrics_config: Default::default(),
callback_config: None,
}
}
}
Expand Down Expand Up @@ -286,9 +296,15 @@ impl PipelineConfig {
.map(|(key, val)| (key.into(), val.into()))
.filter(|(key, _val)| {
// FIXME(cr): this filter is non-exhaustive, should we invert?
key == ENV_NUMAFLOW_SERVING_JETSTREAM_URL
|| key == ENV_NUMAFLOW_SERVING_JETSTREAM_USER
|| key == ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD
[
ENV_NUMAFLOW_SERVING_JETSTREAM_URL,
ENV_NUMAFLOW_SERVING_JETSTREAM_USER,
ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD,
ENV_PAF_BATCH_SIZE,
ENV_CALLBACK_ENABLED,
ENV_CALLBACK_CONCURRENCY,
]
.contains(&key.as_str())
})
.collect();

Expand Down Expand Up @@ -373,10 +389,25 @@ impl PipelineConfig {
.and_then(|scale| scale.lookback_seconds.map(|x| x as u16))
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = get_var(ENV_CALLBACK_ENABLED) {
let callback_concurrency: usize = get_var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
.map_err(|e| {
Error::Config(format!(
"Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}"
))
})?;
callback_config = Some(ServingCallbackConfig {
callback_concurrency,
});
}

Ok(PipelineConfig {
batch_size: batch_size as usize,
paf_concurrency: env::var("PAF_BATCH_SIZE")
.unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string())
paf_concurrency: get_var(ENV_PAF_BATCH_SIZE)
.unwrap_or_else(|_| (DEFAULT_BATCH_SIZE * 2).to_string())
.parse()
.unwrap(),
read_timeout: Duration::from_millis(timeout_in_ms as u64),
Expand All @@ -388,6 +419,7 @@ impl PipelineConfig {
to_vertex_config,
vertex_config: vertex,
metrics_config: MetricsConfig::with_lookback_window_in_secs(look_back_window),
callback_config,
})
}
}
Expand Down Expand Up @@ -419,6 +451,7 @@ mod tests {
transformer_config: None,
}),
metrics_config: Default::default(),
callback_config: None,
};

let config = PipelineConfig::default();
Expand Down Expand Up @@ -485,6 +518,7 @@ mod tests {
lag_refresh_interval_in_secs: 3,
lookback_window_in_secs: 120,
},
..Default::default()
};
assert_eq!(pipeline_config, expected);
}
Expand Down Expand Up @@ -536,7 +570,7 @@ mod tests {
},
transformer_config: None,
}),
metrics_config: Default::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down Expand Up @@ -588,7 +622,7 @@ mod tests {
},
transformer_config: None,
}),
metrics_config: Default::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down Expand Up @@ -704,7 +738,7 @@ mod tests {
}),
map_mode: MapMode::Unary,
}),
metrics_config: MetricsConfig::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down
Loading
Loading