From 8cff6d1ff13bf4b3343084b737065fc03596a2f0 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Fri, 5 Jul 2024 19:33:51 -0700 Subject: [PATCH] fix: save trait should accept Self as mutable (#1795) Signed-off-by: Vigith Maurice --- serving/Cargo.toml | 4 +- serving/src/app/callback.rs | 2 +- serving/src/app/callback/state.rs | 2 +- serving/src/app/callback/store.rs | 2 +- serving/src/app/callback/store/memstore.rs | 6 +- serving/src/app/callback/store/redisstore.rs | 121 ++++++++++--------- serving/src/app/jetstream_proxy.rs | 2 +- 7 files changed, 72 insertions(+), 67 deletions(-) diff --git a/serving/Cargo.toml b/serving/Cargo.toml index b66371f60a..8125b88806 100644 --- a/serving/Cargo.toml +++ b/serving/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = [ "backoff","extras/upstreams", "servesink"] } +workspace = { members = ["backoff", "extras/upstreams", "servesink"] } [package] name = "serve" version = "0.1.0" @@ -29,3 +29,5 @@ redis = { version = "0.25.3", features = ["tokio-comp", "aio", "connection-manag config = "0.14.0" trait-variant = "0.1.2" chrono = { version = "0.4", features = ["serde"] } +# intern +backoff = { path = "backoff" } diff --git a/serving/src/app/callback.rs b/serving/src/app/callback.rs index 57b26a69fc..3ac7264698 100644 --- a/serving/src/app/callback.rs +++ b/serving/src/app/callback.rs @@ -33,7 +33,7 @@ pub fn callback_handler( } async fn callback_save( - State(proxy_state): State>, + State(mut proxy_state): State>, headers: HeaderMap, body: Bytes, ) -> Result<(), ApiError> { diff --git a/serving/src/app/callback/state.rs b/serving/src/app/callback/state.rs index b6318484a4..9fdd300542 100644 --- a/serving/src/app/callback/state.rs +++ b/serving/src/app/callback/state.rs @@ -65,7 +65,7 @@ where } pub(crate) async fn save_response( - &self, + &mut self, id: String, body: axum::body::Bytes, ) -> crate::Result<()> { diff --git a/serving/src/app/callback/store.rs b/serving/src/app/callback/store.rs index 537ab03f37..6a7c71985a 100644 --- a/serving/src/app/callback/store.rs +++ b/serving/src/app/callback/store.rs @@ -24,7 +24,7 @@ pub(crate) enum PayloadToSave { #[trait_variant::make(Store: Send)] #[allow(dead_code)] pub(crate) trait LocalStore { - async fn save(&self, messages: Vec) -> crate::Result<()>; + async fn save(&mut self, messages: Vec) -> crate::Result<()>; /// retrieve the callback payloads async fn retrieve_callbacks( &mut self, diff --git a/serving/src/app/callback/store/memstore.rs b/serving/src/app/callback/store/memstore.rs index 8be34786f2..afbf8e89e1 100644 --- a/serving/src/app/callback/store/memstore.rs +++ b/serving/src/app/callback/store/memstore.rs @@ -29,7 +29,7 @@ impl InMemoryStore { impl super::Store for InMemoryStore { /// Saves a vector of `PayloadToSave` into the `HashMap`. /// Each `PayloadToSave` is serialized into bytes and stored in the `HashMap` under its key. - async fn save(&self, messages: Vec) -> crate::Result<()> { + async fn save(&mut self, messages: Vec) -> crate::Result<()> { let mut data = self.data.lock().unwrap(); for msg in messages { match msg { @@ -176,7 +176,7 @@ mod tests { #[tokio::test] async fn test_save_invalid_callback() { - let store = InMemoryStore::new(); + let mut store = InMemoryStore::new(); let value = Arc::new(CallbackRequest { id: "test_id".to_string(), vertex: "in".to_string(), @@ -199,7 +199,7 @@ mod tests { #[tokio::test] async fn test_save_invalid_datum() { - let store = InMemoryStore::new(); + let mut store = InMemoryStore::new(); // Try to save a datum with an invalid key let result = store diff --git a/serving/src/app/callback/store/redisstore.rs b/serving/src/app/callback/store/redisstore.rs index 2a091a246f..f21154a68f 100644 --- a/serving/src/app/callback/store/redisstore.rs +++ b/serving/src/app/callback/store/redisstore.rs @@ -14,62 +14,6 @@ use super::PayloadToSave; const LPUSH: &str = "LPUSH"; const LRANGE: &str = "LRANGE"; -async fn handle_write_requests(conn: ConnectionManager, msg: PayloadToSave) -> crate::Result<()> { - match msg { - PayloadToSave::Callback { key, value } => { - // Convert the CallbackRequest to a byte array - let value = serde_json::to_vec(&*value) - .map_err(|e| Error::StoreWrite(format!("Serializing payload - {}", e)))?; - - write_to_redis(conn, &key, &value).await - } - - // Write the byte array to Redis - PayloadToSave::DatumFromPipeline { key, value } => { - // we have to differentiate between the saved responses and the callback requests - // saved responses are stored in "id_SAVED", callback requests are stored in "id" - let key = format!("{}_{}", key, SAVED); - let value: Vec = value.into(); - - write_to_redis(conn, &key, &value).await - } - } -} - -// write to Redis with retries -async fn write_to_redis( - mut conn: ConnectionManager, - key: &str, - value: &Vec, -) -> crate::Result<()> { - let mut retries = 0; - loop { - // Write the byte array to Redis - match conn - .send_packed_command(redis::cmd(LPUSH).arg(key).arg(value)) - .await - .map(|_| ()) - { - Ok(_) => return Ok(()), - Err(err) => { - // return if we are out of retries of if the error is unrecoverable - if retries < config().redis.retries || err.is_unrecoverable_error() { - return Err(Error::StoreWrite( - format!("Saving to redis: {}", err).to_string(), - )); - } else { - retries -= 1; - sleep(Duration::from_millis( - config().redis.retries_duration_millis.into(), - )) - .await; - continue; - } - } - } - } -} - // Handle to the Redis actor. #[derive(Clone)] pub(crate) struct RedisConnection { @@ -91,23 +35,82 @@ impl RedisConnection { max_tasks, }) } + + async fn handle_write_requests( + mut conn_manager: &mut ConnectionManager, + msg: PayloadToSave, + ) -> crate::Result<()> { + match msg { + PayloadToSave::Callback { key, value } => { + // Convert the CallbackRequest to a byte array + let value = serde_json::to_vec(&*value) + .map_err(|e| Error::StoreWrite(format!("Serializing payload - {}", e)))?; + + Self::write_to_redis(&mut conn_manager, &key, &value).await + } + + // Write the byte array to Redis + PayloadToSave::DatumFromPipeline { key, value } => { + // we have to differentiate between the saved responses and the callback requests + // saved responses are stored in "id_SAVED", callback requests are stored in "id" + let key = format!("{}_{}", key, SAVED); + let value: Vec = value.into(); + + Self::write_to_redis(&mut conn_manager, &key, &value).await + } + } + } + + // write to Redis with retries + async fn write_to_redis( + conn_manager: &mut ConnectionManager, + key: &str, + value: &Vec, + ) -> crate::Result<()> { + let mut retries = 0; + loop { + // Write the byte array to Redis + match conn_manager + .send_packed_command(redis::cmd(LPUSH).arg(key).arg(value)) + .await + .map(|_| ()) + { + Ok(_) => return Ok(()), + Err(err) => { + // return if we are out of retries of if the error is unrecoverable + if retries < config().redis.retries || err.is_unrecoverable_error() { + return Err(Error::StoreWrite( + format!("Saving to redis: {}", err).to_string(), + )); + } else { + retries -= 1; + sleep(Duration::from_millis( + config().redis.retries_duration_millis.into(), + )) + .await; + continue; + } + } + } + } + } } // It is possible to move the methods defined here to be methods on the Redis actor and communicate through channels. // With that, all public APIs defined on RedisConnection can be on &self (immutable). impl super::Store for RedisConnection { // Attempt to save all payloads. Returns error if we fail to save at least one message. - async fn save(&self, messages: Vec) -> crate::Result<()> { + async fn save(&mut self, messages: Vec) -> crate::Result<()> { let mut tasks = vec![]; // This is put in place not to overload Redis and also way some kind of // flow control. let sem = Arc::new(Semaphore::new(self.max_tasks)); for msg in messages { let permit = Arc::clone(&sem).acquire_owned().await; - let conn = self.conn_manager.clone(); + let mut _conn_mgr = self.conn_manager.clone(); let task = tokio::spawn(async move { let _permit = permit; - handle_write_requests(conn, msg).await + Self::handle_write_requests(&mut _conn_mgr, msg).await }); tasks.push(task); } diff --git a/serving/src/app/jetstream_proxy.rs b/serving/src/app/jetstream_proxy.rs index f7b5030cb9..61e831ba2b 100644 --- a/serving/src/app/jetstream_proxy.rs +++ b/serving/src/app/jetstream_proxy.rs @@ -286,7 +286,7 @@ mod tests { struct MockStore; impl Store for MockStore { - async fn save(&self, _messages: Vec) -> crate::Result<()> { + async fn save(&mut self, _messages: Vec) -> crate::Result<()> { Ok(()) } async fn retrieve_callbacks(