Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Jan 23, 2025
1 parent dbac81f commit 11fe278
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
109 changes: 109 additions & 0 deletions src/impls/impl-posix/epoll/async.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#include <impl-posix/fd.h>
#include <impl-posix/utils.h>
#include <karm-async/promise.h>
#include <karm-base/map.h>
#include <karm-sys/_embed.h>
#include <karm-sys/async.h>
#include <karm-sys/time.h>
#include <sys/epoll.h>
#include <unistd.h>

namespace Karm::Sys::_Embed {

struct EpollSched : public Sys::Sched {
int _epollFd;
usize _id = 0;
Map<usize, Async::Promise<>> _promises;

EpollSched(int epoll_fd) : _epollFd(epoll_fd) {}

~EpollSched() { close(_epollFd); }

Async::Task<> waitFor(epoll_event ev, int fd) {
usize id = _id++;
auto promise = Async::Promise<>();
auto future = promise.future();

ev.data.u64 = id;
if (::epoll_ctl(_epollFd, EPOLL_CTL_ADD, fd, &ev) < 0)
panic("epoll_ctl");

_promises.put(id, std::move(promise));
return Async::makeTask(future);
}

Async::Task<usize> readAsync(Rc<Fd> fd, MutBytes buf) override {
co_trya$(waitFor({.events = EPOLLIN | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->read(buf)));
}

Async::Task<usize> writeAsync(Rc<Fd> fd, Bytes buf) override {
co_trya$(waitFor({.events = EPOLLOUT | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->write(buf)));
}

Async::Task<usize> flushAsync(Rc<Fd> fd) override {
co_trya$(waitFor({.events = EPOLLOUT | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->flush()));
}

Async::Task<_Accepted> acceptAsync(Rc<Fd> fd) override {
co_trya$(waitFor({.events = EPOLLIN | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->accept()));
}

Async::Task<_Sent> sendAsync(Rc<Fd> fd, Bytes buf, Slice<Handle> handles, SocketAddr addr) override {
co_trya$(waitFor({.events = EPOLLOUT | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->send(buf, handles, addr)));
}

Async::Task<_Received> recvAsync(Rc<Fd> fd, MutBytes buf, MutSlice<Handle> hnds) override {
co_trya$(waitFor({.events = EPOLLIN | EPOLLET, .data = {}}, fd->handle().value()));
co_return Ok(co_try$(fd->recv(buf, hnds)));
}

Async::Task<> sleepAsync(TimeStamp until) override {
TimeStamp now = Sys::now();
TimeSpan delta = TimeSpan::zero();
if (now < until)
delta = until - now;

int timer_fd = Posix::createTimerFd(delta);
if (timer_fd < 0)
panic("createTimerFd");

co_trya$(waitFor({.events = EPOLLIN, .data = {}}, timer_fd));
close(timer_fd);
co_return Ok();
}

Res<> wait(TimeStamp until) override {
epoll_event ev;
int timeout = until.isEndOfTime() ? -1 : (until - Sys::now()).asMillis();

int n = ::epoll_wait(_epollFd, &ev, 1, timeout);

if (n < 0)
return Posix::fromLastErrno();

if (n == 0)
return Ok();

usize id = ev.data.u64;
auto promise = _promises.take(id);
promise.resolve(Ok());
return Ok();
}
};

Sched& globalSched() {
static EpollSched sched = [] {
int epoll_fd = ::epoll_create1(0);
if (epoll_fd < 0)
panic("epoll_create1");
return EpollSched(epoll_fd);
}();
return sched;
}

} // namespace Karm::Sys::_Embed
16 changes: 16 additions & 0 deletions src/impls/impl-posix/epoll/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"$schema": "https://schemas.cute.engineering/stable/cutekit.manifest.component.v1",
"id": "impl-posix.epoll",
"type": "lib",
"props": {
"cpp-excluded": true
},
"enableIf": {
"sys": [
"linux"
]
},
"provides": [
"karm-sys-async-impl"
]
}

0 comments on commit 11fe278

Please sign in to comment.