Skip to content

Commit

Permalink
Merge branch 'master' into dependency-bump
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Nov 14, 2024
2 parents 7707a1f + 8617932 commit 6ea2d70
Show file tree
Hide file tree
Showing 10 changed files with 822 additions and 219 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Native Rust HDFS client
This is a proof-of-concept HDFS client written natively in Rust. All other clients I have found in any other language are simply wrappers around libhdfs and require all the same Java dependencies, so I wanted to see if I could write one from scratch given that HDFS isn't really changing very often anymore. Several basic features are working, however it is not nearly as robust and the real HDFS client.

What this is not trying to do is implement all HDFS client/FileSystem interfaces, just things involving reading and writing data.
This is an experimental HDFS client written natively in Rust. Several basic features are working, however it is not nearly as robust and the real HDFS client.

## Supported HDFS features
Here is a list of currently supported and unsupported but possible future features.
Expand All @@ -12,10 +10,15 @@ Here is a list of currently supported and unsupported but possible future featur
- [x] Writing
- [x] Rename
- [x] Delete
- [x] Basic Permissions and ownership
- [x] ACLs
- [x] Content summary
- [x] Set replication
- [x] Set timestamps

### HDFS Features
- [x] Name Services
- [ ] Observer reads (state ID tracking is supported, but needs improvements on tracking Observer/Active NameNode)
- [x] Observer reads
- [x] ViewFS
- [x] Router based federation
- [x] Erasure coded reads and writes
Expand Down
60 changes: 54 additions & 6 deletions python/hdfs_native/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import io
import os
from typing import TYPE_CHECKING, Dict, Iterator, Optional
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional

# For some reason mypy doesn't think this exists
from typing_extensions import Buffer # type: ignore

from ._internal import RawClient, WriteOptions
from ._internal import (
AclEntry,
AclStatus,
ContentSummary,
FileStatus,
RawClient,
WriteOptions,
)

if TYPE_CHECKING:
from ._internal import ContentSummary, FileStatus, RawFileReader, RawFileWriter
from ._internal import (
RawFileReader,
RawFileWriter,
)


class FileReader(io.RawIOBase):
Expand Down Expand Up @@ -95,11 +105,11 @@ def __init__(
):
self.inner = RawClient(url, config)

def get_file_info(self, path: str) -> "FileStatus":
def get_file_info(self, path: str) -> FileStatus:
"""Gets the file status for the file at `path`"""
return self.inner.get_file_info(path)

def list_status(self, path: str, recursive: bool = False) -> Iterator["FileStatus"]:
def list_status(self, path: str, recursive: bool = False) -> Iterator[FileStatus]:
"""Gets the status of files rooted at `path`. If `recursive` is true, lists all files recursively."""
return self.inner.list_status(path, recursive)

