Skip to content

Commit

Permalink
Merge pull request #1094 from Lorak-mmk/fix-unnamable-types
Browse files Browse the repository at this point in the history
Fix unnamable types
  • Loading branch information
Lorak-mmk authored Oct 21, 2024
2 parents 4314d7a + 8a3e60d commit 025342c
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 91 deletions.
1 change: 1 addition & 0 deletions scylla-cql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ full-serialization = [
]

[lints.rust]
unnameable_types = "warn"
unreachable_pub = "warn"
1 change: 1 addition & 0 deletions scylla-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ quote = "1.0"
proc-macro2 = "1.0"

[lints.rust]
unnameable_types = "warn"
unreachable_pub = "warn"
1 change: 1 addition & 0 deletions scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,6 @@ name = "benchmark"
harness = false

[lints.rust]
unnameable_types = "warn"
unreachable_pub = "warn"
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(scylla_cloud_tests)'] }
2 changes: 1 addition & 1 deletion scylla/src/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod config;

use std::net::SocketAddr;

pub(crate) use config::CloudConfig;
pub use config::CloudConfig;
pub use config::CloudConfigError;
use openssl::{
error::ErrorStack,
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,4 @@ pub use transport::load_balancing;
pub use transport::retry_policy;
pub use transport::speculative_execution;

pub use transport::metrics::Metrics;
pub use transport::metrics::{Metrics, MetricsError};
57 changes: 2 additions & 55 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tracing::{debug, warn};
use uuid::Uuid;

use super::locator::tablets::{RawTablet, Tablet, TabletsInfo};
use super::node::{KnownNode, NodeAddr};
use super::node::{InternalKnownNode, NodeAddr};
use super::NodeRef;

use super::locator::ReplicaLocator;
Expand Down Expand Up @@ -59,12 +59,6 @@ impl<'a> std::fmt::Debug for ClusterNeatDebug<'a> {
}
}

#[derive(Clone, Debug)]
pub struct Datacenter {
pub nodes: Vec<Arc<Node>>,
pub rack_count: usize,
}

