Skip to content

Commit

Permalink
RtpStreamSend: sanitization (#10)
Browse files Browse the repository at this point in the history
RtpStreamSend: sanitization
  • Loading branch information
jmillan authored Mar 3, 2022
1 parent af3b681 commit 19f1016
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 92 deletions.
16 changes: 11 additions & 5 deletions worker/include/RTC/RtpStreamSend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ namespace RTC
public:
struct StorageItem
{
void Dump() const;

// Original packet.
RTC::RtpPacket::SharedPtr originalPacket{ nullptr };
// Correct SSRC since original packet will have original ssrc.
// Correct SSRC since original packet may not have the same.
uint32_t ssrc{ 0 };
// Correct sequence number since original packet will have original sequence number.
// Correct sequence number since original packet may not have the same.
uint16_t sequenceNumber{ 0 };
// Correct timestamp since original packet may not have the same.
uint32_t timestamp{ 0 };
// Cloned packet.
RTC::RtpPacket::SharedPtr clonedPacket{ nullptr };
// Last time this packet was resent.
Expand All @@ -47,10 +51,11 @@ namespace RTC

StorageItem* GetFirst() const;
StorageItem* Get(uint16_t seq) const;
size_t GetBufferSize() const;
bool Insert(uint16_t seq, StorageItem* storageItem);
bool RemoveFirst();
bool Remove(uint16_t seq);
void RemoveFirst();
void Clear();
void Dump();

private:
uint16_t startSeq{ 0 };
Expand Down Expand Up @@ -84,7 +89,8 @@ namespace RTC
uint32_t GetLayerBitrate(uint64_t nowMs, uint8_t spatialLayer, uint8_t temporalLayer) override;

private:
void StorePacket(RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket);
void StorePacket(const RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket);
void ClearOldPackets(const RtpPacket* packet);
void ClearBuffer();
void FillRetransmissionContainer(uint16_t seq, uint16_t bitmask);
void UpdateScore(RTC::RTCP::ReceiverReport* report);
Expand Down
7 changes: 7 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#ifndef MS_UTILS_HPP
#define MS_UTILS_HPP
// #define MS_MEM_POOL_FREE_ON_RETURN 1

#include "common.hpp"
#include <openssl/hmac.h>
Expand Down Expand Up @@ -382,7 +383,13 @@ namespace Utils
void Return(T* ptr)
{
if (ptr)
{
#ifdef MS_MEM_POOL_FREE_ON_RETURN
std::free(ptr);
#else
this->pool.push_back(ptr);
#endif
}
}

private:
Expand Down
195 changes: 109 additions & 86 deletions worker/src/RTC/RtpStreamSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ namespace RTC
static constexpr uint32_t MaxRetransmissionDelay{ 2000 };
static constexpr uint32_t DefaultRtt{ 100 };
static constexpr uint16_t MaxSeq = std::numeric_limits<uint16_t>::max();
static constexpr uint32_t MaxTs = std::numeric_limits<uint32_t>::max();

static void resetStorageItem(RTC::RtpStreamSend::StorageItem* storageItem)
{
Expand All @@ -31,9 +30,21 @@ namespace RTC

storageItem->clonedPacket.reset();
storageItem->originalPacket.reset();
storageItem->resentAtMs = 0;
storageItem->sentTimes = 0;
storageItem->rtxEncoded = false;
storageItem->ssrc = 0;
storageItem->sequenceNumber = 0;
storageItem->timestamp = 0;
storageItem->resentAtMs = 0;
storageItem->sentTimes = 0;
storageItem->rtxEncoded = false;
}

void RtpStreamSend::StorageItem::Dump() const
{
MS_DUMP(
"ssrc:%" PRIu32 ", seq:%" PRIu16 ", timestamp:%" PRIu32,
this->ssrc,
this->sequenceNumber,
this->timestamp);
}

RtpStreamSend::StorageItem* RtpStreamSend::StorageItemBuffer::GetFirst() const
Expand All @@ -51,6 +62,11 @@ namespace RTC
return this->buffer.at(idx);
}

size_t RtpStreamSend::StorageItemBuffer::GetBufferSize() const
{
return this->buffer.size();
}

bool RtpStreamSend::StorageItemBuffer::Insert(uint16_t seq, StorageItem* storageItem)
{
if (this->buffer.empty())
Expand All @@ -61,7 +77,7 @@ namespace RTC
return true;
}

if (seq > this->startSeq)
if (RTC::SeqManager<uint16_t>::IsSeqHigherThan(seq, this->startSeq))
{
auto idx{ static_cast<uint16_t>(seq - this->startSeq) };

Expand All @@ -73,18 +89,11 @@ namespace RTC

return true;
}
}

