Skip to content

Commit

Permalink
Use a vector to store sending messages and clean it once messages are…
Browse files Browse the repository at this point in the history
… sent in async cb
  • Loading branch information
ibc committed Mar 6, 2024
1 parent 069f78e commit 1a25bed
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 23 deletions.
13 changes: 8 additions & 5 deletions node/src/test/test-node-sctp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import * as dgram from 'node:dgram';
import * as sctp from 'sctp';
import * as mediasoup from '../';

// Trick to avoid that Jest overrides console.
console.log('TOOD: REMOVE console hack');
import { log } from 'node:console';
console.log = log;

type TestContext = {
worker?: mediasoup.types.Worker;
router?: mediasoup.types.Router;
Expand Down Expand Up @@ -114,7 +119,7 @@ afterEach(async () => {
test('ordered DataProducer delivers all SCTP messages to the DataConsumer', async () => {
const onStream = jest.fn();
console.log('TODO: Revert numMessages to 200');
const numMessages = 3;
const numMessages = 2;
// const numMessages = 200;
let sentMessageBytes = 0;
let recvMessageBytes = 0;
Expand Down Expand Up @@ -142,7 +147,7 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn
data.ppid = sctp.PPID.WEBRTC_BINARY;
}

console.log('---- sending id %s', id);
console.log('---- test | sending id %s', id);

ctx.sctpSendStream!.write(data);
sentMessageBytes += data.byteLength;
Expand Down Expand Up @@ -174,7 +179,7 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn
// @ts-ignore
const ppid = data.ppid;

console.log('---- received id %s', id);
console.log('---- test | received id %s', id);

if (id !== numReceivedMessages) {
reject(
Expand All @@ -196,8 +201,6 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn
`ppid in message with id ${id} should be ${sctp.PPID.WEBRTC_BINARY} but it is ${ppid}`
)
);

return;
}
});
});
Expand Down
53 changes: 51 additions & 2 deletions worker/include/DepUsrSCTP.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,63 @@
#include "RTC/SctpAssociation.hpp"
#include "handles/TimerHandle.hpp"
#include <absl/container/flat_hash_map.h>
#include <vector>

class DepUsrSCTP
{
private:
/* Struct for storing a pending SCTP message to be sent. */
struct SendSctpDataItem
{
// NOTE: We keep this struct simple, without explicit allocation
// or deallocation in constructor/destructor, and instead rely on
// the destructor of the main container SendSctpDataStore.

SendSctpDataItem() noexcept;

// SendSctpDataItem(uint8_t* data, size_t len) : data(new uint8_t[len]), len(len)
// {
// std::memcpy(this->data, data, len);
// }
// SendSctpDataItem(uint8_t* data, size_t len);

// Disable copy constructor because of the dynamically allocated data.
// SendSctpDataItem(const SendSctpDataItem&) = delete;

// ~SendSctpDataItem()
// {
// delete[] this->data;
// }
~SendSctpDataItem();

uint8_t* data{ nullptr };
size_t len{ 0u };

int fooId{ 0 };
};

public:
/* Struct for storing pending datas to be sent. */
struct SendSctpDataStore
{
explicit SendSctpDataStore(RTC::SctpAssociation* sctpAssociation);

// Disable copy constructor.
// SendSctpDataStore(const SendSctpDataStore&) = delete;

// ~SendSctpDataStore()
// {
// this->items.clear();
// }
~SendSctpDataStore();

void ClearItems();

RTC::SctpAssociation* sctpAssociation;
uint8_t* data;
size_t len;

int fooId{ 0 };

std::vector<SendSctpDataItem> items;
};

private:
Expand Down Expand Up @@ -53,6 +101,7 @@ class DepUsrSCTP
static uint64_t numSctpAssociations;
static uintptr_t nextSctpAssociationId;
static absl::flat_hash_map<uintptr_t, RTC::SctpAssociation*> mapIdSctpAssociation;
// Map of SendSctpDataStores indexed by uv_async_t*.
static absl::flat_hash_map<const uv_async_t*, SendSctpDataStore> mapAsyncHandlerSendSctpData;
};

Expand Down
3 changes: 2 additions & 1 deletion worker/include/handles/UdpSocketHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class UdpSocketHandle
{
}

// Disable copy constructor because of the dynamically allocated data (store).
// Disable copy constructor because of the dynamically allocated data
// (store).
UvSendData(const UvSendData&) = delete;

~UvSendData()
Expand Down
98 changes: 85 additions & 13 deletions worker/src/DepUsrSCTP.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#define MS_CLASS "DepUsrSCTP"
// TODO: Comment
#define MS_LOG_DEV_LEVEL 3

