Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add new CommandRequest - Pipeline #2954

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
213 changes: 163 additions & 50 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
//! ```

mod connections_container;
mod pipeline_routing;

mod connections_logic;
/// Exposed only for testing.
pub mod testing {
Expand All @@ -41,6 +43,10 @@ use crate::{
FromRedisValue, InfoDict,
};
use dashmap::DashMap;
use pipeline_routing::{
collect_and_send_pending_requests, map_pipeline_to_nodes, process_pipeline_responses,
route_for_pipeline, PipelineResponses,
};
use std::{
collections::{HashMap, HashSet},
fmt, io, mem,
Expand Down Expand Up @@ -285,6 +291,7 @@ where
offset,
count,
route: route.into(),
sub_pipeline: false,
},
sender,
})
Expand Down Expand Up @@ -606,6 +613,7 @@ enum CmdArg<C> {
offset: usize,
count: usize,
route: InternalSingleNodeRouting<C>,
sub_pipeline: bool,
},
ClusterScan {
// struct containing the arguments for the cluster scan command - scan state cursor, match pattern, count and object type.
Expand All @@ -621,44 +629,6 @@ enum Operation {
UpdateConnectionPassword(Option<String>),
}

fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult<Option<Route>> {
fn route_for_command(cmd: &Cmd) -> Option<Route> {
match cluster_routing::RoutingInfo::for_routable(cmd) {
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None,
Some(cluster_routing::RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(route),
)) => Some(route),
Some(cluster_routing::RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary,
)) => Some(Route::new_random_primary()),
Some(cluster_routing::RoutingInfo::MultiNode(_)) => None,
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
..
})) => None,
None => None,
}
}

// Find first specific slot and send to it. There's no need to check If later commands
// should be routed to a different slot, since the server will return an error indicating this.
pipeline.cmd_iter().map(route_for_command).try_fold(
None,
|chosen_route, next_cmd_route| match (chosen_route, next_cmd_route) {
(None, _) => Ok(next_cmd_route),
(_, None) => Ok(chosen_route),
(Some(chosen_route), Some(next_cmd_route)) => {
if chosen_route.slot() != next_cmd_route.slot() {
Err((ErrorKind::CrossSlot, "Received crossed slots in pipeline").into())
} else if chosen_route.slot_addr() == SlotAddr::ReplicaOptional {
Ok(Some(next_cmd_route))
} else {
Ok(Some(chosen_route))
}
}
},
)
}

fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> {
Box::pin(tokio::time::sleep(duration))
}
Expand Down Expand Up @@ -2147,15 +2117,23 @@ where
offset,
count,
route,
sub_pipeline,
} => {
Self::try_pipeline_request(
pipeline,
offset,
count,
Self::get_connection(route, core, None),
)
.await
if pipeline.is_atomic() || sub_pipeline {
// If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines.
Self::try_pipeline_request(
pipeline,
offset,
count,
Self::get_connection(route, core, None),
)
.await
} else {
// The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move all of this logic into a dedicated function, shouldn't be directly in try_request

Self::handle_pipeline_request(&pipeline, core).await
}
}

CmdArg::ClusterScan {
cluster_scan_args, ..
} => {
Expand All @@ -2180,6 +2158,140 @@ where
}
}

