diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3d61efce29..8d03c0a3be 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -24,6 +24,7 @@ mod connections_container; mod connections_logic; +mod pipeline_routing; /// Exposed only for testing. pub mod testing { pub use super::connections_container::ConnectionDetails; @@ -41,6 +42,10 @@ use crate::{ FromRedisValue, InfoDict, }; use dashmap::DashMap; +use pipeline_routing::{ + collect_and_send_pending_requests, map_pipeline_to_nodes, process_pipeline_responses, + route_for_pipeline, PipelineResponses, +}; use std::{ collections::{HashMap, HashSet}, fmt, io, mem, @@ -285,6 +290,7 @@ where offset, count, route: route.into(), + sub_pipeline: false, }, sender, }) @@ -606,6 +612,7 @@ enum CmdArg { offset: usize, count: usize, route: InternalSingleNodeRouting, + sub_pipeline: bool, }, ClusterScan { // struct containing the arguments for the cluster scan command - scan state cursor, match pattern, count and object type. @@ -621,44 +628,6 @@ enum Operation { UpdateConnectionPassword(Option), } -fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { - fn route_for_command(cmd: &Cmd) -> Option { - match cluster_routing::RoutingInfo::for_routable(cmd) { - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(route), - )) => Some(route), - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::RandomPrimary, - )) => Some(Route::new_random_primary()), - Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { - .. - })) => None, - None => None, - } - } - - // Find first specific slot and send to it. There's no need to check If later commands - // should be routed to a different slot, since the server will return an error indicating this. - pipeline.cmd_iter().map(route_for_command).try_fold( - None, - |chosen_route, next_cmd_route| match (chosen_route, next_cmd_route) { - (None, _) => Ok(next_cmd_route), - (_, None) => Ok(chosen_route), - (Some(chosen_route), Some(next_cmd_route)) => { - if chosen_route.slot() != next_cmd_route.slot() { - Err((ErrorKind::CrossSlot, "Received crossed slots in pipeline").into()) - } else if chosen_route.slot_addr() == SlotAddr::ReplicaOptional { - Ok(Some(next_cmd_route)) - } else { - Ok(Some(chosen_route)) - } - } - }, - ) -} - fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> { Box::pin(tokio::time::sleep(duration)) } @@ -2147,14 +2116,21 @@ where offset, count, route, + sub_pipeline, } => { - Self::try_pipeline_request( - pipeline, - offset, - count, - Self::get_connection(route, core, None), - ) - .await + if pipeline.is_atomic() || sub_pipeline { + // If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines. + Self::try_pipeline_request( + pipeline, + offset, + count, + Self::get_connection(route, core, None), + ) + .await + } else { + // The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately. + Self::handle_pipeline_request(pipeline, core).await + } } CmdArg::ClusterScan { cluster_scan_args, .. @@ -2180,6 +2156,140 @@ where } } + /// Handles the execution of a non-atomic pipeline request by splitting it into sub-pipelines and sending them to the appropriate cluster nodes. + /// + /// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses, + /// and aggregates them if necessary according to the specified response policies. + async fn handle_pipeline_request( + pipeline: Arc, + core: Core, + ) -> OperationResult { + // Distribute pipeline commands across cluster nodes based on routing information. + // Returns: + // - pipelines_by_node: Map of node addresses to their pipeline contexts + // - response_policies: List of response aggregation policies for multi-node commands + let (pipelines_by_node, response_policies) = + map_pipeline_to_nodes(&pipeline, core.clone()).await?; + + // Initialize `PipelineResponses` to store responses for each pipeline command. + // This will be used to store the responses from the different sub-pipelines to the pipeline commands. + // A command can have one or more responses (e.g MultiNode commands). + // Each entry in `PipelineResponses` corresponds to a command in the original pipeline and contains + // a vector of tuples where each tuple holds a response to the command and the address of the node that provided it. + let mut pipeline_responses: PipelineResponses = vec![Vec::new(); pipeline.len()]; + + // Send the requests to each node and collect the responses + // Returns a tuple containing: + // - A vector of results for each sub-pipeline execution. + // - A vector of (address, indices) pairs indicating where each response should be placed. + let (responses, addresses_and_indices) = + collect_and_send_pending_requests(pipelines_by_node, core.clone()).await; + + // Process the responses and update the pipeline_responses + process_pipeline_responses(&mut pipeline_responses, responses, addresses_and_indices)?; + + // Process response policies after all tasks are complete and aggregate the relevant commands. + Self::aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies) + .await?; + + // Collect final responses, ensuring no missing or invalid responses. + let final_responses: Result, (OperationTarget, RedisError)> = pipeline_responses + .into_iter() + .enumerate() + .map(|(index, mut value)| { + value.pop().map(|(response, _)| response).ok_or_else(|| { + ( + OperationTarget::FanOut, + RedisError::from(( + ErrorKind::ResponseError, + "No response found for command: ", + pipeline + .get_command(index) + .map_or("no command available".to_string(), |cmd| { + format!("{:?}", cmd) + }), + )), + ) + }) + }) + .collect(); + + Ok(Response::Multiple(final_responses?)) + } + + /// Aggregates pipeline responses for multi-node commands and updates the `pipeline_responses` vector. + /// + /// Pipeline commands with multi-node routing info, will be splitted into multiple pipelines, therefore, after executing each pipeline and storing the results in `pipeline_responses`, + /// the multi-node commands will contain more than one response (one for each sub-pipeline that contained the command). This responses must be aggregated into a single response, based on the proper response policy. + /// + /// This function processes the provided `response_policies`, which contain information about how responses from multiple nodes should be aggregated. + /// For each policy: + /// - It collects the multiple responses and their source node addresses from the corresponding entry in `pipeline_responses`. + /// - Uses the routing information and optional response policy to aggregate the responses into a single result. + /// + /// The aggregated result replaces the existing entries in `pipeline_responses` for the given command index, changing the multiple responses to the command into a single aggregated response. + /// + /// After the execution of this function, all entries in `pipeline_responses` will contain a single response for each command. + /// + /// # Arguments + /// * `pipeline_responses` - A mutable reference to a vector of vectors, where each inner vector contains tuples of responses and their corresponding node addresses. + /// * `response_policies` - A vector of tuples, each containing: + /// - The index of the command in the pipeline that has a multi-node routing info. + /// - The routing information for the command. + /// - An optional response policy that dictates how the responses should be aggregated. + /// + /// # Example + /// Suppose we have a pipeline with multiple commands that were split and executed on different nodes. + /// This function will aggregate the responses for commands that were split across multiple nodes. + /// + /// ```rust,compile_fail + /// // Example pipeline with commands that might be split across nodes + /// + /// let mut pipeline_responses = vec![ + /// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string()), (Value::Int(3), "node3".to_string())], // represents `DBSIZE` + /// vec![(Value::Int(3), "node3".to_string())], + /// vec![(Value::SimpleString("PONG".to_string()), "node1".to_string()), (Value::SimpleString("PONG".to_string()), "node2".to_string()), (Value::SimpleString("PONG".to_string()), "node3".to_string())], // represents `PING` + /// ]; + /// let response_policies = vec![ + /// (0, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::Aggregate(AggregateOp::Sum))), + /// (2, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::AllSucceeded)), + /// ]; + /// + /// // Aggregating the responses + /// aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies).await.unwrap(); + /// + /// // After aggregation, pipeline_responses will be updated with aggregated results + /// assert_eq!(pipeline_responses[0], vec![(Value::Int(6), "".to_string())]); + /// assert_eq!(pipeline_responses[1], vec![(Value::Int(3), "node3".to_string())]); + /// assert_eq!(pipeline_responses[2], vec![(Value::SimpleString("PONG".to_string()), "".to_string())]); + /// ``` + /// + /// This function is essential for handling multi-node commands in a Redis cluster, ensuring that responses from different nodes are correctly aggregated and processed. + async fn aggregate_pipeline_multi_node_commands( + pipeline_responses: &mut PipelineResponses, + response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ) -> Result<(), (OperationTarget, RedisError)> { + // Go over the multi-node commands + for (index, routing_info, response_policy) in response_policies { + let response_receivers = pipeline_responses[index] + .iter() + .map(|(value, address)| { + let (sender, receiver) = oneshot::channel(); + let _ = sender.send(Ok(Response::Single(value.clone()))); + (Some(address.clone()), receiver) + }) + .collect(); + + let aggregated_response = + Self::aggregate_results(response_receivers, &routing_info, response_policy) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + pipeline_responses[index] = vec![(aggregated_response, "".to_string())]; + } + Ok(()) + } + async fn get_connection( routing: InternalSingleNodeRouting, core: Core, @@ -2821,15 +2931,23 @@ impl Connect for MultiplexedConnection { #[cfg(test)] mod pipeline_routing_tests { - use super::route_for_pipeline; + use futures::executor::block_on; + + use super::pipeline_routing::route_for_pipeline; use crate::{ - cluster_routing::{Route, SlotAddr}, - cmd, + aio::MultiplexedConnection, + cluster_async::{pipeline_routing::PipelineResponses, ClusterConnInner}, + cluster_routing::{ + AggregateOp, MultiSlotArgPattern, MultipleNodeRoutingInfo, ResponsePolicy, Route, + SlotAddr, + }, + cmd, Value, }; #[test] fn test_first_route_is_found() { let mut pipeline = crate::Pipeline::new(); + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters @@ -2842,10 +2960,139 @@ mod pipeline_routing_tests { ); } + #[test] + fn test_numerical_response_aggregation_logic() { + let mut pipeline_responses: PipelineResponses = vec![ + vec![ + (Value::Int(3), "node1".to_string()), + (Value::Int(7), "node2".to_string()), + (Value::Int(0), "node3".to_string()), + ], + vec![( + Value::BulkString(b"unchanged".to_vec()), + "node3".to_string(), + )], + vec![ + (Value::Int(5), "node1".to_string()), + (Value::Int(11), "node2".to_string()), + ], + ]; + let response_policies = vec![ + ( + 0, + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::Aggregate(AggregateOp::Sum)), + ), + ( + 2, + MultipleNodeRoutingInfo::AllMasters, + Some(ResponsePolicy::Aggregate(AggregateOp::Min)), + ), + ]; + block_on( + ClusterConnInner::::aggregate_pipeline_multi_node_commands( + &mut pipeline_responses, + response_policies, + ), + ) + .expect("Aggregation failed"); + + // Command 0 should be aggregated to 3 + 7 + 0 = 10. + // Command 1 should remain unchanged. + assert_eq!( + pipeline_responses[0], + vec![(Value::Int(10), "".to_string())], + "Expected command 0 aggregation to yield 10" + ); + assert_eq!( + pipeline_responses[1], + vec![( + Value::BulkString(b"unchanged".to_vec()), + "node3".to_string() + )], + "Expected command 1 to remain unchanged" + ); + assert_eq!( + pipeline_responses[2], + vec![(Value::Int(5), "".to_string())], + "Expected command 2 aggregation to yield 5 as the minimum value" + ); + } + + #[test] + fn test_combine_arrays_response_aggregation_logic() { + let mut pipeline_responses: PipelineResponses = vec![ + vec![ + (Value::Array(vec![Value::Int(1)]), "node1".to_string()), + (Value::Array(vec![Value::Int(2)]), "node2".to_string()), + ], + vec![ + ( + Value::Array(vec![ + Value::BulkString("key1".into()), + Value::BulkString("key3".into()), + ]), + "node2".to_string(), + ), + ( + Value::Array(vec![ + Value::BulkString("key2".into()), + Value::BulkString("key4".into()), + ]), + "node1".to_string(), + ), + ], + ]; + let response_policies = vec![ + ( + 0, + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::CombineArrays), + ), + ( + 1, + MultipleNodeRoutingInfo::MultiSlot(( + vec![ + (Route::new(1, SlotAddr::Master), vec![0, 2]), + (Route::new(2, SlotAddr::Master), vec![1, 3]), + ], + MultiSlotArgPattern::KeysOnly, + )), + Some(ResponsePolicy::CombineArrays), + ), + ]; + + block_on( + ClusterConnInner::::aggregate_pipeline_multi_node_commands( + &mut pipeline_responses, + response_policies, + ), + ) + .expect("CombineArrays aggregation should succeed"); + + let mut expected = Value::Array(vec![Value::Int(1), Value::Int(2)]); + assert_eq!( + pipeline_responses[0], + vec![(expected, "".to_string())], + "Expected combined array to include all elements" + ); + expected = Value::Array(vec![ + Value::BulkString("key1".into()), + Value::BulkString("key2".into()), + Value::BulkString("key3".into()), + Value::BulkString("key4".into()), + ]); + assert_eq!( + pipeline_responses[1], + vec![(expected, "".to_string())], + "Expected combined array to include all elements" + ); + } + #[test] fn test_return_none_if_no_route_is_found() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters .add_command(cmd("EVAL")); // route randomly @@ -2856,7 +3103,7 @@ mod pipeline_routing_tests { #[test] fn test_prefer_primary_route_over_replica() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .get("foo") // route to replica of slot 12182 .add_command(cmd("FLUSHALL")) // route to all masters @@ -2873,7 +3120,7 @@ mod pipeline_routing_tests { #[test] fn test_raise_cross_slot_error_on_conflicting_slots() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters .set("baz", "bar") // route to slot 4813 @@ -2888,7 +3135,7 @@ mod pipeline_routing_tests { #[test] fn unkeyed_commands_dont_affect_route() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .set("{foo}bar", "baz") // route to primary of slot 12182 .cmd("CONFIG").arg("GET").arg("timeout") // unkeyed command diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs new file mode 100644 index 0000000000..e45abc025e --- /dev/null +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -0,0 +1,519 @@ +use crate::aio::ConnectionLike; +use crate::cluster_async::ClusterConnInner; +use crate::cluster_async::Connect; +use crate::cluster_routing::RoutingInfo; +use cluster_routing::RoutingInfo::{MultiNode, SingleNode}; + +use crate::cluster_routing::{ + command_for_multi_slot_indices, MultipleNodeRoutingInfo, ResponsePolicy, SingleNodeRoutingInfo, +}; +use crate::{cluster_routing, RedisResult, Value}; +use crate::{cluster_routing::Route, Cmd, ErrorKind, RedisError}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::cluster_async::MUTEX_READ_ERR; +use crate::Pipeline; +use futures::FutureExt; +use rand::prelude::IteratorRandom; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::RecvError; + +use super::CmdArg; +use super::PendingRequest; +use super::RequestInfo; +use super::{Core, InternalSingleNodeRouting, OperationTarget, Response}; + +/// Represents a pipeline command execution context for a specific node +#[derive(Default)] +pub struct NodePipelineContext { + /// The pipeline of commands to be executed + pub pipeline: Pipeline, + /// The connection to the node + pub connection: C, + /// Vector of (command_index, inner_index) pairs tracking command order + /// command_index: Position in the original pipeline + /// inner_index: Optional sub-index for multi-node operations (e.g. MSET) + pub command_indices: Vec<(usize, Option)>, +} + +/// Maps node addresses to their pipeline execution contexts +pub type NodePipelineMap = HashMap>; + +impl NodePipelineContext { + fn new(connection: C) -> Self { + Self { + pipeline: Pipeline::new(), + connection, + command_indices: Vec::new(), + } + } + + // Adds a command to the pipeline and records its index + fn add_command(&mut self, cmd: Cmd, index: usize, inner_index: Option) { + self.pipeline.add_command(cmd); + self.command_indices.push((index, inner_index)); + } +} + +/// `NodeResponse` represents a response from a node along with its source node address. +type NodeResponse = (Value, String); +/// `PipelineResponses` represents the responses for each pipeline command. +/// The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. +/// Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command +/// might produce multiple responses, each from a different node. By storing the responses with their +/// respective node addresses, we ensure that we have all the information needed to aggregate the results later. +pub type PipelineResponses = Vec>; + +/// `AddressAndIndices` represents the address of a node and the indices of commands associated with that node. +type AddressAndIndices = Vec<(String, Vec<(usize, Option)>)>; + +/// Adds a command to the pipeline map for a specific node address. +pub fn add_command_to_node_pipeline_map( + pipeline_map: &mut NodePipelineMap, + address: String, + connection: C, + cmd: Cmd, + index: usize, + inner_index: Option, +) { + pipeline_map + .entry(address) + .or_insert_with(|| NodePipelineContext::new(connection)) + .add_command(cmd, index, inner_index); +} + +/// Maps the commands in a pipeline to the appropriate nodes based on their routing information. +/// +/// This function processes each command in the given pipeline, determines its routing information, +/// and organizes it into a map of node pipelines. It handles both single-node and multi-node routing +/// strategies and ensures that the commands are distributed accordingly. +/// +/// It also collects response policies for multi-node routing and returns them along with the pipeline map. +/// This is to ensure we can aggregate responses from properly from the different nodes. +/// +/// # Arguments +/// +/// * `pipeline` - A reference to the pipeline containing the commands to route. +/// * `core` - The core object that provides access to connection locks and other resources. +/// +/// # Returns +/// +/// A `Result` containing a tuple: +/// - A `NodePipelineMap` where commands are grouped by their corresponding nodes (as pipelines). +/// - A `Vec<(usize, MultipleNodeRoutingInfo, Option)>` containing the routing information +/// and response policies for multi-node commands, along with the index of the command in the pipeline, for aggregating the responses later. +pub async fn map_pipeline_to_nodes( + pipeline: &crate::Pipeline, + core: Core, +) -> Result< + ( + NodePipelineMap, + Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ), + (OperationTarget, RedisError), +> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let mut pipelines_per_node = NodePipelineMap::new(); + let mut response_policies = Vec::new(); + + for (index, cmd) in pipeline.cmd_iter().enumerate() { + match RoutingInfo::for_routable(cmd).unwrap_or(SingleNode(SingleNodeRoutingInfo::Random)) { + SingleNode(route) => { + handle_pipeline_single_node_routing( + &mut pipelines_per_node, + cmd.clone(), + route.into(), + core.clone(), + index, + ) + .await?; + } + MultiNode((multi_node_routing, response_policy)) => { + //save the routing info and response policy, so we will be able to aggregate the results later + response_policies.push((index, multi_node_routing.clone(), response_policy)); + match multi_node_routing { + MultipleNodeRoutingInfo::AllNodes | MultipleNodeRoutingInfo::AllMasters => { + let connections: Vec<_> = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + if matches!(multi_node_routing, MultipleNodeRoutingInfo::AllNodes) { + lock.all_node_connections().collect() + } else { + lock.all_primary_connections().collect() + } + }; + + if connections.is_empty() { + return Err(( + OperationTarget::NotFound, + RedisError::from(( + ErrorKind::AllConnectionsUnavailable, // should use different kind? ConnectionNotFoundForRoute + "No available connections", + )), + )); + } + for (inner_index, (address, conn)) in connections.into_iter().enumerate() { + add_command_to_node_pipeline_map( + &mut pipelines_per_node, + address, + conn.await, + cmd.clone(), + index, + Some(inner_index), + ); + } + } + MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { + handle_pipeline_multi_slot_routing( + &mut pipelines_per_node, + core.clone(), + cmd, + index, + slots, + ) + .await?; + } + } + } + } + } + Ok((pipelines_per_node, response_policies)) +} + +/// Handles pipeline commands that require single-node routing. +/// +/// This function processes commands with `SingleNode` routing information and determines +/// the appropriate handling based on the routing type. +/// +/// ### Parameters: +/// - `pipeline_map`: A mutable reference to the `NodePipelineMap`, representing the pipelines grouped by nodes. +/// - `cmd`: The command to process and add to the appropriate node pipeline. +/// - `routing`: The single-node routing information, which determines how the command is routed. +/// - `core`: The core object responsible for managing connections and routing logic. +/// - `index`: The position of the command in the overall pipeline. +pub async fn handle_pipeline_single_node_routing( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + routing: InternalSingleNodeRouting, + core: Core, + index: usize, +) -> Result<(), (OperationTarget, RedisError)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { + // Adds the command to a random existing node pipeline in the pipeline map + let mut rng = rand::thread_rng(); + if let Some(node_context) = pipeline_map.values_mut().choose(&mut rng) { + node_context.add_command(cmd, index, None); + return Ok(()); + } + } + + let (address, conn) = + ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) + .await + .map_err(|err| (OperationTarget::NotFound, err))?; + add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); + Ok(()) +} + +/// Handles multi-slot commands within a pipeline. +/// +/// This function processes commands with routing information indicating multiple slots +/// (e.g., `MSET` or `MGET`), splits them into sub-commands based on their target slots, +/// and assigns these sub-commands to the appropriate pipelines for the corresponding nodes. +/// +/// ### Parameters: +/// - `pipelines_by_connection`: A mutable map of node pipelines where the commands will be added. +/// - `core`: The core structure that provides access to connection management. +/// - `cmd`: The original multi-slot command that needs to be split. +/// - `index`: The index of the original command within the pipeline. +/// - `slots`: A vector containing routing information. Each entry includes: +/// - `Route`: The specific route for the slot. +/// - `Vec`: Indices of the keys within the command that map to this slot. +pub async fn handle_pipeline_multi_slot_routing( + pipelines_by_connection: &mut NodePipelineMap, + core: Core, + cmd: &Cmd, + index: usize, + slots: Vec<(Route, Vec)>, +) -> Result<(), (OperationTarget, RedisError)> +where + C: Clone, +{ + // inner_index is used to keep track of the index of the sub-command inside cmd + for (inner_index, (route, indices)) in slots.iter().enumerate() { + let conn = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.connection_for_route(route) + }; + if let Some((address, conn)) = conn { + // create the sub-command for the slot + let new_cmd = command_for_multi_slot_indices(cmd, indices.iter()); + add_command_to_node_pipeline_map( + pipelines_by_connection, + address, + conn.await, + new_cmd, + index, + Some(inner_index), + ); + } else { + return Err(( + OperationTarget::NotFound, + RedisError::from(( + ErrorKind::ConnectionNotFoundForRoute, + "No available connections", + )), + )); + } + } + Ok(()) +} + +/// Collects and sends pending requests for the given pipeline map, and waits for their responses. +/// +/// This function creates `PendingRequest` objects for each pipeline in the provided pipeline map, +/// adds them to the core's pending requests queue, and waits for all responses to be received. +/// +/// # Arguments +/// +/// * `pipeline_map` - A map of node pipelines where the commands are grouped by their corresponding nodes. +/// * `core` - The core object that provides access to connection locks and other resources. +/// +/// # Returns +/// +/// A tuple containing: +/// - A vector of results for each sub-pipeline execution. +/// - A vector of (address, indices) pairs indicating where each response should be placed. +pub async fn collect_and_send_pending_requests( + pipeline_map: NodePipelineMap, + core: Core, +) -> ( + Vec, RecvError>>, + Vec<(String, Vec<(usize, Option)>)>, +) +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + // Processes the sub-pipelines to generate pending requests for execution on specific nodes. + // Each pending request encapsulates all the necessary details for executing commands on a node. + // + // Returns: + // - `receivers`: A vector of `oneshot::Receiver` instances, enabling asynchronous retrieval + // of the results from the execution of each sub-pipeline. + // - `pending_requests`: A vector of `PendingRequest` objects, each representing a scheduled command + // for execution on a node. + // - `addresses_and_indices`: A vector of tuples where each tuple contains a node address and a list + // of command indices for each sub-pipeline, allowing the results to be mapped back to their original command within the original pipeline. + let (receivers, pending_requests, addresses_and_indices) = + collect_pipeline_requests(pipeline_map); + + // Add the pending requests to the pending_requests queue + core.pending_requests + .lock() + .unwrap() + .extend(pending_requests.into_iter()); + + // Wait for all receivers to complete and collect the responses + let responses: Vec<_> = futures::future::join_all(receivers.into_iter()) + .await + .into_iter() + .collect(); + + (responses, addresses_and_indices) +} + +/// Creates `PendingRequest` objects for each pipeline in the provided pipeline map. +/// +/// This function processes the given map of node pipelines and prepares each sub-pipeline for execution +/// by creating a `PendingRequest` containing all necessary details for execution. +/// Additionally, it sets up communication channels to asynchronously receive the results of each sub-pipeline's execution. +/// +/// Returns a tuple containing: +/// - **receivers**: A vector of `oneshot::Receiver` objects to receive the responses of the sub-pipeline executions. +/// - **pending_requests**: A vector of `PendingRequest` objects, each representing a pipeline scheduled for execution on a node. +/// - **addresses_and_indices**: A vector of tuples containing node addresses and their associated command indices for each sub-pipeline, +/// allowing the results to be mapped back to their original command within the original pipeline. +#[allow(clippy::type_complexity)] +pub fn collect_pipeline_requests( + pipelines_by_connection: NodePipelineMap, +) -> ( + Vec>>, + Vec>, + Vec<(String, Vec<(usize, Option)>)>, +) +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let mut receivers = Vec::new(); + let mut pending_requests = Vec::new(); + let mut addresses_and_indices = Vec::new(); + + for (address, context) in pipelines_by_connection { + // Create a channel to receive the pipeline execution results + let (sender, receiver) = oneshot::channel(); + // Add the receiver to the list of receivers + receivers.push(receiver); + pending_requests.push(PendingRequest { + retry: 0, + sender, + info: RequestInfo { + cmd: CmdArg::Pipeline { + count: context.pipeline.len(), + pipeline: context.pipeline.into(), + offset: 0, + route: InternalSingleNodeRouting::Connection { + address: address.clone(), + conn: async { context.connection }.boxed().shared(), + }, + // mark it as a sub-pipeline mode + sub_pipeline: true, + }, + }, + }); + // Record the node address and its associated command indices for result mapping + addresses_and_indices.push((address, context.command_indices)); + } + + (receivers, pending_requests, addresses_and_indices) +} + +/// Adds the result of a pipeline command to the `pipeline_responses` collection. +/// +/// This function updates the `pipeline_responses` vector at the given `index` and optionally at the +/// `inner_index` if provided. If `inner_index` is `Some`, it ensures that the vector at that index is large enough +/// to hold the value and address at the specified position, resizing it if necessary. If `inner_index` is `None`, +/// the value and address are simply appended to the vector. +/// +/// # Parameters +/// - `pipeline_responses`: A mutable reference to a vector of vectors that stores the results of pipeline commands. +/// - `index`: The index in `pipeline_responses` where the result should be stored. +/// - `inner_index`: An optional index within the vector at `index`, used to store the result at a specific position. +/// - `value`: The result value to store. +/// - `address`: The address associated with the result. +pub fn add_pipeline_result( + pipeline_responses: &mut PipelineResponses, + index: usize, + inner_index: Option, + value: Value, + address: String, +) { + match inner_index { + Some(inner_index) => { + // Ensure the vector at the given index is large enough to hold the value and address at the specified position + if pipeline_responses[index].len() <= inner_index { + pipeline_responses[index].resize(inner_index + 1, (Value::Nil, "".to_string())); + } + pipeline_responses[index][inner_index] = (value, address); + } + None => pipeline_responses[index].push((value, address)), + } +} + +/// Processes the responses of pipeline commands and updates the given `pipeline_responses` +/// with the corresponding results. +/// +/// The function iterates over the responses along with the `addresses_and_indices` list, +/// ensuring that each response is added to its appropriate position in `pipeline_responses` along with the associated address. +/// If any response indicates an error, the function terminates early and returns the first encountered error. +/// +/// # Parameters +/// +/// - `pipeline_responses`: A vec that holds the original pipeline commands responses. +/// - `responses`: A list of responses corresponding to each sub-pipeline. +/// - `addresses_and_indices`: A list of (address, indices) pairs indicating where each response should be placed. +/// +/// # Returns +/// +/// - `Ok(())` if all responses are processed successfully. +/// - `Err((OperationTarget, RedisError))` if a node-level or reception error occurs. +pub fn process_pipeline_responses( + pipeline_responses: &mut PipelineResponses, + responses: Vec, RecvError>>, + addresses_and_indices: AddressAndIndices, +) -> Result<(), (OperationTarget, RedisError)> { + for ((address, command_indices), response_result) in + addresses_and_indices.into_iter().zip(responses) + { + match response_result { + Ok(Ok(Response::Multiple(values))) => { + // Add each response to the pipeline_responses vector at the appropriate index + for ((index, inner_index), value) in command_indices.into_iter().zip(values) { + add_pipeline_result( + pipeline_responses, + index, + inner_index, + value, + address.clone(), + ); + } + } + Ok(Err(err)) => { + return Err((OperationTarget::Node { address }, err)); + } + _ => { + return Err(( + OperationTarget::Node { address }, + RedisError::from((ErrorKind::ResponseError, "Failed to receive response")), + )); + } + } + } + Ok(()) +} + +/// This function returns the route for a given pipeline. +/// The function goes over the commands in the pipeline, checks that all key-based commands are routed to the same slot, +/// and returns the route for that specific node. +/// If the pipeline contains no key-based commands, the function returns None. +/// For non-atomic pipelines, the function will return None, regardless of the commands in it. +pub fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { + fn route_for_command(cmd: &Cmd) -> Option { + match cluster_routing::RoutingInfo::for_routable(cmd) { + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(route), + )) => Some(route), + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) => Some(Route::new_random_primary()), + Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { + .. + })) => None, + None => None, + } + } + + if pipeline.is_atomic() { + // Find first specific slot and send to it. There's no need to check If later commands + // should be routed to a different slot, since the server will return an error indicating this. + pipeline + .cmd_iter() + .map(route_for_command) + .try_fold(None, |chosen_route, next_cmd_route| { + match (chosen_route, next_cmd_route) { + (None, _) => Ok(next_cmd_route), + (_, None) => Ok(chosen_route), + (Some(chosen_route), Some(next_cmd_route)) => { + if chosen_route.slot() != next_cmd_route.slot() { + Err(( + ErrorKind::CrossSlot, + "Received crossed slots in transaction", + ) + .into()) + } else { + Ok(Some(chosen_route)) + } + } + } + }) + } else { + // Pipeline is not atomic, so we can have commands with different slots. + Ok(None) + } +} diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index babb57a1ff..6773d8895b 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -203,6 +203,29 @@ impl Pipeline { pub fn execute(&self, con: &mut dyn ConnectionLike) { self.query::<()>(con).unwrap(); } + + /// Returns whether the pipeline is in transaction mode (atomic). + /// + /// When in transaction mode, all commands in the pipeline are executed + /// as a single atomic operation. + pub fn is_atomic(&self) -> bool { + self.transaction_mode + } + + /// Returns the number of commands in the pipeline. + pub fn len(&self) -> usize { + self.commands.len() + } + + /// Returns `true` if the pipeline contains no commands. + pub fn is_empty(&self) -> bool { + self.commands.is_empty() + } + + /// Returns the command at the given index, or `None` if the index is out of bounds. + pub fn get_command(&self, index: usize) -> Option<&Cmd> { + self.commands.get(index) + } } fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec { diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 3f05092a64..37967c746a 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -367,10 +367,10 @@ impl Client { .into()); } }; - Self::convert_transaction_values_to_expected_types(pipeline, values, command_count) + Self::convert_pipeline_values_to_expected_types(pipeline, values, command_count) } - fn convert_transaction_values_to_expected_types( + fn convert_pipeline_values_to_expected_types( pipeline: &redis::Pipeline, values: Vec, command_count: usize, @@ -395,6 +395,11 @@ impl Client { routing: Option, ) -> redis::RedisFuture<'a, Value> { let command_count = pipeline.cmd_iter().count(); + // The offset is set to command_count + 1 to account for: + // 1. The first command, which is the "MULTI" command, that returns "OK" + // 2. The "QUEUED" responses for each of the commands in the pipeline (before EXEC) + // After these initial responses (OK and QUEUED), we expect a single response, + // which is an array containing the results of all the commands in the pipeline. let offset = command_count + 1; run_with_timeout(Some(self.request_timeout), async move { let values = match self.internal_client { @@ -415,6 +420,28 @@ impl Client { .boxed() } + pub fn send_pipeline<'a>( + &'a mut self, + pipeline: &'a redis::Pipeline, + ) -> redis::RedisFuture<'a, Value> { + let command_count = pipeline.cmd_iter().count(); + + run_with_timeout(Some(self.request_timeout), async move { + let values = match self.internal_client { + ClientWrapper::Standalone(ref mut client) => { + client.send_pipeline(pipeline, 0, command_count).await + } + + ClientWrapper::Cluster { ref mut client } => { + client.req_packed_commands(pipeline, 0, command_count).await + } + }?; + + Self::convert_pipeline_values_to_expected_types(pipeline, values, command_count) + }) + .boxed() + } + pub async fn invoke_script<'a>( &'a mut self, hash: &'a str, diff --git a/glide-core/src/protobuf/command_request.proto b/glide-core/src/protobuf/command_request.proto index d7c693cfd6..8b7887bc2d 100644 --- a/glide-core/src/protobuf/command_request.proto +++ b/glide-core/src/protobuf/command_request.proto @@ -501,6 +501,10 @@ message Transaction { repeated Command commands = 1; } +message Pipeline { + repeated Command commands = 1; +} + message ClusterScan { string cursor = 1; optional bytes match_pattern = 2; @@ -524,6 +528,7 @@ message CommandRequest { ScriptInvocationPointers script_invocation_pointers = 5; ClusterScan cluster_scan = 6; UpdateConnectionPassword update_connection_password = 7; + Pipeline pipeline = 8; } - Routes route = 8; + Routes route = 9; } diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 0b034e48c3..b63a0968fc 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -4,7 +4,8 @@ use super::rotating_buffer::RotatingBuffer; use crate::client::Client; use crate::cluster_scan_container::get_cluster_scan_cursor; use crate::command_request::{ - command, command_request, ClusterScan, Command, CommandRequest, Routes, SlotTypes, Transaction, + command, command_request, ClusterScan, Command, CommandRequest, Pipeline, Routes, SlotTypes, + Transaction, }; use crate::connection_request::ConnectionRequest; use crate::errors::{error_message, error_type, RequestErrorType}; @@ -388,6 +389,18 @@ async fn send_transaction( .map_err(|err| err.into()) } +async fn send_pipeline(request: Pipeline, client: &mut Client) -> ClientUsageResult { + let mut pipeline = redis::Pipeline::with_capacity(request.commands.capacity()); + for command in request.commands { + pipeline.add_command(get_redis_command(&command)?); + } + + client + .send_pipeline(&pipeline) + .await + .map_err(|err| err.into()) +} + fn get_slot_addr(slot_type: &protobuf::EnumOrUnknown) -> ClientUsageResult { slot_type .enum_value() @@ -491,6 +504,9 @@ fn handle_request(request: CommandRequest, mut client: Client, writer: Rc Err(e), } } + command_request::Command::Pipeline(pipeline) => { + send_pipeline(pipeline, &mut client).await + } command_request::Command::ScriptInvocation(script) => { match get_route(request.route.0, None) { Ok(routes) => { diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index 5921236e36..870f0869dd 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -21,7 +21,7 @@ mod socket_listener { use crate::utilities::mocks::{Mock, ServerMock}; use super::*; - use command_request::{CommandRequest, RequestType}; + use command_request::{CommandRequest, Pipeline, RequestType}; use glide_core::command_request::command::{Args, ArgsArray}; use glide_core::command_request::{Command, Transaction}; use glide_core::response::{response, ConstantResponse, Response}; @@ -30,6 +30,7 @@ mod socket_listener { use redis::{Cmd, ConnectionAddr, FromRedisValue, Value}; use rstest::rstest; use std::mem::size_of; + use std::vec; use tokio::{net::UnixListener, runtime::Builder}; /// An enum representing the values of the request type field for testing purposes @@ -309,6 +310,28 @@ mod socket_listener { write_request(buffer, socket, request); } + fn write_pipeline_request( + buffer: &mut Vec, + socket: &mut UnixStream, + callback_index: u32, + commands_components: Vec, + ) { + let mut request = CommandRequest::new(); + request.callback_idx = callback_index; + let mut pipeline = Pipeline::new(); + pipeline.commands.reserve(commands_components.len()); + + for components in commands_components { + pipeline.commands.push(get_command(components)); + } + + request.command = Some(command_request::command_request::Command::Pipeline( + pipeline, + )); + + write_request(buffer, socket, request); + } + fn write_get( buffer: &mut Vec, socket: &mut UnixStream, @@ -1211,6 +1234,169 @@ mod socket_listener { ); } + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_send_pipeline_and_get_array_of_results( + #[values(RedisType::Cluster, RedisType::Standalone)] use_cluster: RedisType, + ) { + let test_basics = setup_test_basics(Tls::NoTls, TestServer::Shared, use_cluster); + let mut socket = test_basics + .socket + .try_clone() + .expect("Failed to clone socket"); + + const CALLBACK_INDEX: u32 = 0; + // making sure both keys are in a different slot + let key = format!("{{abc}}f{}", generate_random_string(KEY_LENGTH)); + let key2 = format!("{{xyz}}f{}", generate_random_string(KEY_LENGTH)); + + let commands = vec![ + CommandComponents { + args: vec![key.clone().into(), "bar".to_string().into()], + args_pointer: true, + request_type: RequestType::Set.into(), + }, + CommandComponents { + args: vec!["GET".to_string().into(), key.clone().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.clone().into(), key2.clone().into()], + args_pointer: false, + request_type: RequestType::MGet.into(), + }, + CommandComponents { + args: vec!["FLUSHALL".to_string().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), // AllPrimaries command + }, + CommandComponents { + args: vec![], + args_pointer: false, + request_type: RequestType::DBSize.into(), // Aggregation of sum + }, + CommandComponents { + args: vec![key.clone().into()], + args_pointer: false, + request_type: RequestType::Get.into(), + }, + CommandComponents { + args: vec!["HELLO".into()], + args_pointer: false, + request_type: RequestType::Ping.into(), + }, + CommandComponents { + args: vec![ + key.into(), + "bar".to_string().into(), + key2.into(), + "baz".to_string().into(), + ], + args_pointer: false, + request_type: RequestType::MSet.into(), + }, + CommandComponents { + args: vec![], + args_pointer: false, + request_type: RequestType::DBSize.into(), + }, + CommandComponents { + args: vec!["appendonly".to_string().into(), "no".to_string().into()], + args_pointer: false, + request_type: RequestType::ConfigSet.into(), // AllNodes command + }, + CommandComponents { + args: vec!["appendonly".to_string().into()], + args_pointer: false, + request_type: RequestType::ConfigGet.into(), // RandomNode command + }, + ]; + let mut buffer = Vec::with_capacity(200); + write_pipeline_request(&mut buffer, &mut socket, CALLBACK_INDEX, commands); + + assert_value_response( + &mut buffer, + Some(&mut socket), + CALLBACK_INDEX, + Value::Array(vec![ + Value::Okay, + Value::BulkString(vec![b'b', b'a', b'r']), + Value::Array(vec![Value::BulkString(vec![b'b', b'a', b'r']), Value::Nil]), + Value::Okay, + Value::Int(0), + Value::Nil, + Value::BulkString(vec![b'H', b'E', b'L', b'L', b'O']), + Value::Okay, + Value::Int(2), + Value::Okay, + Value::Map(vec![( + Value::BulkString(vec![ + b'a', b'p', b'p', b'e', b'n', b'd', b'o', b'n', b'l', b'y', + ]), + Value::BulkString(vec![b'n', b'o']), + )]), + ]), + ); + } + + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_send_pipeline_and_get_error( + #[values(RedisType::Cluster, RedisType::Standalone)] use_cluster: RedisType, + ) { + let mut test_basics = setup_test_basics(Tls::NoTls, TestServer::Shared, use_cluster); + let mut socket = test_basics + .socket + .try_clone() + .expect("Failed to clone socket"); + + const CALLBACK_INDEX: u32 = 0; + let key = generate_random_string(KEY_LENGTH); + let commands = vec![ + CommandComponents { + args: vec![key.clone().into(), "bar".to_string().into()], + args_pointer: true, + request_type: RequestType::Set.into(), + }, + CommandComponents { + args: vec!["GET".to_string().into(), key.clone().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.clone().into(), "random_key".into()], + args_pointer: false, + request_type: RequestType::MGet.into(), + }, + CommandComponents { + args: vec![key.clone().into()], + args_pointer: false, + request_type: RequestType::LLen.into(), + }, + CommandComponents { + args: vec!["FLUSHALL".to_string().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.into()], + args_pointer: false, + request_type: RequestType::Get.into(), + }, + ]; + let mut buffer = Vec::with_capacity(200); + write_pipeline_request(&mut buffer, &mut socket, CALLBACK_INDEX, commands); + + assert_error_response( + &mut buffer, + &mut test_basics.socket, + CALLBACK_INDEX, + ResponseType::RequestError, + ); + } #[rstest] #[serial_test::serial] #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]