Skip to content

Commit

Permalink
Do not create a LibUring instance if not supported by kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
jmillan committed Nov 15, 2023
1 parent e8f59f4 commit 808c2c1
Showing 6 changed files with 105 additions and 26 deletions.
30 changes: 15 additions & 15 deletions worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
@@ -26,6 +26,15 @@ class DepLibUring

static void ClassInit();
static void ClassDestroy();
static void StartPollingCQEs();
static void StopPollingCQEs();
static bool PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
static bool PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
static void Submit();
static void SetActive();
static bool IsActive();

class LibUring;

@@ -40,24 +49,17 @@ class DepLibUring
void StartPollingCQEs();
void StopPollingCQEs();
bool PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
bool PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
void Submit();
void Enable()
{
this->enabled = true;
}
void SetActive()
{
if (this->enabled)
{
this->active = true;
}
this->active = true;
}
bool IsActive() const
{
return this->enabled && this->active;
return this->active;
}
io_uring* GetRing()
{
@@ -72,10 +74,10 @@ class DepLibUring
this->availableUserDataEntries.push(idx);
}

private:
private:
UserData* GetUserData();

private:
private:
// io_uring instance.
io_uring ring;
// Event file descriptor to watch for completions.
@@ -84,8 +86,6 @@ class DepLibUring
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Whether io_uring is enabled in runtime.
bool enabled{ false };
// Pre-allocated UserData entries.
UserData userDataBuffer[QueueDepth]{};
// Indexes of available UserData entries.
85 changes: 82 additions & 3 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
@@ -75,8 +75,6 @@ void DepLibUring::ClassInit()

MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor);

DepLibUring::liburing = new LibUring();

// clang-format off
struct utsname buffer{};
// clang-format on
@@ -96,7 +94,7 @@ void DepLibUring::ClassInit()
// Enable liburing for kernel versions greather than or equal to 6.
if (kernelMayorLong >= 6)
{
DepLibUring::liburing->Enable();
DepLibUring::liburing = new LibUring();
}
else
{
@@ -111,6 +109,87 @@ void DepLibUring::ClassDestroy()
delete DepLibUring::liburing;
}

void DepLibUring::StartPollingCQEs()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}

DepLibUring::liburing->StartPollingCQEs();
}

void DepLibUring::StopPollingCQEs()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}

DepLibUring::liburing->StopPollingCQEs();
}

bool DepLibUring::PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");

return DepLibUring::liburing->PrepareSend(sockfd, data, len, addr, cb);
}

bool DepLibUring::PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb)
{
MS_TRACE();

MS_ASSERT(DepLibUring::liburing, "DepLibUring::liburing is not set");

return DepLibUring::liburing->PrepareWrite(sockfd, data1, len1, data2, len2, cb);
}

void DepLibUring::Submit()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}

DepLibUring::liburing->Submit();
}

void DepLibUring::SetActive()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return;
}

DepLibUring::liburing->SetActive();
}

bool DepLibUring::IsActive()
{
MS_TRACE();

if (!DepLibUring::liburing)
{
return false;
}

return DepLibUring::liburing->IsActive();
}

/* Instance methods. */
DepLibUring::LibUring::LibUring()
{
MS_TRACE();
4 changes: 2 additions & 2 deletions worker/src/RTC/Router.cpp
Original file line number Diff line number Diff line change
@@ -653,7 +653,7 @@ namespace RTC

#ifdef MS_LIBURING_ENABLED
// Activate liburing usage.
DepLibUring::liburing->SetActive();
DepLibUring::SetActive();
#endif

for (auto* consumer : consumers)
@@ -669,7 +669,7 @@ namespace RTC

#ifdef MS_LIBURING_ENABLED
// Submit all prepared submission entries.
DepLibUring::liburing->Submit();
DepLibUring::Submit();
#endif
}

4 changes: 2 additions & 2 deletions worker/src/Worker.cpp
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel)

#ifdef MS_LIBURING_ENABLED
// Start polling CQEs, which will create a uv_pool_t handle.
DepLibUring::liburing->StartPollingCQEs();
DepLibUring::StartPollingCQEs();
#endif

// Tell the Node process that we are running.
@@ -103,7 +103,7 @@ void Worker::Close()

#ifdef MS_LIBURING_ENABLED
// Stop polling CQEs, which will close the uv_pool_t handle.
DepLibUring::liburing->StopPollingCQEs();
DepLibUring::StopPollingCQEs();
#endif

// Close the Channel.
4 changes: 2 additions & 2 deletions worker/src/handles/TcpConnectionHandle.cpp
Original file line number Diff line number Diff line change
@@ -235,15 +235,15 @@ void TcpConnectionHandle::Write(

#ifdef MS_LIBURING_ENABLED
{
if (!DepLibUring::liburing->IsActive())
if (!DepLibUring::IsActive())
{
goto write_libuv;
}

// Prepare the data to be sent.
// NOTE: If all SQEs are currently in use or no UserData entry is available we'll
// fall back to libuv.
auto prepared = DepLibUring::liburing->PrepareWrite(this->fd, data1, len1, data2, len2, cb);
auto prepared = DepLibUring::PrepareWrite(this->fd, data1, len1, data2, len2, cb);

if (!prepared)
{
4 changes: 2 additions & 2 deletions worker/src/handles/UdpSocketHandle.cpp
Original file line number Diff line number Diff line change
@@ -160,15 +160,15 @@ void UdpSocketHandle::Send(

#ifdef MS_LIBURING_ENABLED
{
if (!DepLibUring::liburing->IsActive())
if (!DepLibUring::IsActive())
{
goto send_libuv;
}

// Prepare the data to be sent.
// NOTE: If all SQEs are currently in use or no UserData entry is available we'll
// fall back to libuv.
auto prepared = DepLibUring::liburing->PrepareSend(this->fd, data, len, addr, cb);
auto prepared = DepLibUring::PrepareSend(this->fd, data, len, addr, cb);

if (!prepared)
{

0 comments on commit 808c2c1

Please sign in to comment.