#include "DepUsrSCTP.hpp"
Expand All @@ -18,6 +19,10 @@ static constexpr size_t CheckerInterval{ 10u }; // In ms.
static std::mutex GlobalSyncMutex;
static size_t GlobalInstances{ 0u };

// TODO: REMOVE
static int FOO_STORE_ID{ 0u };
static int FOO_ITEM_ID{ 0u };

/* Static methods for UV callbacks. */

inline static void onAsync(uv_async_t* handle)
Expand All @@ -26,6 +31,8 @@ inline static void onAsync(uv_async_t* handle)

const std::lock_guard<std::mutex> lock(GlobalSyncMutex);

MS_DUMP("**** onAsync() called");

// Get the sending data from the map.
auto* store = DepUsrSCTP::GetSendSctpDataStore(handle);

Expand All @@ -37,13 +44,20 @@ inline static void onAsync(uv_async_t* handle)
}

auto* sctpAssociation = store->sctpAssociation;
auto* data = store->data;
auto len = store->len;
auto& items = store->items;

MS_DUMP("------------- sending pening messages [items.size:%zu]", items.size());

for (auto& item : items)
{
auto* data = item.data;
auto len = item.len;

sctpAssociation->OnUsrSctpSendSctpData(data, len);
sctpAssociation->OnUsrSctpSendSctpData(data, len);
}

// Must delete the mem copied data once sent.
delete[] store->data;
// Must clear send data items once they have been sent.
store->ClearItems();
}

/* Static methods for usrsctp global callbacks. */
Expand All @@ -52,6 +66,8 @@ inline static int onSendSctpData(void* addr, void* data, size_t len, uint8_t /*t
{
MS_TRACE();

MS_DUMP("**** onSendSctpData() called");

auto* sctpAssociation = DepUsrSCTP::RetrieveSctpAssociation(reinterpret_cast<uintptr_t>(addr));

if (!sctpAssociation)
Expand Down Expand Up @@ -120,6 +136,7 @@ void DepUsrSCTP::ClassInit()

void DepUsrSCTP::ClassDestroy()
{
MS_DUMP("-------- ClassDestroy()");
MS_TRACE();

const std::lock_guard<std::mutex> lock(GlobalSyncMutex);
Expand All @@ -134,6 +151,7 @@ void DepUsrSCTP::ClassDestroy()
nextSctpAssociationId = 0u;

DepUsrSCTP::mapIdSctpAssociation.clear();
MS_DUMP("-------- ClassDestroy() calling mapAsyncHandlerSendSctpData.clear()");
DepUsrSCTP::mapAsyncHandlerSendSctpData.clear();
}
}
Expand Down Expand Up @@ -202,8 +220,10 @@ void DepUsrSCTP::RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation)
it2 == DepUsrSCTP::mapAsyncHandlerSendSctpData.end(),
"the id of the SctpAssociation is already in the mapAsyncHandlerSendSctpData map");

DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation;
DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()] = { sctpAssociation };
DepUsrSCTP::mapIdSctpAssociation[sctpAssociation->id] = sctpAssociation;
// DepUsrSCTP::mapAsyncHandlerSendSctpData[sctpAssociation->GetAsyncHandle()] = { sctpAssociation,
// ++FOO_STORE_ID };
DepUsrSCTP::mapAsyncHandlerSendSctpData.emplace(sctpAssociation->GetAsyncHandle(), sctpAssociation);

sctpAssociation->InitializeSyncHandle(onAsync);

Expand Down Expand Up @@ -264,17 +284,23 @@ void DepUsrSCTP::SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* da
it != DepUsrSCTP::mapAsyncHandlerSendSctpData.end(),
"SctpAssociation not found in mapAsyncHandlerSendSctpData map");

SendSctpDataStore& store = it->second;
auto& store = it->second;

MS_DUMP("-------- store.items.emplace_back()... [items.size:%zu]", store.items.size());
// NOTE: In Rust, DepUsrSCTP::SendSctpData() is called from onSendSctpData()
// callback from a different thread and usrsctp immediately frees |data| when
// the callback execution finishes. So we have to mem copy it.
// TODO: This must be freed, but I'd prefer if we used a static thread_local
// buffer, but I don't know max size of this (if any).
store.data = new uint8_t[len];
store.len = len;
auto& item = store.items.emplace_back();

std::memcpy(store.data, data, len);
item.fooId = ++FOO_ITEM_ID;
item.data = new uint8_t[len];
item.len = len;
std::memcpy(item.data, data, len);

MS_DUMP(
"-------- store.items.emplace_back() DONE [item.fooId:%d, items.size:%zu]",
item.fooId,
store.items.size());

// Invoke UV async send.
int err = uv_async_send(sctpAssociation->GetAsyncHandle());
Expand Down Expand Up @@ -362,3 +388,49 @@ void DepUsrSCTP::Checker::OnTimer(TimerHandle* /*timer*/)

