Skip to content

Commit

Permalink
handle ENOMEM error when registering buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
jmillan committed Dec 20, 2023
1 parent 8841ec4 commit 7fa3e96
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 34 deletions.
6 changes: 6 additions & 0 deletions worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class DepLibUring
{
return this->active;
}
bool IsZeroCopyEnabled() const
{
return this->zeroCopyEnabled;
}
io_uring* GetRing()
{
return std::addressof(this->ring);
Expand Down Expand Up @@ -109,6 +113,8 @@ class DepLibUring
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Whether Zero Copy feature is enabled.
bool zeroCopyEnabled{ true };
// Pre-allocated UserData's.
UserData userDatas[QueueDepth]{};
// Indexes of available UserData entries.
Expand Down
90 changes: 56 additions & 34 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,44 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)
struct io_uring_cqe* cqe = cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));

// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
if (liburing->IsZeroCopyEnabled())
{
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
{
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(false);
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);
// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
continue;
}
}

// Successfull SQE.
Expand Down Expand Up @@ -310,7 +313,19 @@ DepLibUring::LibUring::LibUring()

if (err < 0)
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err));
if (err == ENOMEM)
{
this->zeroCopyEnabled = false;

MS_WARN_TAG(
info,
"io_uring_register_buffers() failed due to low memlock limit (ulimit -l), disabling zero copy",
std::strerror(-err));
}
else
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err));
}
}
}

Expand Down Expand Up @@ -442,16 +457,23 @@ bool DepLibUring::LibUring::PrepareSend(

socklen_t addrlen = Utils::IP::GetAddressLen(addr);

auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;
if (this->zeroCopyEnabled)
{
auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;

io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);
io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);

// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
}
else
{
io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);
}

this->sqeProcessCount++;

Expand Down

0 comments on commit 7fa3e96

Please sign in to comment.