From bd502c75e9849c3343ec7315d0e8a6df8a987bad Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Wed, 15 Jan 2025 14:04:43 +0000 Subject: [PATCH 01/15] Core: Add new CommandRequest - Pipeline Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 392 ++++++++++++++++-- glide-core/redis-rs/redis/src/pipeline.rs | 13 + glide-core/src/client/mod.rs | 27 +- glide-core/src/protobuf/command_request.proto | 7 +- glide-core/src/socket_listener.rs | 18 +- glide-core/tests/test_socket_listener.rs | 146 ++++++- 6 files changed, 570 insertions(+), 33 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3d61efce29..dfa1ccf035 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -38,7 +38,7 @@ use crate::{ }, cmd, commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, - FromRedisValue, InfoDict, + types, FromRedisValue, InfoDict, Pipeline, }; use dashmap::DashMap; use std::{ @@ -93,6 +93,7 @@ use tokio::{sync::Notify, time::timeout}; use dispose::{Disposable, Dispose}; use futures::{future::BoxFuture, prelude::*, ready}; use pin_project_lite::pin_project; +use rand::seq::SliceRandom; use std::sync::RwLock as StdRwLock; use tokio::sync::{ mpsc, @@ -641,24 +642,31 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> // Find first specific slot and send to it. There's no need to check If later commands // should be routed to a different slot, since the server will return an error indicating this. - pipeline.cmd_iter().map(route_for_command).try_fold( - None, - |chosen_route, next_cmd_route| match (chosen_route, next_cmd_route) { - (None, _) => Ok(next_cmd_route), - (_, None) => Ok(chosen_route), - (Some(chosen_route), Some(next_cmd_route)) => { - if chosen_route.slot() != next_cmd_route.slot() { - Err((ErrorKind::CrossSlot, "Received crossed slots in pipeline").into()) - } else if chosen_route.slot_addr() == SlotAddr::ReplicaOptional { - Ok(Some(next_cmd_route)) - } else { - Ok(Some(chosen_route)) + if pipeline.is_atomic() { + pipeline + .cmd_iter() + .map(route_for_command) + .try_fold(None, |chosen_route, next_cmd_route| { + match (chosen_route, next_cmd_route) { + (None, _) => Ok(next_cmd_route), + (_, None) => Ok(chosen_route), + (Some(chosen_route), Some(next_cmd_route)) => { + if chosen_route.slot() != next_cmd_route.slot() { + Err(( + ErrorKind::CrossSlot, + "Received crossed slots in transaction", + ) + .into()) + } else { + Ok(Some(chosen_route)) + } + } } - } - }, - ) + }) + } else { + Ok(None) + } } - fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> { Box::pin(tokio::time::sleep(duration)) } @@ -2139,6 +2147,223 @@ where .map_err(|err| (OperationTarget::Node { address }, err)) } + fn add_command_to_pipeline_map( + pipelines_by_connection: &mut HashMap)>)>, + address: String, + conn: C, + cmd: Cmd, + index: usize, + index2: Option, + ) { + pipelines_by_connection + .entry(address.clone()) + .or_insert_with(|| (Pipeline::new(), conn.clone(), Vec::new())) + .0 + .add_command(cmd); + pipelines_by_connection + .entry(address) + .or_insert_with(|| (Pipeline::new(), conn, Vec::new())) + .2 + .push((index, index2)); + } + + async fn routes_pipeline_commands( + pipeline: &crate::Pipeline, + core: Core, + ) -> RedisResult<( + HashMap)>)>, + Vec<(usize, MultipleNodeRoutingInfo, Option)>, + )> { + let mut pipelines_by_connection: HashMap< + String, + (Pipeline, C, Vec<(usize, Option)>), + > = HashMap::new(); + let mut response_policies = Vec::new(); + + for (index, cmd) in pipeline.cmd_iter().enumerate() { + match cluster_routing::RoutingInfo::for_routable(cmd) { + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + | None => { + if pipelines_by_connection.is_empty() { + let conn = crate::cluster_async::ClusterConnInner::get_connection( + SingleNodeRoutingInfo::Random.into(), + core.clone(), + Some(Arc::new(cmd.clone())), + ); + let (address, conn) = conn.await.map_err(|err| { + types::RedisError::from(( + types::ErrorKind::ConnectionNotFoundForRoute, + "Requested connection not found", + err.to_string(), + )) + })?; + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn, + cmd.clone(), + index, + None, + ); + } else { + let mut rng = rand::thread_rng(); + let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect(); + let random_key = keys.choose(&mut rng).unwrap(); + pipelines_by_connection + .get_mut(random_key) + .unwrap() + .0 + .add_command(cmd.clone()); + pipelines_by_connection + .get_mut(random_key) + .unwrap() + .2 + .push((index, None)); + } + } + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(route), + )) => { + let route_single: SingleNodeRoutingInfo = Some(route).into(); + let conn = crate::cluster_async::ClusterConnInner::get_connection( + route_single.into(), + core.clone(), + Some(Arc::new(cmd.clone())), + ); + let (address, conn) = conn.await.map_err(|err| { + types::RedisError::from(( + types::ErrorKind::ConnectionNotFoundForRoute, + "Requested connection not found", + err.to_string(), + )) + })?; + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn, + cmd.clone(), + index, + None, + ); + } + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) => { + let route_single: SingleNodeRoutingInfo = + Some(Route::new_random_primary()).into(); + let conn = crate::cluster_async::ClusterConnInner::get_connection( + route_single.into(), + core.clone(), + Some(Arc::new(cmd.clone())), + ); + let (address, conn) = conn.await.map_err(|err| { + types::RedisError::from(( + types::ErrorKind::ConnectionNotFoundForRoute, + "Requested connection not found", + err.to_string(), + )) + })?; + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn, + cmd.clone(), + index, + None, + ); + } + Some(cluster_routing::RoutingInfo::MultiNode(( + multi_node_routing, + response_policy, + ))) => { + response_policies.push((index, multi_node_routing.clone(), response_policy)); + match multi_node_routing { + MultipleNodeRoutingInfo::AllNodes => { + let all_nodes: Vec<_> = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.all_node_connections().collect() + }; + for (index2, (address, conn)) in all_nodes.into_iter().enumerate() { + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn.await, + cmd.clone(), + index, + Some(index2), + ); + } + } + MultipleNodeRoutingInfo::AllMasters => { + let all_primaries: Vec<_> = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.all_primary_connections().collect() + }; + for (index2, (address, conn)) in all_primaries.into_iter().enumerate() { + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn.await, + cmd.clone(), + index, + Some(index2), + ); + } + } + MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { + for (index2, (route, indices)) in slots.iter().enumerate() { + let conn = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.connection_for_route(&route) + }; + if let Some((address, conn)) = conn { + let new_cmd = + crate::cluster_routing::command_for_multi_slot_indices( + cmd, + indices.iter(), + ); + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn.await, + new_cmd, + index, + Some(index2), + ); + } + } + } + } + } + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::ByAddress { host, port }, + )) => { + let address = format!("{host}:{port}"); + let conn = crate::cluster_async::ClusterConnInner::get_connection( + InternalSingleNodeRouting::ByAddress(address.clone()).into(), + core.clone(), + Some(Arc::new(cmd.clone())), + ); + let (address, conn) = conn.await.map_err(|err| { + types::RedisError::from(( + types::ErrorKind::ConnectionNotFoundForRoute, + "Requested connection not found", + err.to_string(), + )) + })?; + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn, + cmd.clone(), + index, + None, + ); + } + } + } + Ok((pipelines_by_connection, response_policies)) + } async fn try_request(info: RequestInfo, core: Core) -> OperationResult { match info.cmd { CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await, @@ -2148,14 +2373,124 @@ where count, route, } => { - Self::try_pipeline_request( - pipeline, - offset, - count, - Self::get_connection(route, core, None), - ) - .await + if pipeline.is_atomic() { + Self::try_pipeline_request( + pipeline, + offset, + count, + Self::get_connection(route, core, None), + ) + .await + } else { + let (pipelines_by_connection, response_policies) = + Self::routes_pipeline_commands(&pipeline, core.clone()) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + let mut values_and_addresses = vec![Vec::new(); pipeline.len()]; + + let mut first_error = None; + let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); + let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks + + for (address, (pipeline, conn, indices)) in pipelines_by_connection { + // Spawn the async task + join_set.spawn(async move { + let count = pipeline.len(); + let result = + Self::try_pipeline_request(Arc::new(pipeline), 0, count, async { + Ok((address.clone(), conn)) + }) + .await?; + match result { + Response::Multiple(values) => Ok((indices, values, address)), + _ => Err(( + OperationTarget::FanOut, + RedisError::from(( + ErrorKind::ResponseError, + "Unsupported response type", + )), + )), + } + }); + } + + // Wait for all spawned tasks to complete + while let Some(future_result) = join_set.join_next().await { + match future_result { + Err(e) => { + return Err(( + OperationTarget::FanOut, + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) + .into(), + )); + } + Ok(Ok((indices, values, address))) => { + for ((index, index2), value) in indices.into_iter().zip(values) { + if let Some(index2) = index2 { + // Ensure the vector is big enough to hold `index2` + if values_and_addresses[index].len() <= index2 { + values_and_addresses[index] + .resize(index2 + 1, (Value::Nil, "".to_string())); + } + // Add the value to the specific index2 within index + values_and_addresses[index][index2] = + (value, address.clone()); + } else { + // Push the value into the default `index` + values_and_addresses[index].push((value, address.clone())); + } + } + } + Ok(Err(e)) => { + if first_error.is_none() { + first_error = Some(e); + } + } + } + } + + // Check for errors + if let Some(first_error) = first_error { + return Err(first_error); + } + + // Process response policies after all tasks are complete + for (index, routing_info, response_policy) in response_policies { + // Safely access `values_and_addresses` for the current index + let response_receivers: Vec<( + Option, + oneshot::Receiver>, + )> = values_and_addresses[index] + .iter() + .map(|(value, address)| { + let (sender, receiver) = oneshot::channel(); + let _ = sender.send(Ok(Response::Single(value.clone()))); + (Some(address.clone()), receiver) + }) + .collect(); + + let aggregated_response = Self::aggregate_results( + response_receivers, + &routing_info, + response_policy, + ) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + // Update `values_and_addresses` for the current index + values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; + } + + // Collect final responses + for mut ans in values_and_addresses.into_iter() { + assert_eq!(ans.len(), 1); + final_responses.push(ans.pop().unwrap().0); + } + + Ok(Response::Multiple(final_responses)) + } } + CmdArg::ClusterScan { cluster_scan_args, .. } => { @@ -2830,6 +3165,7 @@ mod pipeline_routing_tests { #[test] fn test_first_route_is_found() { let mut pipeline = crate::Pipeline::new(); + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters @@ -2845,7 +3181,7 @@ mod pipeline_routing_tests { #[test] fn test_return_none_if_no_route_is_found() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters .add_command(cmd("EVAL")); // route randomly @@ -2856,7 +3192,7 @@ mod pipeline_routing_tests { #[test] fn test_prefer_primary_route_over_replica() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .get("foo") // route to replica of slot 12182 .add_command(cmd("FLUSHALL")) // route to all masters @@ -2873,7 +3209,7 @@ mod pipeline_routing_tests { #[test] fn test_raise_cross_slot_error_on_conflicting_slots() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters .set("baz", "bar") // route to slot 4813 @@ -2888,7 +3224,7 @@ mod pipeline_routing_tests { #[test] fn unkeyed_commands_dont_affect_route() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .set("{foo}bar", "baz") // route to primary of slot 12182 .cmd("CONFIG").arg("GET").arg("timeout") // unkeyed command diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index babb57a1ff..746e1935c5 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -203,6 +203,19 @@ impl Pipeline { pub fn execute(&self, con: &mut dyn ConnectionLike) { self.query::<()>(con).unwrap(); } + + /// Returns whether the pipeline is in transaction mode (atomic). + /// + /// When in transaction mode, all commands in the pipeline are executed + /// as a single atomic operation. + pub fn is_atomic(&self) -> bool { + self.transaction_mode + } + + /// Returns the number of commands in the pipeline. + pub fn len(&self) -> usize { + self.commands.len() + } } fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec { diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 3f05092a64..693c6da6c8 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -367,10 +367,10 @@ impl Client { .into()); } }; - Self::convert_transaction_values_to_expected_types(pipeline, values, command_count) + Self::convert_pipeline_values_to_expected_types(pipeline, values, command_count) } - fn convert_transaction_values_to_expected_types( + fn convert_pipeline_values_to_expected_types( pipeline: &redis::Pipeline, values: Vec, command_count: usize, @@ -415,6 +415,29 @@ impl Client { .boxed() } + pub fn send_pipeline<'a>( + &'a mut self, + pipeline: &'a redis::Pipeline, + ) -> redis::RedisFuture<'a, Value> { + let command_count = pipeline.cmd_iter().count(); + let _offset = command_count + 1; //TODO: check + + run_with_timeout(Some(self.request_timeout), async move { + let values = match self.internal_client { + ClientWrapper::Standalone(ref mut client) => { + client.send_pipeline(pipeline, 0, command_count).await + } + + ClientWrapper::Cluster { ref mut client } => { + client.req_packed_commands(pipeline, 0, command_count).await + } + }?; + + Self::convert_pipeline_values_to_expected_types(pipeline, values, command_count) + }) + .boxed() + } + pub async fn invoke_script<'a>( &'a mut self, hash: &'a str, diff --git a/glide-core/src/protobuf/command_request.proto b/glide-core/src/protobuf/command_request.proto index d7c693cfd6..8b7887bc2d 100644 --- a/glide-core/src/protobuf/command_request.proto +++ b/glide-core/src/protobuf/command_request.proto @@ -501,6 +501,10 @@ message Transaction { repeated Command commands = 1; } +message Pipeline { + repeated Command commands = 1; +} + message ClusterScan { string cursor = 1; optional bytes match_pattern = 2; @@ -524,6 +528,7 @@ message CommandRequest { ScriptInvocationPointers script_invocation_pointers = 5; ClusterScan cluster_scan = 6; UpdateConnectionPassword update_connection_password = 7; + Pipeline pipeline = 8; } - Routes route = 8; + Routes route = 9; } diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 0b034e48c3..b63a0968fc 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -4,7 +4,8 @@ use super::rotating_buffer::RotatingBuffer; use crate::client::Client; use crate::cluster_scan_container::get_cluster_scan_cursor; use crate::command_request::{ - command, command_request, ClusterScan, Command, CommandRequest, Routes, SlotTypes, Transaction, + command, command_request, ClusterScan, Command, CommandRequest, Pipeline, Routes, SlotTypes, + Transaction, }; use crate::connection_request::ConnectionRequest; use crate::errors::{error_message, error_type, RequestErrorType}; @@ -388,6 +389,18 @@ async fn send_transaction( .map_err(|err| err.into()) } +async fn send_pipeline(request: Pipeline, client: &mut Client) -> ClientUsageResult { + let mut pipeline = redis::Pipeline::with_capacity(request.commands.capacity()); + for command in request.commands { + pipeline.add_command(get_redis_command(&command)?); + } + + client + .send_pipeline(&pipeline) + .await + .map_err(|err| err.into()) +} + fn get_slot_addr(slot_type: &protobuf::EnumOrUnknown) -> ClientUsageResult { slot_type .enum_value() @@ -491,6 +504,9 @@ fn handle_request(request: CommandRequest, mut client: Client, writer: Rc Err(e), } } + command_request::Command::Pipeline(pipeline) => { + send_pipeline(pipeline, &mut client).await + } command_request::Command::ScriptInvocation(script) => { match get_route(request.route.0, None) { Ok(routes) => { diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index 5921236e36..6a897e1cae 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -21,7 +21,7 @@ mod socket_listener { use crate::utilities::mocks::{Mock, ServerMock}; use super::*; - use command_request::{CommandRequest, RequestType}; + use command_request::{CommandRequest, Pipeline, RequestType}; use glide_core::command_request::command::{Args, ArgsArray}; use glide_core::command_request::{Command, Transaction}; use glide_core::response::{response, ConstantResponse, Response}; @@ -309,6 +309,28 @@ mod socket_listener { write_request(buffer, socket, request); } + fn write_pipeline_request( + buffer: &mut Vec, + socket: &mut UnixStream, + callback_index: u32, + commands_components: Vec, + ) { + let mut request = CommandRequest::new(); + request.callback_idx = callback_index; + let mut pipeline = Pipeline::new(); + pipeline.commands.reserve(commands_components.len()); + + for components in commands_components { + pipeline.commands.push(get_command(components)); + } + + request.command = Some(command_request::command_request::Command::Pipeline( + pipeline, + )); + + write_request(buffer, socket, request); + } + fn write_get( buffer: &mut Vec, socket: &mut UnixStream, @@ -1211,6 +1233,128 @@ mod socket_listener { ); } + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_send_pipeline_and_get_array_of_results( + #[values(RedisType::Cluster, RedisType::Standalone)] use_cluster: RedisType, + ) { + let test_basics = setup_test_basics(Tls::NoTls, TestServer::Shared, use_cluster); + let mut socket = test_basics + .socket + .try_clone() + .expect("Failed to clone socket"); + + const CALLBACK_INDEX: u32 = 0; + let key = generate_random_string(KEY_LENGTH); + let key2 = generate_random_string(KEY_LENGTH); + + let commands = vec![ + CommandComponents { + args: vec![key.clone().into(), "bar".to_string().into()], + args_pointer: true, + request_type: RequestType::Set.into(), + }, + CommandComponents { + args: vec!["GET".to_string().into(), key.clone().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.clone().into(), key2.into()], + args_pointer: false, + request_type: RequestType::MGet.into(), + }, + CommandComponents { + args: vec!["FLUSHALL".to_string().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.into()], + args_pointer: false, + request_type: RequestType::Get.into(), + }, + CommandComponents { + args: vec!["HELLO".into()], + args_pointer: false, + request_type: RequestType::Ping.into(), + }, + ]; + let mut buffer = Vec::with_capacity(200); + write_pipeline_request(&mut buffer, &mut socket, CALLBACK_INDEX, commands); + + assert_value_response( + &mut buffer, + Some(&mut socket), + CALLBACK_INDEX, + Value::Array(vec![ + Value::Okay, + Value::BulkString(vec![b'b', b'a', b'r']), + Value::Array(vec![Value::BulkString(vec![b'b', b'a', b'r']), Value::Nil]), + Value::Okay, + Value::Nil, + Value::BulkString(vec![b'H', b'E', b'L', b'L', b'O']), + ]), + ); + } + + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_send_pipeline_and_get_error( + #[values(RedisType::Cluster, RedisType::Standalone)] use_cluster: RedisType, + ) { + let mut test_basics = setup_test_basics(Tls::NoTls, TestServer::Shared, use_cluster); + let mut socket = test_basics + .socket + .try_clone() + .expect("Failed to clone socket"); + + const CALLBACK_INDEX: u32 = 0; + let key = generate_random_string(KEY_LENGTH); + let commands = vec![ + CommandComponents { + args: vec![key.clone().into(), "bar".to_string().into()], + args_pointer: true, + request_type: RequestType::Set.into(), + }, + CommandComponents { + args: vec!["GET".to_string().into(), key.clone().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.clone().into(), "random_key".into()], + args_pointer: false, + request_type: RequestType::MGet.into(), + }, + CommandComponents { + args: vec![key.clone().into()], + args_pointer: false, + request_type: RequestType::LLen.into(), + }, + CommandComponents { + args: vec!["FLUSHALL".to_string().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.into()], + args_pointer: false, + request_type: RequestType::Get.into(), + }, + ]; + let mut buffer = Vec::with_capacity(200); + write_pipeline_request(&mut buffer, &mut socket, CALLBACK_INDEX, commands); + + assert_error_response( + &mut buffer, + &mut test_basics.socket, + CALLBACK_INDEX, + ResponseType::RequestError, + ); + } #[rstest] #[serial_test::serial] #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] From 5f38f43b68cb530cc0db7919071046c052b9ed13 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Wed, 15 Jan 2025 15:07:46 +0000 Subject: [PATCH 02/15] clippy Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 46 +++++++++++-------- glide-core/redis-rs/redis/src/pipeline.rs | 5 ++ 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index dfa1ccf035..91f646cbe6 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2146,14 +2146,14 @@ where .map(Response::Multiple) .map_err(|err| (OperationTarget::Node { address }, err)) } - + #[allow(clippy::type_complexity)] fn add_command_to_pipeline_map( pipelines_by_connection: &mut HashMap)>)>, address: String, conn: C, cmd: Cmd, index: usize, - index2: Option, + inner_index: Option, ) { pipelines_by_connection .entry(address.clone()) @@ -2164,7 +2164,7 @@ where .entry(address) .or_insert_with(|| (Pipeline::new(), conn, Vec::new())) .2 - .push((index, index2)); + .push((index, inner_index)); } async fn routes_pipeline_commands( @@ -2174,6 +2174,7 @@ where HashMap)>)>, Vec<(usize, MultipleNodeRoutingInfo, Option)>, )> { + #[allow(clippy::type_complexity)] let mut pipelines_by_connection: HashMap< String, (Pipeline, C, Vec<(usize, Option)>), @@ -2283,14 +2284,15 @@ where let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); lock.all_node_connections().collect() }; - for (index2, (address, conn)) in all_nodes.into_iter().enumerate() { + for (inner_index, (address, conn)) in all_nodes.into_iter().enumerate() + { Self::add_command_to_pipeline_map( &mut pipelines_by_connection, address, conn.await, cmd.clone(), index, - Some(index2), + Some(inner_index), ); } } @@ -2299,22 +2301,24 @@ where let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); lock.all_primary_connections().collect() }; - for (index2, (address, conn)) in all_primaries.into_iter().enumerate() { + for (inner_index, (address, conn)) in + all_primaries.into_iter().enumerate() + { Self::add_command_to_pipeline_map( &mut pipelines_by_connection, address, conn.await, cmd.clone(), index, - Some(index2), + Some(inner_index), ); } } MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { - for (index2, (route, indices)) in slots.iter().enumerate() { + for (inner_index, (route, indices)) in slots.iter().enumerate() { let conn = { let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.connection_for_route(&route) + lock.connection_for_route(route) }; if let Some((address, conn)) = conn { let new_cmd = @@ -2328,7 +2332,7 @@ where conn.await, new_cmd, index, - Some(index2), + Some(inner_index), ); } } @@ -2340,7 +2344,7 @@ where )) => { let address = format!("{host}:{port}"); let conn = crate::cluster_async::ClusterConnInner::get_connection( - InternalSingleNodeRouting::ByAddress(address.clone()).into(), + InternalSingleNodeRouting::ByAddress(address.clone()), core.clone(), Some(Arc::new(cmd.clone())), ); @@ -2425,15 +2429,18 @@ where )); } Ok(Ok((indices, values, address))) => { - for ((index, index2), value) in indices.into_iter().zip(values) { - if let Some(index2) = index2 { - // Ensure the vector is big enough to hold `index2` - if values_and_addresses[index].len() <= index2 { - values_and_addresses[index] - .resize(index2 + 1, (Value::Nil, "".to_string())); + for ((index, inner_index), value) in indices.into_iter().zip(values) + { + if let Some(inner_index) = inner_index { + // Ensure the vector is big enough to hold `inner_index` + if values_and_addresses[index].len() <= inner_index { + values_and_addresses[index].resize( + inner_index + 1, + (Value::Nil, "".to_string()), + ); } - // Add the value to the specific index2 within index - values_and_addresses[index][index2] = + // Add the value to the specific inner_index within index + values_and_addresses[index][inner_index] = (value, address.clone()); } else { // Push the value into the default `index` @@ -2456,6 +2463,7 @@ where // Process response policies after all tasks are complete for (index, routing_info, response_policy) in response_policies { + #[allow(clippy::type_complexity)] // Safely access `values_and_addresses` for the current index let response_receivers: Vec<( Option, diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index 746e1935c5..813961156a 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -216,6 +216,11 @@ impl Pipeline { pub fn len(&self) -> usize { self.commands.len() } + + /// Returns `true` if the pipeline contains no commands. + pub fn is_empty(&self) -> bool { + self.commands.is_empty() + } } fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec { From bb7199b1fd716b3e01ae86d21a8711b53cfb62e2 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Thu, 16 Jan 2025 10:53:22 +0000 Subject: [PATCH 03/15] few changes Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 91f646cbe6..98f3ee4155 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2207,6 +2207,7 @@ where None, ); } else { + // since the map is not empty, add the command to a random connection within the map. let mut rng = rand::thread_rng(); let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect(); let random_key = keys.choose(&mut rng).unwrap(); @@ -2273,10 +2274,36 @@ where None, ); } + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::ByAddress { host, port }, + )) => { + let address = format!("{host}:{port}"); + let conn = crate::cluster_async::ClusterConnInner::get_connection( + InternalSingleNodeRouting::ByAddress(address.clone()), + core.clone(), + Some(Arc::new(cmd.clone())), + ); + let (address, conn) = conn.await.map_err(|err| { + types::RedisError::from(( + types::ErrorKind::ConnectionNotFoundForRoute, + "Requested connection not found", + err.to_string(), + )) + })?; + Self::add_command_to_pipeline_map( + &mut pipelines_by_connection, + address, + conn, + cmd.clone(), + index, + None, + ); + } Some(cluster_routing::RoutingInfo::MultiNode(( multi_node_routing, response_policy, ))) => { + // save the routing info and response policy, so we will be able to aggregate the results later response_policies.push((index, multi_node_routing.clone(), response_policy)); match multi_node_routing { MultipleNodeRoutingInfo::AllNodes => { @@ -2339,31 +2366,6 @@ where } } } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::ByAddress { host, port }, - )) => { - let address = format!("{host}:{port}"); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - InternalSingleNodeRouting::ByAddress(address.clone()), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } } } Ok((pipelines_by_connection, response_policies)) @@ -2443,7 +2445,6 @@ where values_and_addresses[index][inner_index] = (value, address.clone()); } else { - // Push the value into the default `index` values_and_addresses[index].push((value, address.clone())); } } @@ -2490,15 +2491,14 @@ where } // Collect final responses - for mut ans in values_and_addresses.into_iter() { - assert_eq!(ans.len(), 1); - final_responses.push(ans.pop().unwrap().0); + for mut value in values_and_addresses.into_iter() { + assert_eq!(value.len(), 1); + final_responses.push(value.pop().unwrap().0); } Ok(Response::Multiple(final_responses)) } } - CmdArg::ClusterScan { cluster_scan_args, .. } => { From b2263135f236c10d8ca5e58fb0b3e703e84a1cb5 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Mon, 20 Jan 2025 12:18:01 +0000 Subject: [PATCH 04/15] move all functions to seperate file Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 422 +++-------------- .../src/cluster_async/pipeline_routing.rs | 433 ++++++++++++++++++ 2 files changed, 499 insertions(+), 356 deletions(-) create mode 100644 glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 98f3ee4155..3e13d0e4f6 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -23,6 +23,8 @@ //! ``` mod connections_container; +mod pipeline_routing; + mod connections_logic; /// Exposed only for testing. pub mod testing { @@ -38,9 +40,12 @@ use crate::{ }, cmd, commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC}, - types, FromRedisValue, InfoDict, Pipeline, + FromRedisValue, InfoDict, }; use dashmap::DashMap; +use pipeline_routing::{ + collect_pipeline_tasks, execute_pipeline_on_node, map_pipeline_to_nodes, route_for_pipeline, +}; use std::{ collections::{HashMap, HashSet}, fmt, io, mem, @@ -93,7 +98,6 @@ use tokio::{sync::Notify, time::timeout}; use dispose::{Disposable, Dispose}; use futures::{future::BoxFuture, prelude::*, ready}; use pin_project_lite::pin_project; -use rand::seq::SliceRandom; use std::sync::RwLock as StdRwLock; use tokio::sync::{ mpsc, @@ -622,51 +626,6 @@ enum Operation { UpdateConnectionPassword(Option), } -fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { - fn route_for_command(cmd: &Cmd) -> Option { - match cluster_routing::RoutingInfo::for_routable(cmd) { - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(route), - )) => Some(route), - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::RandomPrimary, - )) => Some(Route::new_random_primary()), - Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { - .. - })) => None, - None => None, - } - } - - // Find first specific slot and send to it. There's no need to check If later commands - // should be routed to a different slot, since the server will return an error indicating this. - if pipeline.is_atomic() { - pipeline - .cmd_iter() - .map(route_for_command) - .try_fold(None, |chosen_route, next_cmd_route| { - match (chosen_route, next_cmd_route) { - (None, _) => Ok(next_cmd_route), - (_, None) => Ok(chosen_route), - (Some(chosen_route), Some(next_cmd_route)) => { - if chosen_route.slot() != next_cmd_route.slot() { - Err(( - ErrorKind::CrossSlot, - "Received crossed slots in transaction", - ) - .into()) - } else { - Ok(Some(chosen_route)) - } - } - } - }) - } else { - Ok(None) - } -} fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> { Box::pin(tokio::time::sleep(duration)) } @@ -2133,7 +2092,7 @@ where .map_err(|err| (address.into(), err)) } - async fn try_pipeline_request( + pub async fn try_pipeline_request( pipeline: Arc, offset: usize, count: usize, @@ -2146,230 +2105,39 @@ where .map(Response::Multiple) .map_err(|err| (OperationTarget::Node { address }, err)) } - #[allow(clippy::type_complexity)] - fn add_command_to_pipeline_map( - pipelines_by_connection: &mut HashMap)>)>, - address: String, - conn: C, - cmd: Cmd, - index: usize, - inner_index: Option, - ) { - pipelines_by_connection - .entry(address.clone()) - .or_insert_with(|| (Pipeline::new(), conn.clone(), Vec::new())) - .0 - .add_command(cmd); - pipelines_by_connection - .entry(address) - .or_insert_with(|| (Pipeline::new(), conn, Vec::new())) - .2 - .push((index, inner_index)); - } - - async fn routes_pipeline_commands( - pipeline: &crate::Pipeline, - core: Core, - ) -> RedisResult<( - HashMap)>)>, - Vec<(usize, MultipleNodeRoutingInfo, Option)>, - )> { - #[allow(clippy::type_complexity)] - let mut pipelines_by_connection: HashMap< - String, - (Pipeline, C, Vec<(usize, Option)>), - > = HashMap::new(); - let mut response_policies = Vec::new(); - - for (index, cmd) in pipeline.cmd_iter().enumerate() { - match cluster_routing::RoutingInfo::for_routable(cmd) { - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) - | None => { - if pipelines_by_connection.is_empty() { - let conn = crate::cluster_async::ClusterConnInner::get_connection( - SingleNodeRoutingInfo::Random.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } else { - // since the map is not empty, add the command to a random connection within the map. - let mut rng = rand::thread_rng(); - let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect(); - let random_key = keys.choose(&mut rng).unwrap(); - pipelines_by_connection - .get_mut(random_key) - .unwrap() - .0 - .add_command(cmd.clone()); - pipelines_by_connection - .get_mut(random_key) - .unwrap() - .2 - .push((index, None)); - } - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(route), - )) => { - let route_single: SingleNodeRoutingInfo = Some(route).into(); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - route_single.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::RandomPrimary, - )) => { - let route_single: SingleNodeRoutingInfo = - Some(Route::new_random_primary()).into(); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - route_single.into(), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::ByAddress { host, port }, - )) => { - let address = format!("{host}:{port}"); - let conn = crate::cluster_async::ClusterConnInner::get_connection( - InternalSingleNodeRouting::ByAddress(address.clone()), - core.clone(), - Some(Arc::new(cmd.clone())), - ); - let (address, conn) = conn.await.map_err(|err| { - types::RedisError::from(( - types::ErrorKind::ConnectionNotFoundForRoute, - "Requested connection not found", - err.to_string(), - )) - })?; - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn, - cmd.clone(), - index, - None, - ); - } - Some(cluster_routing::RoutingInfo::MultiNode(( - multi_node_routing, - response_policy, - ))) => { - // save the routing info and response policy, so we will be able to aggregate the results later - response_policies.push((index, multi_node_routing.clone(), response_policy)); - match multi_node_routing { - MultipleNodeRoutingInfo::AllNodes => { - let all_nodes: Vec<_> = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.all_node_connections().collect() - }; - for (inner_index, (address, conn)) in all_nodes.into_iter().enumerate() - { - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - cmd.clone(), - index, - Some(inner_index), - ); - } - } - MultipleNodeRoutingInfo::AllMasters => { - let all_primaries: Vec<_> = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.all_primary_connections().collect() - }; - for (inner_index, (address, conn)) in - all_primaries.into_iter().enumerate() - { - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - cmd.clone(), - index, - Some(inner_index), - ); - } - } - MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { - for (inner_index, (route, indices)) in slots.iter().enumerate() { - let conn = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.connection_for_route(route) - }; - if let Some((address, conn)) = conn { - let new_cmd = - crate::cluster_routing::command_for_multi_slot_indices( - cmd, - indices.iter(), - ); - Self::add_command_to_pipeline_map( - &mut pipelines_by_connection, - address, - conn.await, - new_cmd, - index, - Some(inner_index), - ); - } - } - } - } - } - } + + /// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector. + /// + /// This function processes the provided `response_policies`, which contain information about how responses + /// from multiple nodes should be aggregated. For each policy: + /// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`. + /// - Uses the routing information and optional response policy to aggregate the responses into a single result. + /// + /// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index. + async fn aggregate_pipeline_multi_node_commands( + values_and_addresses: &mut [Vec<(Value, String)>], + response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ) -> Result<(), (OperationTarget, RedisError)> { + for (index, routing_info, response_policy) in response_policies { + let response_receivers = values_and_addresses[index] + .iter() + .map(|(value, address)| { + let (sender, receiver) = oneshot::channel(); + let _ = sender.send(Ok(Response::Single(value.clone()))); + (Some(address.clone()), receiver) + }) + .collect(); + + let aggregated_response = + Self::aggregate_results(response_receivers, &routing_info, response_policy) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; } - Ok((pipelines_by_connection, response_policies)) + Ok(()) } + async fn try_request(info: RequestInfo, core: Core) -> OperationResult { match info.cmd { CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await, @@ -2380,6 +2148,7 @@ where route, } => { if pipeline.is_atomic() { + // If the pipeline is atomic (i.e., a transaction), we can send it as is, with no need to split it into sub-pipelines. Self::try_pipeline_request( pipeline, offset, @@ -2388,74 +2157,36 @@ where ) .await } else { + // The pipeline is not atomic, we need to split it into sub-pipelines and send them separately. + + // Distribute pipeline commands across cluster nodes based on routing information. + // Returns: + // - pipelines_by_connection: Map of node addresses to their pipeline contexts + // - response_policies: List of response aggregation policies for multi-node operations let (pipelines_by_connection, response_policies) = - Self::routes_pipeline_commands(&pipeline, core.clone()) + map_pipeline_to_nodes(&pipeline, core.clone()) .await .map_err(|err| (OperationTarget::FanOut, err))?; + // Stores responses along with their source node addresses for each pipeline command. + // + // The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. + // Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command + // might produce multiple responses, each from a different node. By storing the responses with their + // respective node addresses, we ensure that we have all the information needed to aggregate the results later. + // This structure is essential for handling scenarios where responses from multiple nodes must be combined. let mut values_and_addresses = vec![Vec::new(); pipeline.len()]; - let mut first_error = None; let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks - for (address, (pipeline, conn, indices)) in pipelines_by_connection { - // Spawn the async task - join_set.spawn(async move { - let count = pipeline.len(); - let result = - Self::try_pipeline_request(Arc::new(pipeline), 0, count, async { - Ok((address.clone(), conn)) - }) - .await?; - match result { - Response::Multiple(values) => Ok((indices, values, address)), - _ => Err(( - OperationTarget::FanOut, - RedisError::from(( - ErrorKind::ResponseError, - "Unsupported response type", - )), - )), - } - }); + for (address, node_context) in pipelines_by_connection { + // Spawn a new task to execute the pipeline on the node + join_set.spawn(execute_pipeline_on_node(address, node_context)); } // Wait for all spawned tasks to complete - while let Some(future_result) = join_set.join_next().await { - match future_result { - Err(e) => { - return Err(( - OperationTarget::FanOut, - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - .into(), - )); - } - Ok(Ok((indices, values, address))) => { - for ((index, inner_index), value) in indices.into_iter().zip(values) - { - if let Some(inner_index) = inner_index { - // Ensure the vector is big enough to hold `inner_index` - if values_and_addresses[index].len() <= inner_index { - values_and_addresses[index].resize( - inner_index + 1, - (Value::Nil, "".to_string()), - ); - } - // Add the value to the specific inner_index within index - values_and_addresses[index][inner_index] = - (value, address.clone()); - } else { - values_and_addresses[index].push((value, address.clone())); - } - } - } - Ok(Err(e)) => { - if first_error.is_none() { - first_error = Some(e); - } - } - } - } + let first_error = + collect_pipeline_tasks(&mut join_set, &mut values_and_addresses).await?; // Check for errors if let Some(first_error) = first_error { @@ -2463,36 +2194,15 @@ where } // Process response policies after all tasks are complete - for (index, routing_info, response_policy) in response_policies { - #[allow(clippy::type_complexity)] - // Safely access `values_and_addresses` for the current index - let response_receivers: Vec<( - Option, - oneshot::Receiver>, - )> = values_and_addresses[index] - .iter() - .map(|(value, address)| { - let (sender, receiver) = oneshot::channel(); - let _ = sender.send(Ok(Response::Single(value.clone()))); - (Some(address.clone()), receiver) - }) - .collect(); - - let aggregated_response = Self::aggregate_results( - response_receivers, - &routing_info, - response_policy, - ) - .await - .map_err(|err| (OperationTarget::FanOut, err))?; - - // Update `values_and_addresses` for the current index - values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; - } + Self::aggregate_pipeline_multi_node_commands( + &mut values_and_addresses, + response_policies, + ) + .await?; // Collect final responses for mut value in values_and_addresses.into_iter() { - assert_eq!(value.len(), 1); + // unwrap() is safe here because we know that the vector is not empty final_responses.push(value.pop().unwrap().0); } @@ -2523,7 +2233,7 @@ where } } - async fn get_connection( + pub async fn get_connection( routing: InternalSingleNodeRouting, core: Core, cmd: Option>, @@ -3164,7 +2874,7 @@ impl Connect for MultiplexedConnection { #[cfg(test)] mod pipeline_routing_tests { - use super::route_for_pipeline; + use super::pipeline_routing::route_for_pipeline; use crate::{ cluster_routing::{Route, SlotAddr}, cmd, diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs new file mode 100644 index 0000000000..1b980c479d --- /dev/null +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -0,0 +1,433 @@ +use crate::aio::ConnectionLike; +use crate::cluster_async::ClusterConnInner; +use crate::cluster_async::Connect; +use crate::cluster_routing::{ + command_for_multi_slot_indices, MultipleNodeRoutingInfo, ResponsePolicy, SingleNodeRoutingInfo, +}; +use crate::{cluster_routing, RedisResult, Value}; +use crate::{cluster_routing::Route, Cmd, ErrorKind, RedisError}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::cluster_async::MUTEX_READ_ERR; +use crate::Pipeline; +use rand::prelude::IteratorRandom; + +use super::{Core, InternalSingleNodeRouting, OperationTarget, Response}; + +/// Represents a pipeline command execution context for a specific node +#[derive(Default)] +pub struct NodePipelineContext { + /// The pipeline of commands to be executed + pub pipeline: Pipeline, + /// The connection to the node + pub connection: C, + /// Vector of (command_index, inner_index) pairs tracking command order + /// command_index: Position in the original pipeline + /// inner_index: Optional sub-index for multi-node operations (e.g. MSET) + pub command_indices: Vec<(usize, Option)>, +} + +/// Maps node addresses to their pipeline execution contexts +pub type NodePipelineMap = HashMap>; + +impl NodePipelineContext { + fn new(connection: C) -> Self { + Self { + pipeline: Pipeline::new(), + connection, + command_indices: Vec::new(), + } + } + + // Adds a command to the pipeline and records its index + fn add_command(&mut self, cmd: Cmd, index: usize, inner_index: Option) { + self.pipeline.add_command(cmd); + self.command_indices.push((index, inner_index)); + } +} + +/// Adds a command to the pipeline map for a specific node address. +pub fn add_command_to_node_pipeline_map( + pipeline_map: &mut NodePipelineMap, + address: String, + connection: C, + cmd: Cmd, + index: usize, + inner_index: Option, +) { + pipeline_map + .entry(address) + .or_insert_with(|| NodePipelineContext::new(connection)) + .add_command(cmd, index, inner_index); +} + +/// Adds a command to a random existing node pipeline in the pipeline map +pub fn add_command_to_random_existing_node( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + index: usize, +) -> RedisResult<()> { + let mut rng = rand::thread_rng(); + if let Some(node_context) = pipeline_map.values_mut().choose(&mut rng) { + node_context.add_command(cmd, index, None); + Ok(()) + } else { + Err(RedisError::from((ErrorKind::IoError, "No nodes available"))) + } +} + +/// Handles multi-slot commands within a pipeline. +/// +/// This function processes commands with routing information indicating multiple slots +/// (e.g., `MSET` or `MGET`), splits them into sub-commands based on their target slots, +/// and assigns these sub-commands to the appropriate pipelines for the corresponding nodes. +/// +/// ### Parameters: +/// - `pipelines_by_connection`: A mutable map of node pipelines where the commands will be added. +/// - `core`: The core structure that provides access to connection management. +/// - `cmd`: The original multi-slot command that needs to be split. +/// - `index`: The index of the original command within the pipeline. +/// - `slots`: A vector containing routing information. Each entry includes: +/// - `Route`: The specific route for the slot. +/// - `Vec`: Indices of the keys within the command that map to this slot. +pub async fn handle_pipeline_multi_slot_routing( + pipelines_by_connection: &mut NodePipelineMap, + core: Core, + cmd: &Cmd, + index: usize, + slots: Vec<(Route, Vec)>, +) where + C: Clone, +{ + // inner_index is used to keep track of the index of the sub-command inside cmd + for (inner_index, (route, indices)) in slots.iter().enumerate() { + let conn = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.connection_for_route(route) + }; + if let Some((address, conn)) = conn { + // create the sub-command for the slot + let new_cmd = command_for_multi_slot_indices(cmd, indices.iter()); + add_command_to_node_pipeline_map( + pipelines_by_connection, + address, + conn.await, + new_cmd, + index, + Some(inner_index), + ); + } + } +} + +/// Handles pipeline commands that require single-node routing. +/// +/// This function processes commands with `SingleNode` routing information and determines +/// the appropriate handling based on the routing type. +/// +/// ### Parameters: +/// - `pipeline_map`: A mutable reference to the `NodePipelineMap`, representing the pipelines grouped by nodes. +/// - `cmd`: The command to process and add to the appropriate node pipeline. +/// - `routing`: The single-node routing information, which determines how the command is routed. +/// - `core`: The core object responsible for managing connections and routing logic. +/// - `index`: The position of the command in the overall pipeline. +pub async fn handle_pipeline_single_node_routing( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + routing: InternalSingleNodeRouting, + core: Core, + index: usize, +) -> Result<(), (OperationTarget, RedisError)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { + // The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline + add_command_to_random_existing_node(pipeline_map, cmd, index) + .map_err(|err| (OperationTarget::NotFound, err))?; + Ok(()) + } else { + let (address, conn) = + ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) + .await + .map_err(|err| (OperationTarget::NotFound, err))?; + add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); + Ok(()) + } +} + +/// Maps the commands in a pipeline to the appropriate nodes based on their routing information. +/// +/// This function processes each command in the given pipeline, determines its routing information, +/// and organizes it into a map of node pipelines. It handles both single-node and multi-node routing +/// strategies and ensures that the commands are distributed accordingly. +/// +/// It also collects response policies for multi-node routing and returns them along with the pipeline map. +/// This is to ensure we can aggregate responses from properly from the different nodes. +/// +/// # Arguments +/// +/// * `pipeline` - A reference to the pipeline containing the commands to route. +/// * `core` - The core object that provides access to connection locks and other resources. +/// +/// # Returns +/// +/// A `RedisResult` containing a tuple: +/// +/// - A `NodePipelineMap` where commands are grouped by their corresponding nodes (as pipelines). +/// - A `Vec<(usize, MultipleNodeRoutingInfo, Option)>` containing the routing information +/// and response policies for multi-node commands, along with the index of the command in the pipeline, for aggregating the responses later. +pub async fn map_pipeline_to_nodes( + pipeline: &crate::Pipeline, + core: Core, +) -> RedisResult<( + NodePipelineMap, + Vec<(usize, MultipleNodeRoutingInfo, Option)>, +)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let mut pipelines_by_connection = NodePipelineMap::new(); + let mut response_policies = Vec::new(); + + for (index, cmd) in pipeline.cmd_iter().enumerate() { + match cluster_routing::RoutingInfo::for_routable(cmd).unwrap_or( + cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random), + ) { + cluster_routing::RoutingInfo::SingleNode(route) => { + handle_pipeline_single_node_routing( + &mut pipelines_by_connection, + cmd.clone(), + route.into(), + core.clone(), + index, + ) + .await + .map_err(|(_target, err)| err)?; + } + + cluster_routing::RoutingInfo::MultiNode((multi_node_routing, response_policy)) => { + //save the routing info and response policy, so we will be able to aggregate the results later + response_policies.push((index, multi_node_routing.clone(), response_policy)); + match multi_node_routing { + MultipleNodeRoutingInfo::AllNodes | MultipleNodeRoutingInfo::AllMasters => { + let connections: Vec<_> = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + if matches!(multi_node_routing, MultipleNodeRoutingInfo::AllNodes) { + lock.all_node_connections().collect() + } else { + lock.all_primary_connections().collect() + } + }; + for (inner_index, (address, conn)) in connections.into_iter().enumerate() { + add_command_to_node_pipeline_map( + &mut pipelines_by_connection, + address, + conn.await, + cmd.clone(), + index, + Some(inner_index), + ); + } + } + MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { + handle_pipeline_multi_slot_routing( + &mut pipelines_by_connection, + core.clone(), + cmd, + index, + slots, + ) + .await; + } + } + } + } + } + Ok((pipelines_by_connection, response_policies)) +} + +/// Executes a pipeline of commands on a specified node. +/// +/// This function sends a batch of commands (pipeline) to the specified node for execution. +/// +/// ### Parameters: +/// - `address`: The address of the target node where the pipeline commands should be executed. +/// - `node_context`: The `NodePipelineContext` containing the pipeline commands and the associated connection. +/// +/// ### Returns: +/// - `Ok((Vec<(usize, Option)>, Vec, String))`: +/// - A vector of command indices (`usize`) and their respective inner indices (`Option`) in the pipeline. +/// - A vector of `Value` objects representing the responses from the executed pipeline. +/// - The address of the node where the pipeline was executed. +/// - `Err((OperationTarget, RedisError))`: +/// - An error tuple containing the target operation and the corresponding error details if execution fails. +pub async fn execute_pipeline_on_node( + address: String, + node_context: NodePipelineContext, +) -> Result<(Vec<(usize, Option)>, Vec, String), (OperationTarget, RedisError)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let count = node_context.pipeline.len(); + let result = + ClusterConnInner::try_pipeline_request(Arc::new(node_context.pipeline), 0, count, async { + Ok((address.clone(), node_context.connection)) + }) + .await?; + + match result { + Response::Multiple(values) => Ok((node_context.command_indices, values, address)), + _ => Err(( + OperationTarget::FanOut, + RedisError::from((ErrorKind::ResponseError, "Unsupported response type")), + )), + } +} + +/// Adds the result of a pipeline command to the `values_and_addresses` collection. +/// +/// This function updates the `values_and_addresses` vector at the given `index` and optionally at the +/// `inner_index` if provided. If `inner_index` is `Some`, it ensures that the vector at that index is large enough +/// to hold the value and address at the specified position, resizing it if necessary. If `inner_index` is `None`, +/// the value and address are simply appended to the vector. +/// +/// # Parameters +/// - `values_and_addresses`: A mutable reference to a vector of vectors that stores the results of pipeline commands. +/// - `index`: The index in `values_and_addresses` where the result should be stored. +/// - `inner_index`: An optional index within the vector at `index`, used to store the result at a specific position. +/// - `value`: The result value to store. +/// - `address`: The address associated with the result. +pub fn add_pipeline_result( + values_and_addresses: &mut [Vec<(Value, String)>], + index: usize, + inner_index: Option, + value: Value, + address: String, +) { + match inner_index { + Some(inner_index) => { + // Ensure the vector at the given index is large enough to hold the value and address at the specified position + if values_and_addresses[index].len() <= inner_index { + values_and_addresses[index].resize(inner_index + 1, (Value::Nil, "".to_string())); + } + values_and_addresses[index][inner_index] = (value, address); + } + None => values_and_addresses[index].push((value, address)), + } +} + +/// Collects and processes the results of pipeline tasks from a `tokio::task::JoinSet`. +/// +/// This function iteratively retrieves completed tasks from the provided `join_set` and processes +/// their results. Successful results are added to the `values_and_addresses` vector using the +/// indices and values provided. If an error occurs in any task, it is recorded and returned as +/// the first encountered error. +/// +/// # Parameters +/// - `join_set`: A mutable reference to a `tokio::task::JoinSet` containing tasks that return: +/// - `Ok((Vec<(usize, Option)>, Vec, String))`: On success, a tuple of: +/// - A list of indices and optional inner indices corresponding to pipeline commands. +/// - A list of `Value` results from the executed pipeline. +/// - The `String` address where the task was executed. +/// - `Err((OperationTarget, RedisError))`: On failure, an error detailing the operation target and the Redis error. +/// - `values_and_addresses`: A mutable slice of vectors, where each vector corresponds to a pipeline +/// command's results. This is updated with the values and addresses from successful tasks. +/// +/// # Returns +/// - `Ok(Some((OperationTarget, RedisError)))`: If one or more tasks encountered an error, returns the first error. +/// - `Ok(None)`: If all tasks completed successfully. +/// - `Err((OperationTarget::FanOut, RedisError))`: If a task failed unexpectedly (e.g., due to a panic). +/// +/// +/// # Behavior +/// - Processes successful results by calling `add_pipeline_result` to update the +/// `values_and_addresses` vector with the indices, values, and addresses. +/// - Records the first error encountered and continues processing the remaining tasks. +/// - Returns `Ok(None)` if all tasks complete successfully. +#[allow(clippy::type_complexity)] +pub async fn collect_pipeline_tasks( + join_set: &mut tokio::task::JoinSet< + Result<(Vec<(usize, Option)>, Vec, String), (OperationTarget, RedisError)>, + >, + values_and_addresses: &mut [Vec<(Value, String)>], +) -> Result, (OperationTarget, RedisError)> { + let mut first_error = None; + + while let Some(future_result) = join_set.join_next().await { + match future_result { + Ok(Ok((indices, values, address))) => { + for ((index, inner_index), value) in indices.into_iter().zip(values) { + add_pipeline_result( + values_and_addresses, + index, + inner_index, + value, + address.clone(), + ); + } + } + Ok(Err(e)) => first_error = first_error.or(Some(e)), + Err(e) => { + return Err(( + OperationTarget::FanOut, + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()).into(), + )) + } + } + } + Ok(first_error) +} + +// This function returns the rout for a given pipeline. +// The function goes over the commands in the pipeline, checks that all key-based commands are routed to the same slot, +// and returns the route for that specific node. +// If the pipelines contains no key-base commands, the function returns None. +// For non-anomic pipeline, the function will return None, regardless of the commands in it. +pub fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { + fn route_for_command(cmd: &Cmd) -> Option { + match cluster_routing::RoutingInfo::for_routable(cmd) { + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(route), + )) => Some(route), + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) => Some(Route::new_random_primary()), + Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { + .. + })) => None, + None => None, + } + } + + if pipeline.is_atomic() { + // Find first specific slot and send to it. There's no need to check If later commands + // should be routed to a different slot, since the server will return an error indicating this. + pipeline + .cmd_iter() + .map(route_for_command) + .try_fold(None, |chosen_route, next_cmd_route| { + match (chosen_route, next_cmd_route) { + (None, _) => Ok(next_cmd_route), + (_, None) => Ok(chosen_route), + (Some(chosen_route), Some(next_cmd_route)) => { + if chosen_route.slot() != next_cmd_route.slot() { + Err(( + ErrorKind::CrossSlot, + "Received crossed slots in transaction", + ) + .into()) + } else { + Ok(Some(chosen_route)) + } + } + } + }) + } else { + // Pipeline is not atomic, so we can have commands with different slots. + Ok(None) + } +} From 021ce24bc44755edbf09a9a01376f3c9d777637b Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Mon, 27 Jan 2025 11:09:35 +0000 Subject: [PATCH 05/15] save doc Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 80 +++++++++++++++---- .../src/cluster_async/pipeline_routing.rs | 19 +++-- 2 files changed, 77 insertions(+), 22 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3e13d0e4f6..0e4d90b105 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -45,6 +45,7 @@ use crate::{ use dashmap::DashMap; use pipeline_routing::{ collect_pipeline_tasks, execute_pipeline_on_node, map_pipeline_to_nodes, route_for_pipeline, + PipelineResponses, }; use std::{ collections::{HashMap, HashSet}, @@ -283,6 +284,11 @@ where route: SingleNodeRoutingInfo, ) -> RedisResult> { let (sender, receiver) = oneshot::channel(); + let connections: Vec<_> = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + + lock.all_primary_connections().collect() + }; self.0 .send(Message { cmd: CmdArg::Pipeline { @@ -2106,20 +2112,60 @@ where .map_err(|err| (OperationTarget::Node { address }, err)) } - /// Aggregates responses for multi-node commands and updates the `values_and_addresses` vector. + /// Aggregates pipeline responses for multi-node commands and updates the `pipeline_responses` vector. + /// + /// Pipeline commands with multi-node routing info, will be splitted into multiple pipelines, therefor, after executing each pipeline and storing the results in `pipeline_responses`, + /// the multi-node commands will contain more than one response (one for each sub-pipeline that contained the command). This responses must be aggregated into a single response, based on the proper response policy. /// /// This function processes the provided `response_policies`, which contain information about how responses /// from multiple nodes should be aggregated. For each policy: - /// - It collects responses and their source node addresses from the corresponding entry in `values_and_addresses`. + /// - It collects the multiple responses and their source node addresses from the corresponding entry in `pipeline_responses`. /// - Uses the routing information and optional response policy to aggregate the responses into a single result. /// - /// The aggregated result replaces the existing entries in `values_and_addresses` for the given command index. + /// The aggregated result replaces the existing entries in `pipeline_responses` for the given command index, changing the multiple responses to the command into a single aggregated response. + /// + /// After the execution of this function, all entries in `pipeline_responses` will contain a single response for each command. + /// + /// # Arguments + /// * `pipeline_responses` - A mutable reference to a vector of vectors, where each inner vector contains tuples of responses and their corresponding node addresses. + /// * `response_policies` - A vector of tuples, each containing: + /// - The index of the command in the pipeline that has a multi-node routing info. + /// - The routing information for the command. + /// - An optional response policy that dictates how the responses should be aggregated. + /// + /// # Returns + /// * `Result<(), (OperationTarget, RedisError)>` - Returns `Ok(())` if the aggregation is successful, or an error tuple containing the operation target and the Redis error if it fails. + /// + /// # Example + /// Suppose we have a pipeline with multiple commands that were split and executed on different nodes. + /// This function will aggregate the responses for commands that were split across multiple nodes. + /// + /// ```rust,no_run + /// // Example pipeline with commands that might be split across nodes + /// let mut pipeline_responses = vec![ + /// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string())], + /// vec![(Value::Int(3), "node3".to_string())], + /// ]; + /// let response_policies = vec![ + /// (0, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::Aggregate(AggregateOp::Sum))), + /// (1, MultipleNodeRoutingInfo::AllNodes, None), + /// ]; + /// + /// // Aggregating the responses + /// aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies).await.unwrap(); + /// + /// // After aggregation, pipeline_responses will be updated with aggregated results + /// assert_eq!(pipeline_responses[0], vec![(Value::Int(3), "".to_string())]); + /// assert_eq!(pipeline_responses[1], vec![(Value::Int(3), "".to_string())]); + /// ``` + /// + /// This function is essential for handling multi-node commands in a Redis cluster, ensuring that responses from different nodes are correctly aggregated and processed. async fn aggregate_pipeline_multi_node_commands( - values_and_addresses: &mut [Vec<(Value, String)>], + pipeline_responses: &mut PipelineResponses, response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, ) -> Result<(), (OperationTarget, RedisError)> { for (index, routing_info, response_policy) in response_policies { - let response_receivers = values_and_addresses[index] + let response_receivers = pipeline_responses[index] .iter() .map(|(value, address)| { let (sender, receiver) = oneshot::channel(); @@ -2133,7 +2179,7 @@ where .await .map_err(|err| (OperationTarget::FanOut, err))?; - values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; + pipeline_responses[index] = vec![(aggregated_response, "".to_string())]; } Ok(()) } @@ -2167,14 +2213,14 @@ where map_pipeline_to_nodes(&pipeline, core.clone()) .await .map_err(|err| (OperationTarget::FanOut, err))?; - // Stores responses along with their source node addresses for each pipeline command. - // - // The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. - // Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command - // might produce multiple responses, each from a different node. By storing the responses with their - // respective node addresses, we ensure that we have all the information needed to aggregate the results later. - // This structure is essential for handling scenarios where responses from multiple nodes must be combined. - let mut values_and_addresses = vec![Vec::new(); pipeline.len()]; + + // Initialize `PipelineResponses` to store responses for each pipeline command. + // This will be used to store the responses from the different sub-pipelines to the pipeline commands. + // A command can have one or more responses (e.g MultiNode commands). + // Each entry in `PipelineResponses` corresponds to a command in the original pipeline and contains + // a vector of tuples where each tuple holds a response to the command and the address of the node that provided it. + let mut pipeline_responses: PipelineResponses = + vec![Vec::new(); pipeline.len()]; let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks @@ -2186,7 +2232,7 @@ where // Wait for all spawned tasks to complete let first_error = - collect_pipeline_tasks(&mut join_set, &mut values_and_addresses).await?; + collect_pipeline_tasks(&mut join_set, &mut pipeline_responses).await?; // Check for errors if let Some(first_error) = first_error { @@ -2195,13 +2241,13 @@ where // Process response policies after all tasks are complete Self::aggregate_pipeline_multi_node_commands( - &mut values_and_addresses, + &mut pipeline_responses, response_policies, ) .await?; // Collect final responses - for mut value in values_and_addresses.into_iter() { + for mut value in pipeline_responses.into_iter() { // unwrap() is safe here because we know that the vector is not empty final_responses.push(value.pop().unwrap().0); } diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs index 1b980c479d..2b1db3396e 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -47,6 +47,15 @@ impl NodePipelineContext { } } +// `NodeResponse` represents a response from a node along with its source node address. +// `PipelineResponses` represents the responses for each pipeline command. +// The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. +// Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command +// might produce multiple responses, each from a different node. By storing the responses with their +// respective node addresses, we ensure that we have all the information needed to aggregate the results later. +type NodeResponse = (Value, String); +pub type PipelineResponses = Vec>; + /// Adds a command to the pipeline map for a specific node address. pub fn add_command_to_node_pipeline_map( pipeline_map: &mut NodePipelineMap, @@ -380,11 +389,11 @@ pub async fn collect_pipeline_tasks( Ok(first_error) } -// This function returns the rout for a given pipeline. -// The function goes over the commands in the pipeline, checks that all key-based commands are routed to the same slot, -// and returns the route for that specific node. -// If the pipelines contains no key-base commands, the function returns None. -// For non-anomic pipeline, the function will return None, regardless of the commands in it. +/// This function returns the route for a given pipeline. +/// The function goes over the commands in the pipeline, checks that all key-based commands are routed to the same slot, +/// and returns the route for that specific node. +/// If the pipeline contains no key-based commands, the function returns None. +/// For non-atomic pipelines, the function will return None, regardless of the commands in it. pub fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { fn route_for_command(cmd: &Cmd) -> Option { match cluster_routing::RoutingInfo::for_routable(cmd) { From 06e352ce03621315b0b31e21a14773652f54bb17 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Mon, 27 Jan 2025 15:50:55 +0000 Subject: [PATCH 06/15] save initial work Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 101 +++++++++++++++--- .../src/cluster_async/pipeline_routing.rs | 4 +- glide-core/redis-rs/redis/src/pipeline.rs | 24 +++++ 3 files changed, 112 insertions(+), 17 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 0e4d90b105..d334d6b682 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -44,8 +44,8 @@ use crate::{ }; use dashmap::DashMap; use pipeline_routing::{ - collect_pipeline_tasks, execute_pipeline_on_node, map_pipeline_to_nodes, route_for_pipeline, - PipelineResponses, + add_pipeline_result, collect_pipeline_tasks, execute_pipeline_on_node, map_pipeline_to_nodes, + route_for_pipeline, PipelineResponses, }; use std::{ collections::{HashMap, HashSet}, @@ -284,11 +284,6 @@ where route: SingleNodeRoutingInfo, ) -> RedisResult> { let (sender, receiver) = oneshot::channel(); - let connections: Vec<_> = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - - lock.all_primary_connections().collect() - }; self.0 .send(Message { cmd: CmdArg::Pipeline { @@ -2193,7 +2188,7 @@ where count, route, } => { - if pipeline.is_atomic() { + if pipeline.is_atomic() || pipeline.is_sub_pipeline() { // If the pipeline is atomic (i.e., a transaction), we can send it as is, with no need to split it into sub-pipelines. Self::try_pipeline_request( pipeline, @@ -2223,21 +2218,94 @@ where vec![Vec::new(); pipeline.len()]; let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); - let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks + //let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks + + let mut receivers = Vec::new(); + let mut pending_requests = Vec::new(); + let mut addresses_and_indices = Vec::new(); for (address, node_context) in pipelines_by_connection { - // Spawn a new task to execute the pipeline on the node - join_set.spawn(execute_pipeline_on_node(address, node_context)); + let (sender, receiver) = oneshot::channel(); + let count = node_context.pipeline.len(); + receivers.push((Some(address.clone()), receiver)); + pending_requests.push(Some(PendingRequest { + retry: 0, + sender, + info: RequestInfo { + cmd: CmdArg::Pipeline { + pipeline: node_context.pipeline.into(), + offset: 0, + count, + route: InternalSingleNodeRouting::::ByAddress( + address.clone(), + ), + }, + }, + })); + addresses_and_indices.push((address, node_context.command_indices)); } + core.pending_requests + .lock() + .unwrap() + .extend(pending_requests.into_iter().flatten()); + + // Spawn a new task to execute the pipeline on the node + // join_set.spawn(execute_pipeline_on_node(address, node_context)); + + // Wait for all receivers to complete and collect the responses + let responses: Vec<_> = futures::future::join_all( + receivers.into_iter().map(|(_, receiver)| receiver), + ) + .await + .into_iter() + .collect(); + + // Process the responses and update the pipeline_responses + for (i, response) in responses.into_iter().enumerate() { + match response { + Ok(Ok(Response::Multiple(values))) => { + for ((index, inner_index), value) in + addresses_and_indices[i].1.iter().cloned().zip(values) + { + add_pipeline_result( + &mut pipeline_responses, + index, + inner_index, + value, + addresses_and_indices[i].0.clone(), + ); + } + } + Ok(Err(err)) => { + return Err(( + OperationTarget::Node { + address: addresses_and_indices[i].0.clone(), + }, + err, + )); + } + _ => { + return Err(( + OperationTarget::Node { + address: addresses_and_indices[i].0.clone(), + }, + RedisError::from(( + ErrorKind::ResponseError, + "Failed to receive response", + )), + )); + } + } + } // Wait for all spawned tasks to complete - let first_error = - collect_pipeline_tasks(&mut join_set, &mut pipeline_responses).await?; + //let first_error = + // collect_pipeline_tasks(&mut join_set, &mut pipeline_responses).await?; // Check for errors - if let Some(first_error) = first_error { - return Err(first_error); - } + //if let Some(first_error) = first_error { + // return Err(first_error); + //} // Process response policies after all tasks are complete Self::aggregate_pipeline_multi_node_commands( @@ -2255,6 +2323,7 @@ where Ok(Response::Multiple(final_responses)) } } + CmdArg::ClusterScan { cluster_scan_args, .. } => { diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs index 2b1db3396e..4a2fa411db 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -33,8 +33,10 @@ pub type NodePipelineMap = HashMap>; impl NodePipelineContext { fn new(connection: C) -> Self { + let mut pipeline = Pipeline::new(); + pipeline.sub_pipeline(); Self { - pipeline: Pipeline::new(), + pipeline, connection, command_indices: Vec::new(), } diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index 813961156a..1806ff5605 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -12,6 +12,7 @@ pub struct Pipeline { commands: Vec, transaction_mode: bool, ignored_commands: HashSet, + is_sub_pipeline: bool, } /// A pipeline allows you to send multiple commands in one go to the @@ -48,6 +49,7 @@ impl Pipeline { commands: Vec::with_capacity(capacity), transaction_mode: false, ignored_commands: HashSet::new(), + is_sub_pipeline: false, } } @@ -70,6 +72,23 @@ impl Pipeline { self } + /// This enables sub-pipeline mode. In sub-pipeline mode, the whole pipeline is enclosed in + /// `MULTI`/`EXEC` and the return value is a nested array of results. This is useful when + /// you want to execute a pipeline inside another pipeline. + /// ```rust,no_run + /// # let client = redis::Client::open("redis:// + /// 127.0.0.1/").unwrap(); + /// # let mut con = client.get_connection(None).unwrap(); + /// let (k1, k2) : (i32, i32) = redis::pipe() + /// .atomic() + /// .cmd("SET").arg("key_1").arg(42).ignore() + /// + /// + pub fn sub_pipeline(&mut self) -> &mut Pipeline { + self.is_sub_pipeline = true; + self + } + /// Returns the encoded pipeline commands. pub fn get_packed_pipeline(&self) -> Vec { encode_pipeline(&self.commands, self.transaction_mode) @@ -221,6 +240,11 @@ impl Pipeline { pub fn is_empty(&self) -> bool { self.commands.is_empty() } + + /// + pub fn is_sub_pipeline(&self) -> bool { + self.is_sub_pipeline + } } fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec { From 0b4aeb381297ab85fd78b799db402b3d3f65af03 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Tue, 28 Jan 2025 16:08:11 +0000 Subject: [PATCH 07/15] add sub-pipelines to the pending requests queue Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 260 ++++++------- .../src/cluster_async/pipeline_routing.rs | 342 +++++++++--------- glide-core/redis-rs/redis/src/pipeline.rs | 2 +- 3 files changed, 282 insertions(+), 322 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index d334d6b682..e008aad4a1 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -44,7 +44,7 @@ use crate::{ }; use dashmap::DashMap; use pipeline_routing::{ - add_pipeline_result, collect_pipeline_tasks, execute_pipeline_on_node, map_pipeline_to_nodes, + collect_pipeline_requests, map_pipeline_to_nodes, process_pipeline_responses, route_for_pipeline, PipelineResponses, }; use std::{ @@ -2093,7 +2093,7 @@ where .map_err(|err| (address.into(), err)) } - pub async fn try_pipeline_request( + async fn try_pipeline_request( pipeline: Arc, offset: usize, count: usize, @@ -2107,78 +2107,6 @@ where .map_err(|err| (OperationTarget::Node { address }, err)) } - /// Aggregates pipeline responses for multi-node commands and updates the `pipeline_responses` vector. - /// - /// Pipeline commands with multi-node routing info, will be splitted into multiple pipelines, therefor, after executing each pipeline and storing the results in `pipeline_responses`, - /// the multi-node commands will contain more than one response (one for each sub-pipeline that contained the command). This responses must be aggregated into a single response, based on the proper response policy. - /// - /// This function processes the provided `response_policies`, which contain information about how responses - /// from multiple nodes should be aggregated. For each policy: - /// - It collects the multiple responses and their source node addresses from the corresponding entry in `pipeline_responses`. - /// - Uses the routing information and optional response policy to aggregate the responses into a single result. - /// - /// The aggregated result replaces the existing entries in `pipeline_responses` for the given command index, changing the multiple responses to the command into a single aggregated response. - /// - /// After the execution of this function, all entries in `pipeline_responses` will contain a single response for each command. - /// - /// # Arguments - /// * `pipeline_responses` - A mutable reference to a vector of vectors, where each inner vector contains tuples of responses and their corresponding node addresses. - /// * `response_policies` - A vector of tuples, each containing: - /// - The index of the command in the pipeline that has a multi-node routing info. - /// - The routing information for the command. - /// - An optional response policy that dictates how the responses should be aggregated. - /// - /// # Returns - /// * `Result<(), (OperationTarget, RedisError)>` - Returns `Ok(())` if the aggregation is successful, or an error tuple containing the operation target and the Redis error if it fails. - /// - /// # Example - /// Suppose we have a pipeline with multiple commands that were split and executed on different nodes. - /// This function will aggregate the responses for commands that were split across multiple nodes. - /// - /// ```rust,no_run - /// // Example pipeline with commands that might be split across nodes - /// let mut pipeline_responses = vec![ - /// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string())], - /// vec![(Value::Int(3), "node3".to_string())], - /// ]; - /// let response_policies = vec![ - /// (0, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::Aggregate(AggregateOp::Sum))), - /// (1, MultipleNodeRoutingInfo::AllNodes, None), - /// ]; - /// - /// // Aggregating the responses - /// aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies).await.unwrap(); - /// - /// // After aggregation, pipeline_responses will be updated with aggregated results - /// assert_eq!(pipeline_responses[0], vec![(Value::Int(3), "".to_string())]); - /// assert_eq!(pipeline_responses[1], vec![(Value::Int(3), "".to_string())]); - /// ``` - /// - /// This function is essential for handling multi-node commands in a Redis cluster, ensuring that responses from different nodes are correctly aggregated and processed. - async fn aggregate_pipeline_multi_node_commands( - pipeline_responses: &mut PipelineResponses, - response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, - ) -> Result<(), (OperationTarget, RedisError)> { - for (index, routing_info, response_policy) in response_policies { - let response_receivers = pipeline_responses[index] - .iter() - .map(|(value, address)| { - let (sender, receiver) = oneshot::channel(); - let _ = sender.send(Ok(Response::Single(value.clone()))); - (Some(address.clone()), receiver) - }) - .collect(); - - let aggregated_response = - Self::aggregate_results(response_receivers, &routing_info, response_policy) - .await - .map_err(|err| (OperationTarget::FanOut, err))?; - - pipeline_responses[index] = vec![(aggregated_response, "".to_string())]; - } - Ok(()) - } - async fn try_request(info: RequestInfo, core: Core) -> OperationResult { match info.cmd { CmdArg::Cmd { cmd, routing } => Self::try_cmd_request(cmd, routing, core).await, @@ -2189,7 +2117,7 @@ where route, } => { if pipeline.is_atomic() || pipeline.is_sub_pipeline() { - // If the pipeline is atomic (i.e., a transaction), we can send it as is, with no need to split it into sub-pipelines. + // If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines. Self::try_pipeline_request( pipeline, offset, @@ -2198,7 +2126,7 @@ where ) .await } else { - // The pipeline is not atomic, we need to split it into sub-pipelines and send them separately. + // The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately. // Distribute pipeline commands across cluster nodes based on routing information. // Returns: @@ -2218,94 +2146,38 @@ where vec![Vec::new(); pipeline.len()]; let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); - //let mut join_set = tokio::task::JoinSet::new(); // Manage spawned tasks - - let mut receivers = Vec::new(); - let mut pending_requests = Vec::new(); - let mut addresses_and_indices = Vec::new(); - - for (address, node_context) in pipelines_by_connection { - let (sender, receiver) = oneshot::channel(); - let count = node_context.pipeline.len(); - receivers.push((Some(address.clone()), receiver)); - pending_requests.push(Some(PendingRequest { - retry: 0, - sender, - info: RequestInfo { - cmd: CmdArg::Pipeline { - pipeline: node_context.pipeline.into(), - offset: 0, - count, - route: InternalSingleNodeRouting::::ByAddress( - address.clone(), - ), - }, - }, - })); - addresses_and_indices.push((address, node_context.command_indices)); - } + // Processes the sub-pipelines to generate pending requests for execution on specific nodes. + // Each pending request encapsulates all the necessary details for executing commands on a node. + // + // Returns: + // - `receivers`: A vector of `oneshot::Receiver` instances, enabling asynchronous retrieval + // of the results from the execution of each sub-pipeline. + // - `pending_requests`: A vector of `PendingRequest` objects, each representing a scheduled command + // for execution on a node. + // - `addresses_and_indices`: A vector of tuples where each tuple contains a node address and a list + // of command indices for each sub-pipeline, allowing the results to be mapped back to their original command within the original pipeline. + let (receivers, pending_requests, addresses_and_indices) = + collect_pipeline_requests(pipelines_by_connection); + + // Add the pending requests to the pending_requests queue core.pending_requests .lock() .unwrap() - .extend(pending_requests.into_iter().flatten()); - - // Spawn a new task to execute the pipeline on the node - // join_set.spawn(execute_pipeline_on_node(address, node_context)); + .extend(pending_requests.into_iter()); // Wait for all receivers to complete and collect the responses - let responses: Vec<_> = futures::future::join_all( - receivers.into_iter().map(|(_, receiver)| receiver), - ) - .await - .into_iter() - .collect(); + let responses: Vec<_> = futures::future::join_all(receivers.into_iter()) + .await + .into_iter() + .collect(); // Process the responses and update the pipeline_responses - for (i, response) in responses.into_iter().enumerate() { - match response { - Ok(Ok(Response::Multiple(values))) => { - for ((index, inner_index), value) in - addresses_and_indices[i].1.iter().cloned().zip(values) - { - add_pipeline_result( - &mut pipeline_responses, - index, - inner_index, - value, - addresses_and_indices[i].0.clone(), - ); - } - } - Ok(Err(err)) => { - return Err(( - OperationTarget::Node { - address: addresses_and_indices[i].0.clone(), - }, - err, - )); - } - _ => { - return Err(( - OperationTarget::Node { - address: addresses_and_indices[i].0.clone(), - }, - RedisError::from(( - ErrorKind::ResponseError, - "Failed to receive response", - )), - )); - } - } - } - // Wait for all spawned tasks to complete - //let first_error = - // collect_pipeline_tasks(&mut join_set, &mut pipeline_responses).await?; - - // Check for errors - //if let Some(first_error) = first_error { - // return Err(first_error); - //} + process_pipeline_responses( + &mut pipeline_responses, + responses, + addresses_and_indices, + )?; // Process response policies after all tasks are complete Self::aggregate_pipeline_multi_node_commands( @@ -2348,7 +2220,81 @@ where } } - pub async fn get_connection( + /// Aggregates pipeline responses for multi-node commands and updates the `pipeline_responses` vector. + /// + /// Pipeline commands with multi-node routing info, will be splitted into multiple pipelines, therefore, after executing each pipeline and storing the results in `pipeline_responses`, + /// the multi-node commands will contain more than one response (one for each sub-pipeline that contained the command). This responses must be aggregated into a single response, based on the proper response policy. + /// + /// This function processes the provided `response_policies`, which contain information about how responses from multiple nodes should be aggregated. + /// For each policy: + /// - It collects the multiple responses and their source node addresses from the corresponding entry in `pipeline_responses`. + /// - Uses the routing information and optional response policy to aggregate the responses into a single result. + /// + /// The aggregated result replaces the existing entries in `pipeline_responses` for the given command index, changing the multiple responses to the command into a single aggregated response. + /// + /// After the execution of this function, all entries in `pipeline_responses` will contain a single response for each command. + /// + /// # Arguments + /// * `pipeline_responses` - A mutable reference to a vector of vectors, where each inner vector contains tuples of responses and their corresponding node addresses. + /// * `response_policies` - A vector of tuples, each containing: + /// - The index of the command in the pipeline that has a multi-node routing info. + /// - The routing information for the command. + /// - An optional response policy that dictates how the responses should be aggregated. + /// + /// # Returns + /// * `Result<(), (OperationTarget, RedisError)>` - Returns `Ok(())` if the aggregation is successful, or an error tuple containing the operation target and the Redis error if it fails. + /// + /// # Example + /// Suppose we have a pipeline with multiple commands that were split and executed on different nodes. + /// This function will aggregate the responses for commands that were split across multiple nodes. + /// + /// ```rust,no_run + /// // Example pipeline with commands that might be split across nodes + /// let mut pipeline_responses = vec![ + /// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string())], (Value::Int(3), "node3".to_string())] // represents `DBSIZE`` + /// vec![(Value::Int(3), "node3".to_string())], + /// vec![(Value::SimpleString("PONG"), "node1".to_string()), (Value::SimpleString("PONG"), "node2".to_string()), (Value::SimpleString("PONG"), "node3".to_string())], // represents `PING` + /// ]; + /// let response_policies = vec![ + /// (0, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::Aggregate(AggregateOp::Sum))), + /// (2, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::AllSuceeded), + /// ]; + /// + /// // Aggregating the responses + /// aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies).await.unwrap(); + /// + /// // After aggregation, pipeline_responses will be updated with aggregated results + /// assert_eq!(pipeline_responses[0], vec![(Value::Int(6), "".to_string())]); + /// assert_eq!(pipeline_responses[1], vec![(Value::Int(3), "node3".to_string())]); + /// assert_eq!(pipeline_responses[2], vec![(Value::SimpleString("PONG"), "".to_string())]); + /// ``` + /// + /// This function is essential for handling multi-node commands in a Redis cluster, ensuring that responses from different nodes are correctly aggregated and processed. + async fn aggregate_pipeline_multi_node_commands( + pipeline_responses: &mut PipelineResponses, + response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ) -> Result<(), (OperationTarget, RedisError)> { + for (index, routing_info, response_policy) in response_policies { + let response_receivers = pipeline_responses[index] + .iter() + .map(|(value, address)| { + let (sender, receiver) = oneshot::channel(); + let _ = sender.send(Ok(Response::Single(value.clone()))); + (Some(address.clone()), receiver) + }) + .collect(); + + let aggregated_response = + Self::aggregate_results(response_receivers, &routing_info, response_policy) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + pipeline_responses[index] = vec![(aggregated_response, "".to_string())]; + } + Ok(()) + } + + async fn get_connection( routing: InternalSingleNodeRouting, core: Core, cmd: Option>, diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs index 4a2fa411db..49e6422b86 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -11,8 +11,14 @@ use std::sync::Arc; use crate::cluster_async::MUTEX_READ_ERR; use crate::Pipeline; +use futures::FutureExt; use rand::prelude::IteratorRandom; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::RecvError; +use super::CmdArg; +use super::PendingRequest; +use super::RequestInfo; use super::{Core, InternalSingleNodeRouting, OperationTarget, Response}; /// Represents a pipeline command execution context for a specific node @@ -49,15 +55,18 @@ impl NodePipelineContext { } } -// `NodeResponse` represents a response from a node along with its source node address. -// `PipelineResponses` represents the responses for each pipeline command. -// The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. -// Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command -// might produce multiple responses, each from a different node. By storing the responses with their -// respective node addresses, we ensure that we have all the information needed to aggregate the results later. +/// `NodeResponse` represents a response from a node along with its source node address. type NodeResponse = (Value, String); +/// `PipelineResponses` represents the responses for each pipeline command. +/// The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. +/// Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command +/// might produce multiple responses, each from a different node. By storing the responses with their +/// respective node addresses, we ensure that we have all the information needed to aggregate the results later. pub type PipelineResponses = Vec>; +/// `AddressAndIndices` represents the address of a node and the indices of commands associated with that node. +type AddressAndIndices = Vec<(String, Vec<(usize, Option)>)>; + /// Adds a command to the pipeline map for a specific node address. pub fn add_command_to_node_pipeline_map( pipeline_map: &mut NodePipelineMap, @@ -88,86 +97,6 @@ pub fn add_command_to_random_existing_node( } } -/// Handles multi-slot commands within a pipeline. -/// -/// This function processes commands with routing information indicating multiple slots -/// (e.g., `MSET` or `MGET`), splits them into sub-commands based on their target slots, -/// and assigns these sub-commands to the appropriate pipelines for the corresponding nodes. -/// -/// ### Parameters: -/// - `pipelines_by_connection`: A mutable map of node pipelines where the commands will be added. -/// - `core`: The core structure that provides access to connection management. -/// - `cmd`: The original multi-slot command that needs to be split. -/// - `index`: The index of the original command within the pipeline. -/// - `slots`: A vector containing routing information. Each entry includes: -/// - `Route`: The specific route for the slot. -/// - `Vec`: Indices of the keys within the command that map to this slot. -pub async fn handle_pipeline_multi_slot_routing( - pipelines_by_connection: &mut NodePipelineMap, - core: Core, - cmd: &Cmd, - index: usize, - slots: Vec<(Route, Vec)>, -) where - C: Clone, -{ - // inner_index is used to keep track of the index of the sub-command inside cmd - for (inner_index, (route, indices)) in slots.iter().enumerate() { - let conn = { - let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); - lock.connection_for_route(route) - }; - if let Some((address, conn)) = conn { - // create the sub-command for the slot - let new_cmd = command_for_multi_slot_indices(cmd, indices.iter()); - add_command_to_node_pipeline_map( - pipelines_by_connection, - address, - conn.await, - new_cmd, - index, - Some(inner_index), - ); - } - } -} - -/// Handles pipeline commands that require single-node routing. -/// -/// This function processes commands with `SingleNode` routing information and determines -/// the appropriate handling based on the routing type. -/// -/// ### Parameters: -/// - `pipeline_map`: A mutable reference to the `NodePipelineMap`, representing the pipelines grouped by nodes. -/// - `cmd`: The command to process and add to the appropriate node pipeline. -/// - `routing`: The single-node routing information, which determines how the command is routed. -/// - `core`: The core object responsible for managing connections and routing logic. -/// - `index`: The position of the command in the overall pipeline. -pub async fn handle_pipeline_single_node_routing( - pipeline_map: &mut NodePipelineMap, - cmd: Cmd, - routing: InternalSingleNodeRouting, - core: Core, - index: usize, -) -> Result<(), (OperationTarget, RedisError)> -where - C: Clone + ConnectionLike + Connect + Send + Sync + 'static, -{ - if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { - // The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline - add_command_to_random_existing_node(pipeline_map, cmd, index) - .map_err(|err| (OperationTarget::NotFound, err))?; - Ok(()) - } else { - let (address, conn) = - ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) - .await - .map_err(|err| (OperationTarget::NotFound, err))?; - add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); - Ok(()) - } -} - /// Maps the commands in a pipeline to the appropriate nodes based on their routing information. /// /// This function processes each command in the given pipeline, determines its routing information, @@ -259,59 +188,154 @@ where Ok((pipelines_by_connection, response_policies)) } -/// Executes a pipeline of commands on a specified node. +/// Handles pipeline commands that require single-node routing. /// -/// This function sends a batch of commands (pipeline) to the specified node for execution. +/// This function processes commands with `SingleNode` routing information and determines +/// the appropriate handling based on the routing type. /// /// ### Parameters: -/// - `address`: The address of the target node where the pipeline commands should be executed. -/// - `node_context`: The `NodePipelineContext` containing the pipeline commands and the associated connection. +/// - `pipeline_map`: A mutable reference to the `NodePipelineMap`, representing the pipelines grouped by nodes. +/// - `cmd`: The command to process and add to the appropriate node pipeline. +/// - `routing`: The single-node routing information, which determines how the command is routed. +/// - `core`: The core object responsible for managing connections and routing logic. +/// - `index`: The position of the command in the overall pipeline. +pub async fn handle_pipeline_single_node_routing( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + routing: InternalSingleNodeRouting, + core: Core, + index: usize, +) -> Result<(), (OperationTarget, RedisError)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { + // The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline + add_command_to_random_existing_node(pipeline_map, cmd, index) + .map_err(|err| (OperationTarget::NotFound, err))?; + Ok(()) + } else { + let (address, conn) = + ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) + .await + .map_err(|err| (OperationTarget::NotFound, err))?; + add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); + Ok(()) + } +} + +/// Handles multi-slot commands within a pipeline. /// -/// ### Returns: -/// - `Ok((Vec<(usize, Option)>, Vec, String))`: -/// - A vector of command indices (`usize`) and their respective inner indices (`Option`) in the pipeline. -/// - A vector of `Value` objects representing the responses from the executed pipeline. -/// - The address of the node where the pipeline was executed. -/// - `Err((OperationTarget, RedisError))`: -/// - An error tuple containing the target operation and the corresponding error details if execution fails. -pub async fn execute_pipeline_on_node( - address: String, - node_context: NodePipelineContext, -) -> Result<(Vec<(usize, Option)>, Vec, String), (OperationTarget, RedisError)> +/// This function processes commands with routing information indicating multiple slots +/// (e.g., `MSET` or `MGET`), splits them into sub-commands based on their target slots, +/// and assigns these sub-commands to the appropriate pipelines for the corresponding nodes. +/// +/// ### Parameters: +/// - `pipelines_by_connection`: A mutable map of node pipelines where the commands will be added. +/// - `core`: The core structure that provides access to connection management. +/// - `cmd`: The original multi-slot command that needs to be split. +/// - `index`: The index of the original command within the pipeline. +/// - `slots`: A vector containing routing information. Each entry includes: +/// - `Route`: The specific route for the slot. +/// - `Vec`: Indices of the keys within the command that map to this slot. +pub async fn handle_pipeline_multi_slot_routing( + pipelines_by_connection: &mut NodePipelineMap, + core: Core, + cmd: &Cmd, + index: usize, + slots: Vec<(Route, Vec)>, +) where + C: Clone, +{ + // inner_index is used to keep track of the index of the sub-command inside cmd + for (inner_index, (route, indices)) in slots.iter().enumerate() { + let conn = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.connection_for_route(route) + }; + if let Some((address, conn)) = conn { + // create the sub-command for the slot + let new_cmd = command_for_multi_slot_indices(cmd, indices.iter()); + add_command_to_node_pipeline_map( + pipelines_by_connection, + address, + conn.await, + new_cmd, + index, + Some(inner_index), + ); + } + } +} + +/// Creates `PendingRequest` objects for each pipeline in the provided pipeline map. +/// +/// This function processes the given map of node pipelines and prepares each sub-pipeline for execution +/// by creating a `PendingRequest` containing all necessary details for execution. +/// Additionally, it sets up communication channels to asynchronously receive the results of each sub-pipeline's execution. +/// +/// Returns a tuple containing: +/// - **receivers**: A vector of `oneshot::Receiver` objects to receive the responses of the sub-pipeline executions. +/// - **pending_requests**: A vector of `PendingRequest` objects, each representing a pipeline scheduled for execution on a node. +/// - **addresses_and_indices**: A vector of tuples containing node addresses and their associated command indices for each sub-pipeline, +/// allowing the results to be mapped back to their original command within the original pipeline. +#[allow(clippy::type_complexity)] +pub fn collect_pipeline_requests( + pipelines_by_connection: NodePipelineMap, +) -> ( + Vec>>, + Vec>, + Vec<(String, Vec<(usize, Option)>)>, +) where C: Clone + ConnectionLike + Connect + Send + Sync + 'static, { - let count = node_context.pipeline.len(); - let result = - ClusterConnInner::try_pipeline_request(Arc::new(node_context.pipeline), 0, count, async { - Ok((address.clone(), node_context.connection)) - }) - .await?; + let mut receivers = Vec::new(); + let mut pending_requests = Vec::new(); + let mut addresses_and_indices = Vec::new(); - match result { - Response::Multiple(values) => Ok((node_context.command_indices, values, address)), - _ => Err(( - OperationTarget::FanOut, - RedisError::from((ErrorKind::ResponseError, "Unsupported response type")), - )), + for (address, context) in pipelines_by_connection { + // Create a channel to receive the pipeline execution results + let (sender, receiver) = oneshot::channel(); + // Add the receiver to the list of receivers + receivers.push(receiver); + pending_requests.push(PendingRequest { + retry: 0, + sender, + info: RequestInfo { + cmd: CmdArg::Pipeline { + count: context.pipeline.len(), + pipeline: context.pipeline.into(), + offset: 0, + route: InternalSingleNodeRouting::Connection { + address: address.clone(), + conn: async { context.connection }.boxed().shared(), + }, + }, + }, + }); + // Record the node address and its associated command indices for result mapping + addresses_and_indices.push((address, context.command_indices)); } + + (receivers, pending_requests, addresses_and_indices) } -/// Adds the result of a pipeline command to the `values_and_addresses` collection. +/// Adds the result of a pipeline command to the `pipeline_responses` collection. /// -/// This function updates the `values_and_addresses` vector at the given `index` and optionally at the +/// This function updates the `pipeline_responses` vector at the given `index` and optionally at the /// `inner_index` if provided. If `inner_index` is `Some`, it ensures that the vector at that index is large enough /// to hold the value and address at the specified position, resizing it if necessary. If `inner_index` is `None`, /// the value and address are simply appended to the vector. /// /// # Parameters -/// - `values_and_addresses`: A mutable reference to a vector of vectors that stores the results of pipeline commands. -/// - `index`: The index in `values_and_addresses` where the result should be stored. +/// - `pipeline_responses`: A mutable reference to a vector of vectors that stores the results of pipeline commands. +/// - `index`: The index in `pipeline_responses` where the result should be stored. /// - `inner_index`: An optional index within the vector at `index`, used to store the result at a specific position. /// - `value`: The result value to store. /// - `address`: The address associated with the result. pub fn add_pipeline_result( - values_and_addresses: &mut [Vec<(Value, String)>], + pipeline_responses: &mut PipelineResponses, index: usize, inner_index: Option, value: Value, @@ -320,58 +344,46 @@ pub fn add_pipeline_result( match inner_index { Some(inner_index) => { // Ensure the vector at the given index is large enough to hold the value and address at the specified position - if values_and_addresses[index].len() <= inner_index { - values_and_addresses[index].resize(inner_index + 1, (Value::Nil, "".to_string())); + if pipeline_responses[index].len() <= inner_index { + pipeline_responses[index].resize(inner_index + 1, (Value::Nil, "".to_string())); } - values_and_addresses[index][inner_index] = (value, address); + pipeline_responses[index][inner_index] = (value, address); } - None => values_and_addresses[index].push((value, address)), + None => pipeline_responses[index].push((value, address)), } } -/// Collects and processes the results of pipeline tasks from a `tokio::task::JoinSet`. +/// Processes the responses of pipeline commands and updates the given `pipeline_responses` +/// with the corresponding results. /// -/// This function iteratively retrieves completed tasks from the provided `join_set` and processes -/// their results. Successful results are added to the `values_and_addresses` vector using the -/// indices and values provided. If an error occurs in any task, it is recorded and returned as -/// the first encountered error. +/// The function iterates over the responses along with the `addresses_and_indices` list, +/// ensuring that each response is added to its appropriate position in `pipeline_responses` along with the associated address. +/// If any response indicates an error, the function terminates early and returns the first encountered error. /// /// # Parameters -/// - `join_set`: A mutable reference to a `tokio::task::JoinSet` containing tasks that return: -/// - `Ok((Vec<(usize, Option)>, Vec, String))`: On success, a tuple of: -/// - A list of indices and optional inner indices corresponding to pipeline commands. -/// - A list of `Value` results from the executed pipeline. -/// - The `String` address where the task was executed. -/// - `Err((OperationTarget, RedisError))`: On failure, an error detailing the operation target and the Redis error. -/// - `values_and_addresses`: A mutable slice of vectors, where each vector corresponds to a pipeline -/// command's results. This is updated with the values and addresses from successful tasks. /// -/// # Returns -/// - `Ok(Some((OperationTarget, RedisError)))`: If one or more tasks encountered an error, returns the first error. -/// - `Ok(None)`: If all tasks completed successfully. -/// - `Err((OperationTarget::FanOut, RedisError))`: If a task failed unexpectedly (e.g., due to a panic). +/// - `pipeline_responses`: A vec that holds the original pipeline commands responses. +/// - `responses`: A list of responses corresponding to each sub-pipeline. +/// - `addresses_and_indices`: A list of (address, indices) pairs indicating where each response should be placed. /// +/// # Returns /// -/// # Behavior -/// - Processes successful results by calling `add_pipeline_result` to update the -/// `values_and_addresses` vector with the indices, values, and addresses. -/// - Records the first error encountered and continues processing the remaining tasks. -/// - Returns `Ok(None)` if all tasks complete successfully. -#[allow(clippy::type_complexity)] -pub async fn collect_pipeline_tasks( - join_set: &mut tokio::task::JoinSet< - Result<(Vec<(usize, Option)>, Vec, String), (OperationTarget, RedisError)>, - >, - values_and_addresses: &mut [Vec<(Value, String)>], -) -> Result, (OperationTarget, RedisError)> { - let mut first_error = None; - - while let Some(future_result) = join_set.join_next().await { - match future_result { - Ok(Ok((indices, values, address))) => { - for ((index, inner_index), value) in indices.into_iter().zip(values) { +/// - `Ok(())` if all responses are processed successfully. +/// - `Err((OperationTarget, RedisError))` if a node-level or reception error occurs. +pub fn process_pipeline_responses( + pipeline_responses: &mut PipelineResponses, + responses: Vec, RecvError>>, + addresses_and_indices: AddressAndIndices, +) -> Result<(), (OperationTarget, RedisError)> { + for ((address, command_indices), response_result) in + addresses_and_indices.into_iter().zip(responses) + { + match response_result { + Ok(Ok(Response::Multiple(values))) => { + // Add each response to the pipeline_responses vector at the appropriate index + for ((index, inner_index), value) in command_indices.into_iter().zip(values) { add_pipeline_result( - values_and_addresses, + pipeline_responses, index, inner_index, value, @@ -379,16 +391,18 @@ pub async fn collect_pipeline_tasks( ); } } - Ok(Err(e)) => first_error = first_error.or(Some(e)), - Err(e) => { + Ok(Err(err)) => { + return Err((OperationTarget::Node { address }, err)); + } + _ => { return Err(( - OperationTarget::FanOut, - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()).into(), - )) + OperationTarget::Node { address }, + RedisError::from((ErrorKind::ResponseError, "Failed to receive response")), + )); } } } - Ok(first_error) + Ok(()) } /// This function returns the route for a given pipeline. diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index 1806ff5605..fea54ee7ff 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -241,7 +241,7 @@ impl Pipeline { self.commands.is_empty() } - /// + /// Returns whether the pipeline is in sub-pipeline mode. pub fn is_sub_pipeline(&self) -> bool { self.is_sub_pipeline } From 0050c403eae16bcdde6a3d386ab78146b177d160 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Tue, 28 Jan 2025 17:28:32 +0000 Subject: [PATCH 08/15] fix doc Signed-off-by: Shoham Elias --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 11 ++++++----- glide-core/redis-rs/redis/src/pipeline.rs | 14 ++------------ 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index e008aad4a1..cb408d13f5 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2248,16 +2248,17 @@ where /// Suppose we have a pipeline with multiple commands that were split and executed on different nodes. /// This function will aggregate the responses for commands that were split across multiple nodes. /// - /// ```rust,no_run + /// ```rust,compile_fail /// // Example pipeline with commands that might be split across nodes + /// /// let mut pipeline_responses = vec![ - /// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string())], (Value::Int(3), "node3".to_string())] // represents `DBSIZE`` + /// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string()), (Value::Int(3), "node3".to_string())], // represents `DBSIZE` /// vec![(Value::Int(3), "node3".to_string())], - /// vec![(Value::SimpleString("PONG"), "node1".to_string()), (Value::SimpleString("PONG"), "node2".to_string()), (Value::SimpleString("PONG"), "node3".to_string())], // represents `PING` + /// vec![(Value::SimpleString("PONG".to_string()), "node1".to_string()), (Value::SimpleString("PONG".to_string()), "node2".to_string()), (Value::SimpleString("PONG".to_string()), "node3".to_string())], // represents `PING` /// ]; /// let response_policies = vec![ /// (0, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::Aggregate(AggregateOp::Sum))), - /// (2, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::AllSuceeded), + /// (2, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::AllSucceeded)), /// ]; /// /// // Aggregating the responses @@ -2266,7 +2267,7 @@ where /// // After aggregation, pipeline_responses will be updated with aggregated results /// assert_eq!(pipeline_responses[0], vec![(Value::Int(6), "".to_string())]); /// assert_eq!(pipeline_responses[1], vec![(Value::Int(3), "node3".to_string())]); - /// assert_eq!(pipeline_responses[2], vec![(Value::SimpleString("PONG"), "".to_string())]); + /// assert_eq!(pipeline_responses[2], vec![(Value::SimpleString("PONG".to_string()), "".to_string())]); /// ``` /// /// This function is essential for handling multi-node commands in a Redis cluster, ensuring that responses from different nodes are correctly aggregated and processed. diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index fea54ee7ff..c0dc068ead 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -72,18 +72,8 @@ impl Pipeline { self } - /// This enables sub-pipeline mode. In sub-pipeline mode, the whole pipeline is enclosed in - /// `MULTI`/`EXEC` and the return value is a nested array of results. This is useful when - /// you want to execute a pipeline inside another pipeline. - /// ```rust,no_run - /// # let client = redis::Client::open("redis:// - /// 127.0.0.1/").unwrap(); - /// # let mut con = client.get_connection(None).unwrap(); - /// let (k1, k2) : (i32, i32) = redis::pipe() - /// .atomic() - /// .cmd("SET").arg("key_1").arg(42).ignore() - /// - /// + /// Enables sub-pipeline mode, indicating that this pipeline is part of a larger pipeline + /// split across multiple nodes. pub fn sub_pipeline(&mut self) -> &mut Pipeline { self.is_sub_pipeline = true; self From 9feae97e8690574c2495135b4215772e4239778f Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Wed, 29 Jan 2025 09:58:50 +0000 Subject: [PATCH 09/15] remove sub-pipeline Signed-off-by: Shoham Elias --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 5 ++++- .../redis/src/cluster_async/pipeline_routing.rs | 6 +++--- glide-core/redis-rs/redis/src/pipeline.rs | 14 -------------- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index cb408d13f5..f5c481ea2c 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -291,6 +291,7 @@ where offset, count, route: route.into(), + sub_pipeline: false, }, sender, }) @@ -612,6 +613,7 @@ enum CmdArg { offset: usize, count: usize, route: InternalSingleNodeRouting, + sub_pipeline: bool, }, ClusterScan { // struct containing the arguments for the cluster scan command - scan state cursor, match pattern, count and object type. @@ -2115,8 +2117,9 @@ where offset, count, route, + sub_pipeline, } => { - if pipeline.is_atomic() || pipeline.is_sub_pipeline() { + if pipeline.is_atomic() || sub_pipeline { // If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines. Self::try_pipeline_request( pipeline, diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs index 49e6422b86..a864e34316 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -39,10 +39,8 @@ pub type NodePipelineMap = HashMap>; impl NodePipelineContext { fn new(connection: C) -> Self { - let mut pipeline = Pipeline::new(); - pipeline.sub_pipeline(); Self { - pipeline, + pipeline: Pipeline::new(), connection, command_indices: Vec::new(), } @@ -311,6 +309,8 @@ where address: address.clone(), conn: async { context.connection }.boxed().shared(), }, + // mark it as a sub-pipeline mode + sub_pipeline: true, }, }, }); diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index c0dc068ead..813961156a 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -12,7 +12,6 @@ pub struct Pipeline { commands: Vec, transaction_mode: bool, ignored_commands: HashSet, - is_sub_pipeline: bool, } /// A pipeline allows you to send multiple commands in one go to the @@ -49,7 +48,6 @@ impl Pipeline { commands: Vec::with_capacity(capacity), transaction_mode: false, ignored_commands: HashSet::new(), - is_sub_pipeline: false, } } @@ -72,13 +70,6 @@ impl Pipeline { self } - /// Enables sub-pipeline mode, indicating that this pipeline is part of a larger pipeline - /// split across multiple nodes. - pub fn sub_pipeline(&mut self) -> &mut Pipeline { - self.is_sub_pipeline = true; - self - } - /// Returns the encoded pipeline commands. pub fn get_packed_pipeline(&self) -> Vec { encode_pipeline(&self.commands, self.transaction_mode) @@ -230,11 +221,6 @@ impl Pipeline { pub fn is_empty(&self) -> bool { self.commands.is_empty() } - - /// Returns whether the pipeline is in sub-pipeline mode. - pub fn is_sub_pipeline(&self) -> bool { - self.is_sub_pipeline - } } fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec { From eef58d8ea08d0c71388f697144bb79955c9deb70 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Thu, 6 Feb 2025 12:34:28 +0000 Subject: [PATCH 10/15] pr comments Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 134 ++++++++------- .../src/cluster_async/pipeline_routing.rs | 155 ++++++++++++------ glide-core/redis-rs/redis/src/pipeline.rs | 5 + glide-core/src/client/mod.rs | 6 +- 4 files changed, 182 insertions(+), 118 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index f5c481ea2c..4bcc6adb91 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -44,7 +44,7 @@ use crate::{ }; use dashmap::DashMap; use pipeline_routing::{ - collect_pipeline_requests, map_pipeline_to_nodes, process_pipeline_responses, + collect_and_send_pending_requests, map_pipeline_to_nodes, process_pipeline_responses, route_for_pipeline, PipelineResponses, }; use std::{ @@ -2130,72 +2130,7 @@ where .await } else { // The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately. - - // Distribute pipeline commands across cluster nodes based on routing information. - // Returns: - // - pipelines_by_connection: Map of node addresses to their pipeline contexts - // - response_policies: List of response aggregation policies for multi-node operations - let (pipelines_by_connection, response_policies) = - map_pipeline_to_nodes(&pipeline, core.clone()) - .await - .map_err(|err| (OperationTarget::FanOut, err))?; - - // Initialize `PipelineResponses` to store responses for each pipeline command. - // This will be used to store the responses from the different sub-pipelines to the pipeline commands. - // A command can have one or more responses (e.g MultiNode commands). - // Each entry in `PipelineResponses` corresponds to a command in the original pipeline and contains - // a vector of tuples where each tuple holds a response to the command and the address of the node that provided it. - let mut pipeline_responses: PipelineResponses = - vec![Vec::new(); pipeline.len()]; - - let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); - - // Processes the sub-pipelines to generate pending requests for execution on specific nodes. - // Each pending request encapsulates all the necessary details for executing commands on a node. - // - // Returns: - // - `receivers`: A vector of `oneshot::Receiver` instances, enabling asynchronous retrieval - // of the results from the execution of each sub-pipeline. - // - `pending_requests`: A vector of `PendingRequest` objects, each representing a scheduled command - // for execution on a node. - // - `addresses_and_indices`: A vector of tuples where each tuple contains a node address and a list - // of command indices for each sub-pipeline, allowing the results to be mapped back to their original command within the original pipeline. - let (receivers, pending_requests, addresses_and_indices) = - collect_pipeline_requests(pipelines_by_connection); - - // Add the pending requests to the pending_requests queue - core.pending_requests - .lock() - .unwrap() - .extend(pending_requests.into_iter()); - - // Wait for all receivers to complete and collect the responses - let responses: Vec<_> = futures::future::join_all(receivers.into_iter()) - .await - .into_iter() - .collect(); - - // Process the responses and update the pipeline_responses - process_pipeline_responses( - &mut pipeline_responses, - responses, - addresses_and_indices, - )?; - - // Process response policies after all tasks are complete - Self::aggregate_pipeline_multi_node_commands( - &mut pipeline_responses, - response_policies, - ) - .await?; - - // Collect final responses - for mut value in pipeline_responses.into_iter() { - // unwrap() is safe here because we know that the vector is not empty - final_responses.push(value.pop().unwrap().0); - } - - Ok(Response::Multiple(final_responses)) + Self::handle_pipeline_request(&pipeline, core).await } } @@ -2223,6 +2158,68 @@ where } } + /// Handles the execution of a non-atomic pipeline request by splitting it into sub-pipelines and sending them to the appropriate cluster nodes. + /// + /// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses, + /// and aggregates them if necessary according to the specified response policies. + /// + /// # Arguments + /// * `pipeline` - A reference to the pipeline to be executed. + /// * `core` - A reference to the core cluster connection state. + /// + /// # Returns + /// * `OperationResult` - Returns a result containing the aggregated responses from the sub-pipelines, or an error if the operation fails. + async fn handle_pipeline_request(pipeline: &crate::Pipeline, core: Core) -> OperationResult { + // Distribute pipeline commands across cluster nodes based on routing information. + // Returns: + // - pipelines_by_node: Map of node addresses to their pipeline contexts + // - response_policies: List of response aggregation policies for multi-node operations + let (pipelines_by_node, response_policies) = + map_pipeline_to_nodes(pipeline, core.clone()).await?; + + // Initialize `PipelineResponses` to store responses for each pipeline command. + // This will be used to store the responses from the different sub-pipelines to the pipeline commands. + // A command can have one or more responses (e.g MultiNode commands). + // Each entry in `PipelineResponses` corresponds to a command in the original pipeline and contains + // a vector of tuples where each tuple holds a response to the command and the address of the node that provided it. + let mut pipeline_responses: PipelineResponses = vec![Vec::new(); pipeline.len()]; + + // Send the requests to each node and collect thw responses + let (responses, addresses_and_indices) = + collect_and_send_pending_requests(pipelines_by_node, core.clone()).await; + + // Process the responses and update the pipeline_responses + process_pipeline_responses(&mut pipeline_responses, responses, addresses_and_indices)?; + + // Process response policies after all tasks are complete + Self::aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies) + .await?; + + // Collect final responses, ensuring no missing or invalid responses. + let final_responses: Result, (OperationTarget, RedisError)> = pipeline_responses + .into_iter() + .enumerate() + .map(|(index, mut value)| { + value.pop().map(|(response, _)| response).ok_or_else(|| { + ( + OperationTarget::FanOut, + RedisError::from(( + ErrorKind::ResponseError, + "No response found for command: ", + pipeline + .get_command(index) + .map_or("no command available".to_string(), |cmd| { + format!("{:?}", cmd) + }), + )), + ) + }) + }) + .collect(); + + Ok(Response::Multiple(final_responses?)) + } + /// Aggregates pipeline responses for multi-node commands and updates the `pipeline_responses` vector. /// /// Pipeline commands with multi-node routing info, will be splitted into multiple pipelines, therefore, after executing each pipeline and storing the results in `pipeline_responses`, @@ -2244,9 +2241,6 @@ where /// - The routing information for the command. /// - An optional response policy that dictates how the responses should be aggregated. /// - /// # Returns - /// * `Result<(), (OperationTarget, RedisError)>` - Returns `Ok(())` if the aggregation is successful, or an error tuple containing the operation target and the Redis error if it fails. - /// /// # Example /// Suppose we have a pipeline with multiple commands that were split and executed on different nodes. /// This function will aggregate the responses for commands that were split across multiple nodes. diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs index a864e34316..e45abc025e 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -1,6 +1,9 @@ use crate::aio::ConnectionLike; use crate::cluster_async::ClusterConnInner; use crate::cluster_async::Connect; +use crate::cluster_routing::RoutingInfo; +use cluster_routing::RoutingInfo::{MultiNode, SingleNode}; + use crate::cluster_routing::{ command_for_multi_slot_indices, MultipleNodeRoutingInfo, ResponsePolicy, SingleNodeRoutingInfo, }; @@ -80,21 +83,6 @@ pub fn add_command_to_node_pipeline_map( .add_command(cmd, index, inner_index); } -/// Adds a command to a random existing node pipeline in the pipeline map -pub fn add_command_to_random_existing_node( - pipeline_map: &mut NodePipelineMap, - cmd: Cmd, - index: usize, -) -> RedisResult<()> { - let mut rng = rand::thread_rng(); - if let Some(node_context) = pipeline_map.values_mut().choose(&mut rng) { - node_context.add_command(cmd, index, None); - Ok(()) - } else { - Err(RedisError::from((ErrorKind::IoError, "No nodes available"))) - } -} - /// Maps the commands in a pipeline to the appropriate nodes based on their routing information. /// /// This function processes each command in the given pipeline, determines its routing information, @@ -111,41 +99,39 @@ pub fn add_command_to_random_existing_node( /// /// # Returns /// -/// A `RedisResult` containing a tuple: -/// +/// A `Result` containing a tuple: /// - A `NodePipelineMap` where commands are grouped by their corresponding nodes (as pipelines). /// - A `Vec<(usize, MultipleNodeRoutingInfo, Option)>` containing the routing information /// and response policies for multi-node commands, along with the index of the command in the pipeline, for aggregating the responses later. pub async fn map_pipeline_to_nodes( pipeline: &crate::Pipeline, core: Core, -) -> RedisResult<( - NodePipelineMap, - Vec<(usize, MultipleNodeRoutingInfo, Option)>, -)> +) -> Result< + ( + NodePipelineMap, + Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ), + (OperationTarget, RedisError), +> where C: Clone + ConnectionLike + Connect + Send + Sync + 'static, { - let mut pipelines_by_connection = NodePipelineMap::new(); + let mut pipelines_per_node = NodePipelineMap::new(); let mut response_policies = Vec::new(); for (index, cmd) in pipeline.cmd_iter().enumerate() { - match cluster_routing::RoutingInfo::for_routable(cmd).unwrap_or( - cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random), - ) { - cluster_routing::RoutingInfo::SingleNode(route) => { + match RoutingInfo::for_routable(cmd).unwrap_or(SingleNode(SingleNodeRoutingInfo::Random)) { + SingleNode(route) => { handle_pipeline_single_node_routing( - &mut pipelines_by_connection, + &mut pipelines_per_node, cmd.clone(), route.into(), core.clone(), index, ) - .await - .map_err(|(_target, err)| err)?; + .await?; } - - cluster_routing::RoutingInfo::MultiNode((multi_node_routing, response_policy)) => { + MultiNode((multi_node_routing, response_policy)) => { //save the routing info and response policy, so we will be able to aggregate the results later response_policies.push((index, multi_node_routing.clone(), response_policy)); match multi_node_routing { @@ -158,9 +144,19 @@ where lock.all_primary_connections().collect() } }; + + if connections.is_empty() { + return Err(( + OperationTarget::NotFound, + RedisError::from(( + ErrorKind::AllConnectionsUnavailable, // should use different kind? ConnectionNotFoundForRoute + "No available connections", + )), + )); + } for (inner_index, (address, conn)) in connections.into_iter().enumerate() { add_command_to_node_pipeline_map( - &mut pipelines_by_connection, + &mut pipelines_per_node, address, conn.await, cmd.clone(), @@ -171,19 +167,19 @@ where } MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { handle_pipeline_multi_slot_routing( - &mut pipelines_by_connection, + &mut pipelines_per_node, core.clone(), cmd, index, slots, ) - .await; + .await?; } } } } } - Ok((pipelines_by_connection, response_policies)) + Ok((pipelines_per_node, response_policies)) } /// Handles pipeline commands that require single-node routing. @@ -208,18 +204,20 @@ where C: Clone + ConnectionLike + Connect + Send + Sync + 'static, { if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { - // The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline - add_command_to_random_existing_node(pipeline_map, cmd, index) - .map_err(|err| (OperationTarget::NotFound, err))?; - Ok(()) - } else { - let (address, conn) = - ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) - .await - .map_err(|err| (OperationTarget::NotFound, err))?; - add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); - Ok(()) + // Adds the command to a random existing node pipeline in the pipeline map + let mut rng = rand::thread_rng(); + if let Some(node_context) = pipeline_map.values_mut().choose(&mut rng) { + node_context.add_command(cmd, index, None); + return Ok(()); + } } + + let (address, conn) = + ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) + .await + .map_err(|err| (OperationTarget::NotFound, err))?; + add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); + Ok(()) } /// Handles multi-slot commands within a pipeline. @@ -242,7 +240,8 @@ pub async fn handle_pipeline_multi_slot_routing( cmd: &Cmd, index: usize, slots: Vec<(Route, Vec)>, -) where +) -> Result<(), (OperationTarget, RedisError)> +where C: Clone, { // inner_index is used to keep track of the index of the sub-command inside cmd @@ -262,8 +261,70 @@ pub async fn handle_pipeline_multi_slot_routing( index, Some(inner_index), ); + } else { + return Err(( + OperationTarget::NotFound, + RedisError::from(( + ErrorKind::ConnectionNotFoundForRoute, + "No available connections", + )), + )); } } + Ok(()) +} + +/// Collects and sends pending requests for the given pipeline map, and waits for their responses. +/// +/// This function creates `PendingRequest` objects for each pipeline in the provided pipeline map, +/// adds them to the core's pending requests queue, and waits for all responses to be received. +/// +/// # Arguments +/// +/// * `pipeline_map` - A map of node pipelines where the commands are grouped by their corresponding nodes. +/// * `core` - The core object that provides access to connection locks and other resources. +/// +/// # Returns +/// +/// A tuple containing: +/// - A vector of results for each sub-pipeline execution. +/// - A vector of (address, indices) pairs indicating where each response should be placed. +pub async fn collect_and_send_pending_requests( + pipeline_map: NodePipelineMap, + core: Core, +) -> ( + Vec, RecvError>>, + Vec<(String, Vec<(usize, Option)>)>, +) +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + // Processes the sub-pipelines to generate pending requests for execution on specific nodes. + // Each pending request encapsulates all the necessary details for executing commands on a node. + // + // Returns: + // - `receivers`: A vector of `oneshot::Receiver` instances, enabling asynchronous retrieval + // of the results from the execution of each sub-pipeline. + // - `pending_requests`: A vector of `PendingRequest` objects, each representing a scheduled command + // for execution on a node. + // - `addresses_and_indices`: A vector of tuples where each tuple contains a node address and a list + // of command indices for each sub-pipeline, allowing the results to be mapped back to their original command within the original pipeline. + let (receivers, pending_requests, addresses_and_indices) = + collect_pipeline_requests(pipeline_map); + + // Add the pending requests to the pending_requests queue + core.pending_requests + .lock() + .unwrap() + .extend(pending_requests.into_iter()); + + // Wait for all receivers to complete and collect the responses + let responses: Vec<_> = futures::future::join_all(receivers.into_iter()) + .await + .into_iter() + .collect(); + + (responses, addresses_and_indices) } /// Creates `PendingRequest` objects for each pipeline in the provided pipeline map. diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index 813961156a..6773d8895b 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -221,6 +221,11 @@ impl Pipeline { pub fn is_empty(&self) -> bool { self.commands.is_empty() } + + /// Returns the command at the given index, or `None` if the index is out of bounds. + pub fn get_command(&self, index: usize) -> Option<&Cmd> { + self.commands.get(index) + } } fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec { diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 693c6da6c8..37967c746a 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -395,6 +395,11 @@ impl Client { routing: Option, ) -> redis::RedisFuture<'a, Value> { let command_count = pipeline.cmd_iter().count(); + // The offset is set to command_count + 1 to account for: + // 1. The first command, which is the "MULTI" command, that returns "OK" + // 2. The "QUEUED" responses for each of the commands in the pipeline (before EXEC) + // After these initial responses (OK and QUEUED), we expect a single response, + // which is an array containing the results of all the commands in the pipeline. let offset = command_count + 1; run_with_timeout(Some(self.request_timeout), async move { let values = match self.internal_client { @@ -420,7 +425,6 @@ impl Client { pipeline: &'a redis::Pipeline, ) -> redis::RedisFuture<'a, Value> { let command_count = pipeline.cmd_iter().count(); - let _offset = command_count + 1; //TODO: check run_with_timeout(Some(self.request_timeout), async move { let values = match self.internal_client { From 3c89fd6396c8fc059b373457d004a2c791a73fcd Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Thu, 6 Feb 2025 13:42:50 +0000 Subject: [PATCH 11/15] expend the tests a bit Signed-off-by: Shoham Elias --- glide-core/tests/test_socket_listener.rs | 48 ++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index 6a897e1cae..caa6358396 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -30,6 +30,7 @@ mod socket_listener { use redis::{Cmd, ConnectionAddr, FromRedisValue, Value}; use rstest::rstest; use std::mem::size_of; + use std::vec; use tokio::{net::UnixListener, runtime::Builder}; /// An enum representing the values of the request type field for testing purposes @@ -1246,8 +1247,9 @@ mod socket_listener { .expect("Failed to clone socket"); const CALLBACK_INDEX: u32 = 0; - let key = generate_random_string(KEY_LENGTH); - let key2 = generate_random_string(KEY_LENGTH); + // making sure both keys are in a different slot + let key = format!("{{abc}}f{}", generate_random_string(KEY_LENGTH)); + let key2 = format!("{{xyz}}f{}", generate_random_string(KEY_LENGTH)); let commands = vec![ CommandComponents { @@ -1261,7 +1263,7 @@ mod socket_listener { request_type: RequestType::CustomCommand.into(), }, CommandComponents { - args: vec![key.clone().into(), key2.into()], + args: vec![key.clone().into(), key2.clone().into()], args_pointer: false, request_type: RequestType::MGet.into(), }, @@ -1271,7 +1273,12 @@ mod socket_listener { request_type: RequestType::CustomCommand.into(), }, CommandComponents { - args: vec![key.into()], + args: vec![], + args_pointer: false, + request_type: RequestType::DBSize.into(), + }, + CommandComponents { + args: vec![key.clone().into()], args_pointer: false, request_type: RequestType::Get.into(), }, @@ -1280,6 +1287,31 @@ mod socket_listener { args_pointer: false, request_type: RequestType::Ping.into(), }, + CommandComponents { + args: vec![ + key.into(), + "bar".to_string().into(), + key2.into(), + "baz".to_string().into(), + ], + args_pointer: false, + request_type: RequestType::MSet.into(), + }, + CommandComponents { + args: vec![], + args_pointer: false, + request_type: RequestType::DBSize.into(), + }, + CommandComponents { + args: vec!["maxmemory".to_string().into(), "1000".to_string().into()], + args_pointer: false, + request_type: RequestType::ConfigSet.into(), + }, + CommandComponents { + args: vec!["maxmemory".to_string().into()], + args_pointer: false, + request_type: RequestType::ConfigGet.into(), + }, ]; let mut buffer = Vec::with_capacity(200); write_pipeline_request(&mut buffer, &mut socket, CALLBACK_INDEX, commands); @@ -1293,8 +1325,16 @@ mod socket_listener { Value::BulkString(vec![b'b', b'a', b'r']), Value::Array(vec![Value::BulkString(vec![b'b', b'a', b'r']), Value::Nil]), Value::Okay, + Value::Int(0), Value::Nil, Value::BulkString(vec![b'H', b'E', b'L', b'L', b'O']), + Value::Okay, + Value::Int(2), + Value::Okay, + Value::Map(vec![( + Value::BulkString(vec![b'm', b'a', b'x', b'm', b'e', b'm', b'o', b'r', b'y']), + Value::BulkString(vec![b'1', b'0', b'0', b'0']), + )]), ]), ); } From cf273f3d39e5a3446b3efa96adbc727c86e5fb83 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Thu, 6 Feb 2025 13:57:36 +0000 Subject: [PATCH 12/15] fix test Signed-off-by: Shoham Elias --- glide-core/tests/test_socket_listener.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index caa6358396..b9054e3bea 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -1303,12 +1303,12 @@ mod socket_listener { request_type: RequestType::DBSize.into(), }, CommandComponents { - args: vec!["maxmemory".to_string().into(), "1000".to_string().into()], + args: vec!["appendonly".to_string().into(), "no".to_string().into()], args_pointer: false, request_type: RequestType::ConfigSet.into(), }, CommandComponents { - args: vec!["maxmemory".to_string().into()], + args: vec!["appendonly".to_string().into()], args_pointer: false, request_type: RequestType::ConfigGet.into(), }, @@ -1332,8 +1332,10 @@ mod socket_listener { Value::Int(2), Value::Okay, Value::Map(vec![( - Value::BulkString(vec![b'm', b'a', b'x', b'm', b'e', b'm', b'o', b'r', b'y']), - Value::BulkString(vec![b'1', b'0', b'0', b'0']), + Value::BulkString(vec![ + b'a', b'p', b'p', b'e', b'n', b'd', b'o', b'n', b'l', b'y', + ]), + Value::BulkString(vec![b'n', b'o']), )]), ]), ); From 87740fdc616ebe4f13ab144559fa9fd431267a39 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Sun, 9 Feb 2025 14:57:11 +0000 Subject: [PATCH 13/15] improve comments Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 21 +++++++------------ glide-core/tests/test_socket_listener.rs | 8 +++---- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 4bcc6adb91..a2743efc3c 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -23,9 +23,8 @@ //! ``` mod connections_container; -mod pipeline_routing; - mod connections_logic; +mod pipeline_routing; /// Exposed only for testing. pub mod testing { pub use super::connections_container::ConnectionDetails; @@ -2133,7 +2132,6 @@ where Self::handle_pipeline_request(&pipeline, core).await } } - CmdArg::ClusterScan { cluster_scan_args, .. } => { @@ -2162,18 +2160,11 @@ where /// /// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses, /// and aggregates them if necessary according to the specified response policies. - /// - /// # Arguments - /// * `pipeline` - A reference to the pipeline to be executed. - /// * `core` - A reference to the core cluster connection state. - /// - /// # Returns - /// * `OperationResult` - Returns a result containing the aggregated responses from the sub-pipelines, or an error if the operation fails. async fn handle_pipeline_request(pipeline: &crate::Pipeline, core: Core) -> OperationResult { // Distribute pipeline commands across cluster nodes based on routing information. // Returns: // - pipelines_by_node: Map of node addresses to their pipeline contexts - // - response_policies: List of response aggregation policies for multi-node operations + // - response_policies: List of response aggregation policies for multi-node commands let (pipelines_by_node, response_policies) = map_pipeline_to_nodes(pipeline, core.clone()).await?; @@ -2184,14 +2175,17 @@ where // a vector of tuples where each tuple holds a response to the command and the address of the node that provided it. let mut pipeline_responses: PipelineResponses = vec![Vec::new(); pipeline.len()]; - // Send the requests to each node and collect thw responses + // Send the requests to each node and collect the responses + // Returns a tuple containing: + // - A vector of results for each sub-pipeline execution. + // - A vector of (address, indices) pairs indicating where each response should be placed. let (responses, addresses_and_indices) = collect_and_send_pending_requests(pipelines_by_node, core.clone()).await; // Process the responses and update the pipeline_responses process_pipeline_responses(&mut pipeline_responses, responses, addresses_and_indices)?; - // Process response policies after all tasks are complete + // Process response policies after all tasks are complete and aggregate the relevant commands. Self::aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies) .await?; @@ -2272,6 +2266,7 @@ where pipeline_responses: &mut PipelineResponses, response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, ) -> Result<(), (OperationTarget, RedisError)> { + // Go over the multi-node commands for (index, routing_info, response_policy) in response_policies { let response_receivers = pipeline_responses[index] .iter() diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index b9054e3bea..870f0869dd 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -1270,12 +1270,12 @@ mod socket_listener { CommandComponents { args: vec!["FLUSHALL".to_string().into()], args_pointer: false, - request_type: RequestType::CustomCommand.into(), + request_type: RequestType::CustomCommand.into(), // AllPrimaries command }, CommandComponents { args: vec![], args_pointer: false, - request_type: RequestType::DBSize.into(), + request_type: RequestType::DBSize.into(), // Aggregation of sum }, CommandComponents { args: vec![key.clone().into()], @@ -1305,12 +1305,12 @@ mod socket_listener { CommandComponents { args: vec!["appendonly".to_string().into(), "no".to_string().into()], args_pointer: false, - request_type: RequestType::ConfigSet.into(), + request_type: RequestType::ConfigSet.into(), // AllNodes command }, CommandComponents { args: vec!["appendonly".to_string().into()], args_pointer: false, - request_type: RequestType::ConfigGet.into(), + request_type: RequestType::ConfigGet.into(), // RandomNode command }, ]; let mut buffer = Vec::with_capacity(200); From 3b72abab8e2e1b8284f1d316d630d681bebadbdf Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Sun, 9 Feb 2025 17:20:36 +0000 Subject: [PATCH 14/15] refactor handle_pipeline_request to accept Arc instead of &Pipeline Signed-off-by: Shoham Elias --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index a2743efc3c..b910deeda6 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2129,7 +2129,7 @@ where .await } else { // The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately. - Self::handle_pipeline_request(&pipeline, core).await + Self::handle_pipeline_request(pipeline, core).await } } CmdArg::ClusterScan { @@ -2160,13 +2160,16 @@ where /// /// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses, /// and aggregates them if necessary according to the specified response policies. - async fn handle_pipeline_request(pipeline: &crate::Pipeline, core: Core) -> OperationResult { + async fn handle_pipeline_request( + pipeline: Arc, + core: Core, + ) -> OperationResult { // Distribute pipeline commands across cluster nodes based on routing information. // Returns: // - pipelines_by_node: Map of node addresses to their pipeline contexts // - response_policies: List of response aggregation policies for multi-node commands let (pipelines_by_node, response_policies) = - map_pipeline_to_nodes(pipeline, core.clone()).await?; + map_pipeline_to_nodes(&pipeline, core.clone()).await?; // Initialize `PipelineResponses` to store responses for each pipeline command. // This will be used to store the responses from the different sub-pipelines to the pipeline commands. From 76ad4fc726c103ed121ae4ff13fe9851f718c05c Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Sun, 9 Feb 2025 21:20:01 +0000 Subject: [PATCH 15/15] start adding tests Signed-off-by: Shoham Elias --- .../redis-rs/redis/src/cluster_async/mod.rs | 140 +++++++++++++++++- 1 file changed, 138 insertions(+), 2 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index b910deeda6..8d03c0a3be 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -2931,10 +2931,17 @@ impl Connect for MultiplexedConnection { #[cfg(test)] mod pipeline_routing_tests { + use futures::executor::block_on; + use super::pipeline_routing::route_for_pipeline; use crate::{ - cluster_routing::{Route, SlotAddr}, - cmd, + aio::MultiplexedConnection, + cluster_async::{pipeline_routing::PipelineResponses, ClusterConnInner}, + cluster_routing::{ + AggregateOp, MultiSlotArgPattern, MultipleNodeRoutingInfo, ResponsePolicy, Route, + SlotAddr, + }, + cmd, Value, }; #[test] @@ -2953,6 +2960,135 @@ mod pipeline_routing_tests { ); } + #[test] + fn test_numerical_response_aggregation_logic() { + let mut pipeline_responses: PipelineResponses = vec![ + vec![ + (Value::Int(3), "node1".to_string()), + (Value::Int(7), "node2".to_string()), + (Value::Int(0), "node3".to_string()), + ], + vec![( + Value::BulkString(b"unchanged".to_vec()), + "node3".to_string(), + )], + vec![ + (Value::Int(5), "node1".to_string()), + (Value::Int(11), "node2".to_string()), + ], + ]; + let response_policies = vec![ + ( + 0, + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::Aggregate(AggregateOp::Sum)), + ), + ( + 2, + MultipleNodeRoutingInfo::AllMasters, + Some(ResponsePolicy::Aggregate(AggregateOp::Min)), + ), + ]; + block_on( + ClusterConnInner::::aggregate_pipeline_multi_node_commands( + &mut pipeline_responses, + response_policies, + ), + ) + .expect("Aggregation failed"); + + // Command 0 should be aggregated to 3 + 7 + 0 = 10. + // Command 1 should remain unchanged. + assert_eq!( + pipeline_responses[0], + vec![(Value::Int(10), "".to_string())], + "Expected command 0 aggregation to yield 10" + ); + assert_eq!( + pipeline_responses[1], + vec![( + Value::BulkString(b"unchanged".to_vec()), + "node3".to_string() + )], + "Expected command 1 to remain unchanged" + ); + assert_eq!( + pipeline_responses[2], + vec![(Value::Int(5), "".to_string())], + "Expected command 2 aggregation to yield 5 as the minimum value" + ); + } + + #[test] + fn test_combine_arrays_response_aggregation_logic() { + let mut pipeline_responses: PipelineResponses = vec![ + vec![ + (Value::Array(vec![Value::Int(1)]), "node1".to_string()), + (Value::Array(vec![Value::Int(2)]), "node2".to_string()), + ], + vec![ + ( + Value::Array(vec![ + Value::BulkString("key1".into()), + Value::BulkString("key3".into()), + ]), + "node2".to_string(), + ), + ( + Value::Array(vec![ + Value::BulkString("key2".into()), + Value::BulkString("key4".into()), + ]), + "node1".to_string(), + ), + ], + ]; + let response_policies = vec![ + ( + 0, + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::CombineArrays), + ), + ( + 1, + MultipleNodeRoutingInfo::MultiSlot(( + vec![ + (Route::new(1, SlotAddr::Master), vec![0, 2]), + (Route::new(2, SlotAddr::Master), vec![1, 3]), + ], + MultiSlotArgPattern::KeysOnly, + )), + Some(ResponsePolicy::CombineArrays), + ), + ]; + + block_on( + ClusterConnInner::::aggregate_pipeline_multi_node_commands( + &mut pipeline_responses, + response_policies, + ), + ) + .expect("CombineArrays aggregation should succeed"); + + let mut expected = Value::Array(vec![Value::Int(1), Value::Int(2)]); + assert_eq!( + pipeline_responses[0], + vec![(expected, "".to_string())], + "Expected combined array to include all elements" + ); + expected = Value::Array(vec![ + Value::BulkString("key1".into()), + Value::BulkString("key2".into()), + Value::BulkString("key3".into()), + Value::BulkString("key4".into()), + ]); + assert_eq!( + pipeline_responses[1], + vec![(expected, "".to_string())], + "Expected combined array to include all elements" + ); + } + #[test] fn test_return_none_if_no_route_is_found() { let mut pipeline = crate::Pipeline::new();