diff --git a/node/src/test/test-Consumer.ts b/node/src/test/test-Consumer.ts index 829ee29fe1..620d2500aa 100644 --- a/node/src/test/test-Consumer.ts +++ b/node/src/test/test-Consumer.ts @@ -252,7 +252,8 @@ afterEach(async () => { } }); -test('transport.consume() succeeds', async () => { +console.log('REMOVE test.only'); +test.only('transport.consume() succeeds', async () => { const onObserverNewConsumer1 = jest.fn(); ctx.webRtcTransport2!.observer.once('newconsumer', onObserverNewConsumer1); diff --git a/node/src/test/test-node-sctp.ts b/node/src/test/test-node-sctp.ts index f5e402f082..323b5a310f 100644 --- a/node/src/test/test-node-sctp.ts +++ b/node/src/test/test-node-sctp.ts @@ -190,8 +190,6 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn `ppid in message with id ${id} should be ${sctp.PPID.WEBRTC_BINARY} but it is ${ppid}` ) ); - - return; } }); }); diff --git a/worker/build.rs b/worker/build.rs index 920d185d7b..ac387f84f5 100644 --- a/worker/build.rs +++ b/worker/build.rs @@ -144,8 +144,11 @@ fn main() { .env("PYTHONPATH", &pythonpath) .env("MEDIASOUP_OUT_DIR", &mediasoup_out_dir) .env("MEDIASOUP_BUILDTYPE", build_type) - // Force forward slashes on Windows too, otherwise Meson thinks path is not absolute 🤷 + // Force forward slashes on Windows too, otherwise Meson thinks path is + // not absolute 🤷. .env("MEDIASOUP_INSTALL_DIR", &out_dir.replace('\\', "/")) + // In Rust we want to enable worker multi-thread usage. + .env("MEDIASOUP_ENABLE_MULTITHREAD", "true") .spawn() .expect("Failed to start") .wait() diff --git a/worker/include/DepUsrSCTP.hpp b/worker/include/DepUsrSCTP.hpp index 122cb63c47..c5ceb119b5 100644 --- a/worker/include/DepUsrSCTP.hpp +++ b/worker/include/DepUsrSCTP.hpp @@ -4,10 +4,50 @@ #include "common.hpp" #include "RTC/SctpAssociation.hpp" #include "handles/TimerHandle.hpp" +#include #include +#include class DepUsrSCTP { +private: + /* Struct for storing a pending SCTP message to be sent. */ + struct SendSctpDataItem + { + // NOTE: We keep this struct simple, without explicit allocation or + // deallocation of members in constructor/destructor, and instead rely on + // the destructor of the main container SendSctpDataStore. + + uint8_t* data{ nullptr }; + size_t len{ 0u }; + }; + +public: + /* Struct for storing pending datas to be sent. */ + struct SendSctpDataStore + { + explicit SendSctpDataStore(RTC::SctpAssociation* sctpAssociation) + : sctpAssociation(sctpAssociation) + { + } + ~SendSctpDataStore() + { + ClearItems(); + } + + void ClearItems() + { + for (auto& item : this->items) + { + delete[] item.data; + } + this->items.clear(); + } + + RTC::SctpAssociation* sctpAssociation{ nullptr }; + std::vector items; + }; + private: class Checker : public TimerHandle::Listener { @@ -33,16 +73,24 @@ class DepUsrSCTP static void ClassDestroy(); static void CreateChecker(); static void CloseChecker(); + static bool HasChecker() + { + return DepUsrSCTP::checker != nullptr; + } static uintptr_t GetNextSctpAssociationId(); static void RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation); static void DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation); static RTC::SctpAssociation* RetrieveSctpAssociation(uintptr_t id); + static void SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* data, size_t len); + static SendSctpDataStore* GetSendSctpDataStore(uv_async_t* handle); private: - thread_local static Checker* checker; + static Checker* checker; static uint64_t numSctpAssociations; static uintptr_t nextSctpAssociationId; static absl::flat_hash_map mapIdSctpAssociation; + // Map of SendSctpDataStores indexed by uv_async_t*. + static absl::flat_hash_map mapAsyncHandlerSendSctpData; }; #endif diff --git a/worker/include/RTC/SctpAssociation.hpp b/worker/include/RTC/SctpAssociation.hpp index e2f3fe7922..d78d6a12af 100644 --- a/worker/include/RTC/SctpAssociation.hpp +++ b/worker/include/RTC/SctpAssociation.hpp @@ -6,6 +6,7 @@ #include "RTC/DataConsumer.hpp" #include "RTC/DataProducer.hpp" #include +#include namespace RTC { @@ -80,7 +81,11 @@ namespace RTC public: flatbuffers::Offset FillBuffer( flatbuffers::FlatBufferBuilder& builder) const; - void TransportConnected(); + uv_async_t* GetAsyncHandle() const + { + return this->uvAsyncHandle; + } + void InitializeSyncHandle(uv_async_cb callback); SctpState GetState() const { return this->state; @@ -89,6 +94,7 @@ namespace RTC { return this->sctpBufferedAmount; } + void TransportConnected(); void ProcessSctpData(const uint8_t* data, size_t len) const; void SendSctpMessage( RTC::DataConsumer* dataConsumer, @@ -106,7 +112,7 @@ namespace RTC /* Callbacks fired by usrsctp events. */ public: - void OnUsrSctpSendSctpData(void* buffer, size_t len); + void OnUsrSctpSendSctpData(uint8_t* data, size_t len); void OnUsrSctpReceiveSctpData( uint16_t streamId, uint16_t ssn, uint32_t ppid, int flags, const uint8_t* data, size_t len); void OnUsrSctpReceiveSctpNotification(union sctp_notification* notification, size_t len); @@ -125,6 +131,7 @@ namespace RTC size_t sctpBufferedAmount{ 0u }; bool isDataChannel{ false }; // Allocated by this. + uv_async_t* uvAsyncHandle{ nullptr }; uint8_t* messageBuffer{ nullptr }; // Others. SctpState state{ SctpState::NEW }; diff --git a/worker/include/SCTP/UsrSctpChecker.hpp b/worker/include/SCTP/UsrSctpChecker.hpp new file mode 100644 index 0000000000..da4d570ac2 --- /dev/null +++ b/worker/include/SCTP/UsrSctpChecker.hpp @@ -0,0 +1,30 @@ +#ifndef MS_SCTP_USRSCTP_CHECKER_HPP +#define MS_SCTP_USRSCTP_CHECKER_HPP + +#include "common.hpp" +#include "handles/TimerHandle.hpp" +#include + +namespace SCTP +{ + class UsrSctpChecker : public TimerHandle::Listener + { + public: + UsrSctpChecker(); + ~UsrSctpChecker() override; + + public: + void Start(); + void Stop(); + + /* Pure virtual methods inherited from TimerHandle::Listener. */ + public: + void OnTimer(TimerHandle* timer) override; + + private: + TimerHandle* timer{ nullptr }; + uint64_t lastCalledAtMs{ 0u }; + }; +} // namespace SCTP + +#endif diff --git a/worker/include/Worker.hpp b/worker/include/Worker.hpp index 24483766b6..9a0b886981 100644 --- a/worker/include/Worker.hpp +++ b/worker/include/Worker.hpp @@ -9,13 +9,15 @@ #include "RTC/Shared.hpp" #include "RTC/WebRtcServer.hpp" #include "handles/SignalHandle.hpp" +#include "handles/TimerHandle.hpp" #include #include #include class Worker : public Channel::ChannelSocket::Listener, public SignalHandle::Listener, - public RTC::Router::Listener + public RTC::Router::Listener, + public TimerHandle::Listener { public: explicit Worker(Channel::ChannelSocket* channel); @@ -49,6 +51,10 @@ class Worker : public Channel::ChannelSocket::Listener, public: RTC::WebRtcServer* OnRouterNeedWebRtcServer(RTC::Router* router, std::string& webRtcServerId) override; + /* Pure virtual methods inherited from TimerHandle::Listener. */ +public: + void OnTimer(TimerHandle* timer) override; + private: // Passed by argument. Channel::ChannelSocket* channel{ nullptr }; diff --git a/worker/include/handles/UdpSocketHandle.hpp b/worker/include/handles/UdpSocketHandle.hpp index 2b7a708105..637354216d 100644 --- a/worker/include/handles/UdpSocketHandle.hpp +++ b/worker/include/handles/UdpSocketHandle.hpp @@ -18,7 +18,8 @@ class UdpSocketHandle { } - // Disable copy constructor because of the dynamically allocated data (store). + // Disable copy constructor because of the dynamically allocated data + // (store). UvSendData(const UvSendData&) = delete; ~UvSendData() diff --git a/worker/meson.build b/worker/meson.build index bc4ff9eb00..39000be951 100644 --- a/worker/meson.build +++ b/worker/meson.build @@ -173,6 +173,7 @@ common_sources = [ 'src/RTC/RTCP/XR.cpp', 'src/RTC/RTCP/XrDelaySinceLastRr.cpp', 'src/RTC/RTCP/XrReceiverReferenceTime.cpp', + 'src/SCTP/UsrSctpChecker.cpp', ] openssl_proj = subproject( @@ -298,6 +299,12 @@ if host_machine.system() == 'linux' and not get_option('ms_disable_liburing') endif endif +if get_option('ms_enable_multithread') + cpp_args += [ + '-DMS_MULTITHREAD_ENABLED', + ] +endif + libmediasoup_worker = library( 'libmediasoup-worker', name_prefix: '', diff --git a/worker/meson_options.txt b/worker/meson_options.txt index 0d1400e97a..636747dea8 100644 --- a/worker/meson_options.txt +++ b/worker/meson_options.txt @@ -2,3 +2,4 @@ option('ms_log_trace', type : 'boolean', value : false, description : 'When set option('ms_log_file_line', type : 'boolean', value : false, description : 'When set to true, all the logging macros print more verbose information, including current file and line') option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each RTP packet') option('ms_disable_liburing', type : 'boolean', value : false, description : 'When set to true, disables liburing integration despite current host supports it') +option('ms_enable_multithread', type : 'boolean', value : false, description : 'When set to true, mediasoup worker is built assuming multi-thread usage') diff --git a/worker/src/DepLibUV.cpp b/worker/src/DepLibUV.cpp index 12af49ba59..0bc463fd4b 100644 --- a/worker/src/DepLibUV.cpp +++ b/worker/src/DepLibUV.cpp @@ -4,6 +4,9 @@ #include "DepLibUV.hpp" #include "Logger.hpp" +// TODO: REMOVE +#include + /* Static variables. */ thread_local uv_loop_t* DepLibUV::loop{ nullptr }; @@ -17,6 +20,11 @@ inline static void onCloseLoop(uv_handle_t* handle) inline static void onWalk(uv_handle_t* handle, void* /*arg*/) { + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- onWalk\n"; + outfile.flush(); + // Must use MS_ERROR_STD since at this point the Channel is already closed. MS_ERROR_STD( "alive UV handle found (this shouldn't happen) [type:%s, active:%d, closing:%d, has_ref:%d]", diff --git a/worker/src/DepUsrSCTP.cpp b/worker/src/DepUsrSCTP.cpp index 700bac3348..78e5d1bf35 100644 --- a/worker/src/DepUsrSCTP.cpp +++ b/worker/src/DepUsrSCTP.cpp @@ -1,5 +1,6 @@ #define MS_CLASS "DepUsrSCTP" -// #define MS_LOG_DEV_LEVEL 3 +// TODO: Comment +#define MS_LOG_DEV_LEVEL 3 #include "DepUsrSCTP.hpp" #ifdef MS_LIBURING_SUPPORTED @@ -8,19 +9,86 @@ #include "DepLibUV.hpp" #include "Logger.hpp" #include -#include // std::vsnprintf() +#include // std::vsnprintf() +#include // std::memcpy() #include +// TODO: REMOVE +#include + /* Static. */ static constexpr size_t CheckerInterval{ 10u }; // In ms. static std::mutex GlobalSyncMutex; static size_t GlobalInstances{ 0u }; +/* Static methods for UV callbacks. */ + +inline static void onAsyncCreateChecker(uv_async_t* handle) +{ + MS_TRACE(); + + const std::lock_guard lock(GlobalSyncMutex); + + MS_DUMP("**** onAsyncCreateChecker() called"); + + DepUsrSCTP::CreateChecker(); +} + +inline static void onAsyncCloseChecker(uv_async_t* handle) +{ + MS_TRACE(); + + const std::lock_guard lock(GlobalSyncMutex); + + MS_DUMP("**** onAsyncCloseChecker() called"); + + DepUsrSCTP::CloseChecker(); +} + +inline static void onAsyncSendSctpData(uv_async_t* handle) +{ + MS_TRACE(); + + const std::lock_guard lock(GlobalSyncMutex); + + MS_DUMP("**** onAsyncSendSctpData() called"); + + // Get the sending data from the map. + auto* store = DepUsrSCTP::GetSendSctpDataStore(handle); + + if (!store) + { + MS_WARN_DEV("store not found"); + + return; + } + + auto* sctpAssociation = store->sctpAssociation; + auto& items = store->items; + + MS_DUMP("------------- sending pening messages [items.size:%zu]", items.size()); + + for (auto& item : items) + { + auto* data = item.data; + auto len = item.len; + + sctpAssociation->OnUsrSctpSendSctpData(data, len); + } + + // Must clear send data items once they have been sent. + store->ClearItems(); +} + /* Static methods for usrsctp global callbacks. */ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*tos*/, uint8_t /*setDf*/) { + MS_TRACE(); + + MS_DUMP("**** onSendSctpData() called"); + auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast(addr)); if (!sctpAssociation) @@ -30,7 +98,7 @@ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*t return -1; } - sctpAssociation->OnUsrSctpSendSctpData(data, len); + DepUsrSCTP::SendSctpData(sctpAssociation, static_cast(data), len); // NOTE: Must not free data, usrsctp lib does it. @@ -56,10 +124,11 @@ inline static void sctpDebug(const char* format, ...) /* Static variables. */ -thread_local DepUsrSCTP::Checker* DepUsrSCTP::checker{ nullptr }; +DepUsrSCTP::Checker* DepUsrSCTP::checker{ nullptr }; uint64_t DepUsrSCTP::numSctpAssociations{ 0u }; uintptr_t DepUsrSCTP::nextSctpAssociationId{ 0u }; absl::flat_hash_map DepUsrSCTP::mapIdSctpAssociation; +absl::flat_hash_map DepUsrSCTP::mapAsyncHandlerSendSctpData; /* Static methods. */ @@ -81,6 +150,9 @@ void DepUsrSCTP::ClassInit() #ifdef SCTP_DEBUG usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL); #endif + + // TODO: Create checker (not this way). + DepUsrSCTP::CreateChecker(); } ++GlobalInstances; @@ -88,38 +160,34 @@ void DepUsrSCTP::ClassInit() void DepUsrSCTP::ClassDestroy() { + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- DepUsrSCTP::ClassDestroy()"; + outfile.flush(); + MS_TRACE(); const std::lock_guard lock(GlobalSyncMutex); + --GlobalInstances; if (GlobalInstances == 0) { usrsctp_finish(); - numSctpAssociations = 0u; - nextSctpAssociationId = 0u; + DepUsrSCTP::numSctpAssociations = 0u; + DepUsrSCTP::nextSctpAssociationId = 0u; DepUsrSCTP::mapIdSctpAssociation.clear(); - } -} - -void DepUsrSCTP::CreateChecker() -{ - MS_TRACE(); - - MS_ASSERT(DepUsrSCTP::checker == nullptr, "Checker already created"); - - DepUsrSCTP::checker = new DepUsrSCTP::Checker(); -} -void DepUsrSCTP::CloseChecker() -{ - MS_TRACE(); + outfile << "---- DepUsrSCTP::ClassDestroy() | calling mapAsyncHandlerSendSctpData.clear()\n"; + outfile.flush(); - MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); + DepUsrSCTP::mapAsyncHandlerSendSctpData.clear(); - delete DepUsrSCTP::checker; + // TODO: Close checker (not this way). + DepUsrSCTP::CloseChecker(); + } } uintptr_t DepUsrSCTP::GetNextSctpAssociationId() @@ -156,15 +224,22 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation) const std::lock_guard lock(GlobalSyncMutex); - MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); + MS_ASSERT(DepUsrSCTP::HasChecker(), "Checker not created"); - auto it = DepUsrSCTP::mapIdSctpAssociation.find(sctpAssociation->id); + auto it = DepUsrSCTP::mapIdSctpAssociation.find(sctpAssociation->id); + auto it2 = DepUsrSCTP::mapAsyncHandlerSendSctpData.find(sctpAssociation->GetAsyncHandle()); MS_ASSERT( it == DepUsrSCTP::mapIdSctpAssociation.end(), - "the id of the SctpAssociation is already in the map"); + "the id of the SctpAssociation is already in the mapIdSctpAssociation map"); + MS_ASSERT( + it2 == DepUsrSCTP::mapAsyncHandlerSendSctpData.end(), + "the id of the SctpAssociation is already in the mapAsyncHandlerSendSctpData map"); DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation; + DepUsrSCTP::mapAsyncHandlerSendSctpData.emplace(sctpAssociation->GetAsyncHandle(), sctpAssociation); + + sctpAssociation->InitializeSyncHandle(onAsyncSendSctpData); if (++DepUsrSCTP::numSctpAssociations == 1u) { @@ -178,11 +253,13 @@ void DepUsrSCTP::DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation const std::lock_guard lock(GlobalSyncMutex); - MS_ASSERT(DepUsrSCTP::checker != nullptr, "Checker not created"); + MS_ASSERT(DepUsrSCTP::HasChecker(), "Checker not created"); - auto found = DepUsrSCTP::mapIdSctpAssociation.erase(sctpAssociation->id); + auto found1 = DepUsrSCTP::mapIdSctpAssociation.erase(sctpAssociation->id); + auto found2 = DepUsrSCTP::mapAsyncHandlerSendSctpData.erase(sctpAssociation->GetAsyncHandle()); - MS_ASSERT(found > 0, "SctpAssociation not found"); + MS_ASSERT(found1 > 0, "SctpAssociation not found in mapIdSctpAssociation map"); + MS_ASSERT(found2 > 0, "SctpAssociation not found in mapAsyncHandlerSendSctpData map"); MS_ASSERT(DepUsrSCTP::numSctpAssociations > 0u, "numSctpAssociations was not higher than 0"); if (--DepUsrSCTP::numSctpAssociations == 0u) @@ -207,6 +284,74 @@ RTC::SctpAssociation* DepUsrSCTP::RetrieveSctpAssociation(uintptr_t id) return it->second; } +void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* data, size_t len) +{ + MS_TRACE(); + + const std::lock_guard lock(GlobalSyncMutex); + + // Store the sending data into the map. + + auto it = DepUsrSCTP::mapAsyncHandlerSendSctpData.find(sctpAssociation->GetAsyncHandle()); + + MS_ASSERT( + it != DepUsrSCTP::mapAsyncHandlerSendSctpData.end(), + "SctpAssociation not found in mapAsyncHandlerSendSctpData map"); + + auto& store = it->second; + + // NOTE: In Rust, DepUsrSCTP::SendSctpData() is called from onSendSctpData() + // callback from a different thread and usrsctp immediately frees |data| when + // the callback execution finishes. So we have to mem copy it. + auto& item = store.items.emplace_back(); + + item.data = new uint8_t[len]; + item.len = len; + std::memcpy(item.data, data, len); + + // Invoke UV async send. + int err = uv_async_send(sctpAssociation->GetAsyncHandle()); + + if (err != 0) + { + MS_WARN_TAG(sctp, "uv_async_send() failed: %s", uv_strerror(err)); + } +} + +DepUsrSCTP::SendSctpDataStore* DepUsrSCTP::GetSendSctpDataStore(uv_async_t* handle) +{ + MS_TRACE(); + + auto it = DepUsrSCTP::mapAsyncHandlerSendSctpData.find(handle); + + if (it == DepUsrSCTP::mapAsyncHandlerSendSctpData.end()) + { + return nullptr; + } + + SendSctpDataStore& store = it->second; + + return std::addressof(store); +} + +void DepUsrSCTP::CreateChecker() +{ + MS_TRACE(); + + MS_ASSERT(!DepUsrSCTP::HasChecker(), "Checker already created"); + + DepUsrSCTP::checker = new DepUsrSCTP::Checker(); +} + +void DepUsrSCTP::CloseChecker() +{ + MS_TRACE(); + + MS_ASSERT(DepUsrSCTP::HasChecker(), "Checker not created"); + + delete DepUsrSCTP::checker; +} + /* DepUsrSCTP::Checker instance methods. */ DepUsrSCTP::Checker::Checker() : timer(new TimerHandle(this)) diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index 499ec2a053..111f9f1dee 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -2,6 +2,7 @@ // #define MS_LOG_DEV_LEVEL 3 #include "RTC/SctpAssociation.hpp" +#include "DepLibUV.hpp" #include "DepUsrSCTP.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" @@ -29,6 +30,13 @@ const uint16_t EventTypes[] = }; /* clang-format on */ +/* Static methods for UV callbacks. */ + +inline static void onCloseAsync(uv_handle_t* handle) +{ + delete reinterpret_cast(handle); +} + /* Static methods for usrsctp callbacks. */ inline static int onRecvSctpData( @@ -121,6 +129,9 @@ namespace RTC { MS_TRACE(); + // Create a uv_async_t handle. + this->uvAsyncHandle = new uv_async_t; + // Register ourselves in usrsctp. // NOTE: This must be done before calling usrsctp_bind(). usrsctp_register_address(reinterpret_cast(this->id)); @@ -293,6 +304,9 @@ namespace RTC // Register the SctpAssociation from the global map. DepUsrSCTP::DeregisterSctpAssociation(this); + uv_close( + reinterpret_cast(this->uvAsyncHandle), static_cast(onCloseAsync)); + delete[] this->messageBuffer; } @@ -381,6 +395,18 @@ namespace RTC this->isDataChannel); } + void SctpAssociation::InitializeSyncHandle(uv_async_cb callback) + { + MS_TRACE(); + + int err = uv_async_init(DepLibUV::GetLoop(), this->uvAsyncHandle, callback); + + if (err != 0) + { + MS_ABORT("uv_async_init() failed: %s", uv_strerror(err)); + } + } + void SctpAssociation::ProcessSctpData(const uint8_t* data, size_t len) const { MS_TRACE(); @@ -667,12 +693,10 @@ namespace RTC } } - void SctpAssociation::OnUsrSctpSendSctpData(void* buffer, size_t len) + void SctpAssociation::OnUsrSctpSendSctpData(uint8_t* data, size_t len) { MS_TRACE(); - const uint8_t* data = static_cast(buffer); - #if MS_LOG_DEV_LEVEL == 3 MS_DUMP_DATA(data, len); #endif @@ -729,7 +753,8 @@ namespace RTC this->listener->OnSctpAssociationMessageReceived(this, streamId, data, len, ppid); } - // If end of message and there is buffered data, append data and notify buffer. + // If end of message and there is buffered data, append data and notify + // buffer. else if (eor && this->messageBufferLen != 0) { std::memcpy(this->messageBuffer + this->messageBufferLen, data, len); diff --git a/worker/src/SCTP/UsrSctpChecker.cpp b/worker/src/SCTP/UsrSctpChecker.cpp new file mode 100644 index 0000000000..6daa863346 --- /dev/null +++ b/worker/src/SCTP/UsrSctpChecker.cpp @@ -0,0 +1,74 @@ +/** + * NOTE: The UsrSctpChecker singleton must run in its own thread so it cannot + * use Logger.hpp since Logger communicates via UnixSocket with Node or Rust + * and there must be a single thread writing into that socket in worker side. + */ + +#include "SCTP/UsrSctpChecker.hpp" +#include "DepLibUV.hpp" +#include + +namespace SCTP +{ + /* Static. */ + + static constexpr size_t CheckerInterval{ 10u }; // In ms. + + /* UsrSctpChecker instance methods. */ + + UsrSctpChecker::UsrSctpChecker() + { + this->timer = new TimerHandle(this); + + DepLibUV::RunLoop(); + } + + UsrSctpChecker::~UsrSctpChecker() + { + delete this->timer; + } + + void UsrSctpChecker::Start() + { + this->lastCalledAtMs = 0u; + + this->timer->Start(CheckerInterval, CheckerInterval); + } + + void UsrSctpChecker::Stop() + { + this->lastCalledAtMs = 0u; + + this->timer->Stop(); + } + + void UsrSctpChecker::OnTimer(TimerHandle* /*timer*/) + { + auto nowMs = DepLibUV::GetTimeMs(); + const int elapsedMs = this->lastCalledAtMs ? static_cast(nowMs - this->lastCalledAtMs) : 0; + + // TODO: This must run in the worker thread obviously. How3ver this is not easy + // at all. Note that usrsctp_handle_timers() may trigger many calls to the + // usrsctp onSendSctpData() callback, but each of those call may be intended + // for a SctpAssociation in whatever other worker/thread, so we cannot just + // try to group all them together. + // #ifdef MS_LIBURING_SUPPORTED + // Activate liburing usage. + // 'usrsctp_handle_timers()' will synchronously call the send/recv + // callbacks for the pending data. If there are multiple messages to be + // sent over the network then we will send those messages within a single + // system call. + // DepLibUring::SetActive(); + // #endif + + usrsctp_handle_timers(elapsedMs); + + // TODO: This must run in the worker thread obviously. + // #ifdef MS_LIBURING_SUPPORTED + // Submit all prepared submission entries. + // DepLibUring::Submit(); + // #endif + + this->lastCalledAtMs = nowMs; + } +} // namespace SCTP diff --git a/worker/src/Worker.cpp b/worker/src/Worker.cpp index 1bd5468f88..7a993e502a 100644 --- a/worker/src/Worker.cpp +++ b/worker/src/Worker.cpp @@ -7,7 +7,6 @@ #include "DepLibUring.hpp" #endif #include "DepLibUV.hpp" -#include "DepUsrSCTP.hpp" #include "Logger.hpp" #include "MediaSoupErrors.hpp" #include "Settings.hpp" @@ -15,6 +14,9 @@ #include "FBS/response.h" #include "FBS/worker.h" +// TODO: REMOVE +#include + /* Instance methods. */ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel) @@ -40,9 +42,6 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel) } #endif - // Create the Checker instance in DepUsrSCTP. - DepUsrSCTP::CreateChecker(); - #ifdef MS_LIBURING_SUPPORTED // Start polling CQEs, which will create a uv_pool_t handle. DepLibUring::StartPollingCQEs(); @@ -52,6 +51,9 @@ Worker::Worker(::Channel::ChannelSocket* channel) : channel(channel) this->shared->channelNotifier->Emit( std::to_string(Logger::Pid), FBS::Notification::Event::WORKER_RUNNING); + auto* timer = new TimerHandle(this); + // timer->Start(1000, 1000); + MS_DEBUG_DEV("starting libuv loop"); DepLibUV::RunLoop(); MS_DEBUG_DEV("libuv loop ended"); @@ -102,9 +104,6 @@ void Worker::Close() // Delete the RTC::Shared singleton. delete this->shared; - // Close the Checker instance in DepUsrSCTP. - DepUsrSCTP::CloseChecker(); - #ifdef MS_LIBURING_SUPPORTED // Stop polling CQEs, which will close the uv_pool_t handle. DepLibUring::StopPollingCQEs(); @@ -554,3 +553,12 @@ inline RTC::WebRtcServer* Worker::OnRouterNeedWebRtcServer( return webRtcServer; } + +void Worker::OnTimer(TimerHandle* /*timer*/) +{ + MS_DUMP_STD("---- Worker::OnTimer()"); + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- Worker::OnTimer()\n"; + outfile.flush(); +} diff --git a/worker/src/handles/TimerHandle.cpp b/worker/src/handles/TimerHandle.cpp index e40ae8efb1..57022b72c7 100644 --- a/worker/src/handles/TimerHandle.cpp +++ b/worker/src/handles/TimerHandle.cpp @@ -6,6 +6,9 @@ #include "Logger.hpp" #include "MediaSoupErrors.hpp" +// TODO: REMOVE +#include + /* Static methods for UV callbacks. */ inline static void onTimer(uv_timer_t* handle) @@ -15,6 +18,11 @@ inline static void onTimer(uv_timer_t* handle) inline static void onCloseTimer(uv_handle_t* handle) { + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- onCloseTimer\n"; + outfile.flush(); + delete reinterpret_cast(handle); } @@ -24,6 +32,11 @@ TimerHandle::TimerHandle(Listener* listener) : listener(listener), uvHandle(new { MS_TRACE(); + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- Timer created\n"; + outfile.flush(); + this->uvHandle->data = static_cast(this); const int err = uv_timer_init(DepLibUV::GetLoop(), this->uvHandle); diff --git a/worker/src/handles/UnixStreamSocketHandle.cpp b/worker/src/handles/UnixStreamSocketHandle.cpp index bf024d1d8e..6dbae0c6a2 100644 --- a/worker/src/handles/UnixStreamSocketHandle.cpp +++ b/worker/src/handles/UnixStreamSocketHandle.cpp @@ -216,7 +216,7 @@ void UnixStreamSocketHandle::Write(const uint8_t* data, size_t len) // Any other error. else if (written < 0) { - MS_ERROR_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written)); + MS_WARN_DEV_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written)); // Set written to 0 so pendingLen can be properly calculated. written = 0; diff --git a/worker/src/lib.cpp b/worker/src/lib.cpp index 8387e5f35f..525ff9d6fe 100644 --- a/worker/src/lib.cpp +++ b/worker/src/lib.cpp @@ -23,6 +23,9 @@ #include // sigaction() #include +// TODO: REMOVE +#include + void IgnoreSignals(); // NOLINTNEXTLINE @@ -138,9 +141,16 @@ extern "C" int mediasoup_worker_run( IgnoreSignals(); #endif + MS_DUMP("------------- CREATING WORKER..."); + // Run the Worker. const Worker worker(channel.get()); + std::ofstream outfile; + outfile.open("/tmp/ms_log.txt", std::ios_base::app); + outfile << "---- WORKER CLOSED\n"; + outfile.flush(); + // Free static stuff. DepLibSRTP::ClassDestroy(); Utils::Crypto::ClassDestroy(); diff --git a/worker/tasks.py b/worker/tasks.py index 728e7cf962..a14d59f803 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -139,6 +139,10 @@ def setup(ctx, meson_args=MESON_ARGS): """ Run meson setup """ + enable_multithread = os.getenv('MEDIASOUP_ENABLE_MULTITHREAD'); + if enable_multithread: + meson_args += ' -Dms_enable_multithread=true'; + if MEDIASOUP_BUILDTYPE == 'Release': with ctx.cd(f'"{WORKER_DIR}"'): ctx.run(