diff --git a/worker/include/RTC/RtpStreamSend.hpp b/worker/include/RTC/RtpStreamSend.hpp index c6397f4fed..647553dfc0 100644 --- a/worker/include/RTC/RtpStreamSend.hpp +++ b/worker/include/RTC/RtpStreamSend.hpp @@ -20,12 +20,16 @@ namespace RTC public: struct StorageItem { + void Dump() const; + // Original packet. RTC::RtpPacket::SharedPtr originalPacket{ nullptr }; - // Correct SSRC since original packet will have original ssrc. + // Correct SSRC since original packet may not have the same. uint32_t ssrc{ 0 }; - // Correct sequence number since original packet will have original sequence number. + // Correct sequence number since original packet may not have the same. uint16_t sequenceNumber{ 0 }; + // Correct timestamp since original packet may not have the same. + uint32_t timestamp{ 0 }; // Cloned packet. RTC::RtpPacket::SharedPtr clonedPacket{ nullptr }; // Last time this packet was resent. @@ -47,10 +51,11 @@ namespace RTC StorageItem* GetFirst() const; StorageItem* Get(uint16_t seq) const; + size_t GetBufferSize() const; bool Insert(uint16_t seq, StorageItem* storageItem); - bool RemoveFirst(); - bool Remove(uint16_t seq); + void RemoveFirst(); void Clear(); + void Dump(); private: uint16_t startSeq{ 0 }; @@ -84,7 +89,8 @@ namespace RTC uint32_t GetLayerBitrate(uint64_t nowMs, uint8_t spatialLayer, uint8_t temporalLayer) override; private: - void StorePacket(RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket); + void StorePacket(const RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket); + void ClearOldPackets(const RtpPacket* packet); void ClearBuffer(); void FillRetransmissionContainer(uint16_t seq, uint16_t bitmask); void UpdateScore(RTC::RTCP::ReceiverReport* report); diff --git a/worker/include/Utils.hpp b/worker/include/Utils.hpp index 3a092e995f..e782e5ce5e 100644 --- a/worker/include/Utils.hpp +++ b/worker/include/Utils.hpp @@ -1,5 +1,6 @@ #ifndef MS_UTILS_HPP #define MS_UTILS_HPP +// #define MS_MEM_POOL_FREE_ON_RETURN 1 #include "common.hpp" #include @@ -382,7 +383,13 @@ namespace Utils void Return(T* ptr) { if (ptr) + { +#ifdef MS_MEM_POOL_FREE_ON_RETURN + std::free(ptr); +#else this->pool.push_back(ptr); +#endif + } } private: diff --git a/worker/src/RTC/RtpStreamSend.cpp b/worker/src/RTC/RtpStreamSend.cpp index 0cbfa6e551..87515c6e16 100644 --- a/worker/src/RTC/RtpStreamSend.cpp +++ b/worker/src/RTC/RtpStreamSend.cpp @@ -21,7 +21,6 @@ namespace RTC static constexpr uint32_t MaxRetransmissionDelay{ 2000 }; static constexpr uint32_t DefaultRtt{ 100 }; static constexpr uint16_t MaxSeq = std::numeric_limits::max(); - static constexpr uint32_t MaxTs = std::numeric_limits::max(); static void resetStorageItem(RTC::RtpStreamSend::StorageItem* storageItem) { @@ -31,9 +30,21 @@ namespace RTC storageItem->clonedPacket.reset(); storageItem->originalPacket.reset(); - storageItem->resentAtMs = 0; - storageItem->sentTimes = 0; - storageItem->rtxEncoded = false; + storageItem->ssrc = 0; + storageItem->sequenceNumber = 0; + storageItem->timestamp = 0; + storageItem->resentAtMs = 0; + storageItem->sentTimes = 0; + storageItem->rtxEncoded = false; + } + + void RtpStreamSend::StorageItem::Dump() const + { + MS_DUMP( + "ssrc:%" PRIu32 ", seq:%" PRIu16 ", timestamp:%" PRIu32, + this->ssrc, + this->sequenceNumber, + this->timestamp); } RtpStreamSend::StorageItem* RtpStreamSend::StorageItemBuffer::GetFirst() const @@ -51,6 +62,11 @@ namespace RTC return this->buffer.at(idx); } + size_t RtpStreamSend::StorageItemBuffer::GetBufferSize() const + { + return this->buffer.size(); + } + bool RtpStreamSend::StorageItemBuffer::Insert(uint16_t seq, StorageItem* storageItem) { if (this->buffer.empty()) @@ -61,7 +77,7 @@ namespace RTC return true; } - if (seq > this->startSeq) + if (RTC::SeqManager::IsSeqHigherThan(seq, this->startSeq)) { auto idx{ static_cast(seq - this->startSeq) }; @@ -73,18 +89,11 @@ namespace RTC return true; } - } - // Calculate how many elements would it be necessary to add when pushing new item to the back of - // the deque. - auto addToBack{ static_cast(seq - (this->startSeq + this->buffer.size() - 1)) }; - // Calculate how many elements would it be necessary to add when pushing new item to the front - // of the deque. - auto addToFront{ static_cast(this->startSeq - seq) }; + // Calculate how many elements would it be necessary to add when pushing new item + // to the back of the deque. + auto addToBack{ static_cast(seq - (this->startSeq + this->buffer.size() - 1)) }; - // Select the side of deque where fewer elements need to be added, while preferring the end. - if (addToBack <= addToFront) - { // Packets can arrive out of order, add blank slots. for (uint16_t i{ 1 }; i < addToBack; ++i) this->buffer.push_back(nullptr); @@ -93,6 +102,10 @@ namespace RTC } else { + // Calculate how many elements would it be necessary to add when pushing new item + // to the front of the deque. + auto addToFront{ static_cast(this->startSeq - seq) }; + // Packets can arrive out of order, add blank slots. for (uint16_t i{ 1 }; i < addToFront; ++i) this->buffer.push_front(nullptr); @@ -101,40 +114,34 @@ namespace RTC this->startSeq = seq; } + MS_ASSERT( + this->buffer.size() <= MaxSeq, + "StorageItemBuffer contains more than %" PRIu16 " entries", + MaxSeq); + return true; } - bool RtpStreamSend::StorageItemBuffer::RemoveFirst() + void RtpStreamSend::StorageItemBuffer::RemoveFirst() { - return this->Remove(this->startSeq); - } + MS_ASSERT(!this->buffer.empty(), "buffer is empty"); - bool RtpStreamSend::StorageItemBuffer::Remove(uint16_t seq) - { - if (this->buffer.empty()) - return false; + auto storageItem = this->buffer[0]; - auto idx{ static_cast(seq - this->startSeq) }; + // Reset (free RTP packet) the old storage item. + resetStorageItem(storageItem); + // Return into the pool. + StorageItemPool.Return(storageItem); - this->buffer[idx] = nullptr; + this->buffer[0] = nullptr; - // If we have erased the first element, remove all `nullptr` elements from the beginning of the buffer. - if (idx == 0) - { - while (!this->buffer.front()) - { - this->buffer.pop_front(); - this->startSeq++; - } - } - // If we have erased the last element, remove all `nullptr` elements from the end of the buffer. - else if (idx == static_cast(this->buffer.size() - 1)) + // Remove all `nullptr` elements from the beginning of the buffer. + // NOTE: Calling front on an empty container is undefined. + while (!this->buffer.empty() && !this->buffer.front()) { - while (!this->buffer.back()) - this->buffer.pop_back(); + this->buffer.pop_front(); + this->startSeq++; } - - return true; } void RtpStreamSend::StorageItemBuffer::Clear() @@ -154,6 +161,23 @@ namespace RTC this->startSeq = 0; } + void RtpStreamSend::StorageItemBuffer::Dump() + { + for (size_t i{ 0 }; i < this->buffer.size(); i++) + { + const auto* item = this->buffer.at(i); + + if (item == nullptr) + { + MS_DUMP("nullptr item at possition: %zu", i); + + continue; + } + + item->Dump(); + } + } + RtpStreamSend::StorageItemBuffer::~StorageItemBuffer() { Clear(); @@ -400,10 +424,13 @@ namespace RTC MS_ABORT("invalid method call"); } - void RtpStreamSend::StorePacket(RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket) + void RtpStreamSend::StorePacket(const RTC::RtpPacket* packet, RTC::RtpPacket::SharedPtr& clonedPacket) { MS_TRACE(); + MS_ASSERT( + packet->GetSsrc() == this->params.ssrc, "RTP packet SSRC does not match the encodings SSRC"); + if (packet->GetSize() > RTC::MtuSize) { MS_WARN_TAG( @@ -419,13 +446,13 @@ namespace RTC auto seq = packet->GetSequenceNumber(); auto* storageItem = this->storageItemBuffer.Get(seq); + this->ClearOldPackets(packet); + // The buffer item is already used. Check whether we should replace its // storage with the new packet or just ignore it (if duplicated packet). if (storageItem) { - auto storedPacket = storageItem->originalPacket; - - if (packet->GetTimestamp() == storedPacket->GetTimestamp()) + if (packet->GetTimestamp() == storageItem->timestamp) return; // Reset the storage item. @@ -439,48 +466,8 @@ namespace RTC // Memory is not initialized in any way, reset it. Create a new StorageItem instance // in this memory. new (storageItem) StorageItem{}; - MS_ASSERT(this->storageItemBuffer.Insert(seq, storageItem), "sequence number must be empty"); - - auto packetTs{ packet->GetTimestamp() }; - auto clockRate{ this->params.clockRate }; - - // Go through all buffer items starting with the first and free all storage - // items that contain packets older than `MaxRetransmissionDelay`. - for (uint32_t i{ 0 }; i <= static_cast(MaxSeq); ++i) - { - auto* checkedStorageItem = this->storageItemBuffer.GetFirst(); - - // Packets can arrive out of order, in which case we'll miss some storage items. - if (checkedStorageItem) - { - // This is the storage item we have just inserted, no need to go further. - if (!checkedStorageItem->originalPacket) - break; - - auto checkedPacketTs{ checkedStorageItem->originalPacket->GetTimestamp() }; - auto diffTs{ packetTs - checkedPacketTs }; - - // Account for wrapping around. - if (diffTs > MaxTs / 2) - { - diffTs = MaxTs - diffTs; - } - // Cleanup is finished if we found an item with recent enough packet, but also account - // for out-of-order packets. - if ( - static_cast(diffTs * 1000 / clockRate) < this->retransmissionBufferSize || - RTC::SeqManager::IsSeqLowerThan(packetTs, checkedPacketTs)) - break; - - // Reset (free RTP packet) the old storage item. - resetStorageItem(checkedStorageItem); - // Return into the pool. - StorageItemPool.Return(checkedStorageItem); - // Unfill the buffer start item. - MS_ASSERT(this->storageItemBuffer.RemoveFirst(), "Storage item must be used"); - } - } + MS_ASSERT(this->storageItemBuffer.Insert(seq, storageItem), "sequence number must be empty"); } // Only clone once and only if necessary. @@ -494,8 +481,43 @@ namespace RTC // Store original packet and some extra info into the retrieved storage item. storageItem->originalPacket = clonedPacket; - storageItem->ssrc = clonedPacket->GetSsrc(); - storageItem->sequenceNumber = clonedPacket->GetSequenceNumber(); + storageItem->ssrc = packet->GetSsrc(); + storageItem->sequenceNumber = packet->GetSequenceNumber(); + storageItem->timestamp = packet->GetTimestamp(); + } + + void RtpStreamSend::ClearOldPackets(const RtpPacket* packet) + { + MS_TRACE(); + + auto packetTs{ packet->GetTimestamp() }; + auto clockRate{ this->params.clockRate }; + + const auto bufferSize = this->storageItemBuffer.GetBufferSize(); + + // Go through all buffer items starting with the first and free all storage + // items that contain packets older than `MaxRetransmissionDelay`. + for (size_t i{ 0 }; i < bufferSize && this->storageItemBuffer.GetBufferSize() != 0; ++i) + { + auto* firstStorageItem = this->storageItemBuffer.GetFirst(); + + MS_ASSERT(firstStorageItem, "first storage item is missing"); + MS_ASSERT(firstStorageItem->originalPacket, "storage item does not contain original packet"); + + auto firstPacketTs{ firstStorageItem->timestamp }; + uint32_t diffTs{ packetTs - firstPacketTs }; + + // RTP packet is older than first RTP packet. + if (RTC::SeqManager::IsSeqLowerThan(packetTs, firstPacketTs)) + break; + + // First RTP packet is recent enough. + if (static_cast(diffTs * 1000 / clockRate) < this->retransmissionBufferSize) + break; + + // Unfill the buffer start item. + this->storageItemBuffer.RemoveFirst(); + } } void RtpStreamSend::ClearBuffer() @@ -559,6 +581,7 @@ namespace RTC // Put correct SSRC and sequence number into cloned packet. packet->SetSsrc(storageItem->ssrc); packet->SetSequenceNumber(storageItem->sequenceNumber); + packet->SetTimestamp(storageItem->timestamp); // Update MID RTP extension value. if (!this->mid.empty()) diff --git a/worker/test/src/RTC/TestRtpStreamSend.cpp b/worker/test/src/RTC/TestRtpStreamSend.cpp index b1d1ecf074..569182283d 100644 --- a/worker/test/src/RTC/TestRtpStreamSend.cpp +++ b/worker/test/src/RTC/TestRtpStreamSend.cpp @@ -8,7 +8,7 @@ using namespace RTC; -SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp]") +SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp][nack]") { class TestRtpStreamListener : public RtpStreamSend::Listener { @@ -164,4 +164,70 @@ SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp]") delete stream; } + + SECTION("Cloned RTP packet differs from original, get retransmitted packets") + { + // clang-format off + uint8_t rtpBuffer1[] = + { + 0b10000000, 0b01111011, 0b01010010, 0b00001110, + 0b01011011, 0b01101011, 0b11001010, 0b10110101, + 0, 0, 0, 2 + }; + // clang-format on + + RtpStream::Params params; + + params.ssrc = 1111; + params.clockRate = 90000; + params.useNack = true; + + // Create a RtpStreamSend. + std::string mid{ "" }; + RtpStreamSend* stream = new RtpStreamSend(&testRtpStreamListener, params, mid, true); + + auto packet = RtpPacket::Parse(rtpBuffer1, sizeof(rtpBuffer1)); + + REQUIRE(packet); + + // Original packet. + packet->SetSsrc(1111); + packet->SetSequenceNumber(2222); + packet->SetTimestamp(3333); + + auto clonedPacket = packet->Clone(); + + REQUIRE(clonedPacket); + + // Cloned packet. + clonedPacket->SetSsrc(4444); + clonedPacket->SetSequenceNumber(5555); + clonedPacket->SetTimestamp(6666); + + stream->ReceivePacket(packet.get(), clonedPacket); + + // Create a NACK item that requests the packet. + RTCP::FeedbackRtpNackPacket nackPacket(0, params.ssrc); + auto* nackItem = new RTCP::FeedbackRtpNackItem(packet->GetSequenceNumber(), 0b0000000000000000); + + nackPacket.AddItem(nackItem); + + REQUIRE(nackItem->GetPacketId() == packet->GetSequenceNumber()); + REQUIRE(nackItem->GetLostPacketBitmask() == 0b0000000000000000); + + stream->ReceiveNack(&nackPacket); + + REQUIRE(testRtpStreamListener.retransmittedPackets.size() == 1); + + auto rtxPacket = testRtpStreamListener.retransmittedPackets[0]; + + testRtpStreamListener.retransmittedPackets.clear(); + + // Make sure RTX packets correspond match the origina packet info. + REQUIRE(rtxPacket); + REQUIRE(rtxPacket->GetSequenceNumber() == 2222); + REQUIRE(rtxPacket->GetTimestamp() == 3333); + + delete stream; + } }