/// Handles the execution of a non-atomic pipeline request by splitting it into sub-pipelines and sending them to the appropriate cluster nodes.
///
/// This function distributes the commands in the pipeline across the cluster nodes based on routing information, collects the responses,
/// and aggregates them if necessary according to the specified response policies.
///
/// # Arguments
/// * `pipeline` - A reference to the pipeline to be executed.
/// * `core` - A reference to the core cluster connection state.
///
/// # Returns
/// * `OperationResult` - Returns a result containing the aggregated responses from the sub-pipelines, or an error if the operation fails.
async fn handle_pipeline_request(pipeline: &crate::Pipeline, core: Core<C>) -> OperationResult {
// Distribute pipeline commands across cluster nodes based on routing information.
// Returns:
// - pipelines_by_node: Map of node addresses to their pipeline contexts
// - response_policies: List of response aggregation policies for multi-node operations
let (pipelines_by_node, response_policies) =
map_pipeline_to_nodes(pipeline, core.clone()).await?;

// Initialize `PipelineResponses` to store responses for each pipeline command.
// This will be used to store the responses from the different sub-pipelines to the pipeline commands.
// A command can have one or more responses (e.g MultiNode commands).
// Each entry in `PipelineResponses` corresponds to a command in the original pipeline and contains
// a vector of tuples where each tuple holds a response to the command and the address of the node that provided it.
let mut pipeline_responses: PipelineResponses = vec![Vec::new(); pipeline.len()];

// Send the requests to each node and collect thw responses
let (responses, addresses_and_indices) =
collect_and_send_pending_requests(pipelines_by_node, core.clone()).await;

// Process the responses and update the pipeline_responses
process_pipeline_responses(&mut pipeline_responses, responses, addresses_and_indices)?;

// Process response policies after all tasks are complete
Self::aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies)
.await?;

// Collect final responses, ensuring no missing or invalid responses.
let final_responses: Result<Vec<Value>, (OperationTarget, RedisError)> = pipeline_responses
.into_iter()
.enumerate()
.map(|(index, mut value)| {
value.pop().map(|(response, _)| response).ok_or_else(|| {
(
OperationTarget::FanOut,
RedisError::from((
ErrorKind::ResponseError,
"No response found for command: ",
pipeline
.get_command(index)
.map_or("no command available".to_string(), |cmd| {
format!("{:?}", cmd)
}),
)),
)
})
})
.collect();

Ok(Response::Multiple(final_responses?))
}

/// Aggregates pipeline responses for multi-node commands and updates the `pipeline_responses` vector.
///
/// Pipeline commands with multi-node routing info, will be splitted into multiple pipelines, therefore, after executing each pipeline and storing the results in `pipeline_responses`,
/// the multi-node commands will contain more than one response (one for each sub-pipeline that contained the command). This responses must be aggregated into a single response, based on the proper response policy.
///
/// This function processes the provided `response_policies`, which contain information about how responses from multiple nodes should be aggregated.
/// For each policy:
/// - It collects the multiple responses and their source node addresses from the corresponding entry in `pipeline_responses`.
/// - Uses the routing information and optional response policy to aggregate the responses into a single result.
///
/// The aggregated result replaces the existing entries in `pipeline_responses` for the given command index, changing the multiple responses to the command into a single aggregated response.
///
/// After the execution of this function, all entries in `pipeline_responses` will contain a single response for each command.
///
/// # Arguments
/// * `pipeline_responses` - A mutable reference to a vector of vectors, where each inner vector contains tuples of responses and their corresponding node addresses.
/// * `response_policies` - A vector of tuples, each containing:
/// - The index of the command in the pipeline that has a multi-node routing info.
/// - The routing information for the command.
/// - An optional response policy that dictates how the responses should be aggregated.
///
/// # Example
/// Suppose we have a pipeline with multiple commands that were split and executed on different nodes.
/// This function will aggregate the responses for commands that were split across multiple nodes.
///
/// ```rust,compile_fail
/// // Example pipeline with commands that might be split across nodes
///
/// let mut pipeline_responses = vec![
/// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string()), (Value::Int(3), "node3".to_string())], // represents `DBSIZE`
/// vec![(Value::Int(3), "node3".to_string())],
/// vec![(Value::SimpleString("PONG".to_string()), "node1".to_string()), (Value::SimpleString("PONG".to_string()), "node2".to_string()), (Value::SimpleString("PONG".to_string()), "node3".to_string())], // represents `PING`
/// ];
/// let response_policies = vec![
/// (0, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::Aggregate(AggregateOp::Sum))),
/// (2, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::AllSucceeded)),
/// ];
///
/// // Aggregating the responses
/// aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies).await.unwrap();
///
/// // After aggregation, pipeline_responses will be updated with aggregated results
/// assert_eq!(pipeline_responses[0], vec![(Value::Int(6), "".to_string())]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"".to_string()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it return an empty string instead of some representing enum/None

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you still need this field at this stage? can't you completely filter it out?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant filter it out since I am only going through the commands in the pipeline with multi-node routing, the pipeline_responses still contains the rest of the commands (with single node routing) that are with the type of (response, address)