// Calculate how many elements would it be necessary to add when pushing new item to the back of
// the deque.
auto addToBack{ static_cast<uint16_t>(seq - (this->startSeq + this->buffer.size() - 1)) };
// Calculate how many elements would it be necessary to add when pushing new item to the front
// of the deque.
auto addToFront{ static_cast<uint16_t>(this->startSeq - seq) };
// Calculate how many elements would it be necessary to add when pushing new item
// to the back of the deque.
auto addToBack{ static_cast<uint16_t>(seq - (this->startSeq + this->buffer.size() - 1)) };

// Select the side of deque where fewer elements need to be added, while preferring the end.
if (addToBack <= addToFront)
{
// Packets can arrive out of order, add blank slots.
for (uint16_t i{ 1 }; i < addToBack; ++i)
this->buffer.push_back(nullptr);
Expand All @@ -93,6 +102,10 @@ namespace RTC
}
else
{
// Calculate how many elements would it be necessary to add when pushing new item
// to the front of the deque.
auto addToFront{ static_cast<uint16_t>(this->startSeq - seq) };

// Packets can arrive out of order, add blank slots.
for (uint16_t i{ 1 }; i < addToFront; ++i)
this->buffer.push_front(nullptr);
Expand All @@ -101,40 +114,34 @@ namespace RTC
this->startSeq = seq;
}

MS_ASSERT(
this->buffer.size() <= MaxSeq,
"StorageItemBuffer contains more than %" PRIu16 " entries",
MaxSeq);

return true;
}

bool RtpStreamSend::StorageItemBuffer::RemoveFirst()
void RtpStreamSend::StorageItemBuffer::RemoveFirst()
{
return this->Remove(this->startSeq);
}
MS_ASSERT(!this->buffer.empty(), "buffer is empty");

bool RtpStreamSend::StorageItemBuffer::Remove(uint16_t seq)
{
if (this->buffer.empty())
return false;
auto storageItem = this->buffer[0];

auto idx{ static_cast<uint16_t>(seq - this->startSeq) };
// Reset (free RTP packet) the old storage item.
resetStorageItem(storageItem);
// Return into the pool.
StorageItemPool.Return(storageItem);

this->buffer[idx] = nullptr;
this->buffer[0] = nullptr;

// If we have erased the first element, remove all `nullptr` elements from the beginning of the buffer.
if (idx == 0)
{
while (!this->buffer.front())
{
this->buffer.pop_front();
this->startSeq++;
}
}
// If we have erased the last element, remove all `nullptr` elements from the end of the buffer.
else if (idx == static_cast<uint16_t>(this->buffer.size() - 1))
// Remove all `nullptr` elements from the beginning of the buffer.
// NOTE: Calling front on an empty container is undefined.
while (!this->buffer.empty() && !this->buffer.front())
{
while (!this->buffer.back())
this->buffer.pop_back();
this->buffer.pop_front();
this->startSeq++;
}

return true;
}

void RtpStreamSend::StorageItemBuffer::Clear()
Expand All @@ -154,6 +161,23 @@ namespace RTC
this->startSeq = 0;
}

void RtpStreamSend::StorageItemBuffer::Dump()
{
for (size_t i{ 0 }; i < this->buffer.size(); i++)
{
const auto* item = this->buffer.at(i);

if (item == nullptr)
{
MS_DUMP("nullptr item at possition: %zu", i);

continue;
}

item->Dump();
}
}

RtpStreamSend::StorageItemBuffer::~StorageItemBuffer()
{
Clear();
Expand Down Expand Up @@ -400,10 +424,13 @@ namespace RTC
MS_ABORT("invalid method call");
}

