Skip to content

Commit

Permalink
fix: save trait should accept Self as mutable (#1795)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Jul 6, 2024
1 parent d8423e8 commit 8cff6d1
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 67 deletions.
4 changes: 3 additions & 1 deletion serving/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = [ "backoff","extras/upstreams", "servesink"] }
workspace = { members = ["backoff", "extras/upstreams", "servesink"] }
[package]
name = "serve"
version = "0.1.0"
Expand Down Expand Up @@ -29,3 +29,5 @@ redis = { version = "0.25.3", features = ["tokio-comp", "aio", "connection-manag
config = "0.14.0"
trait-variant = "0.1.2"
chrono = { version = "0.4", features = ["serde"] }
# intern
backoff = { path = "backoff" }
2 changes: 1 addition & 1 deletion serving/src/app/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn callback_handler<T: Send + Sync + Clone + Store + 'static>(
}

async fn callback_save<T: Send + Sync + Clone + Store>(
State(proxy_state): State<CallbackState<T>>,
State(mut proxy_state): State<CallbackState<T>>,
headers: HeaderMap,
body: Bytes,
) -> Result<(), ApiError> {
Expand Down
2 changes: 1 addition & 1 deletion serving/src/app/callback/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ where
}

pub(crate) async fn save_response(
&self,
&mut self,
id: String,
body: axum::body::Bytes,
) -> crate::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion serving/src/app/callback/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) enum PayloadToSave {
#[trait_variant::make(Store: Send)]
#[allow(dead_code)]
pub(crate) trait LocalStore {
async fn save(&self, messages: Vec<PayloadToSave>) -> crate::Result<()>;
async fn save(&mut self, messages: Vec<PayloadToSave>) -> crate::Result<()>;
/// retrieve the callback payloads
async fn retrieve_callbacks(
&mut self,
Expand Down
6 changes: 3 additions & 3 deletions serving/src/app/callback/store/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl InMemoryStore {
impl super::Store for InMemoryStore {
/// Saves a vector of `PayloadToSave` into the `HashMap`.
/// Each `PayloadToSave` is serialized into bytes and stored in the `HashMap` under its key.
async fn save(&self, messages: Vec<PayloadToSave>) -> crate::Result<()> {
async fn save(&mut self, messages: Vec<PayloadToSave>) -> crate::Result<()> {
let mut data = self.data.lock().unwrap();
for msg in messages {
match msg {
Expand Down Expand Up @@ -176,7 +176,7 @@ mod tests {

#[tokio::test]
async fn test_save_invalid_callback() {
let store = InMemoryStore::new();
let mut store = InMemoryStore::new();
let value = Arc::new(CallbackRequest {
id: "test_id".to_string(),
vertex: "in".to_string(),
Expand All @@ -199,7 +199,7 @@ mod tests {

#[tokio::test]
async fn test_save_invalid_datum() {
let store = InMemoryStore::new();
let mut store = InMemoryStore::new();

// Try to save a datum with an invalid key
let result = store
Expand Down
121 changes: 62 additions & 59 deletions serving/src/app/callback/store/redisstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,6 @@ use super::PayloadToSave;
const LPUSH: &str = "LPUSH";
const LRANGE: &str = "LRANGE";

async fn handle_write_requests(conn: ConnectionManager, msg: PayloadToSave) -> crate::Result<()> {
match msg {
PayloadToSave::Callback { key, value } => {
// Convert the CallbackRequest to a byte array
let value = serde_json::to_vec(&*value)
.map_err(|e| Error::StoreWrite(format!("Serializing payload - {}", e)))?;

write_to_redis(conn, &key, &value).await
}

// Write the byte array to Redis
PayloadToSave::DatumFromPipeline { key, value } => {
// we have to differentiate between the saved responses and the callback requests
// saved responses are stored in "id_SAVED", callback requests are stored in "id"
let key = format!("{}_{}", key, SAVED);
let value: Vec<u8> = value.into();

write_to_redis(conn, &key, &value).await
}
}
}

// write to Redis with retries
async fn write_to_redis(
mut conn: ConnectionManager,
key: &str,
value: &Vec<u8>,
) -> crate::Result<()> {
let mut retries = 0;
loop {
// Write the byte array to Redis
match conn
.send_packed_command(redis::cmd(LPUSH).arg(key).arg(value))
.await
.map(|_| ())
{
Ok(_) => return Ok(()),
Err(err) => {
// return if we are out of retries of if the error is unrecoverable
if retries < config().redis.retries || err.is_unrecoverable_error() {
return Err(Error::StoreWrite(
format!("Saving to redis: {}", err).to_string(),
));
} else {
retries -= 1;
sleep(Duration::from_millis(
config().redis.retries_duration_millis.into(),
))
.await;
continue;
}
}
}
}
}

// Handle to the Redis actor.
#[derive(Clone)]
pub(crate) struct RedisConnection {
Expand All @@ -91,23 +35,82 @@ impl RedisConnection {
max_tasks,
})
}

async fn handle_write_requests(
mut conn_manager: &mut ConnectionManager,
msg: PayloadToSave,
) -> crate::Result<()> {
match msg {
PayloadToSave::Callback { key, value } => {
// Convert the CallbackRequest to a byte array
let value = serde_json::to_vec(&*value)
.map_err(|e| Error::StoreWrite(format!("Serializing payload - {}", e)))?;

Self::write_to_redis(&mut conn_manager, &key, &value).await
}

// Write the byte array to Redis
PayloadToSave::DatumFromPipeline { key, value } => {
// we have to differentiate between the saved responses and the callback requests
// saved responses are stored in "id_SAVED", callback requests are stored in "id"
let key = format!("{}_{}", key, SAVED);
let value: Vec<u8> = value.into();

Self::write_to_redis(&mut conn_manager, &key, &value).await
}
}
}

// write to Redis with retries
async fn write_to_redis(
conn_manager: &mut ConnectionManager,
key: &str,
value: &Vec<u8>,
) -> crate::Result<()> {
let mut retries = 0;
loop {
// Write the byte array to Redis
match conn_manager
.send_packed_command(redis::cmd(LPUSH).arg(key).arg(value))
.await
.map(|_| ())
{
Ok(_) => return Ok(()),
Err(err) => {
// return if we are out of retries of if the error is unrecoverable
if retries < config().redis.retries || err.is_unrecoverable_error() {
return Err(Error::StoreWrite(
format!("Saving to redis: {}", err).to_string(),
));
} else {
retries -= 1;
sleep(Duration::from_millis(
config().redis.retries_duration_millis.into(),
))
.await;
continue;
}
}
}
}
}
}

// It is possible to move the methods defined here to be methods on the Redis actor and communicate through channels.
// With that, all public APIs defined on RedisConnection can be on &self (immutable).
impl super::Store for RedisConnection {
// Attempt to save all payloads. Returns error if we fail to save at least one message.
async fn save(&self, messages: Vec<PayloadToSave>) -> crate::Result<()> {
async fn save(&mut self, messages: Vec<PayloadToSave>) -> crate::Result<()> {
let mut tasks = vec![];
// This is put in place not to overload Redis and also way some kind of
// flow control.
let sem = Arc::new(Semaphore::new(self.max_tasks));
for msg in messages {
let permit = Arc::clone(&sem).acquire_owned().await;
let conn = self.conn_manager.clone();
let mut _conn_mgr = self.conn_manager.clone();
let task = tokio::spawn(async move {
let _permit = permit;
handle_write_requests(conn, msg).await
Self::handle_write_requests(&mut _conn_mgr, msg).await
});
tasks.push(task);
}
Expand Down
2 changes: 1 addition & 1 deletion serving/src/app/jetstream_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ mod tests {
struct MockStore;

impl Store for MockStore {
async fn save(&self, _messages: Vec<PayloadToSave>) -> crate::Result<()> {
async fn save(&mut self, _messages: Vec<PayloadToSave>) -> crate::Result<()> {
Ok(())
}
async fn retrieve_callbacks(
Expand Down

0 comments on commit 8cff6d1

Please sign in to comment.