Skip to content

Commit

Permalink
Use PathBuf to simplify things and add some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Oct 13, 2023
1 parent 0603053 commit bee51b4
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 52 deletions.
177 changes: 125 additions & 52 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::VecDeque;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use futures::stream::BoxStream;
Expand Down Expand Up @@ -44,24 +44,32 @@ impl Default for WriteOptions {

#[derive(Debug, Clone)]
struct MountLink {
viewfs_path: String,
viewfs_path: PathBuf,
hdfs_path: PathBuf,
protocol: Arc<NamenodeProtocol>,
hdfs_path: String,
}

impl MountLink {
fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
// We should never have an empty path, we always want things mounted at root ("/") by default.
Self {
viewfs_path: PathBuf::from(if viewfs_path.is_empty() {
"/"
} else {
viewfs_path
}),
hdfs_path: PathBuf::from(if hdfs_path.is_empty() { "/" } else { hdfs_path }),
protocol,
}
}
/// Convert a viewfs path into a name service path if it matches this link
fn resolve(&self, path: &str) -> Option<String> {
if path == self.viewfs_path {
// First check for an exact match in which case we should just return the directory match exactly
Some(self.hdfs_path.clone())
} else if let Some(relative_path) = path.strip_prefix(&format!("{}/", self.viewfs_path)) {
// Next check for a path inside
let mut resolved_path = PathBuf::from(&self.hdfs_path);
if !relative_path.is_empty() {
resolved_path.push(relative_path);
fn resolve(&self, path: &Path) -> Option<PathBuf> {
if let Ok(relative_path) = path.strip_prefix(&self.viewfs_path) {
if relative_path.components().count() == 0 {
Some(self.hdfs_path.clone())
} else {
Some(self.hdfs_path.join(relative_path))
}
Some(resolved_path.to_string_lossy().into())
} else {
None
}
Expand All @@ -75,13 +83,21 @@ struct MountTable {
}

impl MountTable {
fn resolve(&self, path: &str) -> (&MountLink, String) {
fn resolve(&self, src: &str) -> (&MountLink, String) {
let path = Path::new(src);
for link in self.mounts.iter() {
if let Some(resolved) = link.resolve(path) {
return (&link, resolved);
return (&link, resolved.to_string_lossy().into());
}
}
(&self.fallback, self.fallback.resolve(path).unwrap())
(
&self.fallback,
self.fallback
.resolve(path)
.unwrap()
.to_string_lossy()
.into(),
)
}
}

Expand Down Expand Up @@ -127,11 +143,7 @@ impl Client {

let mount_table = Arc::new(MountTable {
mounts: Vec::new(),
fallback: MountLink {
viewfs_path: String::new(),
protocol,
hdfs_path: String::new(),
},
fallback: MountLink::new("/", "/", protocol),
});
Ok(Self { mount_table })
}
Expand Down Expand Up @@ -164,39 +176,28 @@ impl Client {
let protocol = Arc::new(NamenodeProtocol::new(proxy));

if let Some(prefix) = viewfs_path {
mounts.push(MountLink {
viewfs_path: prefix.to_string(),
protocol,
hdfs_path: url.path().to_string(),
})
mounts.push(MountLink::new(prefix, url.path(), protocol));
} else {
if fallback.is_some() {
return Err(HdfsError::InvalidArgument(
"Multiple viewfs fallback links found".to_string(),
));
}
fallback = Some(MountLink {
viewfs_path: String::new(),
protocol,
hdfs_path: url.path().to_string(),
})
fallback = Some(MountLink::new("/", url.path(), protocol));
}
}

if fallback.is_none() {
return Err(HdfsError::InvalidArgument(
if let Some(fallback) = fallback {
// Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
mounts.sort_by_key(|m| m.viewfs_path.components().count());
mounts.reverse();

Ok(MountTable { mounts, fallback })
} else {
Err(HdfsError::InvalidArgument(
"No viewfs fallback mount found".to_string(),
));
))
}

// Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
mounts.sort_by_key(|m| m.viewfs_path.len());
mounts.reverse();

Ok(MountTable {
mounts,
fallback: fallback.unwrap(),
})
}

/// Retrieve the file status for the file at `path`.
Expand Down Expand Up @@ -506,7 +507,10 @@ impl FileStatus {

#[cfg(test)]
mod test {
use std::sync::Arc;
use std::{
path::{Path, PathBuf},
sync::Arc,
};

use url::Url;

Expand All @@ -515,7 +519,7 @@ mod test {
hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
};

use super::MountLink;
use super::{MountLink, MountTable};

fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
let proxy =
Expand All @@ -526,14 +530,83 @@ mod test {
#[test]
fn test_mount_link_resolve() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink {
viewfs_path: "/view".to_string(),
protocol,
hdfs_path: "/hdfs".to_string(),
let link = MountLink::new("/view", "/hdfs", protocol);

assert_eq!(
link.resolve(Path::new("/view/dir/file")).unwrap(),
PathBuf::from("/hdfs/dir/file")
);
assert_eq!(
link.resolve(Path::new("/view")).unwrap(),
PathBuf::from("/hdfs")
);
assert!(link.resolve(Path::new("/hdfs/path")).is_none());
}

#[test]
fn test_fallback_link() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink::new("", "/hdfs", protocol);

assert_eq!(
link.resolve(Path::new("/path/to/file")).unwrap(),
PathBuf::from("/hdfs/path/to/file")
);
assert_eq!(
link.resolve(Path::new("/")).unwrap(),
PathBuf::from("/hdfs")
);
assert_eq!(
link.resolve(Path::new("/hdfs/path")).unwrap(),
PathBuf::from("/hdfs/hdfs/path")
);
}

#[test]
fn test_mount_table_resolve() {
let link1 = MountLink::new(
"/mount1",
"/path1/nested",
create_protocol("hdfs://127.0.0.1:9000"),
);
let link2 = MountLink::new(
"/mount2",
"/path2",
create_protocol("hdfs://127.0.0.1:9001"),
);
let link3 = MountLink::new(
"/mount3/nested",
"/path3",
create_protocol("hdfs://127.0.0.1:9002"),
);
let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));

let mount_table = MountTable {
mounts: vec![link1, link2, link3],
fallback,
};

assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
assert!(link.resolve("/hdfs/path").is_none())
// Exact mount path resolves to the exact HDFS path
let (link, resolved) = mount_table.resolve("/mount1");
assert_eq!(link.viewfs_path, Path::new("/mount1"));
assert_eq!(resolved, "/path1/nested");

// Trailing slash is treated the same
let (link, resolved) = mount_table.resolve("/mount1/");
assert_eq!(link.viewfs_path, Path::new("/mount1"));
assert_eq!(resolved, "/path1/nested");

// Doesn't do partial matches on a directory name
let (link, resolved) = mount_table.resolve("/mount12");
assert_eq!(link.viewfs_path, Path::new("/"));
assert_eq!(resolved, "/path4/mount12");

let (link, resolved) = mount_table.resolve("/mount3/file");
assert_eq!(link.viewfs_path, Path::new("/"));
assert_eq!(resolved, "/path4/mount3/file");

let (link, resolved) = mount_table.resolve("/mount3/nested/file");
assert_eq!(link.viewfs_path, Path::new("/mount3/nested"));
assert_eq!(resolved, "/path3/file");
}
}
26 changes: 26 additions & 0 deletions rust/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,29 @@ impl Configuration {
}
}
}

#[cfg(test)]
mod test {
use super::{Configuration, VIEWFS_MOUNTTABLE_PREFIX};

#[test]
fn test_mount_table_config() {
let config = Configuration {
map: [(
format!("{}.clusterX.link./view", VIEWFS_MOUNTTABLE_PREFIX),
"hdfs://127.0.0.1:9000/hdfs".to_string(),
)]
.into(),
};

assert_eq!(
config.get_mount_table("clusterX"),
vec![(
Some("/view".to_string()),
"hdfs://127.0.0.1:9000/hdfs".to_string()
)]
);

assert_eq!(config.get_mount_table("clusterY"), Vec::new());
}
}

0 comments on commit bee51b4

Please sign in to comment.