Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Aug 10, 2024
1 parent 1f0469d commit 4105fb4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 36 deletions.
2 changes: 1 addition & 1 deletion serving/source-sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub async fn run_forwarder(
&mut sink_client,
&mut transformer_client,
)
.await?;
.await?;

// TODO: use builder pattern of options like TIMEOUT, BATCH_SIZE, etc?
let mut forwarder =
Expand Down
56 changes: 21 additions & 35 deletions serving/source-sink/src/server_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,6 @@ pub(crate) struct ServerInfo {
/// check_for_server_compatibility waits until the server info file is ready and check whether the
/// server is compatible with Numaflow.
pub async fn check_for_server_compatibility(file_path: &str) -> error::Result<()> {
// Infinite loop to keep checking until the file is ready
// TODO: see whether we can move this to the code into `read_server_info`
loop {
// Check if the file exists and has content
if let Ok(metadata) = fs::metadata(file_path) {
if metadata.len() > 0 {
// Break out of the loop if the file is ready (has content)
break;
}
}
// Log message indicating the file is not ready and sleep for 1 second before checking again
info!("Server info file {} is not ready, waiting...", file_path);
sleep(Duration::from_secs(1)).await;
}

// Read the server info file
let server_info = read_server_info(file_path).await?;

Expand Down Expand Up @@ -193,6 +178,20 @@ fn check_sdk_compatibility(

/// Reads the server info file and returns the parsed ServerInfo struct.
async fn read_server_info(file_path: &str) -> error::Result<ServerInfo> {
// Infinite loop to keep checking until the file is ready
loop {
// Check if the file exists and has content
if let Ok(metadata) = fs::metadata(file_path) {
if metadata.len() > 0 {
// Break out of the loop if the file is ready (has content)
break;
}
}
// Log message indicating the file is not ready and sleep for 1 second before checking again
info!("Server info file {} is not ready, waiting...", file_path);
sleep(Duration::from_secs(1)).await;
}

// Retry logic for reading the file
let mut retry = 0;
let contents;
Expand Down Expand Up @@ -238,23 +237,18 @@ async fn read_server_info(file_path: &str) -> error::Result<ServerInfo> {

#[cfg(test)]
mod tests {
use std::{collections::HashMap, fs::File};
use std::io::{Read, Write};
use serde_json::json;
use std::io::{Read, Write};
use std::{collections::HashMap, fs::File};
use tempfile::tempdir;

use super::*;

// Alias Protocol as &str
const UDS: &str = "uds";
// Constants for the tests
const MINIMUM_NUMAFLOW_VERSION: &str = "1.2.0-rc4";
const TCP: &str = "tcp";
const PYTHON: &str = "python";
const GOLANG: &str = "go";
const JAVA: &str = "java";

// Constants
const MAP_MODE_KEY: &str = "MAP_MODE";
const MINIMUM_NUMAFLOW_VERSION: &str = "1.2.0-rc4";

async fn write_server_info(
svr_info: &ServerInfo,
Expand Down Expand Up @@ -538,17 +532,17 @@ mod tests {
"version": "v0.7.0-rc2",
"metadata": null
})
.to_string();
.to_string();

let expected_server_info = ServerInfo {
let _expected_server_info = ServerInfo {
protocol: "uds".to_string(),
language: "go".to_string(),
minimum_numaflow_version: "1.2.0-rc4".to_string(),
version: "v0.7.0-rc2".to_string(),
metadata: Some(HashMap::new()), // Expecting an empty HashMap here
};

let parsed_server_info: ServerInfo =
let _parsed_server_info: ServerInfo =
serde_json::from_str(&json_data).expect("Failed to parse JSON");
}

Expand Down Expand Up @@ -687,14 +681,6 @@ mod version {
platform,
}
}

/// Output the version as a string.
pub fn to_string(&self) -> String {
format!(
"Version: {}, BuildDate: {}, GitCommit: {}, GitTag: {}, GitTreeState: {}, GoVersion: {}, Compiler: {}, Platform: {}",
self.version, self.build_date, self.git_commit, self.git_tag, self.git_tree_state, self.go_version, self.compiler, self.platform
)
}
}

/// Use once_cell::sync::Lazy for thread-safe, one-time initialization
Expand Down

0 comments on commit 4105fb4

Please sign in to comment.