Expand Down Expand Up @@ -181,8 +191,46 @@ def set_replication(self, path: str, replication: int) -> bool:
"""
return self.inner.set_replication(path, replication)

def get_content_summary(self, path: str) -> "ContentSummary":
def get_content_summary(self, path: str) -> ContentSummary:
"""
Gets a content summary for `path`
"""
return self.inner.get_content_summary(path)

def modify_acl_entries(self, path: str, entries: List[AclEntry]) -> None:
"""
Update ACL entries for file or directory at `path`. Existing entries will remain.
"""
return self.inner.modify_acl_entries(path, entries)

def remove_acl_entries(self, path: str, entries: List[AclEntry]) -> None:
"""
Remove specific ACL entries for file or directory at `path`.
"""
return self.inner.remove_acl_entries(path, entries)

def remove_default_acl(self, path: str) -> None:
"""
Remove all default ACLs for file or directory at `path`.
"""
return self.inner.remove_default_acl(path)

def remove_acl(self, path: str) -> None:
"""
Remove all ACL entries for file or directory at `path`.
"""
return self.inner.remove_acl(path)

def set_acl(self, path: str, entries: List[AclEntry]) -> None:
"""
Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
maintained.
"""
return self.inner.set_acl(path, entries)

def get_acl_status(self, path: str) -> AclStatus:
"""
Get the ACL status for the file or directory at `path`.
"""
return self.inner.get_acl_status(path)
35 changes: 33 additions & 2 deletions python/hdfs_native/_internal.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Iterator, Optional
from typing import Dict, Iterator, List, Literal, Optional

# For some reason mypy doesn't think this exists
from typing_extensions import Buffer # type: ignore
Expand All @@ -23,6 +23,31 @@ class ContentSummary:
space_consumed: int
space_quota: int

AclEntryType = Literal["user", "group", "mask", "other"]
AclEntryScope = Literal["access", "default"]
FsAction = Literal["---", "--x", "-w-", "-wx", "r--", "r-x", "rw-", "rwx"]

class AclEntry:
type: AclEntryType
scope: AclEntryScope
permissions: FsAction
name: Optional[str]

def __init__(
self,
type: AclEntryType,
scope: AclEntryScope,
permissions: FsAction,
name: Optional[str] = None,
): ...

class AclStatus:
owner: str
group: str
sticky: bool
entries: List[AclEntry]
permission: int

class WriteOptions:
block_size: Optional[int]
replication: Optional[int]
Expand Down Expand Up @@ -85,4 +110,10 @@ class RawClient:
) -> None: ...
def set_permission(self, path: str, permission: int) -> None: ...
def set_replication(self, path: str, replication: int) -> bool: ...
def get_content_summary(self, path) -> ContentSummary: ...
def get_content_summary(self, path: str) -> ContentSummary: ...
def modify_acl_entries(self, path: str, entries: List[AclEntry]) -> None: ...
def remove_acl_entries(self, path: str, entries: List[AclEntry]) -> None: ...
def remove_default_acl(self, path: str) -> None: ...
def remove_acl(self, path: str) -> None: ...
def set_acl(self, path: str, entries: List[AclEntry]) -> None: ...
def get_acl_status(self, path: str) -> AclStatus: ...
141 changes: 141 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ::hdfs_native::{
Client,
};
use bytes::Bytes;
use hdfs_native::acl::{AclEntry, AclStatus};
use hdfs_native::client::ContentSummary;
use pyo3::{exceptions::PyRuntimeError, prelude::*};
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -131,6 +132,93 @@ impl PyContentSummary {
}
}

#[pyclass(get_all, frozen, name = "AclStatus")]
struct PyAclStatus {
owner: String,
group: String,
sticky: bool,
entries: Vec<PyAclEntry>,
permission: u16,
}

impl From<AclStatus> for PyAclStatus {
fn from(value: AclStatus) -> Self {
Self {
owner: value.owner,
group: value.group,
sticky: value.sticky,
entries: value.entries.into_iter().map(PyAclEntry::from).collect(),
permission: value.permission,
}
}
}

#[pymethods]
impl PyAclStatus {
/// Return a dataclass-esque format for the repr
fn __repr__(&self) -> String {
format!("AclStatus(owner='{}')", self.owner)
}
}

#[pyclass(get_all, set_all, name = "AclEntry")]
#[derive(Clone, Default)]
struct PyAclEntry {
r#type: String,
scope: String,
permissions: String,
name: Option<String>,
}

impl From<AclEntry> for PyAclEntry {
fn from(value: AclEntry) -> Self {
Self {
r#type: value.r#type.to_string(),
scope: value.scope.to_string(),
permissions: value.permissions.to_string(),
name: value.name,
}
}
}

impl From<PyAclEntry> for AclEntry {
fn from(value: PyAclEntry) -> Self {
Self {
r#type: value.r#type.into(),
scope: value.scope.into(),
permissions: value.permissions.into(),
name: value.name,
}
}
}

impl FromIterator<PyAclEntry> for Vec<AclEntry> {
fn from_iter<T: IntoIterator<Item = PyAclEntry>>(iter: T) -> Self {
iter.into_iter().map(AclEntry::from).collect()
}
}

#[pymethods]
impl PyAclEntry {
#[new]
pub fn new(r#type: String, scope: String, permissions: String, name: Option<String>) -> Self {
Self {
r#type,
scope,
permissions,
name,
}
}

/// Return a dataclass-esque format for the repr
fn __repr__(&self) -> String {
format!(
"AclEntry(type='{}', scope='{}', permissions='{}', name='{:?}')",
self.r#type, self.scope, self.permissions, self.name
)
}
}

#[pyclass]
struct RawFileReader {
inner: FileReader,
Expand Down Expand Up @@ -402,12 +490,65 @@ impl RawClient {
.allow_threads(|| self.rt.block_on(self.inner.get_content_summary(path)))?
.into())
}

pub fn modify_acl_entries(
&self,
path: &str,
acl_spec: Vec<PyAclEntry>,
py: Python,
) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| {
self.rt.block_on(
self.inner
.modify_acl_entries(path, acl_spec.into_iter().collect()),
)
})?)
}

pub fn remove_acl_entries(
&self,
path: &str,
acl_spec: Vec<PyAclEntry>,
py: Python,
) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| {
self.rt.block_on(
self.inner
.remove_acl_entries(path, acl_spec.into_iter().collect()),
)
})?)
}

pub fn remove_default_acl(&self, path: &str, py: Python) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| self.rt.block_on(self.inner.remove_default_acl(path)))?)
}

pub fn remove_acl(&self, path: &str, py: Python) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| self.rt.block_on(self.inner.remove_acl(path)))?)
}

pub fn set_acl(&self, path: &str, acl_spec: Vec<PyAclEntry>, py: Python) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| {
self.rt
.block_on(self.inner.set_acl(path, acl_spec.into_iter().collect()))
})?)
}

pub fn get_acl_status(&self, path: &str, py: Python) -> PyHdfsResult<PyAclStatus> {
Ok(py
.allow_threads(|| self.rt.block_on(self.inner.get_acl_status(path)))?
.into())
}
}

/// A Python module implemented in Rust.
#[pymodule]
fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<RawClient>()?;
m.add_class::<PyFileStatus>()?;
m.add_class::<PyContentSummary>()?;
m.add_class::<PyWriteOptions>()?;
m.add_class::<PyAclEntry>()?;
m.add_class::<PyAclStatus>()?;
Ok(())
}
33 changes: 32 additions & 1 deletion python/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io

from hdfs_native import Client, WriteOptions
from hdfs_native import AclEntry, Client, WriteOptions


def test_integration(client: Client):
Expand Down Expand Up @@ -115,3 +115,34 @@ def test_write_options(client: Client):
assert file_info.length == 0
assert file_info.permission == 0o700
assert file_info.blocksize == 1024 * 1024


def test_acls(client: Client):
client.create("/test").close()

acl_status = client.get_acl_status("/test")
assert len(acl_status.entries) == 0

client.modify_acl_entries("/test", [AclEntry("user", "access", "r-x", "testuser")])
# Should be 2 entries now, a default group entry gets added as well
acl_status = client.get_acl_status("/test")
assert len(acl_status.entries) == 2

client.remove_acl("/test")
acl_status = client.get_acl_status("/test")
assert len(acl_status.entries) == 0

client.delete("/test")

client.mkdirs("/testdir")

client.modify_acl_entries(
"/testdir", [AclEntry("user", "default", "rwx", "testuser")]
)
# 4 other defaults get added automatically
acl_status = client.get_acl_status("/testdir")
assert len(acl_status.entries) == 5

client.remove_default_acl("/testdir")
acl_status = client.get_acl_status("/testdir")
assert len(acl_status.entries) == 0
Loading

0 comments on commit 6ea2d70

Please sign in to comment.