Skip to content

Commit

Permalink
connection[_pool]: propagate UntranslatedEndpoint borrowing
Browse files Browse the repository at this point in the history
As address translation no longer needs owned Strings, it's enough to
pass references to UntranslatedEndpoint to `open_connection` & friends.

Note that, unfortunately, we can't fully elide the clone in
`start_opening_connection()`, because the endpoint may be mutated (in
the shard-aware case) for the purpose of opening a connection to the
shard-aware port. Nonetheless, now the limitation that requires us to
clone is in internal code, not in the user-facing API, which is better
for us.
  • Loading branch information
wprzytula committed Feb 3, 2025
1 parent e7e62e7 commit c3a42a6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
30 changes: 17 additions & 13 deletions scylla/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2074,16 +2074,16 @@ impl Connection {
}

async fn maybe_translated_addr(
endpoint: UntranslatedEndpoint,
endpoint: &UntranslatedEndpoint,
address_translator: Option<&dyn AddressTranslator>,
) -> Result<SocketAddr, TranslationError> {
match endpoint {
UntranslatedEndpoint::ContactPoint(addr) => Ok(addr.address),
match *endpoint {
UntranslatedEndpoint::ContactPoint(ref addr) => Ok(addr.address),
UntranslatedEndpoint::Peer(PeerEndpoint {
host_id,
address,
datacenter,
rack,
ref datacenter,
ref rack,
}) => match address {
NodeAddr::Translatable(addr) => {
// In this case, addr is subject to AddressTranslator.
Expand Down Expand Up @@ -2118,7 +2118,7 @@ async fn maybe_translated_addr(
///
/// At the beginning, translates node's address, if it is subject to address translation.
pub(super) async fn open_connection(
endpoint: UntranslatedEndpoint,
endpoint: &UntranslatedEndpoint,
source_port: Option<u16>,
config: &HostConnectionConfig,
) -> Result<(Connection, ErrorReceiver), ConnectionError> {
Expand Down Expand Up @@ -2236,7 +2236,7 @@ pub(super) async fn open_connection(
}

pub(super) async fn open_connection_to_shard_aware_port(
endpoint: UntranslatedEndpoint,
endpoint: &UntranslatedEndpoint,
shard: Shard,
sharder: Sharder,
config: &HostConnectionConfig,
Expand All @@ -2245,7 +2245,7 @@ pub(super) async fn open_connection_to_shard_aware_port(
let source_port_iter = sharder.iter_source_ports_for_shard(shard);

for port in source_port_iter {
let connect_result = open_connection(endpoint.clone(), Some(port), config).await;
let connect_result = open_connection(endpoint, Some(port), config).await;

match connect_result {
Err(err) if err.is_address_unavailable_for_use() => continue, // If we can't use this port, try the next one
Expand Down Expand Up @@ -2554,7 +2554,7 @@ mod tests {
let addr: SocketAddr = resolve_hostname(&uri).await;

let (connection, _) = super::open_connection(
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
&UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: addr,
datacenter: None,
}),
Expand Down Expand Up @@ -2679,7 +2679,7 @@ mod tests {

let subtest = |enable_coalescing: bool, ks: String| async move {
let (connection, _) = super::open_connection(
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
&UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: addr,
datacenter: None,
}),
Expand Down Expand Up @@ -2811,16 +2811,20 @@ mod tests {
.unwrap();

// We must interrupt the driver's full connection opening, because our proxy does not interact further after Startup.
let endpoint = UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: proxy_addr,
datacenter: None,
});
let (startup_without_lwt_optimisation, _shard) = select! {
_ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, &config) => unreachable!(),
_ = open_connection(&endpoint, None, &config) => unreachable!(),
startup = startup_rx.recv() => startup.unwrap(),
};

proxy.running_nodes[0]
.change_request_rules(Some(make_rules(options_with_lwt_optimisation_support)));

let (startup_with_lwt_optimisation, _shard) = select! {
_ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, &config) => unreachable!(),
_ = open_connection(&endpoint, None, &config) => unreachable!(),
startup = startup_rx.recv() => startup.unwrap(),
};

Expand Down Expand Up @@ -2879,7 +2883,7 @@ mod tests {

// Setup connection normally, without obstruction
let (conn, mut error_receiver) = open_connection(
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
&UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: proxy_addr,
datacenter: None,
}),
Expand Down
25 changes: 10 additions & 15 deletions scylla/src/network/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ impl PoolRefiller {
endpoint
};
let result = open_connection_to_shard_aware_port(
shard_aware_endpoint,
&shard_aware_endpoint,
shard,
sharder.clone(),
&cfg,
Expand All @@ -877,7 +877,7 @@ impl PoolRefiller {
.boxed(),
_ => async move {
let non_shard_aware_endpoint = endpoint;
let result = open_connection(non_shard_aware_endpoint, None, &cfg).await;
let result = open_connection(&non_shard_aware_endpoint, None, &cfg).await;
OpenedConnectionEvent {
result,
requested_shard: None,
Expand Down Expand Up @@ -1162,20 +1162,15 @@ mod tests {
// to the right shard
let sharder = Sharder::new(ShardCount::new(3).unwrap(), 12);

let endpoint = UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: connect_address,
datacenter: None,
});

// Open the connections
let mut conns = Vec::new();

for _ in 0..connections_number {
conns.push(open_connection_to_shard_aware_port(
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
address: connect_address,
datacenter: None,
}),
0,
sharder.clone(),
&connection_config,
));
}
let conns = (0..connections_number).map(|_| {
open_connection_to_shard_aware_port(&endpoint, 0, sharder.clone(), &connection_config)
});

let joined = futures::future::join_all(conns).await;

Expand Down

0 comments on commit c3a42a6

Please sign in to comment.