Skip to content

Commit

Permalink
Fix memory leaks and optimize code (lynckia#1636)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Sep 24, 2020
1 parent e898774 commit 57965d8
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 48 deletions.
4 changes: 2 additions & 2 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ void WebRtcConnection::syncClose() {
ELOG_DEBUG("%s message: Close ended", toLog());
}

void WebRtcConnection::close() {
boost::future<void> WebRtcConnection::close() {
ELOG_DEBUG("%s message: Async close called", toLog());
std::shared_ptr<WebRtcConnection> shared_this = shared_from_this();
asyncTask([shared_this] (std::shared_ptr<WebRtcConnection> connection) {
return asyncTask([shared_this] (std::shared_ptr<WebRtcConnection> connection) {
shared_this->syncClose();
});
}
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class WebRtcConnection: public TransportListener, public LogContext, public Hand
* @return True if the candidates are gathered.
*/
bool init();
void close();
boost::future<void> close();
void syncClose();

boost::future<void> setRemoteSdpInfo(std::shared_ptr<SdpInfo> sdp, int received_session_version);
Expand Down
2 changes: 2 additions & 0 deletions erizo/src/erizo/dtls/DtlsClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ int createCert(const std::string& pAor, int expireDays, int keyLen, X509*& outCe
delete[] client_key_buffer;
delete[] server_key_buffer;
delete keys;
delete client_key;
delete server_key;

srtp_profile = mSocket->getSrtpProfile();

Expand Down
2 changes: 1 addition & 1 deletion erizoAPI/MediaStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ NAN_METHOD(MediaStream::close) {
obj->Ref();
obj->close().then(
[persistent, obj] (boost::future<void>) {
ELOG_DEBUG("%s, MediaStream Close is finishied, resolving promise", obj->toLog());
ELOG_DEBUG("%s, MediaStream Close is finished, resolving promise", obj->toLog());
obj->notifyFuture(persistent);
});
info.GetReturnValue().Set(resolver->GetPromise());
Expand Down
115 changes: 83 additions & 32 deletions erizoAPI/WebRtcConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,11 @@ WebRtcConnection::WebRtcConnection() : closed_{false}, id_{"undefined"} {

WebRtcConnection::~WebRtcConnection() {
close();
delete event_callback_;
ELOG_DEBUG("%s, message: Destroyed", toLog());
}

void WebRtcConnection::close() {
ELOG_DEBUG("%s, message: Trying to close", toLog());
if (closed_) {
ELOG_DEBUG("%s, message: Already closed", toLog());
return;
}
ELOG_DEBUG("%s, message: Closing", toLog());
if (me) {
me->setWebRtcConnectionEventListener(nullptr);
me->close();
me.reset();
}

boost::mutex::scoped_lock lock(mutex);

void WebRtcConnection::closeEvents() {
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(async_))) {
ELOG_DEBUG("%s, message: Closing handle", toLog());
uv_close(reinterpret_cast<uv_handle_t*>(async_), destroyWebRtcConnectionAsyncHandle);
Expand All @@ -98,8 +85,29 @@ void WebRtcConnection::close() {
}
async_ = nullptr;
future_async_ = nullptr;
ELOG_DEBUG("%s, message: Closed Events, pendingRefs: %d", toLog(), refs_);
}

boost::future<std::string> WebRtcConnection::close() {
auto close_promise = std::make_shared<boost::promise<std::string>>();
ELOG_DEBUG("%s, message: Trying to close", toLog());
if (closed_) {
ELOG_DEBUG("%s, message: Already closed", toLog());
close_promise->set_value("");
return close_promise->get_future();
}
closed_ = true;

ELOG_DEBUG("%s, message: Closing", toLog());
if (me) {
me->setWebRtcConnectionEventListener(nullptr);
me->close().then([this, close_promise] (boost::future<void>) {
close_promise->set_value(std::string("webrtcconnection_closed"));
me.reset();
});
}
ELOG_DEBUG("%s, message: Closed", toLog());
return close_promise->get_future();
}

std::string WebRtcConnection::toLog() {
Expand Down Expand Up @@ -252,6 +260,7 @@ NAN_METHOD(WebRtcConnection::New) {
rtp_mappings, ext_mappings, enable_connection_quality_check,
obj);
obj->Wrap(info.This());
obj->Ref();
info.GetReturnValue().Set(info.This());
} else {
// TODO(pedro) Check what happens here
Expand All @@ -260,7 +269,22 @@ NAN_METHOD(WebRtcConnection::New) {

NAN_METHOD(WebRtcConnection::close) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
obj->close();
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->close().then(
[persistent, obj] (boost::future<std::string> fut) {
ELOG_DEBUG("%s, message: WebRTCConnection Close is finished, resolving promise", obj->toLog());
ResultVariant result = fut.get();
obj->notifyFuture(persistent, result);
});
info.GetReturnValue().Set(resolver->GetPromise());
}

NAN_METHOD(WebRtcConnection::init) {
Expand All @@ -279,7 +303,10 @@ NAN_METHOD(WebRtcConnection::init) {
NAN_METHOD(WebRtcConnection::createOffer) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Expand All @@ -290,9 +317,8 @@ NAN_METHOD(WebRtcConnection::createOffer) {
bool audio_enabled = Nan::To<bool>(info[1]).FromJust();
bool bundle = Nan::To<bool>(info[2]).FromJust();

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

me->createOffer(video_enabled, audio_enabled, bundle).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand Down Expand Up @@ -331,8 +357,11 @@ NAN_METHOD(WebRtcConnection::setMetadata) {
NAN_METHOD(WebRtcConnection::setRemoteDescription) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
info.GetReturnValue().Set(Nan::New(false));
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Expand All @@ -341,10 +370,8 @@ NAN_METHOD(WebRtcConnection::setRemoteDescription) {
int received_session_version = Nan::To<int>(info[1]).FromJust();
auto sdp = std::make_shared<erizo::SdpInfo>(*param->me.get());

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);

obj->Ref();
me->setRemoteSdpInfo(sdp, received_session_version).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand All @@ -356,13 +383,16 @@ NAN_METHOD(WebRtcConnection::setRemoteDescription) {
NAN_METHOD(WebRtcConnection::getLocalDescription) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
ELOG_DEBUG("%s, message: getLocalDescription", obj->toLog());
me->getLocalSdpInfo().then(
[persistent, obj] (boost::future<std::shared_ptr<erizo::SdpInfo>> fut) {
std::shared_ptr<erizo::SdpInfo> sdp_info = fut.get();
Expand Down Expand Up @@ -474,16 +504,19 @@ NAN_METHOD(WebRtcConnection::getConnectionQualityLevel) {
NAN_METHOD(WebRtcConnection::addMediaStream) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

MediaStream* param = Nan::ObjectWrap::Unwrap<MediaStream>(Nan::To<v8::Object>(info[0]).ToLocalChecked());
auto ms = std::shared_ptr<erizo::MediaStream>(param->me);

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

me->addMediaStream(ms).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand All @@ -495,16 +528,19 @@ NAN_METHOD(WebRtcConnection::addMediaStream) {
NAN_METHOD(WebRtcConnection::removeMediaStream) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Nan::Utf8String param(Nan::To<v8::String>(info[0]).ToLocalChecked());
std::string stream_id = std::string(*param);

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

me->removeMediaStream(stream_id).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand Down Expand Up @@ -562,27 +598,36 @@ NAUV_WORK_CB(WebRtcConnection::eventsCallback) {
void WebRtcConnection::notifyFuture(Nan::Persistent<v8::Promise::Resolver> *persistent, ResultVariant result) {
boost::mutex::scoped_lock lock(mutex);
if (!future_async_) {
ELOG_DEBUG("%s, message: Future async does not exist anymore", toLog());
return;
}
ELOG_DEBUG("%s, message: Added future to async send", toLog());
ResultPair result_pair(persistent, result);
futures.push(result_pair);
future_async_->data = this;
Ref();
uv_async_send(future_async_);
}

NAUV_WORK_CB(WebRtcConnection::promiseResolver) {
Nan::HandleScope scope;
WebRtcConnection* obj = reinterpret_cast<WebRtcConnection*>(async->data);
if (!obj || !obj->me) {
if (!obj) {
ELOG_DEBUG("message: promiseResolver with null object");
return;
}
bool closed = false;
boost::mutex::scoped_lock lock(obj->mutex);
ELOG_DEBUG("%s, message: promiseResolver", obj->toLog());
ELOG_DEBUG("%s, message: promiseResolver, refs: %d", obj->toLog(), obj->futures.size());
while (!obj->futures.empty()) {
auto persistent = obj->futures.front().first;
v8::Local<v8::Promise::Resolver> resolver = Nan::New(*persistent);
ResultVariant r = obj->futures.front().second;
if (boost::get<std::string>(&r) != nullptr) {
std::string result = boost::get<std::string>(r);
if (result == "webrtcconnection_closed") {
closed = true;
}
resolver->Resolve(Nan::GetCurrentContext(), Nan::New(boost::get<std::string>(r).c_str()).ToLocalChecked())
.IsNothing();
} else if (boost::get<std::shared_ptr<erizo::SdpInfo>>(&r) != nullptr) {
Expand All @@ -598,8 +643,14 @@ NAUV_WORK_CB(WebRtcConnection::promiseResolver) {
persistent->Reset();
delete persistent;
obj->futures.pop();
obj->Unref();
v8::Isolate::GetCurrent()->RunMicrotasks();
obj->Unref();
}

ELOG_DEBUG("%s, message: promiseResolver finished, refs: %d, closed: %d", obj->toLog(),
obj->refs_, obj->closed_);
if (closed) {
obj->closeEvents();
obj->Unref();
}
ELOG_DEBUG("%s, message: promiseResolver finished", obj->toLog());
}
3 changes: 2 additions & 1 deletion erizoAPI/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class WebRtcConnection : public erizo::WebRtcConnectionEventListener,
~WebRtcConnection();

std::string toLog();
void close();
void closeEvents();
boost::future<std::string> close();

Nan::Callback *event_callback_;
uv_async_t *async_;
Expand Down
1 change: 1 addition & 0 deletions erizo_controller/erizoJS/erizoJSController.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
const closePromise = node.close(sendOffer);

return closePromise.then(() => {
log.debug(`message: Node Closed, clientId: ${node.clientId}, streamId: ${node.streamId}`);
const client = clients.get(clientId);
if (client === undefined) {
log.debug('message: trying to close node with no associated client,' +
Expand Down
18 changes: 13 additions & 5 deletions erizo_controller/erizoJS/models/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ class Connection extends events.EventEmitter {
}

getLocalSdp() {
if (!this.wrtc) {
return Promise.resolve();
}
return this.wrtc.getLocalDescription().then((desc) => {
if (!desc) {
if (!this.wrtc || !desc) {
log.error('Cannot get local description,',
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
return '';
Expand Down Expand Up @@ -314,12 +317,12 @@ class Connection extends events.EventEmitter {
this.mediaStreams.delete(id);
return Promise.all([removePromise, closePromise]).then(() => {
if (sendOffer) {
return this.sendOffer();
this.sendOffer();
}
return Promise.resolve();
});
}
log.error(`message: Trying to remove mediaStream not found, id: ${id},`,
log.error(`message: Trying to remove mediaStream not found, clientId: ${this.clientId}, streamId: ${id}`,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
return promise;
}
Expand Down Expand Up @@ -488,9 +491,14 @@ class Connection extends events.EventEmitter {
promises.push(mediaStream.close());
});
Promise.all(promises).then(() => {
this.wrtc.close();
log.debug(`message: Closing WRTC, id: ${this.id},`,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
this.wrtc.close().then(() => {
log.debug(`message: WRTC closed, id: ${this.id},`,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
delete this.wrtc;
});
this.mediaStreams.clear();
delete this.wrtc;
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion erizo_controller/erizoJS/models/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ class Publisher extends Source {
}

close() {
const removeMediaStreamPromise = this.connection.removeMediaStream(this.mediaStream.id);
const removeMediaStreamPromise = this.connection.removeMediaStream(this.mediaStream.id, false);
if (this.mediaStream.monitorInterval) {
clearInterval(this.mediaStream.monitorInterval);
}
Expand Down
3 changes: 2 additions & 1 deletion erizo_controller/erizoJS/models/Subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ class Subscriber extends NodeClass {
}

close(sendOffer = true) {
log.debug(`message: Closing subscriber, streamId:${this.streamId}, `,
log.debug(`message: Closing subscriber, clientId: ${this.clientId}, streamId: ${this.streamId}, `,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
this.publisher = undefined;
let promise = Promise.resolve();
if (this.connection) {
log.debug(`message: Removing Media Stream, clientId: ${this.clientId}, streamId: ${this.streamId}`);
promise = this.connection.removeMediaStream(this.mediaStream.id, sendOffer);
this.connection.removeListener('media_stream_event', this._mediaStreamListener);
}
Expand Down
3 changes: 2 additions & 1 deletion test/negotiation/utils/ClientStream.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
let currentClientStreamId = 0;
class ClientStream {
constructor(page) {
this.page = page;
this.id = parseInt(Math.random() * 10000);
this.id = currentClientStreamId++;
this.audio = true;
this.video = true;
this.data = true;
Expand Down
Loading

0 comments on commit 57965d8

Please sign in to comment.