Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dfs.client.failover.resolve-needed #136

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The client will attempt to read Hadoop configs `core-site.xml` and `hdfs-site.xm
- `fs.defaultFS` - Client::default() support
- `dfs.ha.namenodes` - name service support
- `dfs.namenode.rpc-address.*` - name service support
- `dfs.client.failover.resolve-needed.*` - DNS based NameNode discovery
- `fs.viewfs.mounttable.*.link.*` - ViewFS links
- `fs.viewfs.mounttable.*.linkFallback` - ViewFS link fallback

Expand Down
7 changes: 4 additions & 3 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl Client {

let mount_table = match url.scheme() {
"hdfs" => {
let proxy = NameServiceProxy::new(url, &config);
let proxy = NameServiceProxy::new(url, &config)?;
let protocol = Arc::new(NamenodeProtocol::new(proxy));

MountTable {
Expand Down Expand Up @@ -218,7 +218,7 @@ impl Client {
"Only hdfs mounts are supported for viewfs".to_string(),
));
}
let proxy = NameServiceProxy::new(&url, config);
let proxy = NameServiceProxy::new(&url, config)?;
let protocol = Arc::new(NamenodeProtocol::new(proxy));

if let Some(prefix) = viewfs_path {
Expand Down Expand Up @@ -716,7 +716,8 @@ mod test {

fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
let proxy =
NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap());
NameServiceProxy::new(&Url::parse(url).unwrap(), &Configuration::new().unwrap())
.unwrap();
Arc::new(NamenodeProtocol::new(proxy))
}

Expand Down
71 changes: 67 additions & 4 deletions rust/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ use std::collections::HashMap;
use std::env;
use std::fs;
use std::io;
use std::net::ToSocketAddrs;
use std::path::{Path, PathBuf};

use log::debug;

use crate::Result;

const HADOOP_CONF_DIR: &str = "HADOOP_CONF_DIR";
const HADOOP_HOME: &str = "HADOOP_HOME";

Expand All @@ -12,6 +17,7 @@ pub(crate) const DEFAULT_FS: &str = "fs.defaultFS";
// Name Service settings
const HA_NAMENODES_PREFIX: &str = "dfs.ha.namenodes";
const HA_NAMENODE_RPC_ADDRESS_PREFIX: &str = "dfs.namenode.rpc-address";
const HA_NAMENODE_RESOLVE_NEEDED: &str = "dfs.client.failover.resolve-needed";

// Viewfs settings
const VIEWFS_MOUNTTABLE_PREFIX: &str = "fs.viewfs.mounttable";
Expand Down Expand Up @@ -52,8 +58,9 @@ impl Configuration {
self.map.get(key).cloned()
}

pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Vec<String> {
self.map
pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Result<Vec<String>> {
let urls: Vec<String> = self
.map
.get(&format!("{}.{}", HA_NAMENODES_PREFIX, nameservice))
.into_iter()
.flat_map(|namenodes| {
Expand All @@ -66,7 +73,31 @@ impl Configuration {
.map(|s| s.to_string())
})
})
.collect()
.collect();

if self
.map
.get(&format!("{}.{}", HA_NAMENODE_RESOLVE_NEEDED, nameservice))
.is_some_and(|value| value.to_lowercase() == "true")
{
let mut resolved_urls: Vec<String> = Vec::new();
for url in urls {
for socket_addr in url.to_socket_addrs()? {
if socket_addr.is_ipv4() {
resolved_urls.push(socket_addr.to_string())
}
}
}
debug!(
"Namenodes for {} resolved to {:?}",
nameservice, resolved_urls
);

Ok(resolved_urls)
} else {
debug!("Namenodes for {} without resolving {:?}", nameservice, urls);
Ok(urls)
}
}

pub(crate) fn get_mount_table(&self, cluster: &str) -> Vec<(Option<String>, String)> {
Expand Down Expand Up @@ -129,7 +160,10 @@ impl Configuration {

#[cfg(test)]
mod test {
use super::{Configuration, VIEWFS_MOUNTTABLE_PREFIX};
use super::{
Configuration, HA_NAMENODES_PREFIX, HA_NAMENODE_RESOLVE_NEEDED,
HA_NAMENODE_RPC_ADDRESS_PREFIX, VIEWFS_MOUNTTABLE_PREFIX,
};

#[test]
fn test_mount_table_config() {
Expand Down Expand Up @@ -192,4 +226,33 @@ mod test {
]
);
}

#[test]
fn test_namenode_resolving() {
let config = Configuration {
map: vec![
(
format!("{}.{}", HA_NAMENODES_PREFIX, "test"),
"namenode".to_string(),
),
(
format!(
"{}.{}.{}",
HA_NAMENODE_RPC_ADDRESS_PREFIX, "test", "namenode"
),
"localhost:9000".to_string(),
),
(
format!("{}.{}", HA_NAMENODE_RESOLVE_NEEDED, "test"),
"true".to_string(),
),
]
.into_iter()
.collect(),
};

let urls = config.get_urls_for_nameservice("test").unwrap();
assert_eq!(urls.len(), 1, "{:?}", urls);
assert_eq!(urls[0], "127.0.0.1:9000");
}
}
8 changes: 4 additions & 4 deletions rust/src/hdfs/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub(crate) struct NameServiceProxy {
impl NameServiceProxy {
/// Creates a new proxy for a name service. If the URL contains a port,
/// it is assumed to be for a single NameNode.
pub(crate) fn new(nameservice: &Url, config: &Configuration) -> Self {
pub(crate) fn new(nameservice: &Url, config: &Configuration) -> Result<Self> {
let alignment_context = Arc::new(Mutex::new(AlignmentContext::default()));

let proxy_connections = if let Some(port) = nameservice.port() {
Expand All @@ -88,7 +88,7 @@ impl NameServiceProxy {
} else if let Some(host) = nameservice.host_str() {
// TODO: Add check for no configured namenodes
config
.get_urls_for_nameservice(host)
.get_urls_for_nameservice(host)?
.into_iter()
.map(|url| {
Arc::new(tokio::sync::Mutex::new(ProxyConnection::new(
Expand All @@ -102,11 +102,11 @@ impl NameServiceProxy {
todo!()
};

NameServiceProxy {
Ok(NameServiceProxy {
proxy_connections,
current_index: AtomicUsize::new(0),
msycned: AtomicBool::new(false),
}
})
}

async fn msync_if_needed(&self) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion rust/src/security/gssapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ static LIBGSSAPI: Lazy<Option<bindings::GSSAPI>> = Lazy::new(|| {
#[cfg(not(target_os = "linux"))]
let library_name = libloading::library_filename("gssapi_krb5");

#[cfg(not(any))]
match unsafe { bindings::GSSAPI::new(library_name) } {
Ok(gssapi) => Some(gssapi),
Err(e) => {
Expand Down
Loading