#[derive(Clone)]
pub struct ClusterData {
pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, // Invariant: nonempty after Cluster::new()
Expand Down Expand Up @@ -145,7 +139,7 @@ struct UseKeyspaceRequest {

impl Cluster {
pub(crate) async fn new(
known_nodes: Vec<KnownNode>,
known_nodes: Vec<InternalKnownNode>,
pool_config: PoolConfig,
keyspaces_to_fetch: Vec<String>,
fetch_schema_metadata: bool,
Expand Down Expand Up @@ -257,18 +251,6 @@ impl Cluster {
}

impl ClusterData {
// Updates information about rack count in each datacenter
fn update_rack_count(datacenters: &mut HashMap<String, Datacenter>) {
for datacenter in datacenters.values_mut() {
datacenter.rack_count = datacenter
.nodes
.iter()
.filter_map(|node| node.rack.as_ref())
.unique()
.count();
}
}

pub(crate) async fn wait_until_all_pools_are_initialized(&self) {
for node in self.locator.unique_nodes_in_global_ring().iter() {
node.wait_until_pool_initialized().await;
Expand All @@ -289,7 +271,6 @@ impl ClusterData {
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
HashMap::with_capacity(metadata.peers.len());
let mut ring: Vec<(Token, Arc<Node>)> = Vec::new();
let mut datacenters: HashMap<String, Datacenter> = HashMap::new();

for peer in metadata.peers {
// Take existing Arc<Node> if possible, otherwise create new one
Expand Down Expand Up @@ -325,19 +306,6 @@ impl ClusterData {

new_known_peers.insert(peer_host_id, node.clone());

if let Some(dc) = &node.datacenter {
match datacenters.get_mut(dc) {
Some(v) => v.nodes.push(node.clone()),
None => {
let v = Datacenter {
nodes: vec![node.clone()],
rack_count: 0,
};
datacenters.insert(dc.clone(), v);
}
}
}

for token in peer_tokens {
ring.push((token, node.clone()));
}
Expand Down Expand Up @@ -384,8 +352,6 @@ impl ClusterData {
)
}

Self::update_rack_count(&mut datacenters);

let keyspaces = metadata.keyspaces;
let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
Expand All @@ -409,25 +375,6 @@ impl ClusterData {
&self.keyspaces
}

/// Access datacenter details collected by the driver
/// Returned `HashMap` is indexed by names of datacenters
pub fn get_datacenters_info(&self) -> HashMap<String, Datacenter> {
self.locator
.datacenter_names()
.iter()
.map(|dc_name| {
let nodes = self
.locator
.unique_nodes_in_datacenter_ring(dc_name)
.unwrap()
.to_vec();
let rack_count = nodes.iter().map(|node| node.rack.as_ref()).unique().count();

(dc_name.clone(), Datacenter { nodes, rack_count })
})
.collect()
}

/// Access details about nodes known to the driver
pub fn get_nodes_info(&self) -> &[Arc<Node>] {
self.locator.unique_nodes_in_global_ring()
Expand Down
44 changes: 31 additions & 13 deletions scylla/src/transport/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,43 @@ impl Hash for Node {
pub enum KnownNode {
Hostname(String),
Address(SocketAddr),
}

/// Describes a database server known on `Session` startup.
/// It is similar to [KnownNode] but includes also `CloudEndpoint` variant,
/// which is created by the driver if session is configured to connect to
/// serverless cluster.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub(crate) enum InternalKnownNode {
Hostname(String),
Address(SocketAddr),
#[cfg(feature = "cloud")]
CloudEndpoint(CloudEndpoint),
}

impl From<KnownNode> for InternalKnownNode {
fn from(value: KnownNode) -> Self {
match value {
KnownNode::Hostname(s) => InternalKnownNode::Hostname(s),
KnownNode::Address(s) => InternalKnownNode::Address(s),
}
}
}

/// Describes a database server in the serverless Scylla Cloud.
#[cfg(feature = "cloud")]
#[non_exhaustive]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct CloudEndpoint {
pub hostname: String,
pub datacenter: String,
pub(crate) struct CloudEndpoint {
pub(crate) hostname: String,
pub(crate) datacenter: String,
}

/// Describes a database server known on Session startup, with already resolved address.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ResolvedContactPoint {
pub address: SocketAddr,
pub datacenter: Option<String>,
pub(crate) struct ResolvedContactPoint {
pub(crate) address: SocketAddr,
#[cfg_attr(not(feature = "cloud"), allow(unused))]
pub(crate) datacenter: Option<String>,
}

// Resolve the given hostname using a DNS lookup if necessary.
Expand Down Expand Up @@ -275,12 +293,12 @@ pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::E
})
}

/// Transforms the given [`KnownNode`]s into [`ContactPoint`]s.
/// Transforms the given [`InternalKnownNode`]s into [`ContactPoint`]s.
///
/// In case of a hostname, resolves it using a DNS lookup.
/// In case of a plain IP address, parses it and uses straight.
pub(crate) async fn resolve_contact_points(
known_nodes: &[KnownNode],
known_nodes: &[InternalKnownNode],
) -> (Vec<ResolvedContactPoint>, Vec<String>) {
// Find IP addresses of all known nodes passed in the config
let mut initial_peers: Vec<ResolvedContactPoint> = Vec::with_capacity(known_nodes.len());
Expand All @@ -290,16 +308,16 @@ pub(crate) async fn resolve_contact_points(

for node in known_nodes.iter() {
match node {
KnownNode::Hostname(hostname) => {
InternalKnownNode::Hostname(hostname) => {
to_resolve.push((hostname, None));
hostnames.push(hostname.clone());
}
KnownNode::Address(address) => initial_peers.push(ResolvedContactPoint {
InternalKnownNode::Address(address) => initial_peers.push(ResolvedContactPoint {
address: *address,
datacenter: None,
}),
#[cfg(feature = "cloud")]
KnownNode::CloudEndpoint(CloudEndpoint {
InternalKnownNode::CloudEndpoint(CloudEndpoint {
hostname,
datacenter,
}) => to_resolve.push((hostname, Some(datacenter.clone()))),
Expand Down
27 changes: 16 additions & 11 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use super::errors::TracingProtocolError;
use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
#[cfg(feature = "cloud")]
use super::node::CloudEndpoint;
use super::node::KnownNode;
use super::node::{InternalKnownNode, KnownNode};
use super::partitioner::PartitionerName;
use super::query_result::MaybeFirstRowTypedError;
use super::topology::UntranslatedPeer;
Expand Down Expand Up @@ -489,23 +489,28 @@ impl Session {
let known_nodes = config.known_nodes;

#[cfg(feature = "cloud")]
let known_nodes = if let Some(cloud_servers) =
config.cloud_config.as_ref().map(|cloud_config| {
cloud_config
let cloud_known_nodes: Option<Vec<InternalKnownNode>> =
if let Some(ref cloud_config) = config.cloud_config {
let cloud_servers = cloud_config
.get_datacenters()
.iter()
.map(|(dc_name, dc_data)| {
KnownNode::CloudEndpoint(CloudEndpoint {
InternalKnownNode::CloudEndpoint(CloudEndpoint {
hostname: dc_data.get_server().to_owned(),
datacenter: dc_name.clone(),
})
})
.collect()
}) {
cloud_servers
} else {
known_nodes
};
.collect();
Some(cloud_servers)
} else {
None
};

#[cfg(not(feature = "cloud"))]
let cloud_known_nodes: Option<Vec<InternalKnownNode>> = None;

let known_nodes = cloud_known_nodes
.unwrap_or_else(|| known_nodes.into_iter().map(|node| node.into()).collect());

// Ensure there is at least one known node
if known_nodes.is_empty() {
Expand Down
4 changes: 4 additions & 0 deletions scylla/src/transport/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ use openssl::ssl::SslContext;
use tracing::warn;

mod sealed {
// This is a sealed trait - its whole purpose is to be unnameable.
// This means we need to disable the check.
#[allow(unknown_lints)] // Rust 1.66 doesn't know this lint
#[allow(unnameable_types)]
pub trait Sealed {}
}
pub trait SessionBuilderKind: sealed::Sealed + Clone {}
Expand Down
10 changes: 5 additions & 5 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::routing::Token;
use crate::statement::Consistency;
use crate::test_utils::{scylla_supports_tablets, setup_tracing};
use crate::tracing::TracingInfo;
use crate::transport::cluster::Datacenter;
use crate::transport::errors::{BadKeyspaceName, BadQuery, DbError, QueryError};
use crate::transport::partitioner::{
calculate_token_for_partition_key, Murmur3Partitioner, Partitioner, PartitionerName,
Expand Down Expand Up @@ -1874,10 +1873,11 @@ async fn test_turning_off_schema_fetching() {
let cluster_data = &session.get_cluster_data();
let keyspace = &cluster_data.get_keyspace_info()[&ks];

let datacenters: HashMap<String, Datacenter> = cluster_data.get_datacenters_info();
let datacenter_repfactors: HashMap<String, usize> = datacenters
.into_keys()
.map(|dc_name| (dc_name, 1))
let datacenter_repfactors: HashMap<String, usize> = cluster_data
.replica_locator()
.datacenter_names()
.iter()
.map(|dc_name| (dc_name.to_owned(), 1))
.collect();

assert_eq!(
Expand Down
9 changes: 4 additions & 5 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::errors::{
KeyspaceStrategyError, KeyspacesMetadataError, MetadataError, PeersMetadataError,
ProtocolError, TablesMetadataError, UdtMetadataError, ViewsMetadataError,
};
use super::node::{KnownNode, NodeAddr, ResolvedContactPoint};
use super::node::{InternalKnownNode, NodeAddr, ResolvedContactPoint};

/// Allows to read current metadata from the cluster
pub(crate) struct MetadataReader {
Expand All @@ -51,7 +51,7 @@ pub(crate) struct MetadataReader {

// When no known peer is reachable, initial known nodes are resolved once again as a fallback
// and establishing control connection to them is attempted.
initial_known_nodes: Vec<KnownNode>,
initial_known_nodes: Vec<InternalKnownNode>,

// When a control connection breaks, the PoolRefiller of its pool uses the requester
// to signal ClusterWorker that an immediate metadata refresh is advisable.
Expand All @@ -75,9 +75,8 @@ pub struct Peer {

/// An endpoint for a node that the driver is to issue connections to,
/// possibly after prior address translation.
#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
#[derive(Clone, Debug)]
pub enum UntranslatedEndpoint {
pub(crate) enum UntranslatedEndpoint {
/// Provided by user in SessionConfig (initial contact points).
ContactPoint(ResolvedContactPoint),
/// Fetched in Metadata with `query_peers()`
Expand Down Expand Up @@ -452,7 +451,7 @@ impl MetadataReader {
/// Creates new MetadataReader, which connects to initially_known_peers in the background
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new(
initial_known_nodes: Vec<KnownNode>,
initial_known_nodes: Vec<InternalKnownNode>,
control_connection_repair_requester: broadcast::Sender<()>,
mut connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
Expand Down

0 comments on commit 025342c

Please sign in to comment.