void RtpStreamSend::StorePacket(RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket)
void RtpStreamSend::StorePacket(const RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket)
{
MS_TRACE();

MS_ASSERT(
packet->GetSsrc() == this->params.ssrc, "RTP packet SSRC does not match the encodings SSRC");

if (packet->GetSize() > RTC::MtuSize)
{
MS_WARN_TAG(
Expand All @@ -419,13 +446,13 @@ namespace RTC
auto seq = packet->GetSequenceNumber();
auto* storageItem = this->storageItemBuffer.Get(seq);

this->ClearOldPackets(packet);

// The buffer item is already used. Check whether we should replace its
// storage with the new packet or just ignore it (if duplicated packet).
if (storageItem)
{
auto storedPacket = storageItem->originalPacket;

if (packet->GetTimestamp() == storedPacket->GetTimestamp())
if (packet->GetTimestamp() == storageItem->timestamp)
return;

// Reset the storage item.
Expand All @@ -439,48 +466,8 @@ namespace RTC
// Memory is not initialized in any way, reset it. Create a new StorageItem instance
// in this memory.
new (storageItem) StorageItem{};
MS_ASSERT(this->storageItemBuffer.Insert(seq, storageItem), "sequence number must be empty");

auto packetTs{ packet->GetTimestamp() };
auto clockRate{ this->params.clockRate };

// Go through all buffer items starting with the first and free all storage
// items that contain packets older than `MaxRetransmissionDelay`.
for (uint32_t i{ 0 }; i <= static_cast<uint32_t>(MaxSeq); ++i)
{
auto* checkedStorageItem = this->storageItemBuffer.GetFirst();

// Packets can arrive out of order, in which case we'll miss some storage items.
if (checkedStorageItem)
{
// This is the storage item we have just inserted, no need to go further.
if (!checkedStorageItem->originalPacket)
break;

auto checkedPacketTs{ checkedStorageItem->originalPacket->GetTimestamp() };
auto diffTs{ packetTs - checkedPacketTs };

// Account for wrapping around.
if (diffTs > MaxTs / 2)
{
diffTs = MaxTs - diffTs;
}

// Cleanup is finished if we found an item with recent enough packet, but also account
// for out-of-order packets.
if (
static_cast<uint32_t>(diffTs * 1000 / clockRate) < this->retransmissionBufferSize ||
RTC::SeqManager<uint32_t>::IsSeqLowerThan(packetTs, checkedPacketTs))
break;

// Reset (free RTP packet) the old storage item.
resetStorageItem(checkedStorageItem);
// Return into the pool.
StorageItemPool.Return(checkedStorageItem);
// Unfill the buffer start item.
MS_ASSERT(this->storageItemBuffer.RemoveFirst(), "Storage item must be used");
}
}
MS_ASSERT(this->storageItemBuffer.Insert(seq, storageItem), "sequence number must be empty");
}

// Only clone once and only if necessary.
Expand All @@ -494,8 +481,43 @@ namespace RTC

// Store original packet and some extra info into the retrieved storage item.
storageItem->originalPacket = clonedPacket;
storageItem->ssrc = clonedPacket->GetSsrc();
storageItem->sequenceNumber = clonedPacket->GetSequenceNumber();
storageItem->ssrc = packet->GetSsrc();
storageItem->sequenceNumber = packet->GetSequenceNumber();
storageItem->timestamp = packet->GetTimestamp();
}

void RtpStreamSend::ClearOldPackets(const RtpPacket* packet)
{
MS_TRACE();

auto packetTs{ packet->GetTimestamp() };
auto clockRate{ this->params.clockRate };

const auto bufferSize = this->storageItemBuffer.GetBufferSize();

// Go through all buffer items starting with the first and free all storage
// items that contain packets older than `MaxRetransmissionDelay`.
for (size_t i{ 0 }; i < bufferSize && this->storageItemBuffer.GetBufferSize() != 0; ++i)
{
auto* firstStorageItem = this->storageItemBuffer.GetFirst();

MS_ASSERT(firstStorageItem, "first storage item is missing");
MS_ASSERT(firstStorageItem->originalPacket, "storage item does not contain original packet");

auto firstPacketTs{ firstStorageItem->timestamp };
uint32_t diffTs{ packetTs - firstPacketTs };

// RTP packet is older than first RTP packet.
if (RTC::SeqManager<uint32_t>::IsSeqLowerThan(packetTs, firstPacketTs))
break;

// First RTP packet is recent enough.
if (static_cast<uint32_t>(diffTs * 1000 / clockRate) < this->retransmissionBufferSize)
break;

// Unfill the buffer start item.
this->storageItemBuffer.RemoveFirst();
}
}

void RtpStreamSend::ClearBuffer()
Expand Down Expand Up @@ -559,6 +581,7 @@ namespace RTC
// Put correct SSRC and sequence number into cloned packet.
packet->SetSsrc(storageItem->ssrc);
packet->SetSequenceNumber(storageItem->sequenceNumber);
packet->SetTimestamp(storageItem->timestamp);

// Update MID RTP extension value.
if (!this->mid.empty())
Expand Down
Loading

0 comments on commit 19f1016

Please sign in to comment.