Skip to content

Commit

Permalink
Release GIL (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Nov 6, 2024
1 parent f348e95 commit ffd719a
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 57 deletions.
2 changes: 2 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ devel = [
"mypy~=1.13.0",
"ruff~=0.7.2",
"pytest~=8.3",
"pytest-benchmark<5"
]

[project.urls]
Expand Down Expand Up @@ -51,6 +52,7 @@ testpaths = [
"tests",
"hdfs_native",
]
addopts = "-m 'not benchmark'"

[tool.ruff.lint]
extend-select = ["I"]
117 changes: 67 additions & 50 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl PyFileStatus {

#[pyclass(name = "FileStatusIter")]
struct PyFileStatusIter {
inner: ListStatusIterator,
inner: Arc<ListStatusIterator>,
rt: Arc<Runtime>,
}

Expand All @@ -81,10 +81,11 @@ impl PyFileStatusIter {
slf
}

fn __next__(mut slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<PyFileStatus>> {
// This is dumb, figure out how to get around the double borrow here
fn __next__(slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<PyFileStatus>> {
// Kinda dumb, but lets us release the GIL while getting the next value
let inner = Arc::clone(&slf.inner);
let rt = Arc::clone(&slf.rt);
if let Some(result) = rt.block_on(slf.inner.next()) {
if let Some(result) = slf.py().allow_threads(|| rt.block_on(inner.next())) {
Ok(Some(PyFileStatus::from(result?)))
} else {
Ok(None)
Expand Down Expand Up @@ -150,21 +151,21 @@ impl RawFileReader {
self.inner.tell()
}

pub fn read(&mut self, len: i64) -> PyHdfsResult<Cow<[u8]>> {
pub fn read(&mut self, len: i64, py: Python) -> PyHdfsResult<Cow<[u8]>> {
let read_len = if len < 0 {
self.inner.remaining()
} else {
len as usize
};
Ok(Cow::from(
self.rt.block_on(self.inner.read(read_len))?.to_vec(),
py.allow_threads(|| self.rt.block_on(self.inner.read(read_len)))?
.to_vec(),
))
}

pub fn read_range(&self, offset: usize, len: usize) -> PyHdfsResult<Cow<[u8]>> {
pub fn read_range(&self, offset: usize, len: usize, py: Python) -> PyHdfsResult<Cow<[u8]>> {
Ok(Cow::from(
self.rt
.block_on(self.inner.read_range(offset, len))?
py.allow_threads(|| self.rt.block_on(self.inner.read_range(offset, len)))?
.to_vec(),
))
}
Expand Down Expand Up @@ -254,12 +255,12 @@ struct RawFileWriter {

#[pymethods]
impl RawFileWriter {
pub fn write(&mut self, buf: Vec<u8>) -> PyHdfsResult<usize> {
Ok(self.rt.block_on(self.inner.write(Bytes::from(buf)))?)
pub fn write(&mut self, buf: Vec<u8>, py: Python) -> PyHdfsResult<usize> {
Ok(py.allow_threads(|| self.rt.block_on(self.inner.write(Bytes::from(buf))))?)
}

pub fn close(&mut self) -> PyHdfsResult<()> {
Ok(self.rt.block_on(self.inner.close())?)
pub fn close(&mut self, py: Python) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| self.rt.block_on(self.inner.close()))?)
}
}

Expand Down Expand Up @@ -294,93 +295,109 @@ impl RawClient {
})
}

pub fn get_file_info(&self, path: &str) -> PyHdfsResult<PyFileStatus> {
Ok(self
.rt
.block_on(self.inner.get_file_info(path))
.map(PyFileStatus::from)?)
pub fn get_file_info(&self, path: &str, py: Python) -> PyHdfsResult<PyFileStatus> {
Ok(py.allow_threads(|| {
self.rt
.block_on(self.inner.get_file_info(path))
.map(PyFileStatus::from)
})?)
}

pub fn list_status(&self, path: &str, recursive: bool) -> PyFileStatusIter {
let inner = self.inner.list_status_iter(path, recursive);
PyFileStatusIter {
inner,
inner: Arc::new(inner),
rt: Arc::clone(&self.rt),
}
}

pub fn read(&self, path: &str) -> PyHdfsResult<RawFileReader> {
let file_reader = self.rt.block_on(self.inner.read(path))?;
pub fn read(&self, path: &str, py: Python) -> PyHdfsResult<RawFileReader> {
let file_reader = py.allow_threads(|| self.rt.block_on(self.inner.read(path)))?;

Ok(RawFileReader {
inner: file_reader,
rt: Arc::clone(&self.rt),
})
}

pub fn create(&self, src: &str, write_options: PyWriteOptions) -> PyHdfsResult<RawFileWriter> {
let file_writer = self
.rt
.block_on(self.inner.create(src, WriteOptions::from(write_options)))?;
pub fn create(
&self,
src: &str,
write_options: PyWriteOptions,
py: Python,
) -> PyHdfsResult<RawFileWriter> {
let file_writer = py.allow_threads(|| {
self.rt
.block_on(self.inner.create(src, WriteOptions::from(write_options)))
})?;

Ok(RawFileWriter {
inner: file_writer,
rt: Arc::clone(&self.rt),
})
}

pub fn append(&self, src: &str) -> PyHdfsResult<RawFileWriter> {
let file_writer = self.rt.block_on(self.inner.append(src))?;
pub fn append(&self, src: &str, py: Python) -> PyHdfsResult<RawFileWriter> {
let file_writer = py.allow_threads(|| self.rt.block_on(self.inner.append(src)))?;

Ok(RawFileWriter {
inner: file_writer,
rt: Arc::clone(&self.rt),
})
}

pub fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> PyHdfsResult<()> {
Ok(self
.rt
.block_on(self.inner.mkdirs(path, permission, create_parent))?)
pub fn mkdirs(
&self,
path: &str,
permission: u32,
create_parent: bool,
py: Python,
) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| {
self.rt
.block_on(self.inner.mkdirs(path, permission, create_parent))
})?)
}

pub fn rename(&self, src: &str, dst: &str, overwrite: bool) -> PyHdfsResult<()> {
Ok(self.rt.block_on(self.inner.rename(src, dst, overwrite))?)
pub fn rename(&self, src: &str, dst: &str, overwrite: bool, py: Python) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| self.rt.block_on(self.inner.rename(src, dst, overwrite)))?)
}

pub fn delete(&self, path: &str, recursive: bool) -> PyHdfsResult<bool> {
Ok(self.rt.block_on(self.inner.delete(path, recursive))?)
pub fn delete(&self, path: &str, recursive: bool, py: Python) -> PyHdfsResult<bool> {
Ok(py.allow_threads(|| self.rt.block_on(self.inner.delete(path, recursive)))?)
}

pub fn set_times(&self, path: &str, mtime: u64, atime: u64) -> PyHdfsResult<()> {
Ok(self.rt.block_on(self.inner.set_times(path, mtime, atime))?)
pub fn set_times(&self, path: &str, mtime: u64, atime: u64, py: Python) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| self.rt.block_on(self.inner.set_times(path, mtime, atime)))?)
}

pub fn set_owner(
&self,
path: &str,
owner: Option<&str>,
group: Option<&str>,
py: Python,
) -> PyHdfsResult<()> {
Ok(self.rt.block_on(self.inner.set_owner(path, owner, group))?)
Ok(py.allow_threads(|| self.rt.block_on(self.inner.set_owner(path, owner, group)))?)
}

pub fn set_permission(&self, path: &str, permission: u32) -> PyHdfsResult<()> {
Ok(self
.rt
.block_on(self.inner.set_permission(path, permission))?)
pub fn set_permission(&self, path: &str, permission: u32, py: Python) -> PyHdfsResult<()> {
Ok(py.allow_threads(|| {
self.rt
.block_on(self.inner.set_permission(path, permission))
})?)
}

pub fn set_replication(&self, path: &str, replication: u32) -> PyHdfsResult<bool> {
Ok(self
.rt
.block_on(self.inner.set_replication(path, replication))?)
pub fn set_replication(&self, path: &str, replication: u32, py: Python) -> PyHdfsResult<bool> {
Ok(py.allow_threads(|| {
self.rt
.block_on(self.inner.set_replication(path, replication))
})?)
}

pub fn get_content_summary(&self, path: &str) -> PyHdfsResult<PyContentSummary> {
Ok(self
.rt
.block_on(self.inner.get_content_summary(path))?
pub fn get_content_summary(&self, path: &str, py: Python) -> PyHdfsResult<PyContentSummary> {
Ok(py
.allow_threads(|| self.rt.block_on(self.inner.get_content_summary(path)))?
.into())
}
}
Expand Down
36 changes: 36 additions & 0 deletions python/tests/test_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from concurrent.futures import ThreadPoolExecutor, as_completed

import pytest
from pytest_benchmark.fixture import BenchmarkFixture

from hdfs_native import Client


@pytest.mark.benchmark
def test_benchmark_threading(client: Client, benchmark: BenchmarkFixture):
def do_work():
def func(path: str):
client.create(path).close()
return client.delete(path)

with ThreadPoolExecutor(100) as executor:
futures = []
for i in range(1000):
futures.append(executor.submit(func, f"/bench{i}"))

for future in as_completed(futures):
assert future.result()

benchmark(do_work)


@pytest.mark.benchmark
def test_benchmark_listing(client: Client, benchmark: BenchmarkFixture):
for i in range(1000):
client.create(f"/bench{i}").close()

def do_work():
for _ in client.list_status("/"):
pass

benchmark(do_work)
15 changes: 8 additions & 7 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ impl DirListingIterator {
pub struct ListStatusIterator {
mount_table: Arc<MountTable>,
recursive: bool,
iters: Vec<DirListingIterator>,
iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
}

impl ListStatusIterator {
Expand All @@ -598,20 +598,21 @@ impl ListStatusIterator {
ListStatusIterator {
mount_table,
recursive,
iters: vec![initial],
iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
}
}

pub async fn next(&mut self) -> Option<Result<FileStatus>> {
pub async fn next(&self) -> Option<Result<FileStatus>> {
let mut next_file: Option<Result<FileStatus>> = None;
let mut iters = self.iters.lock().await;
while next_file.is_none() {
if let Some(iter) = self.iters.last_mut() {
if let Some(iter) = iters.last_mut() {
if let Some(file_result) = iter.next().await {
if let Ok(file) = file_result {
// Return the directory as the next result, but start traversing into that directory
// next if we're doing a recursive listing
if file.isdir && self.recursive {
self.iters.push(DirListingIterator::new(
iters.push(DirListingIterator::new(
file.path.clone(),
&self.mount_table,
false,
Expand All @@ -624,7 +625,7 @@ impl ListStatusIterator {
}
} else {
// We've exhausted this directory
self.iters.pop();
iters.pop();
}
} else {
// There's nothing left, just return None
Expand All @@ -636,7 +637,7 @@ impl ListStatusIterator {
}

pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
let listing = stream::unfold(self, |mut state| async move {
let listing = stream::unfold(self, |state| async move {
let next = state.next().await;
next.map(|n| (n, state))
});
Expand Down

0 comments on commit ffd719a

Please sign in to comment.