Skip to content

Commit

Permalink
feat: add server-info support and versioning to MonoVertex (#1918)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
kohlisid and vigith authored Aug 10, 2024
1 parent 39e03b8 commit 8b7a9a1
Show file tree
Hide file tree
Showing 5 changed files with 748 additions and 17 deletions.
26 changes: 26 additions & 0 deletions serving/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions serving/source-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ serde_json = "1.0.122"
numaflow-models = { path = "../numaflow-models"}
rcgen = "0.13.1"
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
serde = { version = "1.0.204", features = ["derive"] }
semver = "1.0"
pep440_rs = "0.6.6"

[dev-dependencies]
tower = "0.4.13"
Expand Down
3 changes: 3 additions & 0 deletions serving/source-sink/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum Error {

#[error("Config Error - {0}")]
ConfigError(String),

#[error("ServerInfoError Error - {0}")]
ServerInfoError(String),
}

impl From<tonic::Status> for Error {
Expand Down
38 changes: 21 additions & 17 deletions serving/source-sink/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::fs;
use std::time::Duration;

use tokio::signal;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{error, info};
use tracing::{error, info, warn};

pub(crate) use crate::error::Error;
use crate::forwarder::Forwarder;
Expand Down Expand Up @@ -36,6 +35,7 @@ pub mod forwarder;
pub mod config;

pub mod message;
mod server_info;
pub(crate) mod shared;

/// forwards a chunk of data from the source to the sink via an optional transformer.
Expand All @@ -46,18 +46,34 @@ pub async fn run_forwarder(
transformer_config: Option<TransformerConfig>,
custom_shutdown_rx: Option<oneshot::Receiver<()>>,
) -> Result<()> {
wait_for_server_info(&source_config.server_info_file).await?;
server_info::check_for_server_compatibility(&source_config.server_info_file)
.await
.map_err(|e| {
warn!("Error waiting for source server info file: {:?}", e);
Error::ForwarderError("Error waiting for server info file".to_string())
})?;
let mut source_client = SourceClient::connect(source_config).await?;

// start the lag reader to publish lag metrics
let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None);
lag_reader.start().await;

wait_for_server_info(&sink_config.server_info_file).await?;
server_info::check_for_server_compatibility(&sink_config.server_info_file)
.await
.map_err(|e| {
warn!("Error waiting for sink server info file: {:?}", e);
Error::ForwarderError("Error waiting for server info file".to_string())
})?;

let mut sink_client = SinkClient::connect(sink_config).await?;

let mut transformer_client = if let Some(config) = transformer_config {
wait_for_server_info(&config.server_info_file).await?;
server_info::check_for_server_compatibility(&config.server_info_file)
.await
.map_err(|e| {
warn!("Error waiting for transformer server info file: {:?}", e);
Error::ForwarderError("Error waiting for server info file".to_string())
})?;
Some(TransformerClient::connect(config).await?)
} else {
None
Expand Down Expand Up @@ -109,18 +125,6 @@ pub async fn run_forwarder(
Ok(())
}

async fn wait_for_server_info(file_path: &str) -> Result<()> {
loop {
if let Ok(metadata) = fs::metadata(file_path) {
if metadata.len() > 0 {
return Ok(());
}
}
info!("Server info file {} is not ready, waiting...", file_path);
sleep(Duration::from_secs(1)).await;
}
}

async fn wait_until_ready(
source_client: &mut SourceClient,
sink_client: &mut SinkClient,
Expand Down
Loading

0 comments on commit 8b7a9a1

Please sign in to comment.