Skip to content

Commit

Permalink
Allow using default fs for fsspec and pass storage options to client (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Nov 7, 2024
1 parent 0db3e77 commit 1a49df1
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
26 changes: 18 additions & 8 deletions python/hdfs_native/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@
class HdfsFileSystem(AbstractFileSystem):
root_marker = "/"

def __init__(self, host: str, port: Optional[int] = None, *args, **storage_options):
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
*args,
**storage_options,
):
super().__init__(host, port, *args, **storage_options)
self.host = host
self.port = port
url = f"{self.protocol}://{host}"
if port:
url += f":{port}"
self.client = Client(url)
url = f"{self.protocol}://"
if host:
url += host
if port:
url += f":{port}"
self.client = Client(url, storage_options)

@property
def fsid(self):
Expand All @@ -40,9 +48,11 @@ def _strip_protocol(cls, path: str) -> str:
def unstrip_protocol(self, name: str) -> str:
path = self._strip_protocol(name)

url = f"{self.protocol}://{self.host}"
if self.port:
url += f":{self.port}"
url = f"{self.protocol}://"
if self.host:
url += self.host
if self.port:
url += f":{self.port}"

return f"{url}{path}"

Expand Down
6 changes: 6 additions & 0 deletions python/tests/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from hdfs_native.fsspec import HdfsFileSystem


def test_config(minidfs: str):
url = urllib.parse.urlparse(minidfs)
fs: HdfsFileSystem = fsspec.filesystem(url.scheme, **{"fs.defaultFS": minidfs})
assert len(fs.ls("/")) == 0


def test_dirs(fs: HdfsFileSystem):
fs.mkdir("/testdir")
assert fs.info("/testdir")["type"] == "directory"
Expand Down
4 changes: 2 additions & 2 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ mod test {
#[test]
fn test_default_fs() {
assert!(Client::default_with_config(
vec![("fs.defaultFS".to_string(), "hdfs://test".to_string())]
vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
.into_iter()
.collect(),
)
Expand All @@ -751,7 +751,7 @@ mod test {

assert!(Client::new_with_config(
"hdfs://",
vec![("fs.defaultFS".to_string(), "hdfs://test".to_string())]
vec![("fs.defaultFS".to_string(), "hdfs://test:9000".to_string())]
.into_iter()
.collect(),
)
Expand Down
18 changes: 12 additions & 6 deletions rust/src/hdfs/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,18 @@ impl NameServiceProxy {
todo!()
};

Ok(NameServiceProxy {
proxy_connections,
current_active: AtomicUsize::new(0),
current_observers: Arc::new(Mutex::new(HashSet::new())),
msycned: AtomicBool::new(false),
})
if proxy_connections.is_empty() {
Err(HdfsError::InvalidArgument(
"No NameNode hosts found".to_string(),
))
} else {
Ok(NameServiceProxy {
proxy_connections,
current_active: AtomicUsize::new(0),
current_observers: Arc::new(Mutex::new(HashSet::new())),
msycned: AtomicBool::new(false),
})
}
}

async fn msync_if_needed(&self, write: bool) -> Result<()> {
Expand Down
8 changes: 7 additions & 1 deletion rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
//!
//! Create a client for a Name Service
//! ```rust
//! use std::collections::HashMap;
//! use hdfs_native::Client;
//! # use hdfs_native::Result;
//! # fn main() -> Result<()> {
//! let client = Client::new("hdfs://ns")?;
//! let config = HashMap::from([
//! ("dfs.ha.namenodes.ns".to_string(), "nn-1,nn-2".to_string()),
//! ("dfs.namenode.rpc-address.ns.nn-1".to_string(), "nn-1:9000".to_string()),
//! ("dfs.namenode.rpc-address.ns.nn-2".to_string(), "nn-2:9000".to_string()),
//! ]);
//! let client = Client::new_with_config("hdfs://ns", config)?;
//! # Ok(())
//! # }
//! ```
Expand Down

0 comments on commit 1a49df1

Please sign in to comment.