Skip to content

Commit

Permalink
Add more failover config support (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Sep 28, 2024
1 parent e217039 commit de0e9ae
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 12 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ The client will attempt to read Hadoop configs `core-site.xml` and `hdfs-site.xm
- `dfs.ha.namenodes` - name service support
- `dfs.namenode.rpc-address.*` - name service support
- `dfs.client.failover.resolve-needed.*` - DNS based NameNode discovery
- `dfs.client.failover.resolver.useFQDN.*` - DNS based NameNode discovery
- `dfs.client.failover.random.order.*` - Randomize order of NameNodes to try
- `fs.viewfs.mounttable.*.link.*` - ViewFS links
- `fs.viewfs.mounttable.*.linkFallback` - ViewFS link fallback

Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ cipher = "0.4"
crc = "3.2"
ctr = "0.9"
des = "0.8"
dns-lookup = "2"
futures = "0.3"
g2p = "1"
hex = "0.4"
Expand Down
63 changes: 51 additions & 12 deletions rust/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::io;
use std::net::ToSocketAddrs;
use std::path::{Path, PathBuf};

use dns_lookup::lookup_addr;
use log::debug;
use rand::seq::SliceRandom;
use rand::thread_rng;

use crate::Result;

Expand All @@ -17,7 +20,9 @@ pub(crate) const DEFAULT_FS: &str = "fs.defaultFS";
// Name Service settings
const HA_NAMENODES_PREFIX: &str = "dfs.ha.namenodes";
const HA_NAMENODE_RPC_ADDRESS_PREFIX: &str = "dfs.namenode.rpc-address";
const HA_NAMENODE_RESOLVE_NEEDED: &str = "dfs.client.failover.resolve-needed";
const DFS_CLIENT_FAILOVER_RESOLVE_NEEDED: &str = "dfs.client.failover.resolve-needed";
const DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN: &str = "dfs.client.failover.resolver.useFQDN";
const DFS_CLIENT_FAILOVER_RANDOM_ORDER: &str = "dfs.client.failover.random.order";

// Viewfs settings
const VIEWFS_MOUNTTABLE_PREFIX: &str = "fs.viewfs.mounttable";
Expand Down Expand Up @@ -58,6 +63,12 @@ impl Configuration {
self.map.get(key).cloned()
}

fn get_boolean(&self, key: &str, default: bool) -> bool {
self.get(key)
.map(|v| v.to_lowercase() == "true")
.unwrap_or(default)
}

pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Result<Vec<String>> {
let urls: Vec<String> = self
.map
Expand All @@ -75,16 +86,25 @@ impl Configuration {
})
.collect();

if self
.map
.get(&format!("{}.{}", HA_NAMENODE_RESOLVE_NEEDED, nameservice))
.is_some_and(|value| value.to_lowercase() == "true")
{
let mut urls = if self.get_boolean(
&format!("{}.{}", DFS_CLIENT_FAILOVER_RESOLVE_NEEDED, nameservice),
false,
) {
let use_fqdn = self.get_boolean(
&format!("{}.{}", DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN, nameservice),
true,
);

let mut resolved_urls: Vec<String> = Vec::new();
for url in urls {
for socket_addr in url.to_socket_addrs()? {
if socket_addr.is_ipv4() {
resolved_urls.push(socket_addr.to_string())
if use_fqdn {
let fqdn = lookup_addr(&socket_addr.ip())?;
resolved_urls.push(format!("{}:{}", fqdn, socket_addr.port()));
} else {
resolved_urls.push(socket_addr.to_string());
}
}
}
}
Expand All @@ -93,11 +113,19 @@ impl Configuration {
nameservice, resolved_urls
);

Ok(resolved_urls)
resolved_urls
} else {
debug!("Namenodes for {} without resolving {:?}", nameservice, urls);
Ok(urls)
urls
};

if self.get_boolean(
&format!("{}.{}", DFS_CLIENT_FAILOVER_RANDOM_ORDER, nameservice),
false,
) {
urls.shuffle(&mut thread_rng());
}
Ok(urls)
}

pub(crate) fn get_mount_table(&self, cluster: &str) -> Vec<(Option<String>, String)> {
Expand Down Expand Up @@ -160,8 +188,10 @@ impl Configuration {

#[cfg(test)]
mod test {
use crate::common::config::DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN;

use super::{
Configuration, HA_NAMENODES_PREFIX, HA_NAMENODE_RESOLVE_NEEDED,
Configuration, DFS_CLIENT_FAILOVER_RESOLVE_NEEDED, HA_NAMENODES_PREFIX,
HA_NAMENODE_RPC_ADDRESS_PREFIX, VIEWFS_MOUNTTABLE_PREFIX,
};

Expand Down Expand Up @@ -229,7 +259,7 @@ mod test {

#[test]
fn test_namenode_resolving() {
let config = Configuration {
let mut config = Configuration {
map: vec![
(
format!("{}.{}", HA_NAMENODES_PREFIX, "test"),
Expand All @@ -243,14 +273,23 @@ mod test {
"localhost:9000".to_string(),
),
(
format!("{}.{}", HA_NAMENODE_RESOLVE_NEEDED, "test"),
format!("{}.{}", DFS_CLIENT_FAILOVER_RESOLVE_NEEDED, "test"),
"true".to_string(),
),
]
.into_iter()
.collect(),
};

let urls = config.get_urls_for_nameservice("test").unwrap();
assert_eq!(urls.len(), 1, "{:?}", urls);
assert_eq!(urls[0], "localhost:9000");

config.map.insert(
format!("{}.{}", DFS_CLIENT_FAILOVER_RESOLVER_USE_FQDN, "test"),
"false".to_string(),
);

let urls = config.get_urls_for_nameservice("test").unwrap();
assert_eq!(urls.len(), 1, "{:?}", urls);
assert_eq!(urls[0], "127.0.0.1:9000");
Expand Down

0 comments on commit de0e9ae

Please sign in to comment.