diff --git a/README.md b/README.md index 66fc9b1..515aaa6 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ The client will attempt to read Hadoop configs `core-site.xml` and `hdfs-site.xm - `fs.defaultFS` - Client::default() support - `dfs.ha.namenodes` - name service support - `dfs.namenode.rpc-address.*` - name service support +- `dfs.client.failover.resolve-needed.*` - DNS based NameNode discovery - `fs.viewfs.mounttable.*.link.*` - ViewFS links - `fs.viewfs.mounttable.*.linkFallback` - ViewFS link fallback diff --git a/rust/src/client.rs b/rust/src/client.rs index ec0c0bb..d1bf435 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -181,7 +181,7 @@ impl Client { let mount_table = match url.scheme() { "hdfs" => { - let proxy = NameServiceProxy::new(url, &config); + let proxy = NameServiceProxy::new(url, &config)?; let protocol = Arc::new(NamenodeProtocol::new(proxy)); MountTable { @@ -218,7 +218,7 @@ impl Client { "Only hdfs mounts are supported for viewfs".to_string(), )); } - let proxy = NameServiceProxy::new(&url, config); + let proxy = NameServiceProxy::new(&url, config)?; let protocol = Arc::new(NamenodeProtocol::new(proxy)); if let Some(prefix) = viewfs_path { @@ -716,7 +716,8 @@ mod test { fn create_protocol(url: &str) -> Arc { let proxy = - NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap()); + NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap()) + .unwrap(); Arc::new(NamenodeProtocol::new(proxy)) } diff --git a/rust/src/common/config.rs b/rust/src/common/config.rs index e6ab607..658fab5 100644 --- a/rust/src/common/config.rs +++ b/rust/src/common/config.rs @@ -2,8 +2,13 @@ use std::collections::HashMap; use std::env; use std::fs; use std::io; +use std::net::ToSocketAddrs; use std::path::{Path, PathBuf}; +use log::debug; + +use crate::Result; + const HADOOP_CONF_DIR: &str = "HADOOP_CONF_DIR"; const HADOOP_HOME: &str = "HADOOP_HOME"; @@ -12,6 +17,7 @@ pub(crate) const DEFAULT_FS: &str = "fs.defaultFS"; // Name Service settings const HA_NAMENODES_PREFIX: &str = "dfs.ha.namenodes"; const HA_NAMENODE_RPC_ADDRESS_PREFIX: &str = "dfs.namenode.rpc-address"; +const HA_NAMENODE_RESOLVE_NEEDED: &str = "dfs.client.failover.resolve-needed"; // Viewfs settings const VIEWFS_MOUNTTABLE_PREFIX: &str = "fs.viewfs.mounttable"; @@ -52,8 +58,9 @@ impl Configuration { self.map.get(key).cloned() } - pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Vec { - self.map + pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Result> { + let urls: Vec = self + .map .get(&format!("{}.{}", HA_NAMENODES_PREFIX, nameservice)) .into_iter() .flat_map(|namenodes| { @@ -66,7 +73,31 @@ impl Configuration { .map(|s| s.to_string()) }) }) - .collect() + .collect(); + + if self + .map + .get(&format!("{}.{}", HA_NAMENODE_RESOLVE_NEEDED, nameservice)) + .is_some_and(|value| value.to_lowercase() == "true") + { + let mut resolved_urls: Vec = Vec::new(); + for url in urls { + for socket_addr in url.to_socket_addrs()? { + if socket_addr.is_ipv4() { + resolved_urls.push(socket_addr.to_string()) + } + } + } + debug!( + "Namenodes for {} resolved to {:?}", + nameservice, resolved_urls + ); + + Ok(resolved_urls) + } else { + debug!("Namenodes for {} without resolving {:?}", nameservice, urls); + Ok(urls) + } } pub(crate) fn get_mount_table(&self, cluster: &str) -> Vec<(Option, String)> { @@ -129,7 +160,10 @@ impl Configuration { #[cfg(test)] mod test { - use super::{Configuration, VIEWFS_MOUNTTABLE_PREFIX}; + use super::{ + Configuration, HA_NAMENODES_PREFIX, HA_NAMENODE_RESOLVE_NEEDED, + HA_NAMENODE_RPC_ADDRESS_PREFIX, VIEWFS_MOUNTTABLE_PREFIX, + }; #[test] fn test_mount_table_config() { @@ -192,4 +226,33 @@ mod test { ] ); } + + #[test] + fn test_namenode_resolving() { + let config = Configuration { + map: vec![ + ( + format!("{}.{}", HA_NAMENODES_PREFIX, "test"), + "namenode".to_string(), + ), + ( + format!( + "{}.{}.{}", + HA_NAMENODE_RPC_ADDRESS_PREFIX, "test", "namenode" + ), + "localhost:9000".to_string(), + ), + ( + format!("{}.{}", HA_NAMENODE_RESOLVE_NEEDED, "test"), + "true".to_string(), + ), + ] + .into_iter() + .collect(), + }; + + let urls = config.get_urls_for_nameservice("test").unwrap(); + assert_eq!(urls.len(), 1, "{:?}", urls); + assert_eq!(urls[0], "127.0.0.1:9000"); + } } diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs index b69dd80..f2830ea 100644 --- a/rust/src/hdfs/proxy.rs +++ b/rust/src/hdfs/proxy.rs @@ -75,7 +75,7 @@ pub(crate) struct NameServiceProxy { impl NameServiceProxy { /// Creates a new proxy for a name service. If the URL contains a port, /// it is assumed to be for a single NameNode. - pub(crate) fn new(nameservice: &Url, config: &Configuration) -> Self { + pub(crate) fn new(nameservice: &Url, config: &Configuration) -> Result { let alignment_context = Arc::new(Mutex::new(AlignmentContext::default())); let proxy_connections = if let Some(port) = nameservice.port() { @@ -88,7 +88,7 @@ impl NameServiceProxy { } else if let Some(host) = nameservice.host_str() { // TODO: Add check for no configured namenodes config - .get_urls_for_nameservice(host) + .get_urls_for_nameservice(host)? .into_iter() .map(|url| { Arc::new(tokio::sync::Mutex::new(ProxyConnection::new( @@ -102,11 +102,11 @@ impl NameServiceProxy { todo!() }; - NameServiceProxy { + Ok(NameServiceProxy { proxy_connections, current_index: AtomicUsize::new(0), msycned: AtomicBool::new(false), - } + }) } async fn msync_if_needed(&self) -> Result<()> { diff --git a/rust/src/security/gssapi.rs b/rust/src/security/gssapi.rs index 84eff17..8d4ed6a 100644 --- a/rust/src/security/gssapi.rs +++ b/rust/src/security/gssapi.rs @@ -63,7 +63,6 @@ static LIBGSSAPI: Lazy> = Lazy::new(|| { #[cfg(not(target_os = "linux"))] let library_name = libloading::library_filename("gssapi_krb5"); - #[cfg(not(any))] match unsafe { bindings::GSSAPI::new(library_name) } { Ok(gssapi) => Some(gssapi), Err(e) => {