Skip to content

Commit

Permalink
Use memory address itself for 'hash', add a mechanism to wait for any…
Browse files Browse the repository at this point in the history
… write operations on a memory address to be awaited (optional memoryview kwarg on sync())
  • Loading branch information
Tremeschin committed Aug 19, 2024
1 parent 24a70be commit a9bb4a9
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 31 deletions.
15 changes: 9 additions & 6 deletions turbopipe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union
from typing import Optional, Union

from moderngl import Buffer

Expand All @@ -7,8 +7,8 @@

def pipe(buffer: Union[Buffer, memoryview], fileno: int) -> None:
"""
Pipe the content of a moderngl.Buffer or memoryview to a file descriptor,
Fast, threaded and non-blocking. Call `sync()` when done!
Pipe the content of a moderngl.Buffer or memoryview to a file descriptor, fast, threaded and
blocking when needed. Call `sync(buffer)` before this, and `sync()` when done for
Usage:
```python
Expand All @@ -29,9 +29,12 @@ def pipe(buffer: Union[Buffer, memoryview], fileno: int) -> None:
_turbopipe.pipe(buffer, fileno)
del buffer

def sync() -> None:
"""Waits for all jobs to finish"""
_turbopipe.sync()
def sync(buffer: Optional[Union[Buffer, memoryview]]=None) -> None:
"""Waits for any pending write operation on a buffer, or 'all buffers' if None, to finish"""
if isinstance(buffer, Buffer):
buffer = memoryview(buffer.mglo)
_turbopipe.sync(buffer)
del buffer

def close() -> None:
"""Syncs and deletes objects"""
Expand Down
61 changes: 36 additions & 25 deletions turbopipe/_turbopipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,9 @@ using namespace std;
// TurboPipe internals

struct Work {
void* map;
void* data;
int file;
size_t size;

int hash() {
return std::hash<int>()(file) ^ std::hash<void*>()(map);
}
};

class TurboPipe {
Expand All @@ -49,12 +45,23 @@ class TurboPipe {
this->_pipe(view.buf, view.len, file);
}

void sync() {
// Wait for all queues to be empty, as they are erased when
void sync(PyObject* memoryview=nullptr) {
void* data = nullptr;

if (memoryview != nullptr) {
Py_buffer view = *PyMemoryView_GET_BUFFER(memoryview);
data = view.buf;
}

// Wait for some or all queues to be empty, as they are erased when
// each thread's writing loop is done, guaranteeing finish
for (auto& values: queue) {
while (!values.second.empty()) {
this_thread::sleep_for(chrono::milliseconds(1));
while (true) {
if (data != nullptr && values.second.find(data) == values.second.end())
break;
if (data == nullptr && values.second.empty())
break;
this_thread::sleep_for(chrono::microseconds(200));
}
}
}
Expand All @@ -69,8 +76,8 @@ class TurboPipe {
}

private:
dict<int, dict<int, condition_variable>> pending;
dict<int, unordered_set<int>> queue;
dict<int, dict<void*, condition_variable>> pending;
dict<int, unordered_set<void*>> queue;
dict<int, deque<Work>> stream;
dict<int, thread> threads;
dict<int, mutex> mutexes;
Expand All @@ -79,20 +86,18 @@ class TurboPipe {

void _pipe(void* data, size_t size, int file) {
Work work = {data, file, size};
int hash = work.hash();

unique_lock<mutex> lock(mutexes[file]);

// Notify this hash is queued, wait if pending
if (!queue[file].insert(hash).second) {
pending[file][hash].wait(lock, [this, file, hash] {
return queue[file].find(hash) == queue[file].end();
// Notify this memory is queued, wait if pending
if (!queue[file].insert(data).second) {
pending[file][data].wait(lock, [this, file, data] {
return queue[file].find(data) == queue[file].end();
});
}

// Add another job to the queue
stream[file].push_back(work);
queue[file].insert(hash);
queue[file].insert(data);
this->running = true;
lock.unlock();

Expand Down Expand Up @@ -128,17 +133,16 @@ class TurboPipe {
size_t tell = 0;
while (tell < work.size) {
size_t chunk = min(work.size - tell, static_cast<size_t>(4096));
size_t written = write(work.file, (char*) work.map + tell, chunk);
size_t written = write(work.file, (char*) work.data + tell, chunk);
if (written == -1) break;
tell += written;
}
#endif

// Signal work is done
lock.lock();
int hash = work.hash();
pending[file][hash].notify_all();
queue[file].erase(hash);
pending[file][work.data].notify_all();
queue[file].erase(work.data);
signal.notify_all();
}
}
Expand Down Expand Up @@ -168,9 +172,16 @@ static PyObject* turbopipe_pipe(

static PyObject* turbopipe_sync(
PyObject* Py_UNUSED(self),
PyObject* Py_UNUSED(args)
PyObject* args
) {
turbopipe->sync();
PyObject* memoryview;
if (!PyArg_ParseTuple(args, "|O", &memoryview))
return NULL;
if (memoryview != nullptr && !PyMemoryView_Check(memoryview)) {
PyErr_SetString(PyExc_TypeError, "Expected a memoryview object or None");
return NULL;
}
turbopipe->sync(memoryview);
Py_RETURN_NONE;
}

Expand All @@ -191,7 +202,7 @@ static void turbopipe_exit() {

static PyMethodDef TurboPipeMethods[] = {
{"pipe", (PyCFunction) turbopipe_pipe, METH_VARARGS, ""},
{"sync", (PyCFunction) turbopipe_sync, METH_NOARGS, ""},
{"sync", (PyCFunction) turbopipe_sync, METH_VARARGS, ""},
{"close", (PyCFunction) turbopipe_close, METH_NOARGS, ""},
{NULL, NULL, 0, NULL}
};
Expand Down

0 comments on commit a9bb4a9

Please sign in to comment.