Skip to content

Commit

Permalink
Worker: Fix memory leak when using WebRtcServer with TCP enabled
Browse files Browse the repository at this point in the history
Fixes #1381

### Details

- Problem was that, when uusing `WebRtcServer`, if a `WebRtcTransport` was closed from server side (by calling `transport.close()` the underlying `RTC::TcpConnection` instances so also their parent `TcpConnectionHandlep` instances) were not freed.
- In `WebRtcTransport::OnIceServerTupleRemoved()` we were calling `Close` on the removed `TransportTuple` which calls `Close()` on the underlaying `RTC::TcpConnection`. However the `RTC::TcpConnection` was never deleted/freed.
- Fixed by refactoring the class so now we call `tuple->CloseTcpConnection()` which calls `tcpConnection->TriggerClose()` which ensures `TcpConnectionHandle::InternalClose()` is called (formed `Close()` method) and then calls `this->listener->OnTcpConnectionClosed()` so `TcpServerHandle` deletes/frees it. Note that indeed `TcpServerHandle` is the only one that should delete/free its `TcpConnectionHandles` instances.
- Bonus Track: Rename all `Close()` methods in handles and similar classes to `InternalClose()` and make it private to avoid these kind of issues.
  • Loading branch information
ibc committed May 3, 2024
1 parent 98cf79e commit b78ddae
Show file tree
Hide file tree
Showing 22 changed files with 275 additions and 247 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Changelog

### NEXT

