From 1a49df1ccb668210c4cfd0ef24aebe8904a1851c Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 7 Nov 2024 12:02:43 -0500 Subject: [PATCH] Allow using default fs for fsspec and pass storage options to client (#158) --- python/hdfs_native/fsspec.py | 26 ++++++++++++++++++-------- python/tests/test_fsspec.py | 6 ++++++ rust/src/client.rs | 4 ++-- rust/src/hdfs/proxy.rs | 18 ++++++++++++------ rust/src/lib.rs | 8 +++++++- 5 files changed, 45 insertions(+), 17 deletions(-) diff --git a/python/hdfs_native/fsspec.py b/python/hdfs_native/fsspec.py index 4970005..134eebc 100644 --- a/python/hdfs_native/fsspec.py +++ b/python/hdfs_native/fsspec.py @@ -19,14 +19,22 @@ class HdfsFileSystem(AbstractFileSystem): root_marker = "/" - def __init__(self, host: str, port: Optional[int] = None, *args, **storage_options): + def __init__( + self, + host: Optional[str] = None, + port: Optional[int] = None, + *args, + **storage_options, + ): super().__init__(host, port, *args, **storage_options) self.host = host self.port = port - url = f"{self.protocol}://{host}" - if port: - url += f":{port}" - self.client = Client(url) + url = f"{self.protocol}://" + if host: + url += host + if port: + url += f":{port}" + self.client = Client(url, storage_options) @property def fsid(self): @@ -40,9 +48,11 @@ def _strip_protocol(cls, path: str) -> str: def unstrip_protocol(self, name: str) -> str: path = self._strip_protocol(name) - url = f"{self.protocol}://{self.host}" - if self.port: - url += f":{self.port}" + url = f"{self.protocol}://" + if self.host: + url += self.host + if self.port: + url += f":{self.port}" return f"{url}{path}" diff --git a/python/tests/test_fsspec.py b/python/tests/test_fsspec.py index d4d4e02..7d7bdc3 100644 --- a/python/tests/test_fsspec.py +++ b/python/tests/test_fsspec.py @@ -6,6 +6,12 @@ from hdfs_native.fsspec import HdfsFileSystem +def test_config(minidfs: str): + url = urllib.parse.urlparse(minidfs) + fs: HdfsFileSystem = fsspec.filesystem(url.scheme, **{"fs.defaultFS": minidfs}) + assert len(fs.ls("/")) == 0 + + def test_dirs(fs: HdfsFileSystem): fs.mkdir("/testdir") assert fs.info("/testdir")["type"] == "directory" diff --git a/rust/src/client.rs b/rust/src/client.rs index 82975f0..7f0d48e 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -736,7 +736,7 @@ mod test { #[test] fn test_default_fs() { assert!(Client::default_with_config( - vec![("fs.defaultFS".to_string(), "hdfs://test".to_string())] + vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())] .into_iter() .collect(), ) @@ -751,7 +751,7 @@ mod test { assert!(Client::new_with_config( "hdfs://", - vec![("fs.defaultFS".to_string(), "hdfs://test".to_string())] + vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())] .into_iter() .collect(), ) diff --git a/rust/src/hdfs/proxy.rs b/rust/src/hdfs/proxy.rs index 3875ab3..2b874b6 100644 --- a/rust/src/hdfs/proxy.rs +++ b/rust/src/hdfs/proxy.rs @@ -103,12 +103,18 @@ impl NameServiceProxy { todo!() }; - Ok(NameServiceProxy { - proxy_connections, - current_active: AtomicUsize::new(0), - current_observers: Arc::new(Mutex::new(HashSet::new())), - msycned: AtomicBool::new(false), - }) + if proxy_connections.is_empty() { + Err(HdfsError::InvalidArgument( + "No NameNode hosts found".to_string(), + )) + } else { + Ok(NameServiceProxy { + proxy_connections, + current_active: AtomicUsize::new(0), + current_observers: Arc::new(Mutex::new(HashSet::new())), + msycned: AtomicBool::new(false), + }) + } } async fn msync_if_needed(&self, write: bool) -> Result<()> { diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 49b2b38..f4cffdf 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -15,10 +15,16 @@ //! //! Create a client for a Name Service //! ```rust +//! use std::collections::HashMap; //! use hdfs_native::Client; //! # use hdfs_native::Result; //! # fn main() -> Result<()> { -//! let client = Client::new("hdfs://ns")?; +//! let config = HashMap::from([ +//! ("dfs.ha.namenodes.ns".to_string(), "nn-1,nn-2".to_string()), +//! ("dfs.namenode.rpc-address.ns.nn-1".to_string(), "nn-1:9000".to_string()), +//! ("dfs.namenode.rpc-address.ns.nn-2".to_string(), "nn-2:9000".to_string()), +//! ]); +//! let client = Client::new_with_config("hdfs://ns", config)?; //! # Ok(()) //! # } //! ```