Skip to content

Commit

Permalink
chore: convert onFailureStrategy to enum in rust (#2020)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Sep 3, 2024
1 parent 14cdff5 commit 8d8b9e2
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 21 deletions.
222 changes: 213 additions & 9 deletions rust/monovertex/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,47 @@ const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_MAX_SINK_RETRY_ATTEMPTS: u16 = u16::MAX;
const DEFAULT_SINK_RETRY_INTERVAL_IN_MS: u32 = 1;
const DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY: &str = "retry";
const DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY: OnFailureStrategy = OnFailureStrategy::Retry;

#[derive(Debug, PartialEq, Clone)]
pub enum OnFailureStrategy {
Retry,
Fallback,
Drop,
}

impl OnFailureStrategy {
/// Converts a string slice to an `OnFailureStrategy` enum variant.
/// Case insensitivity is considered to enhance usability.
///
/// # Arguments
/// * `s` - A string slice representing the retry strategy.
///
/// # Returns
/// An option containing the corresponding enum variant if successful,
/// or DefaultStrategy if the input does not match known variants.
fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"retry" => Some(OnFailureStrategy::Retry),
"fallback" => Some(OnFailureStrategy::Fallback),
"drop" => Some(OnFailureStrategy::Drop),
_ => Some(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY),
}
}

/// Converts the `OnFailureStrategy` enum variant to a String.
/// This facilitates situations where the enum needs to be displayed or logged as a string.
///
/// # Returns
/// A string representing the `OnFailureStrategy` enum variant.
fn to_string(&self) -> String {
match *self {
OnFailureStrategy::Retry => "retry".to_string(),
OnFailureStrategy::Fallback => "fallback".to_string(),
OnFailureStrategy::Drop => "drop".to_string(),
}
}
}

pub fn config() -> &'static Settings {
static CONF: OnceLock<Settings> = OnceLock::new();
Expand All @@ -44,7 +84,7 @@ pub struct Settings {
pub lag_refresh_interval_in_secs: u16,
pub sink_max_retry_attempts: u16,
pub sink_retry_interval_in_ms: u32,
pub sink_retry_on_fail_strategy: String,
pub sink_retry_on_fail_strategy: OnFailureStrategy,
pub sink_default_retry_strategy: RetryStrategy,
}

Expand Down Expand Up @@ -73,7 +113,7 @@ impl Default for Settings {
lag_refresh_interval_in_secs: DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS,
sink_max_retry_attempts: DEFAULT_MAX_SINK_RETRY_ATTEMPTS,
sink_retry_interval_in_ms: DEFAULT_SINK_RETRY_INTERVAL_IN_MS,
sink_retry_on_fail_strategy: DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY.to_string(),
sink_retry_on_fail_strategy: DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY,
sink_default_retry_strategy: default_retry_strategy,
}
}
Expand Down Expand Up @@ -161,15 +201,16 @@ impl Settings {
}
}

// Set the retry strategy using a direct reference whenever possible
// Set the retry strategy from the spec or use the default
settings.sink_retry_on_fail_strategy = retry_strategy
.on_failure
.clone()
.unwrap_or_else(|| DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY.to_string());
.and_then(|s| OnFailureStrategy::from_str(&s))
.unwrap_or(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY);

