Skip to content

Commit

Permalink
Support looking up default fs if hostname isn't provided (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Sep 28, 2024
1 parent de0e9ae commit 606249b
Showing 1 changed file with 60 additions and 8 deletions.
68 changes: 60 additions & 8 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,33 +163,43 @@ impl Client {

pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
let config = Configuration::new_with_config(config)?;
Self::with_config(&Self::default_fs(&config)?, config)
}

fn default_fs(config: &Configuration) -> Result<Url> {
let url = config
.get(config::DEFAULT_FS)
.ok_or(HdfsError::InvalidArgument(format!(
"No {} setting found",
config::DEFAULT_FS
)))?;
Self::with_config(&Url::parse(&url)?, config)
Ok(Url::parse(&url)?)
}

fn with_config(url: &Url, config: Configuration) -> Result<Self> {
if !url.has_host() {
return Err(HdfsError::InvalidArgument(
"URL must contain a host".to_string(),
));
}
let resolved_url = if !url.has_host() {
let default_url = Self::default_fs(&config)?;
if url.scheme() != default_url.scheme() || !default_url.has_host() {
return Err(HdfsError::InvalidArgument(
"URL must contain a host".to_string(),
));
}
default_url
} else {
url.clone()
};

let mount_table = match url.scheme() {
"hdfs" => {
let proxy = NameServiceProxy::new(url, &config)?;
let proxy = NameServiceProxy::new(&resolved_url, &config)?;
let protocol = Arc::new(NamenodeProtocol::new(proxy));

MountTable {
mounts: Vec::new(),
fallback: MountLink::new("/", "/", protocol),
}
}
"viewfs" => Self::build_mount_table(url.host_str().unwrap(), &config)?,
"viewfs" => Self::build_mount_table(resolved_url.host_str().unwrap(), &config)?,
_ => {
return Err(HdfsError::InvalidArgument(
"Only `hdfs` and `viewfs` schemes are supported".to_string(),
Expand Down Expand Up @@ -710,6 +720,7 @@ mod test {
use crate::{
common::config::Configuration,
hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
Client,
};

use super::{MountLink, MountTable};
Expand All @@ -721,6 +732,47 @@ mod test {
Arc::new(NamenodeProtocol::new(proxy))
}

#[test]
fn test_default_fs() {
assert!(Client::default_with_config(
vec![("fs.defaultFS".to_string(), "hdfs://test".to_string())]
.into_iter()
.collect(),
)
.is_ok());

assert!(Client::default_with_config(
vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
.into_iter()
.collect(),
)
.is_err());

assert!(Client::new_with_config(
"hdfs://",
vec![("fs.defaultFS".to_string(), "hdfs://test".to_string())]
.into_iter()
.collect(),
)
.is_ok());

assert!(Client::new_with_config(
"hdfs://",
vec![("fs.defaultFS".to_string(), "hdfs://".to_string())]
.into_iter()
.collect(),
)
.is_err());

assert!(Client::new_with_config(
"hdfs://",
vec![("fs.defaultFS".to_string(), "viewfs://test".to_string())]
.into_iter()
.collect(),
)
.is_err());
}

#[test]
fn test_mount_link_resolve() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
Expand Down

0 comments on commit 606249b

Please sign in to comment.