/// assert_eq!(pipeline_responses[1], vec![(Value::Int(3), "node3".to_string())]);
/// assert_eq!(pipeline_responses[2], vec![(Value::SimpleString("PONG".to_string()), "".to_string())]);
/// ```
///
/// This function is essential for handling multi-node commands in a Redis cluster, ensuring that responses from different nodes are correctly aggregated and processed.
async fn aggregate_pipeline_multi_node_commands(
pipeline_responses: &mut PipelineResponses,
response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option<ResponsePolicy>)>,
) -> Result<(), (OperationTarget, RedisError)> {
for (index, routing_info, response_policy) in response_policies {
let response_receivers = pipeline_responses[index]
.iter()
.map(|(value, address)| {
let (sender, receiver) = oneshot::channel();
let _ = sender.send(Ok(Response::Single(value.clone())));
(Some(address.clone()), receiver)
})
.collect();

let aggregated_response =
Self::aggregate_results(response_receivers, &routing_info, response_policy)
.await
.map_err(|err| (OperationTarget::FanOut, err))?;

pipeline_responses[index] = vec![(aggregated_response, "".to_string())];
}
Ok(())
}

async fn get_connection(
routing: InternalSingleNodeRouting<C>,
core: Core<C>,
Expand Down Expand Up @@ -2821,7 +2933,7 @@ impl Connect for MultiplexedConnection {

#[cfg(test)]
mod pipeline_routing_tests {
use super::route_for_pipeline;
use super::pipeline_routing::route_for_pipeline;
use crate::{
cluster_routing::{Route, SlotAddr},
cmd,
Expand All @@ -2830,6 +2942,7 @@ mod pipeline_routing_tests {
#[test]
fn test_first_route_is_found() {
let mut pipeline = crate::Pipeline::new();
pipeline.atomic();

pipeline
.add_command(cmd("FLUSHALL")) // route to all masters
Expand All @@ -2845,7 +2958,7 @@ mod pipeline_routing_tests {
#[test]
fn test_return_none_if_no_route_is_found() {
let mut pipeline = crate::Pipeline::new();

pipeline.atomic();
pipeline
.add_command(cmd("FLUSHALL")) // route to all masters
.add_command(cmd("EVAL")); // route randomly
Expand All @@ -2856,7 +2969,7 @@ mod pipeline_routing_tests {
#[test]
fn test_prefer_primary_route_over_replica() {
let mut pipeline = crate::Pipeline::new();

pipeline.atomic();
pipeline
.get("foo") // route to replica of slot 12182
.add_command(cmd("FLUSHALL")) // route to all masters
Expand All @@ -2873,7 +2986,7 @@ mod pipeline_routing_tests {
#[test]
fn test_raise_cross_slot_error_on_conflicting_slots() {
let mut pipeline = crate::Pipeline::new();

pipeline.atomic();
pipeline
.add_command(cmd("FLUSHALL")) // route to all masters
.set("baz", "bar") // route to slot 4813
Expand All @@ -2888,7 +3001,7 @@ mod pipeline_routing_tests {
#[test]
fn unkeyed_commands_dont_affect_route() {
let mut pipeline = crate::Pipeline::new();

pipeline.atomic();
pipeline
.set("{foo}bar", "baz") // route to primary of slot 12182
.cmd("CONFIG").arg("GET").arg("timeout") // unkeyed command
Expand Down
Loading
Loading