From b85120f97b60f67ccbe4ae89de17f3f7cbfcca8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 5 Mar 2024 20:21:01 +0100 Subject: [PATCH 01/13] Fix usrsctp usage in Rust **WIP** Fixes https://github.com/versatica/mediasoup/issues/1352 ### Details - Basically as described in the ticket. But not everything is done at all. - Also, I'm testing this in Node by using UV async stuff (which doesn't make sense in mediasoup for Node but anyway). ### TODO - None of these changes should take effect when in Node, so we need to pass (or to NOT pass) some `define` only from Rust to enable this in the C++ code. We don't want to deal with UV async stuff when in Node because it's not needed at all, so let's see how to do it. - Missing thread X to initialize usrsctp and run the `Checker` singleton. And many other things. - Crash when a `SctpAssociation` is closed. I think it's because somehow the `onAsync` callback is invoked asynchronously (of course) so when it calls `sctpAssociation->OnUsrSctpSendSctpData()` it happens that such a `SctpAssociation` has already been freed. Not sure how to resolve it. Here the logs: ``` mediasoup:Transport close() +18s mediasoup:Channel request() [method:ROUTER_CLOSE_TRANSPORT] +8s mediasoup:Producer transportClosed() +19s mediasoup:DataProducer transportClosed() +18s mediasoup:DataProducer transportClosed() +0ms mediasoup:Transport close() +1ms mediasoup:Channel request() [method:ROUTER_CLOSE_TRANSPORT] +1ms mediasoup:Consumer transportClosed() +19s mediasoup:DataConsumer transportClosed() +18s mediasoup:DataConsumer transportClosed() +1ms mediasoup:Channel [pid:98040] RTC::SctpAssociation::ResetSctpStream() | SCTP_RESET_STREAMS sent [streamId:1] +1ms mediasoup:Channel request succeeded [method:ROUTER_CLOSE_TRANSPORT, id:39] +0ms DepUsrSCTP::onAsync() | ---------- onAsync!! DepUsrSCTP::onAsync() | ---------- onAsync, sending SCTP data!! mediasoup:Channel Producer Channel ended by the worker process +1ms mediasoup:ERROR:Worker worker process died unexpectedly [pid:98040, code:null, signal:SIGSEGV] +0ms ``` --- worker/include/DepUsrSCTP.hpp | 11 +++ worker/include/RTC/SctpAssociation.hpp | 11 ++- worker/src/DepUsrSCTP.cpp | 107 +++++++++++++++++++++++-- worker/src/RTC/SctpAssociation.cpp | 21 ++++- 4 files changed, 138 insertions(+), 12 deletions(-) diff --git a/worker/include/DepUsrSCTP.hpp b/worker/include/DepUsrSCTP.hpp index 122cb63c47..450ddeefe6 100644 --- a/worker/include/DepUsrSCTP.hpp +++ b/worker/include/DepUsrSCTP.hpp @@ -8,6 +8,14 @@ class DepUsrSCTP { +public: + struct SendSctpDataStore + { + RTC::SctpAssociation* sctpAssociation; + uint8_t* data; + size_t len; + }; + private: class Checker : public TimerHandle::Listener { @@ -37,12 +45,15 @@ class DepUsrSCTP static void RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation); static void DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation); static RTC::SctpAssociation* RetrieveSctpAssociation(uintptr_t id); + static void SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* data, size_t len); + static SendSctpDataStore* GetSendSctpDataStore(uv_async_t* handle); private: thread_local static Checker* checker; static uint64_t numSctpAssociations; static uintptr_t nextSctpAssociationId; static absl::flat_hash_map mapIdSctpAssociation; + static absl::flat_hash_map mapAsyncHandlerSendSctpData; }; #endif diff --git a/worker/include/RTC/SctpAssociation.hpp b/worker/include/RTC/SctpAssociation.hpp index e2f3fe7922..d78d6a12af 100644 --- a/worker/include/RTC/SctpAssociation.hpp +++ b/worker/include/RTC/SctpAssociation.hpp @@ -6,6 +6,7 @@ #include "RTC/DataConsumer.hpp" #include "RTC/DataProducer.hpp" #include +#include namespace RTC { @@ -80,7 +81,11 @@ namespace RTC public: flatbuffers::Offset FillBuffer( flatbuffers::FlatBufferBuilder& builder) const; - void TransportConnected(); + uv_async_t* GetAsyncHandle() const + { + return this->uvAsyncHandle; + } + void InitializeSyncHandle(uv_async_cb callback); SctpState GetState() const { return this->state; @@ -89,6 +94,7 @@ namespace RTC { return this->sctpBufferedAmount; } + void TransportConnected(); void ProcessSctpData(const uint8_t* data, size_t len) const; void SendSctpMessage( RTC::DataConsumer* dataConsumer, @@ -106,7 +112,7 @@ namespace RTC /* Callbacks fired by usrsctp events. */ public: - void OnUsrSctpSendSctpData(void* buffer, size_t len); + void OnUsrSctpSendSctpData(uint8_t* data, size_t len); void OnUsrSctpReceiveSctpData( uint16_t streamId, uint16_t ssn, uint32_t ppid, int flags, const uint8_t* data, size_t len); void OnUsrSctpReceiveSctpNotification(union sctp_notification* notification, size_t len); @@ -125,6 +131,7 @@ namespace RTC size_t sctpBufferedAmount{ 0u }; bool isDataChannel{ false }; // Allocated by this. + uv_async_t* uvAsyncHandle{ nullptr }; uint8_t* messageBuffer{ nullptr }; // Others. SctpState state{ SctpState::NEW }; diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 700bac3348..8b6f78d611 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -1,5 +1,5 @@ #define MS_CLASS "DepUsrSCTP" -// #define MS_LOG_DEV_LEVEL 3 +#define MS_LOG_DEV_LEVEL 3 #include "DepUsrSCTP.hpp" #ifdef MS_LIBURING_SUPPORTED @@ -8,7 +8,8 @@ #include "DepLibUV.hpp" #include "Logger.hpp" #include -#include // std::vsnprintf() +#include // std::vsnprintf() +#include // std::memcpy() #include /* Static. */ @@ -17,10 +18,40 @@ static constexpr size_t CheckerInterval{ 10u }; // In ms. static std::mutex GlobalSyncMutex; static size_t GlobalInstances{ 0u }; +/* Static methods for UV callbacks. */ + +inline static void onAsync(uv_async_t* handle) +{ + MS_TRACE(); + MS_DUMP("---------- onAsync!!"); + + const std::lock_guard lock(GlobalSyncMutex); + + // Get the sending data from the map. + auto* store = DepUsrSCTP::GetSendSctpDataStore(handle); + + if (!store) + { + MS_WARN_DEV("store not found"); + + return; + } + + auto* sctpAssociation = store->sctpAssociation; + auto* data = store->data; + auto len = store->len; + + MS_DUMP("---------- onAsync, sending SCTP data!!"); + + sctpAssociation->OnUsrSctpSendSctpData(data, len); +} + /* Static methods for usrsctp global callbacks. */ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*tos*/, uint8_t /*setDf*/) { + MS_TRACE(); + auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast(addr)); if (!sctpAssociation) @@ -30,7 +61,7 @@ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*t return -1; } - sctpAssociation->OnUsrSctpSendSctpData(data, len); + DepUsrSCTP::SendSctpData(sctpAssociation, static_cast(data), len); // NOTE: Must not free data, usrsctp lib does it. @@ -60,6 +91,7 @@ thread_local DepUsrSCTP::Checker* DepUsrSCTP::checker{ nullptr }; uint64_t DepUsrSCTP::numSctpAssociations{ 0u }; uintptr_t DepUsrSCTP::nextSctpAssociationId{ 0u }; absl::flat_hash_map DepUsrSCTP::mapIdSctpAssociation; +absl::flat_hash_map DepUsrSCTP::mapAsyncHandlerSendSctpData; /* Static methods. */ @@ -91,6 +123,7 @@ void DepUsrSCTP::ClassDestroy() MS_TRACE(); const std::lock_guard lock(GlobalSyncMutex); + --GlobalInstances; if (GlobalInstances == 0) @@ -101,6 +134,7 @@ void DepUsrSCTP::ClassDestroy() nextSctpAssociationId = 0u; DepUsrSCTP::mapIdSctpAssociation.clear(); + DepUsrSCTP::mapAsyncHandlerSendSctpData.clear(); } } @@ -158,13 +192,20 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); - auto it = DepUsrSCTP::mapIdSctpAssociation.find(sctpAssociation->id); + auto it = DepUsrSCTP::mapIdSctpAssociation.find(sctpAssociation->id); + auto it2 = DepUsrSCTP::mapAsyncHandlerSendSctpData.find(sctpAssociation->GetAsyncHandle()); MS_ASSERT( it == DepUsrSCTP::mapIdSctpAssociation.end(), - "the id of the SctpAssociation is already in the map"); + "the id of the SctpAssociation is already in the mapIdSctpAssociation map"); + MS_ASSERT( + it2 == DepUsrSCTP::mapAsyncHandlerSendSctpData.end(), + "the id of the SctpAssociation is already in the mapAsyncHandlerSendSctpData map"); DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; + DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()]; + + sctpAssociation->InitializeSyncHandle(onAsync); if (++DepUsrSCTP::numSctpAssociations == 1u) { @@ -180,9 +221,11 @@ void DepUsrSCTP::DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); - auto found = DepUsrSCTP::mapIdSctpAssociation.erase(sctpAssociation->id); + auto found1 = DepUsrSCTP::mapIdSctpAssociation.erase(sctpAssociation->id); + auto found2 = DepUsrSCTP::mapAsyncHandlerSendSctpData.erase(sctpAssociation->GetAsyncHandle()); - MS_ASSERT(found > 0, "SctpAssociation not found"); + MS_ASSERT(found1 > 0, "SctpAssociation not found in mapIdSctpAssociation map"); + MS_ASSERT(found2 > 0, "SctpAssociation not found in mapAsyncHandlerSendSctpData map"); MS_ASSERT(DepUsrSCTP::numSctpAssociations > 0u, "numSctpAssociations was not higher than 0"); if (--DepUsrSCTP::numSctpAssociations == 0u) @@ -207,6 +250,56 @@ RTC::SctpAssociation* DepUsrSCTP::RetrieveSctpAssociation(uintptr_t id) return it->second; } +void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* data, size_t len) +{ + MS_TRACE(); + + const std::lock_guard lock(GlobalSyncMutex); + + // Store the sending data into the map. + + auto it = DepUsrSCTP::mapAsyncHandlerSendSctpData.find(sctpAssociation->GetAsyncHandle()); + + MS_ASSERT( + it != DepUsrSCTP::mapAsyncHandlerSendSctpData.end(), + "SctpAssociation not found in mapAsyncHandlerSendSctpData map"); + + SendSctpDataStore& store = it->second; + + // NOTE: In Rust, DepUsrSCTP::SendSctpData() is called from onSendSctpData() + // callback from a different thread and usrsctp immediately frees |data| when + // the callback execution finishes. So we have to mem copy it. + store.sctpAssociation = sctpAssociation; + store.data = new uint8_t[len]; + store.len = len; + + std::memcpy(store.data, data, len); + + // Invoke UV async send. + int err = uv_async_send(sctpAssociation->GetAsyncHandle()); + + if (err != 0) + { + MS_WARN_TAG(sctp, "uv_async_send() failed: %s", uv_strerror(err)); + } +} + +DepUsrSCTP::SendSctpDataStore* DepUsrSCTP::GetSendSctpDataStore(uv_async_t* handle) +{ + MS_TRACE(); + + auto it = DepUsrSCTP::mapAsyncHandlerSendSctpData.find(handle); + + if (it == DepUsrSCTP::mapAsyncHandlerSendSctpData.end()) + { + return nullptr; + } + + SendSctpDataStore& store = it->second; + + return std::addressof(store); +} + /* DepUsrSCTP::Checker instance methods. */ DepUsrSCTP::Checker::Checker() : timer(new TimerHandle(this)) diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index 499ec2a053..f64edfcf05 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -2,6 +2,7 @@ // #define MS_LOG_DEV_LEVEL 3 #include "RTC/SctpAssociation.hpp" +#include "DepLibUV.hpp" #include "DepUsrSCTP.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" @@ -121,6 +122,9 @@ namespace RTC { MS_TRACE(); + // Create a uv_async_t handle. + this->uvAsyncHandle = new uv_async_t; + // Register ourselves in usrsctp. // NOTE: This must be done before calling usrsctp_bind(). usrsctp_register_address(reinterpret_cast(this->id)); @@ -293,6 +297,7 @@ namespace RTC // Register the SctpAssociation from the global map. DepUsrSCTP::DeregisterSctpAssociation(this); + delete this->uvAsyncHandle; delete[] this->messageBuffer; } @@ -381,6 +386,18 @@ namespace RTC this->isDataChannel); } + void SctpAssociation::InitializeSyncHandle(uv_async_cb callback) + { + MS_TRACE(); + + int err = uv_async_init(DepLibUV::GetLoop(), this->uvAsyncHandle, callback); + + if (err != 0) + { + MS_ABORT("uv_async_init() failed: %s", uv_strerror(err)); + } + } + void SctpAssociation::ProcessSctpData(const uint8_t* data, size_t len) const { MS_TRACE(); @@ -667,12 +684,10 @@ namespace RTC } } - void SctpAssociation::OnUsrSctpSendSctpData(void* buffer, size_t len) + void SctpAssociation::OnUsrSctpSendSctpData(uint8_t* data, size_t len) { MS_TRACE(); - const uint8_t* data = static_cast(buffer); - #if MS_LOG_DEV_LEVEL == 3 MS_DUMP_DATA(data, len); #endif From 6d19087d642c570b42f2d634bec910713108a9d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 5 Mar 2024 21:14:20 +0100 Subject: [PATCH 02/13] debugging logs --- worker/src/DepUsrSCTP.cpp | 16 ++++++++++++---- worker/src/RTC/SctpAssociation.cpp | 1 + 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 8b6f78d611..e0643a65d7 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -23,7 +23,7 @@ static size_t GlobalInstances{ 0u }; inline static void onAsync(uv_async_t* handle) { MS_TRACE(); - MS_DUMP("---------- onAsync!!"); + MS_DUMP_STD("---------- onAsync!!"); const std::lock_guard lock(GlobalSyncMutex); @@ -41,7 +41,7 @@ inline static void onAsync(uv_async_t* handle) auto* data = store->data; auto len = store->len; - MS_DUMP("---------- onAsync, sending SCTP data!!"); + MS_DUMP_STD("---------- onAsync, sending SCTP data!!"); sctpAssociation->OnUsrSctpSendSctpData(data, len); } @@ -99,6 +99,7 @@ void DepUsrSCTP::ClassInit() { MS_TRACE(); + MS_DUMP_STD("---------- DepUsrSCTP::ClassInit()"); MS_DEBUG_TAG(info, "usrsctp"); const std::lock_guard lock(GlobalSyncMutex); @@ -122,6 +123,8 @@ void DepUsrSCTP::ClassDestroy() { MS_TRACE(); + MS_DUMP_STD("---------- DepUsrSCTP::ClassDestroy()"); + const std::lock_guard lock(GlobalSyncMutex); --GlobalInstances; @@ -188,6 +191,8 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) { MS_TRACE(); + MS_DUMP_STD("------ DepUsrSCTP::RegisterSctpAssociation()"); + const std::lock_guard lock(GlobalSyncMutex); MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); @@ -203,7 +208,7 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) "the id of the SctpAssociation is already in the mapAsyncHandlerSendSctpData map"); DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; - DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()]; + DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()] = { sctpAssociation }; sctpAssociation->InitializeSyncHandle(onAsync); @@ -217,6 +222,8 @@ void DepUsrSCTP::DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation { MS_TRACE(); + MS_DUMP_STD("------ DepUsrSCTP::DeregisterSctpAssociation() !!!!!!!!"); + const std::lock_guard lock(GlobalSyncMutex); MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); @@ -269,7 +276,8 @@ void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* da // NOTE: In Rust, DepUsrSCTP::SendSctpData() is called from onSendSctpData() // callback from a different thread and usrsctp immediately frees |data| when // the callback execution finishes. So we have to mem copy it. - store.sctpAssociation = sctpAssociation; + // TODO: This must be freed, but I'd prefer if we used a static thread_local + // buffer, but I don't know max size of this (if any). store.data = new uint8_t[len]; store.len = len; diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index f64edfcf05..37f909db88 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -287,6 +287,7 @@ namespace RTC SctpAssociation::~SctpAssociation() { MS_TRACE(); + MS_DUMP_STD("----------------- SctpAssociation destructor!!!"); usrsctp_set_ulpinfo(this->socket, nullptr); usrsctp_close(this->socket); From 711790eef245acc5cc68f6c51acd6756b0194d46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 5 Mar 2024 21:21:53 +0100 Subject: [PATCH 03/13] Delete mem copied data --- worker/src/DepUsrSCTP.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index e0643a65d7..32d1e6e6a3 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -44,6 +44,9 @@ inline static void onAsync(uv_async_t* handle) MS_DUMP_STD("---------- onAsync, sending SCTP data!!"); sctpAssociation->OnUsrSctpSendSctpData(data, len); + + // Must delete the mem copied data once sent. + delete[] store->data; } /* Static methods for usrsctp global callbacks. */ From 06908d888cbca125745542d339e5d09aaa02f7b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 5 Mar 2024 21:25:11 +0100 Subject: [PATCH 04/13] more logs --- worker/src/DepUsrSCTP.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 32d1e6e6a3..92b6bdb710 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -55,6 +55,8 @@ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*t { MS_TRACE(); + MS_DUMP_STD("---------- onSendSctpData!!"); + auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast(addr)); if (!sctpAssociation) From a62fe3c9a090ebd14e04d8ccaa9260932153bf31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Tue, 5 Mar 2024 21:29:06 +0100 Subject: [PATCH 05/13] debugging --- worker/src/RTC/SctpAssociation.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index 37f909db88..a505e4fca0 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -298,7 +298,9 @@ namespace RTC // Register the SctpAssociation from the global map. DepUsrSCTP::DeregisterSctpAssociation(this); - delete this->uvAsyncHandle; + MS_DUMP_STD("---- HERE we should delete the handler but if so the app crashes"); + // TODO: We should delete teh async handle, but then a crash happens. + // delete this->uvAsyncHandle; delete[] this->messageBuffer; } From 1c2b23630767f4f8f56065ffa3233952ec3342a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 6 Mar 2024 10:12:31 +0100 Subject: [PATCH 06/13] properly close the uv_async_t handle --- worker/src/DepUsrSCTP.cpp | 18 +++--------------- worker/src/RTC/SctpAssociation.cpp | 14 ++++++++++---- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 92b6bdb710..2c15ed2ee8 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -23,7 +23,6 @@ static size_t GlobalInstances{ 0u }; inline static void onAsync(uv_async_t* handle) { MS_TRACE(); - MS_DUMP_STD("---------- onAsync!!"); const std::lock_guard lock(GlobalSyncMutex); @@ -41,8 +40,6 @@ inline static void onAsync(uv_async_t* handle) auto* data = store->data; auto len = store->len; - MS_DUMP_STD("---------- onAsync, sending SCTP data!!"); - sctpAssociation->OnUsrSctpSendSctpData(data, len); // Must delete the mem copied data once sent. @@ -55,8 +52,6 @@ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*t { MS_TRACE(); - MS_DUMP_STD("---------- onSendSctpData!!"); - auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast(addr)); if (!sctpAssociation) @@ -104,7 +99,6 @@ void DepUsrSCTP::ClassInit() { MS_TRACE(); - MS_DUMP_STD("---------- DepUsrSCTP::ClassInit()"); MS_DEBUG_TAG(info, "usrsctp"); const std::lock_guard lock(GlobalSyncMutex); @@ -128,8 +122,6 @@ void DepUsrSCTP::ClassDestroy() { MS_TRACE(); - MS_DUMP_STD("---------- DepUsrSCTP::ClassDestroy()"); - const std::lock_guard lock(GlobalSyncMutex); --GlobalInstances; @@ -196,8 +188,6 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) { MS_TRACE(); - MS_DUMP_STD("------ DepUsrSCTP::RegisterSctpAssociation()"); - const std::lock_guard lock(GlobalSyncMutex); MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); @@ -212,7 +202,7 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) it2 == DepUsrSCTP::mapAsyncHandlerSendSctpData.end(), "the id of the SctpAssociation is already in the mapAsyncHandlerSendSctpData map"); - DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; + DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()] = { sctpAssociation }; sctpAssociation->InitializeSyncHandle(onAsync); @@ -227,8 +217,6 @@ void DepUsrSCTP::DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation { MS_TRACE(); - MS_DUMP_STD("------ DepUsrSCTP::DeregisterSctpAssociation() !!!!!!!!"); - const std::lock_guard lock(GlobalSyncMutex); MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); @@ -283,8 +271,8 @@ void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* da // the callback execution finishes. So we have to mem copy it. // TODO: This must be freed, but I'd prefer if we used a static thread_local // buffer, but I don't know max size of this (if any). - store.data = new uint8_t[len]; - store.len = len; + store.data = new uint8_t[len]; + store.len = len; std::memcpy(store.data, data, len); diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index a505e4fca0..168311aec1 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -30,6 +30,13 @@ const uint16_t EventTypes[] = }; /* clang-format on */ +/* Static methods for UV callbacks. */ + +inline static void onCloseAsync(uv_handle_t* handle) +{ + delete reinterpret_cast(handle); +} + /* Static methods for usrsctp callbacks. */ inline static int onRecvSctpData( @@ -287,7 +294,6 @@ namespace RTC SctpAssociation::~SctpAssociation() { MS_TRACE(); - MS_DUMP_STD("----------------- SctpAssociation destructor!!!"); usrsctp_set_ulpinfo(this->socket, nullptr); usrsctp_close(this->socket); @@ -298,9 +304,9 @@ namespace RTC // Register the SctpAssociation from the global map. DepUsrSCTP::DeregisterSctpAssociation(this); - MS_DUMP_STD("---- HERE we should delete the handler but if so the app crashes"); - // TODO: We should delete teh async handle, but then a crash happens. - // delete this->uvAsyncHandle; + uv_close( + reinterpret_cast(this->uvAsyncHandle), static_cast(onCloseAsync)); + delete[] this->messageBuffer; } From 069f78e62c618afa37622e119b4ea0d522fc9da1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 6 Mar 2024 10:43:10 +0100 Subject: [PATCH 07/13] test-node-sctp.ts is failing, add debug --- node/src/test/test-node-sctp.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/node/src/test/test-node-sctp.ts b/node/src/test/test-node-sctp.ts index f5e402f082..641522bd19 100644 --- a/node/src/test/test-node-sctp.ts +++ b/node/src/test/test-node-sctp.ts @@ -113,7 +113,9 @@ afterEach(async () => { test('ordered DataProducer delivers all SCTP messages to the DataConsumer', async () => { const onStream = jest.fn(); - const numMessages = 200; + console.log('TODO: Revert numMessages to 200'); + const numMessages = 3; + // const numMessages = 200; let sentMessageBytes = 0; let recvMessageBytes = 0; let numSentMessages = 0; @@ -140,6 +142,8 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn data.ppid = sctp.PPID.WEBRTC_BINARY; } + console.log('---- sending id %s', id); + ctx.sctpSendStream!.write(data); sentMessageBytes += data.byteLength; @@ -170,6 +174,8 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn // @ts-ignore const ppid = data.ppid; + console.log('---- received id %s', id); + if (id !== numReceivedMessages) { reject( new Error( From 1a25bedd503f2a96f3d745f7f9af203e02688ab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 6 Mar 2024 14:30:10 +0100 Subject: [PATCH 08/13] Use a vector to store sending messages and clean it once messages are sent in async cb --- node/src/test/test-node-sctp.ts | 13 ++- worker/include/DepUsrSCTP.hpp | 53 +++++++++- worker/include/handles/UdpSocketHandle.hpp | 3 +- worker/src/DepUsrSCTP.cpp | 98 ++++++++++++++++--- worker/src/RTC/DataConsumer.cpp | 5 + worker/src/RTC/DataProducer.cpp | 2 + worker/src/RTC/SctpAssociation.cpp | 8 +- worker/src/RTC/Transport.cpp | 6 ++ worker/src/handles/UnixStreamSocketHandle.cpp | 2 +- 9 files changed, 167 insertions(+), 23 deletions(-) diff --git a/node/src/test/test-node-sctp.ts b/node/src/test/test-node-sctp.ts index 641522bd19..1c2dd2bd93 100644 --- a/node/src/test/test-node-sctp.ts +++ b/node/src/test/test-node-sctp.ts @@ -3,6 +3,11 @@ import * as dgram from 'node:dgram'; import * as sctp from 'sctp'; import * as mediasoup from '../'; +// Trick to avoid that Jest overrides console. +console.log('TOOD: REMOVE console hack'); +import { log } from 'node:console'; +console.log = log; + type TestContext = { worker?: mediasoup.types.Worker; router?: mediasoup.types.Router; @@ -114,7 +119,7 @@ afterEach(async () => { test('ordered DataProducer delivers all SCTP messages to the DataConsumer', async () => { const onStream = jest.fn(); console.log('TODO: Revert numMessages to 200'); - const numMessages = 3; + const numMessages = 2; // const numMessages = 200; let sentMessageBytes = 0; let recvMessageBytes = 0; @@ -142,7 +147,7 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn data.ppid = sctp.PPID.WEBRTC_BINARY; } - console.log('---- sending id %s', id); + console.log('---- test | sending id %s', id); ctx.sctpSendStream!.write(data); sentMessageBytes += data.byteLength; @@ -174,7 +179,7 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn // @ts-ignore const ppid = data.ppid; - console.log('---- received id %s', id); + console.log('---- test | received id %s', id); if (id !== numReceivedMessages) { reject( @@ -196,8 +201,6 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn `ppid in message with id ${id} should be ${sctp.PPID.WEBRTC_BINARY} but it is ${ppid}` ) ); - - return; } }); }); diff --git a/worker/include/DepUsrSCTP.hpp b/worker/include/DepUsrSCTP.hpp index 450ddeefe6..0201750946 100644 --- a/worker/include/DepUsrSCTP.hpp +++ b/worker/include/DepUsrSCTP.hpp @@ -5,15 +5,63 @@ #include "RTC/SctpAssociation.hpp" #include "handles/TimerHandle.hpp" #include +#include class DepUsrSCTP { +private: + /* Struct for storing a pending SCTP message to be sent. */ + struct SendSctpDataItem + { + // NOTE: We keep this struct simple, without explicit allocation + // or deallocation in constructor/destructor, and instead rely on + // the destructor of the main container SendSctpDataStore. + + SendSctpDataItem() noexcept; + + // SendSctpDataItem(uint8_t* data, size_t len) : data(new uint8_t[len]), len(len) + // { + // std::memcpy(this->data, data, len); + // } + // SendSctpDataItem(uint8_t* data, size_t len); + + // Disable copy constructor because of the dynamically allocated data. + // SendSctpDataItem(const SendSctpDataItem&) = delete; + + // ~SendSctpDataItem() + // { + // delete[] this->data; + // } + ~SendSctpDataItem(); + + uint8_t* data{ nullptr }; + size_t len{ 0u }; + + int fooId{ 0 }; + }; + public: + /* Struct for storing pending datas to be sent. */ struct SendSctpDataStore { + explicit SendSctpDataStore(RTC::SctpAssociation* sctpAssociation); + + // Disable copy constructor. + // SendSctpDataStore(const SendSctpDataStore&) = delete; + + // ~SendSctpDataStore() + // { + // this->items.clear(); + // } + ~SendSctpDataStore(); + + void ClearItems(); + RTC::SctpAssociation* sctpAssociation; - uint8_t* data; - size_t len; + + int fooId{ 0 }; + + std::vector items; }; private: @@ -53,6 +101,7 @@ class DepUsrSCTP static uint64_t numSctpAssociations; static uintptr_t nextSctpAssociationId; static absl::flat_hash_map mapIdSctpAssociation; + // Map of SendSctpDataStores indexed by uv_async_t*. static absl::flat_hash_map mapAsyncHandlerSendSctpData; }; diff --git a/worker/include/handles/UdpSocketHandle.hpp b/worker/include/handles/UdpSocketHandle.hpp index 2b7a708105..637354216d 100644 --- a/worker/include/handles/UdpSocketHandle.hpp +++ b/worker/include/handles/UdpSocketHandle.hpp @@ -18,7 +18,8 @@ class UdpSocketHandle { } - // Disable copy constructor because of the dynamically allocated data (store). + // Disable copy constructor because of the dynamically allocated data + // (store). UvSendData(const UvSendData&) = delete; ~UvSendData() diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 2c15ed2ee8..1efb47e8ce 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -1,4 +1,5 @@ #define MS_CLASS "DepUsrSCTP" +// TODO: Comment #define MS_LOG_DEV_LEVEL 3 #include "DepUsrSCTP.hpp" @@ -18,6 +19,10 @@ static constexpr size_t CheckerInterval{ 10u }; // In ms. static std::mutex GlobalSyncMutex; static size_t GlobalInstances{ 0u }; +// TODO: REMOVE +static int FOO_STORE_ID{ 0u }; +static int FOO_ITEM_ID{ 0u }; + /* Static methods for UV callbacks. */ inline static void onAsync(uv_async_t* handle) @@ -26,6 +31,8 @@ inline static void onAsync(uv_async_t* handle) const std::lock_guard lock(GlobalSyncMutex); + MS_DUMP("**** onAsync() called"); + // Get the sending data from the map. auto* store = DepUsrSCTP::GetSendSctpDataStore(handle); @@ -37,13 +44,20 @@ inline static void onAsync(uv_async_t* handle) } auto* sctpAssociation = store->sctpAssociation; - auto* data = store->data; - auto len = store->len; + auto& items = store->items; + + MS_DUMP("------------- sending pening messages [items.size:%zu]", items.size()); + + for (auto& item : items) + { + auto* data = item.data; + auto len = item.len; - sctpAssociation->OnUsrSctpSendSctpData(data, len); + sctpAssociation->OnUsrSctpSendSctpData(data, len); + } - // Must delete the mem copied data once sent. - delete[] store->data; + // Must clear send data items once they have been sent. + store->ClearItems(); } /* Static methods for usrsctp global callbacks. */ @@ -52,6 +66,8 @@ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*t { MS_TRACE(); + MS_DUMP("**** onSendSctpData() called"); + auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast(addr)); if (!sctpAssociation) @@ -120,6 +136,7 @@ void DepUsrSCTP::ClassInit() void DepUsrSCTP::ClassDestroy() { + MS_DUMP("-------- ClassDestroy()"); MS_TRACE(); const std::lock_guard lock(GlobalSyncMutex); @@ -134,6 +151,7 @@ void DepUsrSCTP::ClassDestroy() nextSctpAssociationId = 0u; DepUsrSCTP::mapIdSctpAssociation.clear(); + MS_DUMP("-------- ClassDestroy() calling mapAsyncHandlerSendSctpData.clear()"); DepUsrSCTP::mapAsyncHandlerSendSctpData.clear(); } } @@ -202,8 +220,10 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) it2 == DepUsrSCTP::mapAsyncHandlerSendSctpData.end(), "the id of the SctpAssociation is already in the mapAsyncHandlerSendSctpData map"); - DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; - DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()] = { sctpAssociation }; + DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; + // DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()] = { sctpAssociation, + // ++FOO_STORE_ID }; + DepUsrSCTP::mapAsyncHandlerSendSctpData.emplace(sctpAssociation->GetAsyncHandle(), sctpAssociation); sctpAssociation->InitializeSyncHandle(onAsync); @@ -264,17 +284,23 @@ void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* da it != DepUsrSCTP::mapAsyncHandlerSendSctpData.end(), "SctpAssociation not found in mapAsyncHandlerSendSctpData map"); - SendSctpDataStore& store = it->second; + auto& store = it->second; + MS_DUMP("-------- store.items.emplace_back()... [items.size:%zu]", store.items.size()); // NOTE: In Rust, DepUsrSCTP::SendSctpData() is called from onSendSctpData() // callback from a different thread and usrsctp immediately frees |data| when // the callback execution finishes. So we have to mem copy it. - // TODO: This must be freed, but I'd prefer if we used a static thread_local - // buffer, but I don't know max size of this (if any). - store.data = new uint8_t[len]; - store.len = len; + auto& item = store.items.emplace_back(); - std::memcpy(store.data, data, len); + item.fooId = ++FOO_ITEM_ID; + item.data = new uint8_t[len]; + item.len = len; + std::memcpy(item.data, data, len); + + MS_DUMP( + "-------- store.items.emplace_back() DONE [item.fooId:%d, items.size:%zu]", + item.fooId, + store.items.size()); // Invoke UV async send. int err = uv_async_send(sctpAssociation->GetAsyncHandle()); @@ -362,3 +388,49 @@ void DepUsrSCTP::Checker::OnTimer(TimerHandle* /*timer*/) this->lastCalledAtMs = nowMs; } + +// DepUsrSCTP::SendSctpDataItem::SendSctpDataItem(uint8_t* data, size_t len) +// : data(new uint8_t[len]), len(len) +// { +// this->fooId = ++FOO_ITEM_ID; +// MS_DUMP("---------- item constructor [fooId:%d, data:%p]", this->fooId, this->data); +// std::memcpy(this->data, data, len); +// } + +DepUsrSCTP::SendSctpDataItem::SendSctpDataItem() noexcept +{ + MS_DUMP("---------- item constructor"); +} + +DepUsrSCTP::SendSctpDataItem::~SendSctpDataItem() +{ + MS_DUMP("---------- item destructor [fooId:%d, data:%p]", this->fooId, this->data); + // delete[] this->data; +} + +DepUsrSCTP::SendSctpDataStore::SendSctpDataStore(RTC::SctpAssociation* sctpAssociation) + : sctpAssociation(sctpAssociation) +{ + this->fooId = ++FOO_STORE_ID; + + MS_DUMP("---------- store constructor [fooId:%d]", this->fooId); +} + +DepUsrSCTP::SendSctpDataStore::~SendSctpDataStore() +{ + MS_DUMP("---------- store destructor [fooId:%d, items.size():%zu]", this->fooId, this->items.size()); + + ClearItems(); +} + +void DepUsrSCTP::SendSctpDataStore::ClearItems() +{ + MS_DUMP( + "---------- store ClearItems() [fooId:%d, items.size():%zu]", this->fooId, this->items.size()); + + for (auto& item : this->items) + { + delete[] item.data; + } + this->items.clear(); +} diff --git a/worker/src/RTC/DataConsumer.cpp b/worker/src/RTC/DataConsumer.cpp index 1aaf97df12..bfe663ebbd 100644 --- a/worker/src/RTC/DataConsumer.cpp +++ b/worker/src/RTC/DataConsumer.cpp @@ -525,6 +525,11 @@ namespace RTC { MS_TRACE(); + MS_DUMP( + "------ [ppid:%" PRIu32 ", msg:'%s']", + ppid, + std::string(reinterpret_cast(msg), len).c_str()); + if (!IsActive()) { return; diff --git a/worker/src/RTC/DataProducer.cpp b/worker/src/RTC/DataProducer.cpp index eebd66ebec..f741becf54 100644 --- a/worker/src/RTC/DataProducer.cpp +++ b/worker/src/RTC/DataProducer.cpp @@ -262,6 +262,8 @@ namespace RTC return; } + MS_DUMP("------ [msg:'%s']", std::string(reinterpret_cast(msg), len).c_str()); + this->listener->OnDataProducerMessageReceived( this, msg, len, ppid, subchannels, requiredSubchannel); } diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index 168311aec1..f6e5478e30 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -423,6 +423,11 @@ namespace RTC { MS_TRACE(); + MS_DUMP( + "------ [ppid:%" PRIu32 ", msg:'%s']", + ppid, + std::string(reinterpret_cast(msg), len).c_str()); + // This must be controlled by the DataConsumer. MS_ASSERT( len <= this->maxSctpMessageSize, @@ -753,7 +758,8 @@ namespace RTC this->listener->OnSctpAssociationMessageReceived(this, streamId, data, len, ppid); } - // If end of message and there is buffered data, append data and notify buffer. + // If end of message and there is buffered data, append data and notify + // buffer. else if (eor && this->messageBufferLen != 0) { std::memcpy(this->messageBuffer + this->messageBufferLen, data, len); diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index ea5426970f..49f09ee967 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -2880,6 +2880,12 @@ namespace RTC { MS_TRACE(); + MS_DUMP( + "------ [streamId:%" PRIu16 ", ppid:%" PRIu32 ", msg:'%s']", + streamId, + ppid, + std::string(reinterpret_cast(msg), len).c_str()); + RTC::DataProducer* dataProducer = this->sctpListener.GetDataProducer(streamId); if (!dataProducer) diff --git a/worker/src/handles/UnixStreamSocketHandle.cpp b/worker/src/handles/UnixStreamSocketHandle.cpp index bf024d1d8e..6dbae0c6a2 100644 --- a/worker/src/handles/UnixStreamSocketHandle.cpp +++ b/worker/src/handles/UnixStreamSocketHandle.cpp @@ -216,7 +216,7 @@ void UnixStreamSocketHandle::Write(const uint8_t* data, size_t len) // Any other error. else if (written < 0) { - MS_ERROR_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written)); + MS_WARN_DEV_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written)); // Set written to 0 so pendingLen can be properly calculated. written = 0; From d80367e756966549a3bb86e04a76a1c355552ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 6 Mar 2024 14:33:19 +0100 Subject: [PATCH 09/13] remove debugging changes in test file --- node/src/test/test-node-sctp.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/node/src/test/test-node-sctp.ts b/node/src/test/test-node-sctp.ts index 1c2dd2bd93..323b5a310f 100644 --- a/node/src/test/test-node-sctp.ts +++ b/node/src/test/test-node-sctp.ts @@ -3,11 +3,6 @@ import * as dgram from 'node:dgram'; import * as sctp from 'sctp'; import * as mediasoup from '../'; -// Trick to avoid that Jest overrides console. -console.log('TOOD: REMOVE console hack'); -import { log } from 'node:console'; -console.log = log; - type TestContext = { worker?: mediasoup.types.Worker; router?: mediasoup.types.Router; @@ -118,9 +113,7 @@ afterEach(async () => { test('ordered DataProducer delivers all SCTP messages to the DataConsumer', async () => { const onStream = jest.fn(); - console.log('TODO: Revert numMessages to 200'); - const numMessages = 2; - // const numMessages = 200; + const numMessages = 200; let sentMessageBytes = 0; let recvMessageBytes = 0; let numSentMessages = 0; @@ -147,8 +140,6 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn data.ppid = sctp.PPID.WEBRTC_BINARY; } - console.log('---- test | sending id %s', id); - ctx.sctpSendStream!.write(data); sentMessageBytes += data.byteLength; @@ -179,8 +170,6 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn // @ts-ignore const ppid = data.ppid; - console.log('---- test | received id %s', id); - if (id !== numReceivedMessages) { reject( new Error( From 7b119b2a202d548df76981cf6520a44b7d18c06e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 6 Mar 2024 14:35:51 +0100 Subject: [PATCH 10/13] cosmetic --- worker/include/DepUsrSCTP.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/include/DepUsrSCTP.hpp b/worker/include/DepUsrSCTP.hpp index 0201750946..6684f6102a 100644 --- a/worker/include/DepUsrSCTP.hpp +++ b/worker/include/DepUsrSCTP.hpp @@ -13,8 +13,8 @@ class DepUsrSCTP /* Struct for storing a pending SCTP message to be sent. */ struct SendSctpDataItem { - // NOTE: We keep this struct simple, without explicit allocation - // or deallocation in constructor/destructor, and instead rely on + // NOTE: We keep this struct simple, without explicit allocation or + // deallocation of members in constructor/destructor, and instead rely on // the destructor of the main container SendSctpDataStore. SendSctpDataItem() noexcept; From ca5f222e914cde25622de368ca927a9b3dbaf1a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 6 Mar 2024 19:28:47 +0100 Subject: [PATCH 11/13] Add logs to file (because otherwise it's impossible to see them once the Worker is closed) and more things Also make checker static rather than thread_local static. Remove many debugging logs and stuff. TODO: Rust tests do not finish. --- worker/include/DepUsrSCTP.hpp | 62 +++++------- worker/src/DepLibUV.cpp | 8 ++ worker/src/DepUsrSCTP.cpp | 151 +++++++++++++---------------- worker/src/RTC/DataConsumer.cpp | 5 - worker/src/RTC/DataProducer.cpp | 2 - worker/src/RTC/SctpAssociation.cpp | 5 - worker/src/RTC/Transport.cpp | 6 -- worker/src/Worker.cpp | 7 -- worker/src/handles/TimerHandle.cpp | 13 +++ worker/src/lib.cpp | 10 ++ 10 files changed, 121 insertions(+), 148 deletions(-) diff --git a/worker/include/DepUsrSCTP.hpp b/worker/include/DepUsrSCTP.hpp index 6684f6102a..c5ceb119b5 100644 --- a/worker/include/DepUsrSCTP.hpp +++ b/worker/include/DepUsrSCTP.hpp @@ -4,6 +4,7 @@ #include "common.hpp" #include "RTC/SctpAssociation.hpp" #include "handles/TimerHandle.hpp" +#include #include #include @@ -17,50 +18,33 @@ class DepUsrSCTP // deallocation of members in constructor/destructor, and instead rely on // the destructor of the main container SendSctpDataStore. - SendSctpDataItem() noexcept; - - // SendSctpDataItem(uint8_t* data, size_t len) : data(new uint8_t[len]), len(len) - // { - // std::memcpy(this->data, data, len); - // } - // SendSctpDataItem(uint8_t* data, size_t len); - - // Disable copy constructor because of the dynamically allocated data. - // SendSctpDataItem(const SendSctpDataItem&) = delete; - - // ~SendSctpDataItem() - // { - // delete[] this->data; - // } - ~SendSctpDataItem(); - uint8_t* data{ nullptr }; size_t len{ 0u }; - - int fooId{ 0 }; }; public: /* Struct for storing pending datas to be sent. */ struct SendSctpDataStore { - explicit SendSctpDataStore(RTC::SctpAssociation* sctpAssociation); - - // Disable copy constructor. - // SendSctpDataStore(const SendSctpDataStore&) = delete; - - // ~SendSctpDataStore() - // { - // this->items.clear(); - // } - ~SendSctpDataStore(); - - void ClearItems(); - - RTC::SctpAssociation* sctpAssociation; - - int fooId{ 0 }; - + explicit SendSctpDataStore(RTC::SctpAssociation* sctpAssociation) + : sctpAssociation(sctpAssociation) + { + } + ~SendSctpDataStore() + { + ClearItems(); + } + + void ClearItems() + { + for (auto& item : this->items) + { + delete[] item.data; + } + this->items.clear(); + } + + RTC::SctpAssociation* sctpAssociation{ nullptr }; std::vector items; }; @@ -89,6 +73,10 @@ class DepUsrSCTP static void ClassDestroy(); static void CreateChecker(); static void CloseChecker(); + static bool HasChecker() + { + return DepUsrSCTP::checker != nullptr; + } static uintptr_t GetNextSctpAssociationId(); static void RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation); static void DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation); @@ -97,7 +85,7 @@ class DepUsrSCTP static SendSctpDataStore* GetSendSctpDataStore(uv_async_t* handle); private: - thread_local static Checker* checker; + static Checker* checker; static uint64_t numSctpAssociations; static uintptr_t nextSctpAssociationId; static absl::flat_hash_map mapIdSctpAssociation; diff --git a/worker/src/DepLibUV.cpp b/worker/src/DepLibUV.cpp index 12af49ba59..0bc463fd4b 100644 --- a/worker/src/DepLibUV.cpp +++ b/worker/src/DepLibUV.cpp @@ -4,6 +4,9 @@ #include "DepLibUV.hpp" #include "Logger.hpp" +// TODO: REMOVE +#include + /* Static variables. */ thread_local uv_loop_t* DepLibUV::loop{ nullptr }; @@ -17,6 +20,11 @@ inline static void onCloseLoop(uv_handle_t* handle) inline static void onWalk(uv_handle_t* handle, void* /*arg*/) { + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- onWalk\n"; + outfile.flush(); + // Must use MS_ERROR_STD since at this point the Channel is already closed. MS_ERROR_STD( "alive UV handle found (this shouldn't happen) [type:%s, active:%d, closing:%d, has_ref:%d]", diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 1efb47e8ce..1005ba03f9 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -13,25 +13,46 @@ #include // std::memcpy() #include +// TODO: REMOVE +#include + /* Static. */ static constexpr size_t CheckerInterval{ 10u }; // In ms. static std::mutex GlobalSyncMutex; static size_t GlobalInstances{ 0u }; -// TODO: REMOVE -static int FOO_STORE_ID{ 0u }; -static int FOO_ITEM_ID{ 0u }; - /* Static methods for UV callbacks. */ -inline static void onAsync(uv_async_t* handle) +inline static void onAsyncCreateChecker(uv_async_t* handle) +{ + MS_TRACE(); + + const std::lock_guard lock(GlobalSyncMutex); + + MS_DUMP("**** onAsyncCreateChecker() called"); + + DepUsrSCTP::CreateChecker(); +} + +inline static void onAsyncCloseChecker(uv_async_t* handle) +{ + MS_TRACE(); + + const std::lock_guard lock(GlobalSyncMutex); + + MS_DUMP("**** onAsyncCloseChecker() called"); + + DepUsrSCTP::CloseChecker(); +} + +inline static void onAsyncSendSctpData(uv_async_t* handle) { MS_TRACE(); const std::lock_guard lock(GlobalSyncMutex); - MS_DUMP("**** onAsync() called"); + MS_DUMP("**** onAsyncSendSctpData() called"); // Get the sending data from the map. auto* store = DepUsrSCTP::GetSendSctpDataStore(handle); @@ -103,7 +124,7 @@ inline static void sctpDebug(const char* format, ...) /* Static variables. */ -thread_local DepUsrSCTP::Checker* DepUsrSCTP::checker{ nullptr }; +DepUsrSCTP::Checker* DepUsrSCTP::checker{ nullptr }; uint64_t DepUsrSCTP::numSctpAssociations{ 0u }; uintptr_t DepUsrSCTP::nextSctpAssociationId{ 0u }; absl::flat_hash_map DepUsrSCTP::mapIdSctpAssociation; @@ -129,6 +150,9 @@ void DepUsrSCTP::ClassInit() #ifdef SCTP_DEBUG usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); #endif + + // TODO: Create checker (not this way). + DepUsrSCTP::CreateChecker(); } ++GlobalInstances; @@ -136,7 +160,11 @@ void DepUsrSCTP::ClassInit() void DepUsrSCTP::ClassDestroy() { - MS_DUMP("-------- ClassDestroy()"); + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- DepUsrSCTP::ClassDestroy()"; + outfile.flush(); + MS_TRACE(); const std::lock_guard lock(GlobalSyncMutex); @@ -147,31 +175,19 @@ void DepUsrSCTP::ClassDestroy() { usrsctp_finish(); - numSctpAssociations = 0u; - nextSctpAssociationId = 0u; + DepUsrSCTP::numSctpAssociations = 0u; + DepUsrSCTP::nextSctpAssociationId = 0u; DepUsrSCTP::mapIdSctpAssociation.clear(); - MS_DUMP("-------- ClassDestroy() calling mapAsyncHandlerSendSctpData.clear()"); - DepUsrSCTP::mapAsyncHandlerSendSctpData.clear(); - } -} -void DepUsrSCTP::CreateChecker() -{ - MS_TRACE(); + outfile << "---- DepUsrSCTP::ClassDestroy() | calling mapAsyncHandlerSendSctpData.clear()\n"; + outfile.flush(); - MS_ASSERT(DepUsrSCTP::checker == nullptr, "Checker already created"); - - DepUsrSCTP::checker = new DepUsrSCTP::Checker(); -} - -void DepUsrSCTP::CloseChecker() -{ - MS_TRACE(); - - MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); + DepUsrSCTP::mapAsyncHandlerSendSctpData.clear(); - delete DepUsrSCTP::checker; + // TODO: Close checker (not this way). + DepUsrSCTP::CloseChecker(); + } } uintptr_t DepUsrSCTP::GetNextSctpAssociationId() @@ -208,7 +224,7 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) const std::lock_guard lock(GlobalSyncMutex); - MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); + MS_ASSERT(DepUsrSCTP::HasChecker(), "Checker not created"); auto it = DepUsrSCTP::mapIdSctpAssociation.find(sctpAssociation->id); auto it2 = DepUsrSCTP::mapAsyncHandlerSendSctpData.find(sctpAssociation->GetAsyncHandle()); @@ -221,11 +237,9 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) "the id of the SctpAssociation is already in the mapAsyncHandlerSendSctpData map"); DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; - // DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()] = { sctpAssociation, - // ++FOO_STORE_ID }; DepUsrSCTP::mapAsyncHandlerSendSctpData.emplace(sctpAssociation->GetAsyncHandle(), sctpAssociation); - sctpAssociation->InitializeSyncHandle(onAsync); + sctpAssociation->InitializeSyncHandle(onAsyncSendSctpData); if (++DepUsrSCTP::numSctpAssociations == 1u) { @@ -239,7 +253,7 @@ void DepUsrSCTP::DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation const std::lock_guard lock(GlobalSyncMutex); - MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); + MS_ASSERT(DepUsrSCTP::HasChecker(), "Checker not created"); auto found1 = DepUsrSCTP::mapIdSctpAssociation.erase(sctpAssociation->id); auto found2 = DepUsrSCTP::mapAsyncHandlerSendSctpData.erase(sctpAssociation->GetAsyncHandle()); @@ -286,22 +300,15 @@ void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* da auto& store = it->second; - MS_DUMP("-------- store.items.emplace_back()... [items.size:%zu]", store.items.size()); // NOTE: In Rust, DepUsrSCTP::SendSctpData() is called from onSendSctpData() // callback from a different thread and usrsctp immediately frees |data| when // the callback execution finishes. So we have to mem copy it. auto& item = store.items.emplace_back(); - item.fooId = ++FOO_ITEM_ID; item.data = new uint8_t[len]; item.len = len; std::memcpy(item.data, data, len); - MS_DUMP( - "-------- store.items.emplace_back() DONE [item.fooId:%d, items.size:%zu]", - item.fooId, - store.items.size()); - // Invoke UV async send. int err = uv_async_send(sctpAssociation->GetAsyncHandle()); @@ -327,6 +334,24 @@ DepUsrSCTP::SendSctpDataStore* DepUsrSCTP::GetSendSctpDataStore(uv_async_t* hand return std::addressof(store); } +void DepUsrSCTP::CreateChecker() +{ + MS_TRACE(); + + MS_ASSERT(!DepUsrSCTP::HasChecker(), "Checker already created"); + + DepUsrSCTP::checker = new DepUsrSCTP::Checker(); +} + +void DepUsrSCTP::CloseChecker() +{ + MS_TRACE(); + + MS_ASSERT(DepUsrSCTP::HasChecker(), "Checker not created"); + + delete DepUsrSCTP::checker; +} + /* DepUsrSCTP::Checker instance methods. */ DepUsrSCTP::Checker::Checker() : timer(new TimerHandle(this)) @@ -388,49 +413,3 @@ void DepUsrSCTP::Checker::OnTimer(TimerHandle* /*timer*/) this->lastCalledAtMs = nowMs; } - -// DepUsrSCTP::SendSctpDataItem::SendSctpDataItem(uint8_t* data, size_t len) -// : data(new uint8_t[len]), len(len) -// { -// this->fooId = ++FOO_ITEM_ID; -// MS_DUMP("---------- item constructor [fooId:%d, data:%p]", this->fooId, this->data); -// std::memcpy(this->data, data, len); -// } - -DepUsrSCTP::SendSctpDataItem::SendSctpDataItem() noexcept -{ - MS_DUMP("---------- item constructor"); -} - -DepUsrSCTP::SendSctpDataItem::~SendSctpDataItem() -{ - MS_DUMP("---------- item destructor [fooId:%d, data:%p]", this->fooId, this->data); - // delete[] this->data; -} - -DepUsrSCTP::SendSctpDataStore::SendSctpDataStore(RTC::SctpAssociation* sctpAssociation) - : sctpAssociation(sctpAssociation) -{ - this->fooId = ++FOO_STORE_ID; - - MS_DUMP("---------- store constructor [fooId:%d]", this->fooId); -} - -DepUsrSCTP::SendSctpDataStore::~SendSctpDataStore() -{ - MS_DUMP("---------- store destructor [fooId:%d, items.size():%zu]", this->fooId, this->items.size()); - - ClearItems(); -} - -void DepUsrSCTP::SendSctpDataStore::ClearItems() -{ - MS_DUMP( - "---------- store ClearItems() [fooId:%d, items.size():%zu]", this->fooId, this->items.size()); - - for (auto& item : this->items) - { - delete[] item.data; - } - this->items.clear(); -} diff --git a/worker/src/RTC/DataConsumer.cpp b/worker/src/RTC/DataConsumer.cpp index bfe663ebbd..1aaf97df12 100644 --- a/worker/src/RTC/DataConsumer.cpp +++ b/worker/src/RTC/DataConsumer.cpp @@ -525,11 +525,6 @@ namespace RTC { MS_TRACE(); - MS_DUMP( - "------ [ppid:%" PRIu32 ", msg:'%s']", - ppid, - std::string(reinterpret_cast(msg), len).c_str()); - if (!IsActive()) { return; diff --git a/worker/src/RTC/DataProducer.cpp b/worker/src/RTC/DataProducer.cpp index f741becf54..eebd66ebec 100644 --- a/worker/src/RTC/DataProducer.cpp +++ b/worker/src/RTC/DataProducer.cpp @@ -262,8 +262,6 @@ namespace RTC return; } - MS_DUMP("------ [msg:'%s']", std::string(reinterpret_cast(msg), len).c_str()); - this->listener->OnDataProducerMessageReceived( this, msg, len, ppid, subchannels, requiredSubchannel); } diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index f6e5478e30..111f9f1dee 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -423,11 +423,6 @@ namespace RTC { MS_TRACE(); - MS_DUMP( - "------ [ppid:%" PRIu32 ", msg:'%s']", - ppid, - std::string(reinterpret_cast(msg), len).c_str()); - // This must be controlled by the DataConsumer. MS_ASSERT( len <= this->maxSctpMessageSize, diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index 49f09ee967..ea5426970f 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -2880,12 +2880,6 @@ namespace RTC { MS_TRACE(); - MS_DUMP( - "------ [streamId:%" PRIu16 ", ppid:%" PRIu32 ", msg:'%s']", - streamId, - ppid, - std::string(reinterpret_cast(msg), len).c_str()); - RTC::DataProducer* dataProducer = this->sctpListener.GetDataProducer(streamId); if (!dataProducer) diff --git a/worker/src/Worker.cpp b/worker/src/Worker.cpp index 1bd5468f88..6969e42ecb 100644 --- a/worker/src/Worker.cpp +++ b/worker/src/Worker.cpp @@ -7,7 +7,6 @@ #include "DepLibUring.hpp" #endif #include "DepLibUV.hpp" -#include "DepUsrSCTP.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" #include "Settings.hpp" @@ -40,9 +39,6 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel) } #endif - // Create the Checker instance in DepUsrSCTP. - DepUsrSCTP::CreateChecker(); - #ifdef MS_LIBURING_SUPPORTED // Start polling CQEs, which will create a uv_pool_t handle. DepLibUring::StartPollingCQEs(); @@ -102,9 +98,6 @@ void Worker::Close() // Delete the RTC::Shared singleton. delete this->shared; - // Close the Checker instance in DepUsrSCTP. - DepUsrSCTP::CloseChecker(); - #ifdef MS_LIBURING_SUPPORTED // Stop polling CQEs, which will close the uv_pool_t handle. DepLibUring::StopPollingCQEs(); diff --git a/worker/src/handles/TimerHandle.cpp b/worker/src/handles/TimerHandle.cpp index e40ae8efb1..57022b72c7 100644 --- a/worker/src/handles/TimerHandle.cpp +++ b/worker/src/handles/TimerHandle.cpp @@ -6,6 +6,9 @@ #include "Logger.hpp" #include "MediaSoupErrors.hpp" +// TODO: REMOVE +#include + /* Static methods for UV callbacks. */ inline static void onTimer(uv_timer_t* handle) @@ -15,6 +18,11 @@ inline static void onTimer(uv_timer_t* handle) inline static void onCloseTimer(uv_handle_t* handle) { + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- onCloseTimer\n"; + outfile.flush(); + delete reinterpret_cast(handle); } @@ -24,6 +32,11 @@ TimerHandle::TimerHandle(Listener* listener) : listener(listener), uvHandle(new { MS_TRACE(); + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- Timer created\n"; + outfile.flush(); + this->uvHandle->data = static_cast(this); const int err = uv_timer_init(DepLibUV::GetLoop(), this->uvHandle); diff --git a/worker/src/lib.cpp b/worker/src/lib.cpp index 8387e5f35f..525ff9d6fe 100644 --- a/worker/src/lib.cpp +++ b/worker/src/lib.cpp @@ -23,6 +23,9 @@ #include // sigaction() #include +// TODO: REMOVE +#include + void IgnoreSignals(); // NOLINTNEXTLINE @@ -138,9 +141,16 @@ extern "C" int mediasoup_worker_run( IgnoreSignals(); #endif + MS_DUMP("------------- CREATING WORKER..."); + // Run the Worker. const Worker worker(channel.get()); + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- WORKER CLOSED\n"; + outfile.flush(); + // Free static stuff. DepLibSRTP::ClassDestroy(); Utils::Crypto::ClassDestroy(); From d4c4dd6b9dbba37efd0102ae9a3267d0ebde4ef0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Wed, 6 Mar 2024 19:43:32 +0100 Subject: [PATCH 12/13] make format --- worker/src/DepUsrSCTP.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 1005ba03f9..78e5d1bf35 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -305,8 +305,8 @@ void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* da // the callback execution finishes. So we have to mem copy it. auto& item = store.items.emplace_back(); - item.data = new uint8_t[len]; - item.len = len; + item.data = new uint8_t[len]; + item.len = len; std::memcpy(item.data, data, len); // Invoke UV async send. From 23cab16aa3b896af5dafd4521f5523930ed3aa20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1aki=20Baz=20Castillo?= Date: Thu, 21 Mar 2024 17:23:00 +0100 Subject: [PATCH 13/13] WIP multi thread worker usage --- node/src/test/test-Consumer.ts | 3 +- worker/build.rs | 5 +- worker/include/SCTP/UsrSctpChecker.hpp | 30 +++++++++++ worker/include/Worker.hpp | 8 ++- worker/meson.build | 7 +++ worker/meson_options.txt | 1 + worker/src/SCTP/UsrSctpChecker.cpp | 74 ++++++++++++++++++++++++++ worker/src/Worker.cpp | 15 ++++++ worker/tasks.py | 4 ++ 9 files changed, 144 insertions(+), 3 deletions(-) create mode 100644 worker/include/SCTP/UsrSctpChecker.hpp create mode 100644 worker/src/SCTP/UsrSctpChecker.cpp diff --git a/node/src/test/test-Consumer.ts b/node/src/test/test-Consumer.ts index 829ee29fe1..620d2500aa 100644 --- a/node/src/test/test-Consumer.ts +++ b/node/src/test/test-Consumer.ts @@ -252,7 +252,8 @@ afterEach(async () => { } }); -test('transport.consume() succeeds', async () => { +console.log('REMOVE test.only'); +test.only('transport.consume() succeeds', async () => { const onObserverNewConsumer1 = jest.fn(); ctx.webRtcTransport2!.observer.once('newconsumer', onObserverNewConsumer1); diff --git a/worker/build.rs b/worker/build.rs index 920d185d7b..ac387f84f5 100644 --- a/worker/build.rs +++ b/worker/build.rs @@ -144,8 +144,11 @@ fn main() { .env("PYTHONPATH", &pythonpath) .env("MEDIASOUP_OUT_DIR", &mediasoup_out_dir) .env("MEDIASOUP_BUILDTYPE", build_type) - // Force forward slashes on Windows too, otherwise Meson thinks path is not absolute 🤷 + // Force forward slashes on Windows too, otherwise Meson thinks path is + // not absolute 🤷. .env("MEDIASOUP_INSTALL_DIR", &out_dir.replace('\\', "/")) + // In Rust we want to enable worker multi-thread usage. + .env("MEDIASOUP_ENABLE_MULTITHREAD", "true") .spawn() .expect("Failed to start") .wait() diff --git a/worker/include/SCTP/UsrSctpChecker.hpp b/worker/include/SCTP/UsrSctpChecker.hpp new file mode 100644 index 0000000000..da4d570ac2 --- /dev/null +++ b/worker/include/SCTP/UsrSctpChecker.hpp @@ -0,0 +1,30 @@ +#ifndef MS_SCTP_USRSCTP_CHECKER_HPP +#define MS_SCTP_USRSCTP_CHECKER_HPP + +#include "common.hpp" +#include "handles/TimerHandle.hpp" +#include + +namespace SCTP +{ + class UsrSctpChecker : public TimerHandle::Listener + { + public: + UsrSctpChecker(); + ~UsrSctpChecker() override; + + public: + void Start(); + void Stop(); + + /* Pure virtual methods inherited from TimerHandle::Listener. */ + public: + void OnTimer(TimerHandle* timer) override; + + private: + TimerHandle* timer{ nullptr }; + uint64_t lastCalledAtMs{ 0u }; + }; +} // namespace SCTP + +#endif diff --git a/worker/include/Worker.hpp b/worker/include/Worker.hpp index 24483766b6..9a0b886981 100644 --- a/worker/include/Worker.hpp +++ b/worker/include/Worker.hpp @@ -9,13 +9,15 @@ #include "RTC/Shared.hpp" #include "RTC/WebRtcServer.hpp" #include "handles/SignalHandle.hpp" +#include "handles/TimerHandle.hpp" #include #include #include class Worker : public Channel::ChannelSocket::Listener, public SignalHandle::Listener, - public RTC::Router::Listener + public RTC::Router::Listener, + public TimerHandle::Listener { public: explicit Worker(Channel::ChannelSocket* channel); @@ -49,6 +51,10 @@ class Worker : public Channel::ChannelSocket::Listener, public: RTC::WebRtcServer* OnRouterNeedWebRtcServer(RTC::Router* router, std::string& webRtcServerId) override; + /* Pure virtual methods inherited from TimerHandle::Listener. */ +public: + void OnTimer(TimerHandle* timer) override; + private: // Passed by argument. Channel::ChannelSocket* channel{ nullptr }; diff --git a/worker/meson.build b/worker/meson.build index bc4ff9eb00..39000be951 100644 --- a/worker/meson.build +++ b/worker/meson.build @@ -173,6 +173,7 @@ common_sources = [ 'src/RTC/RTCP/XR.cpp', 'src/RTC/RTCP/XrDelaySinceLastRr.cpp', 'src/RTC/RTCP/XrReceiverReferenceTime.cpp', + 'src/SCTP/UsrSctpChecker.cpp', ] openssl_proj = subproject( @@ -298,6 +299,12 @@ if host_machine.system() == 'linux' and not get_option('ms_disable_liburing') endif endif +if get_option('ms_enable_multithread') + cpp_args += [ + '-DMS_MULTITHREAD_ENABLED', + ] +endif + libmediasoup_worker = library( 'libmediasoup-worker', name_prefix: '', diff --git a/worker/meson_options.txt b/worker/meson_options.txt index 0d1400e97a..636747dea8 100644 --- a/worker/meson_options.txt +++ b/worker/meson_options.txt @@ -2,3 +2,4 @@ option('ms_log_trace', type : 'boolean', value : false, description : 'When set option('ms_log_file_line', type : 'boolean', value : false, description : 'When set to true, all the logging macros print more verbose information, including current file and line') option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each RTP packet') option('ms_disable_liburing', type : 'boolean', value : false, description : 'When set to true, disables liburing integration despite current host supports it') +option('ms_enable_multithread', type : 'boolean', value : false, description : 'When set to true, mediasoup worker is built assuming multi-thread usage') diff --git a/worker/src/SCTP/UsrSctpChecker.cpp b/worker/src/SCTP/UsrSctpChecker.cpp new file mode 100644 index 0000000000..6daa863346 --- /dev/null +++ b/worker/src/SCTP/UsrSctpChecker.cpp @@ -0,0 +1,74 @@ +/** + * NOTE: The UsrSctpChecker singleton must run in its own thread so it cannot + * use Logger.hpp since Logger communicates via UnixSocket with Node or Rust + * and there must be a single thread writing into that socket in worker side. + */ + +#include "SCTP/UsrSctpChecker.hpp" +#include "DepLibUV.hpp" +#include + +namespace SCTP +{ + /* Static. */ + + static constexpr size_t CheckerInterval{ 10u }; // In ms. + + /* UsrSctpChecker instance methods. */ + + UsrSctpChecker::UsrSctpChecker() + { + this->timer = new TimerHandle(this); + + DepLibUV::RunLoop(); + } + + UsrSctpChecker::~UsrSctpChecker() + { + delete this->timer; + } + + void UsrSctpChecker::Start() + { + this->lastCalledAtMs = 0u; + + this->timer->Start(CheckerInterval, CheckerInterval); + } + + void UsrSctpChecker::Stop() + { + this->lastCalledAtMs = 0u; + + this->timer->Stop(); + } + + void UsrSctpChecker::OnTimer(TimerHandle* /*timer*/) + { + auto nowMs = DepLibUV::GetTimeMs(); + const int elapsedMs = this->lastCalledAtMs ? static_cast(nowMs - this->lastCalledAtMs) : 0; + + // TODO: This must run in the worker thread obviously. How3ver this is not easy + // at all. Note that usrsctp_handle_timers() may trigger many calls to the + // usrsctp onSendSctpData() callback, but each of those call may be intended + // for a SctpAssociation in whatever other worker/thread, so we cannot just + // try to group all them together. + // #ifdef MS_LIBURING_SUPPORTED + // Activate liburing usage. + // 'usrsctp_handle_timers()' will synchronously call the send/recv + // callbacks for the pending data. If there are multiple messages to be + // sent over the network then we will send those messages within a single + // system call. + // DepLibUring::SetActive(); + // #endif + + usrsctp_handle_timers(elapsedMs); + + // TODO: This must run in the worker thread obviously. + // #ifdef MS_LIBURING_SUPPORTED + // Submit all prepared submission entries. + // DepLibUring::Submit(); + // #endif + + this->lastCalledAtMs = nowMs; + } +} // namespace SCTP diff --git a/worker/src/Worker.cpp b/worker/src/Worker.cpp index 6969e42ecb..7a993e502a 100644 --- a/worker/src/Worker.cpp +++ b/worker/src/Worker.cpp @@ -14,6 +14,9 @@ #include "FBS/response.h" #include "FBS/worker.h" +// TODO: REMOVE +#include + /* Instance methods. */ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel) @@ -48,6 +51,9 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel) this->shared->channelNotifier->Emit( std::to_string(Logger::Pid), FBS::Notification::Event::WORKER_RUNNING); + auto* timer = new TimerHandle(this); + // timer->Start(1000, 1000); + MS_DEBUG_DEV("starting libuv loop"); DepLibUV::RunLoop(); MS_DEBUG_DEV("libuv loop ended"); @@ -547,3 +553,12 @@ inline RTC::WebRtcServer* Worker::OnRouterNeedWebRtcServer( return webRtcServer; } + +void Worker::OnTimer(TimerHandle* /*timer*/) +{ + MS_DUMP_STD("---- Worker::OnTimer()"); + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- Worker::OnTimer()\n"; + outfile.flush(); +} diff --git a/worker/tasks.py b/worker/tasks.py index 728e7cf962..a14d59f803 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -139,6 +139,10 @@ def setup(ctx, meson_args=MESON_ARGS): """ Run meson setup """ + enable_multithread = os.getenv('MEDIASOUP_ENABLE_MULTITHREAD'); + if enable_multithread: + meson_args += ' -Dms_enable_multithread=true'; + if MEDIASOUP_BUILDTYPE == 'Release': with ctx.cd(f'"{WORKER_DIR}"'): ctx.run(