Skip to content

Commit

Permalink
Merge pull request #195 from muzarski/rack-awareness
Browse files Browse the repository at this point in the history
lbp: rack awareness
  • Loading branch information
muzarski authored Nov 20, 2024
2 parents 6ac2ec3 + 8ed8d15 commit 74c1cc9
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 25 deletions.
91 changes: 84 additions & 7 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,50 @@ cass_execution_profile_set_load_balance_dc_aware_n(CassExecProfile* profile,
unsigned used_hosts_per_remote_dc,
cass_bool_t allow_remote_dcs_for_local_cl);


/**
* Configures the execution profile to use Rack-aware load balancing.
* For each query, all live nodes in a primary 'local' rack are tried first,
* followed by nodes from local DC and then nodes from other DCs.
*
* <b>Note:</b> Profile-based load balancing policy is disabled by default.
* cluster load balancing policy is used when profile does not contain a policy.
*
* @public @memberof CassExecProfile
*
* @param[in] profile
* @param[in] local_dc The primary data center to try first
* @param[in] local_rack The primary rack to try first
* @return CASS_OK if successful, otherwise an error occurred
*/
CASS_EXPORT CassError
cass_execution_profile_set_load_balance_rack_aware(CassExecProfile* profile,
const char* local_dc,
const char* local_rack);


/**
* Same as cass_execution_profile_set_load_balance_rack_aware(), but with lengths for string
* parameters.
*
* @public @memberof CassExecProfile
*
* @param[in] profile
* @param[in] local_dc
* @param[in] local_dc_length
* @return same cass_execution_profile_set_load_balance_rack_aware()
*
* @see cass_execution_profile_set_load_balance_rack_aware()
* @see cass_cluster_set_load_balance_rack_aware_n()
*/
CASS_EXPORT CassError
cass_execution_profile_set_load_balance_rack_aware_n(CassExecProfile* profile,
const char* local_dc,
size_t local_dc_length,
const char* local_rack,
size_t local_rack_length);


/**
* Configures the execution profile to use token-aware request routing or not.
*
Expand Down Expand Up @@ -2176,13 +2220,6 @@ cass_cluster_set_load_balance_round_robin(CassCluster* cluster);
* For each query, all live nodes in a primary 'local' DC are tried first,
* followed by any node from other DCs.
*
* <b>Note:</b> This is the default, and does not need to be called unless
* switching an existing from another policy or changing settings.
* Without further configuration, a default local_dc is chosen from the
* first connected contact point, and no remote hosts are considered in
* query plans. If relying on this mechanism, be sure to use only contact
* points from the local DC.
*
* @deprecated The remote DC settings for DC-aware are not suitable for most
* scenarios that require DC failover. There is also unhandled gap between
* replication factor number of nodes failing and the full cluster failing. Only
Expand Down Expand Up @@ -2233,6 +2270,46 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster,
unsigned used_hosts_per_remote_dc,
cass_bool_t allow_remote_dcs_for_local_cl);


/**
* Configures the cluster to use Rack-aware load balancing.
* For each query, all live nodes in a primary 'local' rack are tried first,
* followed by nodes from local DC and then nodes from other DCs.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] local_dc The primary data center to try first
* @param[in] local_rack The primary rack to try first
* @return CASS_OK if successful, otherwise an error occurred
*/
CASS_EXPORT CassError
cass_cluster_set_load_balance_rack_aware(CassCluster* cluster,
const char* local_dc,
const char* local_rack);


/**
* Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string
* parameters.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] local_dc
* @param[in] local_dc_length
* @return same as cass_cluster_set_load_balance_dc_aware()
*
* @see cass_cluster_set_load_balance_dc_aware()
*/
CASS_EXPORT CassError
cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster,
const char* local_dc,
size_t local_dc_length,
const char* local_rack,
size_t local_rack_length);


