diff --git a/scylla/src/network/connection.rs b/scylla/src/network/connection.rs index 7482687a96..a912d0ce11 100644 --- a/scylla/src/network/connection.rs +++ b/scylla/src/network/connection.rs @@ -2074,16 +2074,16 @@ impl Connection { } async fn maybe_translated_addr( - endpoint: UntranslatedEndpoint, + endpoint: &UntranslatedEndpoint, address_translator: Option<&dyn AddressTranslator>, ) -> Result { - match endpoint { - UntranslatedEndpoint::ContactPoint(addr) => Ok(addr.address), + match *endpoint { + UntranslatedEndpoint::ContactPoint(ref addr) => Ok(addr.address), UntranslatedEndpoint::Peer(PeerEndpoint { host_id, address, - datacenter, - rack, + ref datacenter, + ref rack, }) => match address { NodeAddr::Translatable(addr) => { // In this case, addr is subject to AddressTranslator. @@ -2118,7 +2118,7 @@ async fn maybe_translated_addr( /// /// At the beginning, translates node's address, if it is subject to address translation. pub(super) async fn open_connection( - endpoint: UntranslatedEndpoint, + endpoint: &UntranslatedEndpoint, source_port: Option, config: &HostConnectionConfig, ) -> Result<(Connection, ErrorReceiver), ConnectionError> { @@ -2236,7 +2236,7 @@ pub(super) async fn open_connection( } pub(super) async fn open_connection_to_shard_aware_port( - endpoint: UntranslatedEndpoint, + endpoint: &UntranslatedEndpoint, shard: Shard, sharder: Sharder, config: &HostConnectionConfig, @@ -2245,7 +2245,7 @@ pub(super) async fn open_connection_to_shard_aware_port( let source_port_iter = sharder.iter_source_ports_for_shard(shard); for port in source_port_iter { - let connect_result = open_connection(endpoint.clone(), Some(port), config).await; + let connect_result = open_connection(endpoint, Some(port), config).await; match connect_result { Err(err) if err.is_address_unavailable_for_use() => continue, // If we can't use this port, try the next one @@ -2554,7 +2554,7 @@ mod tests { let addr: SocketAddr = resolve_hostname(&uri).await; let (connection, _) = super::open_connection( - UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { + &UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { address: addr, datacenter: None, }), @@ -2679,7 +2679,7 @@ mod tests { let subtest = |enable_coalescing: bool, ks: String| async move { let (connection, _) = super::open_connection( - UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { + &UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { address: addr, datacenter: None, }), @@ -2811,8 +2811,12 @@ mod tests { .unwrap(); // We must interrupt the driver's full connection opening, because our proxy does not interact further after Startup. + let endpoint = UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { + address: proxy_addr, + datacenter: None, + }); let (startup_without_lwt_optimisation, _shard) = select! { - _ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, &config) => unreachable!(), + _ = open_connection(&endpoint, None, &config) => unreachable!(), startup = startup_rx.recv() => startup.unwrap(), }; @@ -2820,7 +2824,7 @@ mod tests { .change_request_rules(Some(make_rules(options_with_lwt_optimisation_support))); let (startup_with_lwt_optimisation, _shard) = select! { - _ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, &config) => unreachable!(), + _ = open_connection(&endpoint, None, &config) => unreachable!(), startup = startup_rx.recv() => startup.unwrap(), }; @@ -2879,7 +2883,7 @@ mod tests { // Setup connection normally, without obstruction let (conn, mut error_receiver) = open_connection( - UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { + &UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { address: proxy_addr, datacenter: None, }), diff --git a/scylla/src/network/connection_pool.rs b/scylla/src/network/connection_pool.rs index a6bea1c760..90e5e7410b 100644 --- a/scylla/src/network/connection_pool.rs +++ b/scylla/src/network/connection_pool.rs @@ -862,7 +862,7 @@ impl PoolRefiller { endpoint }; let result = open_connection_to_shard_aware_port( - shard_aware_endpoint, + &shard_aware_endpoint, shard, sharder.clone(), &cfg, @@ -877,7 +877,7 @@ impl PoolRefiller { .boxed(), _ => async move { let non_shard_aware_endpoint = endpoint; - let result = open_connection(non_shard_aware_endpoint, None, &cfg).await; + let result = open_connection(&non_shard_aware_endpoint, None, &cfg).await; OpenedConnectionEvent { result, requested_shard: None, @@ -1162,20 +1162,15 @@ mod tests { // to the right shard let sharder = Sharder::new(ShardCount::new(3).unwrap(), 12); + let endpoint = UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { + address: connect_address, + datacenter: None, + }); + // Open the connections - let mut conns = Vec::new(); - - for _ in 0..connections_number { - conns.push(open_connection_to_shard_aware_port( - UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { - address: connect_address, - datacenter: None, - }), - 0, - sharder.clone(), - &connection_config, - )); - } + let conns = (0..connections_number).map(|_| { + open_connection_to_shard_aware_port(&endpoint, 0, sharder.clone(), &connection_config) + }); let joined = futures::future::join_all(conns).await;