// check if the sink retry strategy is set to fallback and there is no fallback sink configured
// then we should return an error
if settings.sink_retry_on_fail_strategy == "fallback"
if settings.sink_retry_on_fail_strategy == OnFailureStrategy::Fallback
&& !settings.is_fallback_enabled
{
return Err(Error::ConfigError(
Expand Down Expand Up @@ -198,10 +239,12 @@ impl Settings {

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::env;

use serde_json::json;

use super::*;

#[test]
fn test_settings_load_combined() {
// Define all JSON test configurations in separate scopes to use them distinctively
Expand Down Expand Up @@ -303,7 +346,118 @@ mod tests {

// Execute and verify
let settings = Settings::load().unwrap();
assert_eq!(settings.sink_retry_on_fail_strategy, "retry");
assert_eq!(
settings.sink_retry_on_fail_strategy,
DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY
);
assert_eq!(settings.sink_max_retry_attempts, 5);
assert_eq!(settings.sink_retry_interval_in_ms, 1000);
env::remove_var(ENV_MONO_VERTEX_OBJ);
}

{
// Test Non default Retry Strategy Load
let json_data = json!({
"metadata": {
"name": "simple-mono-vertex",
"namespace": "default",
"creationTimestamp": null
},
"spec": {
"replicas": 0,
"source": {
"udsource": {
"container": {
"image": "xxxxxxx",
"resources": {}
}
}
},
"sink": {
"udsink": {
"container": {
"image": "xxxxxx",
"resources": {}
}
},
"retryStrategy": {
"backoff": {
"interval": "1s",
"steps": 5
},
"onFailure": "drop"
},
},
"limits": {
"readBatchSize": 500,
"readTimeout": "1s"
},
}
});
let json_str = json_data.to_string();
let encoded_json = BASE64_STANDARD.encode(json_str);
env::set_var(ENV_MONO_VERTEX_OBJ, encoded_json);

// Execute and verify
let settings = Settings::load().unwrap();
assert_eq!(
settings.sink_retry_on_fail_strategy,
OnFailureStrategy::Drop
);
assert_eq!(settings.sink_max_retry_attempts, 5);
assert_eq!(settings.sink_retry_interval_in_ms, 1000);
env::remove_var(ENV_MONO_VERTEX_OBJ);
}

{
// Test Invalid on failure strategy to use default
let json_data = json!({
"metadata": {
"name": "simple-mono-vertex",
"namespace": "default",
"creationTimestamp": null
},
"spec": {
"replicas": 0,
"source": {
"udsource": {
"container": {
"image": "xxxxxxx",
"resources": {}
}
}
},
"sink": {
"udsink": {
"container": {
"image": "xxxxxx",
"resources": {}
}
},
"retryStrategy": {
"backoff": {
"interval": "1s",
"steps": 5
},
"onFailure": "xxxxx"
},
},
"limits": {
"readBatchSize": 500,
"readTimeout": "1s"
},
}
});
let json_str = json_data.to_string();
let encoded_json = BASE64_STANDARD.encode(json_str);
env::set_var(ENV_MONO_VERTEX_OBJ, encoded_json);

// Execute and verify
let settings = Settings::load().unwrap();
assert_eq!(
settings.sink_retry_on_fail_strategy,
DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY
);
assert_eq!(settings.sink_max_retry_attempts, 5);
assert_eq!(settings.sink_retry_interval_in_ms, 1000);
env::remove_var(ENV_MONO_VERTEX_OBJ);
Expand Down Expand Up @@ -407,4 +561,54 @@ mod tests {
// General cleanup
env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE);
}

#[test]
fn test_on_failure_enum_from_str_valid_inputs() {
assert_eq!(
OnFailureStrategy::from_str("retry"),
Some(OnFailureStrategy::Retry)
);
assert_eq!(
OnFailureStrategy::from_str("fallback"),
Some(OnFailureStrategy::Fallback)
);
assert_eq!(
OnFailureStrategy::from_str("drop"),
Some(OnFailureStrategy::Drop)
);

// Testing case insensitivity
assert_eq!(
OnFailureStrategy::from_str("ReTry"),
Some(OnFailureStrategy::Retry)
);
assert_eq!(
OnFailureStrategy::from_str("FALLBACK"),
Some(OnFailureStrategy::Fallback)
);
assert_eq!(
OnFailureStrategy::from_str("Drop"),
Some(OnFailureStrategy::Drop)
);
}

#[test]
fn test_on_failure_enum_from_str_invalid_input() {
assert_eq!(
OnFailureStrategy::from_str("unknown"),
Some(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY)
); // should return None for undefined inputs
}

#[test]
fn test_on_failure_enum_to_string() {
let retry = OnFailureStrategy::Retry;
assert_eq!(retry.to_string(), "retry");

let fallback = OnFailureStrategy::Fallback;
assert_eq!(fallback.to_string(), "fallback");

let drop = OnFailureStrategy::Drop;
assert_eq!(drop.to_string(), "drop");
}
}
17 changes: 5 additions & 12 deletions rust/monovertex/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use crate::config::config;
use crate::config::{config, OnFailureStrategy};
use crate::error::{Error, Result};
use crate::message::{Message, Offset};
use crate::metrics;
Expand Down Expand Up @@ -296,17 +296,17 @@ impl Forwarder {
}
// check what is the failure strategy in the config
let strategy = config().sink_retry_on_fail_strategy.clone();
match strategy.as_str() {
match strategy {
// if we need to retry, return true
"retry" => {
OnFailureStrategy::Retry => {
warn!(
"Using onFailure Retry, Retry attempts {} completed",
attempts
);
return Ok(true);
}
// if we need to drop the messages, log and return false
"drop" => {
OnFailureStrategy::Drop => {
// log that we are dropping the messages as requested
warn!(
"Dropping messages after {} attempts. Errors: {:?}",
Expand All @@ -319,7 +319,7 @@ impl Forwarder {
.inc_by(messages_to_send.len() as u64);
}
// if we need to move the messages to the fallback, return false
"fallback" => {
OnFailureStrategy::Fallback => {
// log that we are moving the messages to the fallback as requested
warn!(
"Moving messages to fallback after {} attempts. Errors: {:?}",
Expand All @@ -328,13 +328,6 @@ impl Forwarder {
// move the messages to the fallback messages
fallback_msgs.append(messages_to_send);
}
// if the strategy is invalid, return an error
_ => {
return Err(Error::SinkError(format!(
"Invalid sink retry on fail strategy: {}",
strategy
)));
}
}
// if we are done with the messages, break the loop
Ok(false)
Expand Down

0 comments on commit 8d8b9e2

Please sign in to comment.