diff --git a/rust/src/client.rs b/rust/src/client.rs index e11f982..3ab1936 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -1,5 +1,5 @@ use std::collections::VecDeque; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use futures::stream::BoxStream; @@ -44,24 +44,32 @@ impl Default for WriteOptions { #[derive(Debug, Clone)] struct MountLink { - viewfs_path: String, + viewfs_path: PathBuf, + hdfs_path: PathBuf, protocol: Arc, - hdfs_path: String, } impl MountLink { + fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc) -> Self { + // We should never have an empty path, we always want things mounted at root ("/") by default. + Self { + viewfs_path: PathBuf::from(if viewfs_path.is_empty() { + "/" + } else { + viewfs_path + }), + hdfs_path: PathBuf::from(if hdfs_path.is_empty() { "/" } else { hdfs_path }), + protocol, + } + } /// Convert a viewfs path into a name service path if it matches this link - fn resolve(&self, path: &str) -> Option { - if path == self.viewfs_path { - // First check for an exact match in which case we should just return the directory match exactly - Some(self.hdfs_path.clone()) - } else if let Some(relative_path) = path.strip_prefix(&format!("{}/", self.viewfs_path)) { - // Next check for a path inside - let mut resolved_path = PathBuf::from(&self.hdfs_path); - if !relative_path.is_empty() { - resolved_path.push(relative_path); + fn resolve(&self, path: &Path) -> Option { + if let Ok(relative_path) = path.strip_prefix(&self.viewfs_path) { + if relative_path.components().count() == 0 { + Some(self.hdfs_path.clone()) + } else { + Some(self.hdfs_path.join(relative_path)) } - Some(resolved_path.to_string_lossy().into()) } else { None } @@ -75,13 +83,21 @@ struct MountTable { } impl MountTable { - fn resolve(&self, path: &str) -> (&MountLink, String) { + fn resolve(&self, src: &str) -> (&MountLink, String) { + let path = Path::new(src); for link in self.mounts.iter() { if let Some(resolved) = link.resolve(path) { - return (&link, resolved); + return (&link, resolved.to_string_lossy().into()); } } - (&self.fallback, self.fallback.resolve(path).unwrap()) + ( + &self.fallback, + self.fallback + .resolve(path) + .unwrap() + .to_string_lossy() + .into(), + ) } } @@ -127,11 +143,7 @@ impl Client { let mount_table = Arc::new(MountTable { mounts: Vec::new(), - fallback: MountLink { - viewfs_path: String::new(), - protocol, - hdfs_path: String::new(), - }, + fallback: MountLink::new("/", "/", protocol), }); Ok(Self { mount_table }) } @@ -164,39 +176,28 @@ impl Client { let protocol = Arc::new(NamenodeProtocol::new(proxy)); if let Some(prefix) = viewfs_path { - mounts.push(MountLink { - viewfs_path: prefix.to_string(), - protocol, - hdfs_path: url.path().to_string(), - }) + mounts.push(MountLink::new(prefix, url.path(), protocol)); } else { if fallback.is_some() { return Err(HdfsError::InvalidArgument( "Multiple viewfs fallback links found".to_string(), )); } - fallback = Some(MountLink { - viewfs_path: String::new(), - protocol, - hdfs_path: url.path().to_string(), - }) + fallback = Some(MountLink::new("/", url.path(), protocol)); } } - if fallback.is_none() { - return Err(HdfsError::InvalidArgument( + if let Some(fallback) = fallback { + // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first. + mounts.sort_by_key(|m| m.viewfs_path.components().count()); + mounts.reverse(); + + Ok(MountTable { mounts, fallback }) + } else { + Err(HdfsError::InvalidArgument( "No viewfs fallback mount found".to_string(), - )); + )) } - - // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first. - mounts.sort_by_key(|m| m.viewfs_path.len()); - mounts.reverse(); - - Ok(MountTable { - mounts, - fallback: fallback.unwrap(), - }) } /// Retrieve the file status for the file at `path`. @@ -506,7 +507,10 @@ impl FileStatus { #[cfg(test)] mod test { - use std::sync::Arc; + use std::{ + path::{Path, PathBuf}, + sync::Arc, + }; use url::Url; @@ -515,7 +519,7 @@ mod test { hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy}, }; - use super::MountLink; + use super::{MountLink, MountTable}; fn create_protocol(url: &str) -> Arc { let proxy = @@ -526,14 +530,83 @@ mod test { #[test] fn test_mount_link_resolve() { let protocol = create_protocol("hdfs://127.0.0.1:9000"); - let link = MountLink { - viewfs_path: "/view".to_string(), - protocol, - hdfs_path: "/hdfs".to_string(), + let link = MountLink::new("/view", "/hdfs", protocol); + + assert_eq!( + link.resolve(Path::new("/view/dir/file")).unwrap(), + PathBuf::from("/hdfs/dir/file") + ); + assert_eq!( + link.resolve(Path::new("/view")).unwrap(), + PathBuf::from("/hdfs") + ); + assert!(link.resolve(Path::new("/hdfs/path")).is_none()); + } + + #[test] + fn test_fallback_link() { + let protocol = create_protocol("hdfs://127.0.0.1:9000"); + let link = MountLink::new("", "/hdfs", protocol); + + assert_eq!( + link.resolve(Path::new("/path/to/file")).unwrap(), + PathBuf::from("/hdfs/path/to/file") + ); + assert_eq!( + link.resolve(Path::new("/")).unwrap(), + PathBuf::from("/hdfs") + ); + assert_eq!( + link.resolve(Path::new("/hdfs/path")).unwrap(), + PathBuf::from("/hdfs/hdfs/path") + ); + } + + #[test] + fn test_mount_table_resolve() { + let link1 = MountLink::new( + "/mount1", + "/path1/nested", + create_protocol("hdfs://127.0.0.1:9000"), + ); + let link2 = MountLink::new( + "/mount2", + "/path2", + create_protocol("hdfs://127.0.0.1:9001"), + ); + let link3 = MountLink::new( + "/mount3/nested", + "/path3", + create_protocol("hdfs://127.0.0.1:9002"), + ); + let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003")); + + let mount_table = MountTable { + mounts: vec![link1, link2, link3], + fallback, }; - assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file"); - assert_eq!(link.resolve("/view").unwrap(), "/hdfs"); - assert!(link.resolve("/hdfs/path").is_none()) + // Exact mount path resolves to the exact HDFS path + let (link, resolved) = mount_table.resolve("/mount1"); + assert_eq!(link.viewfs_path, Path::new("/mount1")); + assert_eq!(resolved, "/path1/nested"); + + // Trailing slash is treated the same + let (link, resolved) = mount_table.resolve("/mount1/"); + assert_eq!(link.viewfs_path, Path::new("/mount1")); + assert_eq!(resolved, "/path1/nested"); + + // Doesn't do partial matches on a directory name + let (link, resolved) = mount_table.resolve("/mount12"); + assert_eq!(link.viewfs_path, Path::new("/")); + assert_eq!(resolved, "/path4/mount12"); + + let (link, resolved) = mount_table.resolve("/mount3/file"); + assert_eq!(link.viewfs_path, Path::new("/")); + assert_eq!(resolved, "/path4/mount3/file"); + + let (link, resolved) = mount_table.resolve("/mount3/nested/file"); + assert_eq!(link.viewfs_path, Path::new("/mount3/nested")); + assert_eq!(resolved, "/path3/file"); } } diff --git a/rust/src/common/config.rs b/rust/src/common/config.rs index 25e2013..8244100 100644 --- a/rust/src/common/config.rs +++ b/rust/src/common/config.rs @@ -123,3 +123,29 @@ impl Configuration { } } } + +#[cfg(test)] +mod test { + use super::{Configuration, VIEWFS_MOUNTTABLE_PREFIX}; + + #[test] + fn test_mount_table_config() { + let config = Configuration { + map: [( + format!("{}.clusterX.link./view", VIEWFS_MOUNTTABLE_PREFIX), + "hdfs://127.0.0.1:9000/hdfs".to_string(), + )] + .into(), + }; + + assert_eq!( + config.get_mount_table("clusterX"), + vec![( + Some("/view".to_string()), + "hdfs://127.0.0.1:9000/hdfs".to_string() + )] + ); + + assert_eq!(config.get_mount_table("clusterY"), Vec::new()); + } +}