- Worker: Fix memory leak when using `WebRtcServer` with TCP enabled ([PR #1388](https://github.com/versatica/mediasoup/pull/1388)).

### 3.14.4

- Worker: Fix crash. RtcpFeedback parameter is optional ([PR #1387](https://github.com/versatica/mediasoup/pull/1387), credits to @Lynnworld).
- Worker: Fix crash. `RtcpFeedback` parameter is optional ([PR #1387](https://github.com/versatica/mediasoup/pull/1387), credits to @Lynnworld).

### 3.14.3

Expand Down
2 changes: 1 addition & 1 deletion worker/include/Channel/ChannelSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ namespace Channel
~ChannelSocket() override;

public:
void Close();
void SetListener(Listener* listener);
void Send(const uint8_t* data, uint32_t dataLen);
void SendLog(const char* data, uint32_t dataLen);
bool CallbackRead();

private:
void InternalClose();
void SendImpl(const uint8_t* payload, uint32_t payloadLen);

/* Pure virtual methods inherited from ConsumerSocket::Listener. */
Expand Down
1 change: 1 addition & 0 deletions worker/include/RTC/IceServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ namespace RTC
std::string password;
uint16_t consentTimeoutMs{ 30000u };
// Others.
bool destroying{ false };
std::string oldUsernameFragment;
std::string oldPassword;
IceState state{ IceState::NEW };
Expand Down
24 changes: 1 addition & 23 deletions worker/include/RTC/TransportTuple.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,7 @@ namespace RTC
}

public:
void Close()
{
if (this->protocol == Protocol::UDP)
{
this->udpSocket->Close();
}
else
{
this->tcpConnection->Close();
}
}

bool IsClosed()
{
if (this->protocol == Protocol::UDP)
{
return this->udpSocket->IsClosed();
}
else
{
return this->tcpConnection->IsClosed();
}
}
void CloseTcpConnection();

flatbuffers::Offset<FBS::Transport::Tuple> FillBuffer(flatbuffers::FlatBufferBuilder& builder) const;

Expand Down
4 changes: 3 additions & 1 deletion worker/include/handles/SignalHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ class SignalHandle
~SignalHandle();

public:
void Close();
void AddSignal(int signum, const std::string& name);

private:
void InternalClose();

/* Callbacks fired by UV events. */
public:
void OnUvSignal(int signum);
Expand Down
3 changes: 2 additions & 1 deletion worker/include/handles/TcpConnectionHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TcpConnectionHandle
virtual ~TcpConnectionHandle();

public:
void Close();
void TriggerClose();
bool IsClosed() const
{
return this->closed;
Expand Down Expand Up @@ -110,6 +110,7 @@ class TcpConnectionHandle
}

private:
void InternalClose();
bool SetPeerAddress();

/* Callbacks fired by UV events. */
Expand Down
2 changes: 1 addition & 1 deletion worker/include/handles/TcpServerHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class TcpServerHandle : public TcpConnectionHandle::Listener
~TcpServerHandle() override;

public:
void Close();
void Dump() const;
const struct sockaddr* GetLocalAddress() const
{
Expand Down Expand Up @@ -48,6 +47,7 @@ class TcpServerHandle : public TcpConnectionHandle::Listener
void AcceptTcpConnection(TcpConnectionHandle* connection);

private:
void InternalClose();
bool SetLocalAddress();

/* Pure virtual methods that must be implemented by the subclass. */
Expand Down
4 changes: 3 additions & 1 deletion worker/include/handles/TimerHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class TimerHandle
~TimerHandle();

public:
void Close();
void Start(uint64_t timeout, uint64_t repeat = 0);
void Stop();
void Reset();
Expand All @@ -41,6 +40,9 @@ class TimerHandle
return uv_is_active(reinterpret_cast<uv_handle_t*>(this->uvHandle)) != 0;
}

private:
void InternalClose();

/* Callbacks fired by UV events. */
public:
void OnUvTimer();
Expand Down
2 changes: 1 addition & 1 deletion worker/include/handles/UdpSocketHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class UdpSocketHandle
virtual ~UdpSocketHandle();

public:
void Close();
bool IsClosed() const
{
return this->closed;
Expand Down Expand Up @@ -80,6 +79,7 @@ class UdpSocketHandle
void SetRecvBufferSize(uint32_t size);

private:
void InternalClose();
bool SetLocalAddress();

/* Callbacks fired by UV events. */
Expand Down
4 changes: 3 additions & 1 deletion worker/include/handles/UnixStreamSocketHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class UnixStreamSocketHandle
virtual ~UnixStreamSocketHandle();

public:
void Close();
bool IsClosed() const
{
return this->closed;
Expand All @@ -51,6 +50,9 @@ class UnixStreamSocketHandle
uint32_t GetRecvBufferSize() const;
void SetRecvBufferSize(uint32_t size);

private:
void InternalClose();

/* Callbacks fired by UV events. */
public:
void OnUvReadAlloc(size_t suggestedSize, uv_buf_t* buf);
Expand Down
65 changes: 32 additions & 33 deletions worker/src/Channel/ChannelSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,38 +79,7 @@ namespace Channel

if (!this->closed)
{
Close();
}

delete this->consumerSocket;
delete this->producerSocket;
}

void ChannelSocket::Close()
{
MS_TRACE_STD();

if (this->closed)
{
return;
}

this->closed = true;

if (this->uvReadHandle)
{
uv_close(
reinterpret_cast<uv_handle_t*>(this->uvReadHandle), static_cast<uv_close_cb>(onCloseAsync));
}

if (this->consumerSocket)
{
this->consumerSocket->Close();
}

if (this->producerSocket)
{
this->producerSocket->Close();
InternalClose();
}
}

Expand Down Expand Up @@ -247,7 +216,37 @@ namespace Channel
return free != nullptr;
}

inline void ChannelSocket::SendImpl(const uint8_t* payload, uint32_t payloadLen)
void ChannelSocket::InternalClose()
{
MS_TRACE_STD();

if (this->closed)
{
return;
}

this->closed = true;

if (this->uvReadHandle)
{
uv_close(
reinterpret_cast<uv_handle_t*>(this->uvReadHandle), static_cast<uv_close_cb>(onCloseAsync));
}

if (this->consumerSocket)
{
delete this->consumerSocket;
this->consumerSocket = nullptr;
}

if (this->producerSocket)
{
delete this->producerSocket;
this->producerSocket = nullptr;
}
}

void ChannelSocket::SendImpl(const uint8_t* payload, uint32_t payloadLen)
{
MS_TRACE_STD();

Expand Down
9 changes: 9 additions & 0 deletions worker/src/RTC/IceServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ namespace RTC
{
MS_TRACE();

this->destroying = true;

// Here we must notify the listener about the removal of current
// usernameFragments (and also the old one if any) and all tuples.

Expand Down Expand Up @@ -223,6 +225,13 @@ namespace RTC
{
MS_TRACE();

// While IceServer is being destroyed, it may call listener methods that may
// end calling RemoveTuple(). We must ignore it to avoid double-free issues.
if (this->destroying)
{
return;
}

RTC::TransportTuple* removedTuple{ nullptr };

// Find the removed tuple.
Expand Down
16 changes: 16 additions & 0 deletions worker/src/RTC/TransportTuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace RTC

TransportTuple::Protocol TransportTuple::ProtocolFromFbs(FBS::Transport::Protocol protocol)
{
MS_TRACE();

switch (protocol)
{
case FBS::Transport::Protocol::UDP:
Expand All @@ -23,6 +25,8 @@ namespace RTC

FBS::Transport::Protocol TransportTuple::ProtocolToFbs(TransportTuple::Protocol protocol)
{
MS_TRACE();

switch (protocol)
{
case TransportTuple::Protocol::UDP:
Expand All @@ -35,6 +39,18 @@ namespace RTC

/* Instance methods. */

void TransportTuple::CloseTcpConnection()
{
MS_TRACE();

if (this->protocol == Protocol::UDP)
{
MS_ABORT("cannot delete a UDP socket");
}

this->tcpConnection->TriggerClose();
}

flatbuffers::Offset<FBS::Transport::Tuple> TransportTuple::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
Expand Down
4 changes: 2 additions & 2 deletions worker/src/RTC/WebRtcTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1180,9 +1180,9 @@ namespace RTC
}

// If this is a TCP tuple, close its underlaying TCP connection.
if (tuple->GetProtocol() == RTC::TransportTuple::Protocol::TCP && !tuple->IsClosed())
if (tuple->GetProtocol() == RTC::TransportTuple::Protocol::TCP)
{
tuple->Close();
tuple->CloseTcpConnection();
}
}

Expand Down
2 changes: 1 addition & 1 deletion worker/src/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void Worker::Close()
#endif

// Close the Channel.
this->channel->Close();
delete this->channel;
}

flatbuffers::Offset<FBS::Worker::DumpResponse> Worker::FillBuffer(
Expand Down
38 changes: 19 additions & 19 deletions worker/src/handles/SignalHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,7 @@ SignalHandle::~SignalHandle()

if (!this->closed)
{
Close();
}
}

void SignalHandle::Close()
{
MS_TRACE();

if (this->closed)
{
return;
}

this->closed = true;

for (auto* uvHandle : this->uvHandles)
{
uv_close(reinterpret_cast<uv_handle_t*>(uvHandle), static_cast<uv_close_cb>(onCloseSignal));
InternalClose();
}
}

Expand Down Expand Up @@ -86,7 +69,24 @@ void SignalHandle::AddSignal(int signum, const std::string& name)
this->uvHandles.push_back(uvHandle);
}

inline void SignalHandle::OnUvSignal(int signum)
void SignalHandle::InternalClose()
{
MS_TRACE();

if (this->closed)
{
return;
}

this->closed = true;

for (auto* uvHandle : this->uvHandles)
{
uv_close(reinterpret_cast<uv_handle_t*>(uvHandle), static_cast<uv_close_cb>(onCloseSignal));
}
}

void SignalHandle::OnUvSignal(int signum)
{
MS_TRACE();

Expand Down
Loading

0 comments on commit b78ddae

Please sign in to comment.