From 8d8b9e20b37ee214b33a06fed9fc54bab3b9a94d Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 3 Sep 2024 12:07:40 -0700 Subject: [PATCH] chore: convert onFailureStrategy to enum in rust (#2020) Signed-off-by: Sidhant Kohli --- rust/monovertex/src/config.rs | 222 +++++++++++++++++++++++++++++-- rust/monovertex/src/forwarder.rs | 17 +-- 2 files changed, 218 insertions(+), 21 deletions(-) diff --git a/rust/monovertex/src/config.rs b/rust/monovertex/src/config.rs index d1450500be..81b115422f 100644 --- a/rust/monovertex/src/config.rs +++ b/rust/monovertex/src/config.rs @@ -19,7 +19,47 @@ const DEFAULT_BATCH_SIZE: u64 = 500; const DEFAULT_TIMEOUT_IN_MS: u32 = 1000; const DEFAULT_MAX_SINK_RETRY_ATTEMPTS: u16 = u16::MAX; const DEFAULT_SINK_RETRY_INTERVAL_IN_MS: u32 = 1; -const DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY: &str = "retry"; +const DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY: OnFailureStrategy = OnFailureStrategy::Retry; + +#[derive(Debug, PartialEq, Clone)] +pub enum OnFailureStrategy { + Retry, + Fallback, + Drop, +} + +impl OnFailureStrategy { + /// Converts a string slice to an `OnFailureStrategy` enum variant. + /// Case insensitivity is considered to enhance usability. + /// + /// # Arguments + /// * `s` - A string slice representing the retry strategy. + /// + /// # Returns + /// An option containing the corresponding enum variant if successful, + /// or DefaultStrategy if the input does not match known variants. + fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "retry" => Some(OnFailureStrategy::Retry), + "fallback" => Some(OnFailureStrategy::Fallback), + "drop" => Some(OnFailureStrategy::Drop), + _ => Some(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY), + } + } + + /// Converts the `OnFailureStrategy` enum variant to a String. + /// This facilitates situations where the enum needs to be displayed or logged as a string. + /// + /// # Returns + /// A string representing the `OnFailureStrategy` enum variant. + fn to_string(&self) -> String { + match *self { + OnFailureStrategy::Retry => "retry".to_string(), + OnFailureStrategy::Fallback => "fallback".to_string(), + OnFailureStrategy::Drop => "drop".to_string(), + } + } +} pub fn config() -> &'static Settings { static CONF: OnceLock = OnceLock::new(); @@ -44,7 +84,7 @@ pub struct Settings { pub lag_refresh_interval_in_secs: u16, pub sink_max_retry_attempts: u16, pub sink_retry_interval_in_ms: u32, - pub sink_retry_on_fail_strategy: String, + pub sink_retry_on_fail_strategy: OnFailureStrategy, pub sink_default_retry_strategy: RetryStrategy, } @@ -73,7 +113,7 @@ impl Default for Settings { lag_refresh_interval_in_secs: DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS, sink_max_retry_attempts: DEFAULT_MAX_SINK_RETRY_ATTEMPTS, sink_retry_interval_in_ms: DEFAULT_SINK_RETRY_INTERVAL_IN_MS, - sink_retry_on_fail_strategy: DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY.to_string(), + sink_retry_on_fail_strategy: DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY, sink_default_retry_strategy: default_retry_strategy, } } @@ -161,15 +201,16 @@ impl Settings { } } - // Set the retry strategy using a direct reference whenever possible + // Set the retry strategy from the spec or use the default settings.sink_retry_on_fail_strategy = retry_strategy .on_failure .clone() - .unwrap_or_else(|| DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY.to_string()); + .and_then(|s| OnFailureStrategy::from_str(&s)) + .unwrap_or(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY); // check if the sink retry strategy is set to fallback and there is no fallback sink configured // then we should return an error - if settings.sink_retry_on_fail_strategy == "fallback" + if settings.sink_retry_on_fail_strategy == OnFailureStrategy::Fallback && !settings.is_fallback_enabled { return Err(Error::ConfigError( @@ -198,10 +239,12 @@ impl Settings { #[cfg(test)] mod tests { - use super::*; - use serde_json::json; use std::env; + use serde_json::json; + + use super::*; + #[test] fn test_settings_load_combined() { // Define all JSON test configurations in separate scopes to use them distinctively @@ -303,7 +346,118 @@ mod tests { // Execute and verify let settings = Settings::load().unwrap(); - assert_eq!(settings.sink_retry_on_fail_strategy, "retry"); + assert_eq!( + settings.sink_retry_on_fail_strategy, + DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY + ); + assert_eq!(settings.sink_max_retry_attempts, 5); + assert_eq!(settings.sink_retry_interval_in_ms, 1000); + env::remove_var(ENV_MONO_VERTEX_OBJ); + } + + { + // Test Non default Retry Strategy Load + let json_data = json!({ + "metadata": { + "name": "simple-mono-vertex", + "namespace": "default", + "creationTimestamp": null + }, + "spec": { + "replicas": 0, + "source": { + "udsource": { + "container": { + "image": "xxxxxxx", + "resources": {} + } + } + }, + "sink": { + "udsink": { + "container": { + "image": "xxxxxx", + "resources": {} + } + }, + "retryStrategy": { + "backoff": { + "interval": "1s", + "steps": 5 + }, + "onFailure": "drop" + }, + }, + "limits": { + "readBatchSize": 500, + "readTimeout": "1s" + }, + } + }); + let json_str = json_data.to_string(); + let encoded_json = BASE64_STANDARD.encode(json_str); + env::set_var(ENV_MONO_VERTEX_OBJ, encoded_json); + + // Execute and verify + let settings = Settings::load().unwrap(); + assert_eq!( + settings.sink_retry_on_fail_strategy, + OnFailureStrategy::Drop + ); + assert_eq!(settings.sink_max_retry_attempts, 5); + assert_eq!(settings.sink_retry_interval_in_ms, 1000); + env::remove_var(ENV_MONO_VERTEX_OBJ); + } + + { + // Test Invalid on failure strategy to use default + let json_data = json!({ + "metadata": { + "name": "simple-mono-vertex", + "namespace": "default", + "creationTimestamp": null + }, + "spec": { + "replicas": 0, + "source": { + "udsource": { + "container": { + "image": "xxxxxxx", + "resources": {} + } + } + }, + "sink": { + "udsink": { + "container": { + "image": "xxxxxx", + "resources": {} + } + }, + "retryStrategy": { + "backoff": { + "interval": "1s", + "steps": 5 + }, + "onFailure": "xxxxx" + }, + }, + "limits": { + "readBatchSize": 500, + "readTimeout": "1s" + }, + } + }); + let json_str = json_data.to_string(); + let encoded_json = BASE64_STANDARD.encode(json_str); + env::set_var(ENV_MONO_VERTEX_OBJ, encoded_json); + + // Execute and verify + let settings = Settings::load().unwrap(); + assert_eq!( + settings.sink_retry_on_fail_strategy, + DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY + ); assert_eq!(settings.sink_max_retry_attempts, 5); assert_eq!(settings.sink_retry_interval_in_ms, 1000); env::remove_var(ENV_MONO_VERTEX_OBJ); @@ -407,4 +561,54 @@ mod tests { // General cleanup env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE); } + + #[test] + fn test_on_failure_enum_from_str_valid_inputs() { + assert_eq!( + OnFailureStrategy::from_str("retry"), + Some(OnFailureStrategy::Retry) + ); + assert_eq!( + OnFailureStrategy::from_str("fallback"), + Some(OnFailureStrategy::Fallback) + ); + assert_eq!( + OnFailureStrategy::from_str("drop"), + Some(OnFailureStrategy::Drop) + ); + + // Testing case insensitivity + assert_eq!( + OnFailureStrategy::from_str("ReTry"), + Some(OnFailureStrategy::Retry) + ); + assert_eq!( + OnFailureStrategy::from_str("FALLBACK"), + Some(OnFailureStrategy::Fallback) + ); + assert_eq!( + OnFailureStrategy::from_str("Drop"), + Some(OnFailureStrategy::Drop) + ); + } + + #[test] + fn test_on_failure_enum_from_str_invalid_input() { + assert_eq!( + OnFailureStrategy::from_str("unknown"), + Some(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY) + ); // should return None for undefined inputs + } + + #[test] + fn test_on_failure_enum_to_string() { + let retry = OnFailureStrategy::Retry; + assert_eq!(retry.to_string(), "retry"); + + let fallback = OnFailureStrategy::Fallback; + assert_eq!(fallback.to_string(), "fallback"); + + let drop = OnFailureStrategy::Drop; + assert_eq!(drop.to_string(), "drop"); + } } diff --git a/rust/monovertex/src/forwarder.rs b/rust/monovertex/src/forwarder.rs index 0f55268576..1ba928123c 100644 --- a/rust/monovertex/src/forwarder.rs +++ b/rust/monovertex/src/forwarder.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::config::config; +use crate::config::{config, OnFailureStrategy}; use crate::error::{Error, Result}; use crate::message::{Message, Offset}; use crate::metrics; @@ -296,9 +296,9 @@ impl Forwarder { } // check what is the failure strategy in the config let strategy = config().sink_retry_on_fail_strategy.clone(); - match strategy.as_str() { + match strategy { // if we need to retry, return true - "retry" => { + OnFailureStrategy::Retry => { warn!( "Using onFailure Retry, Retry attempts {} completed", attempts @@ -306,7 +306,7 @@ impl Forwarder { return Ok(true); } // if we need to drop the messages, log and return false - "drop" => { + OnFailureStrategy::Drop => { // log that we are dropping the messages as requested warn!( "Dropping messages after {} attempts. Errors: {:?}", @@ -319,7 +319,7 @@ impl Forwarder { .inc_by(messages_to_send.len() as u64); } // if we need to move the messages to the fallback, return false - "fallback" => { + OnFailureStrategy::Fallback => { // log that we are moving the messages to the fallback as requested warn!( "Moving messages to fallback after {} attempts. Errors: {:?}", @@ -328,13 +328,6 @@ impl Forwarder { // move the messages to the fallback messages fallback_msgs.append(messages_to_send); } - // if the strategy is invalid, return an error - _ => { - return Err(Error::SinkError(format!( - "Invalid sink retry on fail strategy: {}", - strategy - ))); - } } // if we are done with the messages, break the loop Ok(false)