this->lastCalledAtMs = nowMs;
}

// DepUsrSCTP::SendSctpDataItem::SendSctpDataItem(uint8_t* data, size_t len)
// : data(new uint8_t[len]), len(len)
// {
// this->fooId = ++FOO_ITEM_ID;
// MS_DUMP("---------- item constructor [fooId:%d, data:%p]", this->fooId, this->data);
// std::memcpy(this->data, data, len);
// }

DepUsrSCTP::SendSctpDataItem::SendSctpDataItem() noexcept
{
MS_DUMP("---------- item constructor");
}

DepUsrSCTP::SendSctpDataItem::~SendSctpDataItem()
{
MS_DUMP("---------- item destructor [fooId:%d, data:%p]", this->fooId, this->data);
// delete[] this->data;
}

DepUsrSCTP::SendSctpDataStore::SendSctpDataStore(RTC::SctpAssociation* sctpAssociation)
: sctpAssociation(sctpAssociation)
{
this->fooId = ++FOO_STORE_ID;

MS_DUMP("---------- store constructor [fooId:%d]", this->fooId);
}

DepUsrSCTP::SendSctpDataStore::~SendSctpDataStore()
{
MS_DUMP("---------- store destructor [fooId:%d, items.size():%zu]", this->fooId, this->items.size());

ClearItems();
}

void DepUsrSCTP::SendSctpDataStore::ClearItems()
{
MS_DUMP(
"---------- store ClearItems() [fooId:%d, items.size():%zu]", this->fooId, this->items.size());

for (auto& item : this->items)
{
delete[] item.data;
}
this->items.clear();
}
5 changes: 5 additions & 0 deletions worker/src/RTC/DataConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ namespace RTC
{
MS_TRACE();

MS_DUMP(
"------ [ppid:%" PRIu32 ", msg:'%s']",
ppid,
std::string(reinterpret_cast<const char*>(msg), len).c_str());

if (!IsActive())
{
return;
Expand Down
2 changes: 2 additions & 0 deletions worker/src/RTC/DataProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ namespace RTC
return;
}

MS_DUMP("------ [msg:'%s']", std::string(reinterpret_cast<const char*>(msg), len).c_str());

this->listener->OnDataProducerMessageReceived(
this, msg, len, ppid, subchannels, requiredSubchannel);
}
Expand Down
8 changes: 7 additions & 1 deletion worker/src/RTC/SctpAssociation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ namespace RTC
{
MS_TRACE();

MS_DUMP(
"------ [ppid:%" PRIu32 ", msg:'%s']",
ppid,
std::string(reinterpret_cast<const char*>(msg), len).c_str());

// This must be controlled by the DataConsumer.
MS_ASSERT(
len <= this->maxSctpMessageSize,
Expand Down Expand Up @@ -753,7 +758,8 @@ namespace RTC

this->listener->OnSctpAssociationMessageReceived(this, streamId, data, len, ppid);
}
// If end of message and there is buffered data, append data and notify buffer.
// If end of message and there is buffered data, append data and notify
// buffer.
else if (eor && this->messageBufferLen != 0)
{
std::memcpy(this->messageBuffer + this->messageBufferLen, data, len);
Expand Down
6 changes: 6 additions & 0 deletions worker/src/RTC/Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2880,6 +2880,12 @@ namespace RTC
{
MS_TRACE();

MS_DUMP(
"------ [streamId:%" PRIu16 ", ppid:%" PRIu32 ", msg:'%s']",
streamId,
ppid,
std::string(reinterpret_cast<const char*>(msg), len).c_str());

RTC::DataProducer* dataProducer = this->sctpListener.GetDataProducer(streamId);

if (!dataProducer)
Expand Down
2 changes: 1 addition & 1 deletion worker/src/handles/UnixStreamSocketHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void UnixStreamSocketHandle::Write(const uint8_t* data, size_t len)
// Any other error.
else if (written < 0)
{
MS_ERROR_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written));
MS_WARN_DEV_STD("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written));

// Set written to 0 so pendingLen can be properly calculated.
written = 0;
Expand Down

0 comments on commit 1a25bed

Please sign in to comment.