/**
* Configures the cluster to use token-aware request routing or not.
*
Expand Down
183 changes: 166 additions & 17 deletions scylla-rust-wrapper/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,22 @@ impl LoadBalancingConfig {
builder =
builder.enable_shuffling_replicas(self.token_aware_shuffling_replicas_enabled);
}
if let LoadBalancingKind::DcAware { local_dc } = load_balancing_kind {
builder = builder.prefer_datacenter(local_dc).permit_dc_failover(true)

match load_balancing_kind {
LoadBalancingKind::DcAware { local_dc } => {
builder = builder.prefer_datacenter(local_dc).permit_dc_failover(true)
}
LoadBalancingKind::RackAware {
local_dc,
local_rack,
} => {
builder = builder
.prefer_datacenter_and_rack(local_dc, local_rack)
.permit_dc_failover(true)
}
LoadBalancingKind::RoundRobin => {}
}

if self.latency_awareness_enabled {
builder = builder.latency_awareness(self.latency_awareness_builder);
}
Expand All @@ -99,7 +112,13 @@ impl Default for LoadBalancingConfig {
#[derive(Clone, Debug)]
pub(crate) enum LoadBalancingKind {
RoundRobin,
DcAware { local_dc: String },
DcAware {
local_dc: String,
},
RackAware {
local_dc: String,
local_rack: String,
},
}

#[derive(Clone)]
Expand Down Expand Up @@ -553,6 +572,68 @@ pub unsafe extern "C" fn cass_cluster_set_load_balance_dc_aware_n(
)
}

#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_load_balance_rack_aware(
cluster_raw: *mut CassCluster,
local_dc_raw: *const c_char,
local_rack_raw: *const c_char,
) -> CassError {
cass_cluster_set_load_balance_rack_aware_n(
cluster_raw,
local_dc_raw,
strlen(local_dc_raw),
local_rack_raw,
strlen(local_rack_raw),
)
}

#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_load_balance_rack_aware_n(
cluster_raw: *mut CassCluster,
local_dc_raw: *const c_char,
local_dc_length: size_t,
local_rack_raw: *const c_char,
local_rack_length: size_t,
) -> CassError {
let cluster = ptr_to_ref_mut(cluster_raw);

set_load_balance_rack_aware_n(
&mut cluster.load_balancing_config,
local_dc_raw,
local_dc_length,
local_rack_raw,
local_rack_length,
)
}

pub(crate) unsafe fn set_load_balance_rack_aware_n(
load_balancing_config: &mut LoadBalancingConfig,
local_dc_raw: *const c_char,
local_dc_length: size_t,
local_rack_raw: *const c_char,
local_rack_length: size_t,
) -> CassError {
let (local_dc, local_rack) = match (
ptr_to_cstr_n(local_dc_raw, local_dc_length),
ptr_to_cstr_n(local_rack_raw, local_rack_length),
) {
(Some(local_dc_str), Some(local_rack_str))
if local_dc_length > 0 && local_rack_length > 0 =>
{
(local_dc_str.to_owned(), local_rack_str.to_owned())
}
// One of them either is a null pointer, is an empty string or is not a proper utf-8.
_ => return CassError::CASS_ERROR_LIB_BAD_PARAMS,
};

load_balancing_config.load_balancing_kind = Some(LoadBalancingKind::RackAware {
local_dc,
local_rack,
});

CassError::CASS_OK
}

#[no_mangle]
pub unsafe extern "C" fn cass_cluster_set_cloud_secure_connection_bundle_n(
_cluster_raw: *mut CassCluster,
Expand Down Expand Up @@ -891,12 +972,7 @@ mod tests {
{
cass_cluster_set_token_aware_routing(cluster_raw, 0);
assert_cass_error_eq!(
cass_cluster_set_load_balance_dc_aware(
cluster_raw,
"eu\0".as_ptr() as *const i8,
0,
0
),
cass_cluster_set_load_balance_dc_aware(cluster_raw, c"eu".as_ptr(), 0, 0),
CassError::CASS_OK
);
cass_cluster_set_latency_aware_routing(cluster_raw, 1);
Expand All @@ -920,25 +996,98 @@ mod tests {
}
assert!(!cluster.load_balancing_config.token_awareness_enabled);
assert!(cluster.load_balancing_config.latency_awareness_enabled);

// set preferred rack+dc
assert_cass_error_eq!(
cass_cluster_set_load_balance_rack_aware(
cluster_raw,
c"eu-east".as_ptr(),
c"rack1".as_ptr(),
),
CassError::CASS_OK
);

let node_location_preference =
&cluster.load_balancing_config.load_balancing_kind;
match node_location_preference {
Some(LoadBalancingKind::RackAware {
local_dc,
local_rack,
}) => {
assert_eq!(local_dc, "eu-east");
assert_eq!(local_rack, "rack1");
}
_ => panic!("Expected preferred dc and rack"),
}

// set back to preferred dc
assert_cass_error_eq!(
cass_cluster_set_load_balance_dc_aware(cluster_raw, c"eu".as_ptr(), 0, 0),
CassError::CASS_OK
);

let node_location_preference =
&cluster.load_balancing_config.load_balancing_kind;
match node_location_preference {
Some(LoadBalancingKind::DcAware { local_dc }) => {
assert_eq!(local_dc, "eu")
}
_ => panic!("Expected preferred dc"),
}
}
/* Test invalid configurations */
{
// Nonzero deprecated parameters
assert_cass_error_eq!(
cass_cluster_set_load_balance_dc_aware(
cass_cluster_set_load_balance_dc_aware(cluster_raw, c"eu".as_ptr(), 1, 0),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);
assert_cass_error_eq!(
cass_cluster_set_load_balance_dc_aware(cluster_raw, c"eu".as_ptr(), 0, 1),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);

// null pointers
assert_cass_error_eq!(
cass_cluster_set_load_balance_dc_aware(cluster_raw, std::ptr::null(), 0, 0),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);
assert_cass_error_eq!(
cass_cluster_set_load_balance_rack_aware(
cluster_raw,
c"eu".as_ptr(),
std::ptr::null(),
),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);
assert_cass_error_eq!(
cass_cluster_set_load_balance_rack_aware(
cluster_raw,
std::ptr::null(),
c"rack".as_ptr(),
),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);

// empty strings
let empty_str = "\0".as_ptr() as *const i8;
assert_cass_error_eq!(
cass_cluster_set_load_balance_dc_aware(cluster_raw, std::ptr::null(), 0, 0),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);
assert_cass_error_eq!(
cass_cluster_set_load_balance_rack_aware(
cluster_raw,
"eu\0".as_ptr() as *const i8,
1,
0
c"eu".as_ptr(),
empty_str,
),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);
assert_cass_error_eq!(
cass_cluster_set_load_balance_dc_aware(
cass_cluster_set_load_balance_rack_aware(
cluster_raw,
"eu\0".as_ptr() as *const i8,
0,
1
empty_str,
c"rack".as_ptr(),
),
CassError::CASS_ERROR_LIB_BAD_PARAMS
);
Expand Down
39 changes: 38 additions & 1 deletion scylla-rust-wrapper/src/exec_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use crate::argconv::{free_boxed, ptr_to_cstr_n, ptr_to_ref, ptr_to_ref_mut, strl
use crate::batch::CassBatch;
use crate::cass_error::CassError;
use crate::cass_types::CassConsistency;
use crate::cluster::{set_load_balance_dc_aware_n, LoadBalancingConfig, LoadBalancingKind};
use crate::cluster::{
set_load_balance_dc_aware_n, set_load_balance_rack_aware_n, LoadBalancingConfig,
LoadBalancingKind,
};
use crate::retry_policy::CassRetryPolicy;
use crate::retry_policy::RetryPolicy::{
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy,
Expand Down Expand Up @@ -357,6 +360,40 @@ pub unsafe extern "C" fn cass_execution_profile_set_load_balance_dc_aware_n(
)
}

#[no_mangle]
pub unsafe extern "C" fn cass_execution_profile_set_load_balance_rack_aware(
profile: *mut CassExecProfile,
local_dc_raw: *const c_char,
local_rack_raw: *const c_char,
) -> CassError {
cass_execution_profile_set_load_balance_rack_aware_n(
profile,
local_dc_raw,
strlen(local_dc_raw),
local_rack_raw,
strlen(local_rack_raw),
)
}

#[no_mangle]
pub unsafe extern "C" fn cass_execution_profile_set_load_balance_rack_aware_n(
profile: *mut CassExecProfile,
local_dc_raw: *const c_char,
local_dc_length: size_t,
local_rack_raw: *const c_char,
local_rack_length: size_t,
) -> CassError {
let profile_builder = ptr_to_ref_mut(profile);

set_load_balance_rack_aware_n(
&mut profile_builder.load_balancing_config,
local_dc_raw,
local_dc_length,
local_rack_raw,
local_rack_length,
)
}

#[no_mangle]
pub unsafe extern "C" fn cass_execution_profile_set_load_balance_round_robin(
profile: *mut CassExecProfile,
Expand Down

0 comments on commit 74c1cc9

Please sign in to comment.