From c3f55e414ac91bb68ebeba7e25ea70f2a7d50d43 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Wed, 21 Feb 2024 21:31:07 +0200 Subject: [PATCH] Core: Add support to route by address. (#971) * Core: Add support to route by address. * Update test. --- glide-core/src/protobuf/redis_request.proto | 6 ++ glide-core/src/socket_listener.rs | 12 ++++ glide-core/tests/test_socket_listener.rs | 71 ++++++++++++++++++++- 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index 034fa9e518..06380f511c 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -22,11 +22,17 @@ message SlotKeyRoute { string slot_key = 2; } +message ByAddressRoute { + string host = 1; + int32 port = 2; +} + message Routes { oneof value { SimpleRoutes simple_routes = 1; SlotKeyRoute slot_key_route = 2; SlotIdRoute slot_id_route = 3; + ByAddressRoute by_address_route = 4; } } diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 338e689307..06879e8e91 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -488,6 +488,18 @@ fn get_route( get_slot_addr(&slot_id_route.slot_type)?, )), ))), + Value::ByAddressRoute(by_address_route) => match u16::try_from(by_address_route.port) { + Ok(port) => Ok(Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::ByAddress { + host: by_address_route.host.to_string(), + port, + }, + ))), + Err(err) => { + log_warn("get route", format!("Failed to parse port: {err:?}")); + Ok(None) + } + }, } } diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index b909151a0c..b7d7e6e6e3 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -25,7 +25,7 @@ mod socket_listener { use glide_core::response::{response, ConstantResponse, Response}; use glide_core::scripts_container::add_script; use protobuf::{EnumOrUnknown, Message}; - use redis::{Cmd, ConnectionAddr, Value}; + use redis::{Cmd, ConnectionAddr, FromRedisValue, Value}; use redis_request::{RedisRequest, RequestType}; use rstest::rstest; use std::mem::size_of; @@ -619,6 +619,75 @@ mod socket_listener { } } + #[rstest] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_socket_pass_manual_route_by_address() { + // We send a request to a random node, get that node's hostname & port, and then + // route the same request to the hostname & port, and verify that we've received the same value. + let mut test_basics = setup_cluster_test_basics(false, true); + + const CALLBACK_INDEX: u32 = 100; + let approx_message_length = 4 + APPROX_RESP_HEADER_LEN; + let mut buffer = Vec::with_capacity(approx_message_length); + let mut request = get_command_request( + CALLBACK_INDEX, + vec!["CLUSTER".to_string(), "NODES".to_string()], + RequestType::CustomCommand.into(), + false, + ); + let mut routes = redis_request::Routes::default(); + routes.set_simple_routes(redis_request::SimpleRoutes::Random); + request.route = Some(routes).into(); + write_message(&mut buffer, request.clone()); + test_basics.socket.write_all(&buffer).unwrap(); + + let _size = read_from_socket(&mut buffer, &mut test_basics.socket); + let (message_length, header_bytes) = parse_header(&buffer); + let response = decode_response(&buffer, header_bytes, message_length as usize); + + assert_eq!(response.callback_idx, CALLBACK_INDEX); + let Some(response::Value::RespPointer(pointer)) = response.value else { + panic!("Unexpected response {:?}", response.value); + }; + let pointer = pointer as *mut Value; + let received_value = unsafe { Box::from_raw(pointer) }; + let first_value = String::from_redis_value(&received_value).unwrap(); + let (host, port) = first_value + .split('\n') + .find(|line| line.contains("myself")) + .and_then(|line| line.split_once(' ')) + .and_then(|(_, second)| second.split_once('@')) + .and_then(|(first, _)| first.split_once(':')) + .and_then(|(host, port)| port.parse::().map(|port| (host, port)).ok()) + .unwrap(); + + buffer.clear(); + let mut routes = redis_request::Routes::default(); + let by_address_route = glide_core::redis_request::ByAddressRoute { + host: host.into(), + port, + ..Default::default() + }; + routes.set_by_address_route(by_address_route); + request.route = Some(routes).into(); + write_message(&mut buffer, request); + test_basics.socket.write_all(&buffer).unwrap(); + + let _size = read_from_socket(&mut buffer, &mut test_basics.socket); + let (message_length, header_bytes) = parse_header(&buffer); + let response = decode_response(&buffer, header_bytes, message_length as usize); + + assert_eq!(response.callback_idx, CALLBACK_INDEX); + let Some(response::Value::RespPointer(pointer)) = response.value else { + panic!("Unexpected response {:?}", response.value); + }; + let pointer = pointer as *mut Value; + let received_value = unsafe { Box::from_raw(pointer) }; + let second_value = String::from_redis_value(&received_value).unwrap(); + + assert_eq!(first_value, second_value); + } + #[rstest] #[timeout(SHORT_STANDALONE_TEST_TIMEOUT)] fn test_socket_get_returns_null(#[values(false, true)] use_arg_pointer: bool) {