diff --git a/worker/include/DepLibUring.hpp b/worker/include/DepLibUring.hpp index 71ed329f13..973acaff42 100644 --- a/worker/include/DepLibUring.hpp +++ b/worker/include/DepLibUring.hpp @@ -80,6 +80,10 @@ class DepLibUring { return this->active; } + bool IsZeroCopyEnabled() const + { + return this->zeroCopyEnabled; + } io_uring* GetRing() { return std::addressof(this->ring); @@ -109,6 +113,8 @@ class DepLibUring uv_poll_t* uvHandle{ nullptr }; // Whether we are currently sending RTP over io_uring. bool active{ false }; + // Whether Zero Copy feature is enabled. + bool zeroCopyEnabled{ true }; // Pre-allocated UserData's. UserData userDatas[QueueDepth]{}; // Indexes of available UserData entries. diff --git a/worker/src/DepLibUring.cpp b/worker/src/DepLibUring.cpp index 12cb75ccba..a42b03fd16 100644 --- a/worker/src/DepLibUring.cpp +++ b/worker/src/DepLibUring.cpp @@ -41,41 +41,44 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events) struct io_uring_cqe* cqe = cqes[i]; auto* userData = static_cast(io_uring_cqe_get_data(cqe)); - // CQE notification for a zero-copy submission. - if (cqe->flags & IORING_CQE_F_NOTIF) + if (liburing->IsZeroCopyEnabled()) { - // The send buffer is now in the network card, run the send callback. - if (userData->cb) - { - (*userData->cb)(true); - delete userData->cb; - userData->cb = nullptr; - } - - liburing->ReleaseUserDataEntry(userData->idx); - io_uring_cqe_seen(liburing->GetRing(), cqe); - - continue; - } - - // CQE for a zero-copy submission, a CQE notification will follow. - if (cqe->flags & IORING_CQE_F_MORE) - { - if (cqe->res < 0) + // CQE notification for a zero-copy submission. + if (cqe->flags & IORING_CQE_F_NOTIF) { + // The send buffer is now in the network card, run the send callback. if (userData->cb) { - (*userData->cb)(false); + (*userData->cb)(true); delete userData->cb; userData->cb = nullptr; } + + liburing->ReleaseUserDataEntry(userData->idx); + io_uring_cqe_seen(liburing->GetRing(), cqe); + + continue; } - // NOTE: Do not release the user data as it will be done upon reception - // of CQE notification. - io_uring_cqe_seen(liburing->GetRing(), cqe); + // CQE for a zero-copy submission, a CQE notification will follow. + if (cqe->flags & IORING_CQE_F_MORE) + { + if (cqe->res < 0) + { + if (userData->cb) + { + (*userData->cb)(false); + delete userData->cb; + userData->cb = nullptr; + } + } + + // NOTE: Do not release the user data as it will be done upon reception + // of CQE notification. + io_uring_cqe_seen(liburing->GetRing(), cqe); - continue; + continue; + } } // Successfull SQE. @@ -310,7 +313,19 @@ DepLibUring::LibUring::LibUring() if (err < 0) { - MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err)); + if (err == ENOMEM) + { + this->zeroCopyEnabled = false; + + MS_WARN_TAG( + info, + "io_uring_register_buffers() failed due to low memlock limit (ulimit -l), disabling zero copy", + std::strerror(-err)); + } + else + { + MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err)); + } } } @@ -442,16 +457,23 @@ bool DepLibUring::LibUring::PrepareSend( socklen_t addrlen = Utils::IP::GetAddressLen(addr); - auto iovec = this->iovecs[userData->idx]; - iovec.iov_len = len; + if (this->zeroCopyEnabled) + { + auto iovec = this->iovecs[userData->idx]; + iovec.iov_len = len; - io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0); - io_uring_prep_send_set_addr(sqe, addr, addrlen); + io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0); + io_uring_prep_send_set_addr(sqe, addr, addrlen); - // Tell io_uring that we are providing the already registered send buffer - // for zero copy. - sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; - sqe->buf_index = userData->idx; + // Tell io_uring that we are providing the already registered send buffer + // for zero copy. + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = userData->idx; + } + else + { + io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen); + } this->sqeProcessCount++;