diff --git a/node/lib/Consumer.d.ts b/node/lib/Consumer.d.ts index 208fbb47e2..483263aaf9 100644 --- a/node/lib/Consumer.d.ts +++ b/node/lib/Consumer.d.ts @@ -267,6 +267,10 @@ export declare class Consumer extends EnhancedEventEmitter { * Enable 'trace' event. */ enableTraceEvent(types?: ConsumerTraceEventType[]): Promise; + /** + * Replaces the producer associated with this consumer. + */ + changeProducer(producerId: string): Promise; private handleWorkerNotifications; } //# sourceMappingURL=Consumer.d.ts.map \ No newline at end of file diff --git a/node/lib/Consumer.d.ts.map b/node/lib/Consumer.d.ts.map index 417515e5d9..19d26363cc 100644 --- a/node/lib/Consumer.d.ts.map +++ b/node/lib/Consumer.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"Consumer.d.ts","sourceRoot":"","sources":["../src/Consumer.ts"],"names":[],"mappings":"AACA,OAAO,EAAE,oBAAoB,EAAE,MAAM,wBAAwB,CAAC;AAC9D,OAAO,EAAE,OAAO,EAAE,MAAM,WAAW,CAAC;AACpC,OAAO,EAAE,cAAc,EAAE,MAAM,kBAAkB,CAAC;AAClD,OAAO,EAAE,YAAY,EAAE,MAAM,YAAY,CAAC;AAC1C,OAAO,EACN,SAAS,EACT,eAAe,EACf,aAAa,EACb,MAAM,iBAAiB,CAAC;AAEzB,oBAAY,eAAe,GAC3B;IACC;;OAEG;IACH,UAAU,EAAE,MAAM,CAAC;IAEnB;;OAEG;IACH,eAAe,EAAE,eAAe,CAAC;IAEjC;;;;;;;;;;;;;OAaG;IACH,MAAM,CAAC,EAAE,OAAO,CAAC;IAEjB;;;OAGG;IACH,GAAG,CAAC,EAAE,MAAM,CAAC;IAEb;;;OAGG;IACH,eAAe,CAAC,EAAE,cAAc,CAAC;IAEjC;;;OAGG;IACH,IAAI,CAAC,EAAE,OAAO,CAAC;IAEf;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,CAAC;CACd,CAAA;AAED;;GAEG;AACH,oBAAY,sBAAsB,GAAG,KAAK,GAAG,UAAU,GAAG,MAAM,GAAG,KAAK,GAAG,KAAK,CAAC;AAEjF;;GAEG;AACH,oBAAY,sBAAsB,GAClC;IACC;;OAEG;IACH,IAAI,EAAE,sBAAsB,CAAC;IAE7B;;OAEG;IACH,SAAS,EAAE,MAAM,CAAC;IAElB;;OAEG;IACH,SAAS,EAAE,IAAI,GAAG,KAAK,CAAC;IAExB;;OAEG;IACH,IAAI,EAAE,GAAG,CAAC;CACV,CAAA;AAED,oBAAY,aAAa,GACzB;IACC;;OAEG;IACH,KAAK,EAAE,MAAM,CAAC;IAEd;;OAEG;IACH,aAAa,EAAE,MAAM,CAAC;IAEtB;;;OAGG;IACH,cAAc,EAAE,MAAM,EAAE,CAAC;CACzB,CAAA;AAED,oBAAY,cAAc,GAC1B;IACC;;OAEG;IACH,YAAY,EAAE,MAAM,CAAC;IAErB;;OAEG;IACH,aAAa,CAAC,EAAE,MAAM,CAAC;CACvB,CAAA;AAED,oBAAY,YAAY,GACxB;IAEC,IAAI,EAAE,MAAM,CAAC;IACb,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,MAAM,CAAC;IACb,OAAO,CAAC,EAAE,MAAM,CAAC;IACjB,IAAI,EAAE,MAAM,CAAC;IACb,QAAQ,EAAE,MAAM,CAAC;IACjB,WAAW,EAAE,MAAM,CAAC;IACpB,YAAY,EAAE,MAAM,CAAC;IACrB,gBAAgB,EAAE,MAAM,CAAC;IACzB,oBAAoB,EAAE,MAAM,CAAC;IAC7B,eAAe,EAAE,MAAM,CAAC;IACxB,SAAS,EAAE,MAAM,CAAC;IAClB,eAAe,EAAE,MAAM,CAAC;IACxB,QAAQ,EAAE,MAAM,CAAC;IACjB,QAAQ,EAAE,MAAM,CAAC;IACjB,KAAK,EAAE,MAAM,CAAC;IACd,WAAW,EAAE,MAAM,CAAC;IACpB,SAAS,EAAE,MAAM,CAAC;IAClB,OAAO,EAAE,MAAM,CAAC;IAChB,aAAa,CAAC,EAAE,MAAM,CAAC;CACvB,CAAA;AAED;;GAEG;AACH,oBAAY,YAAY,GAAG,QAAQ,GAAG,WAAW,GAAG,KAAK,GAAG,MAAM,CAAC;AAInE,qBAAa,QAAS,SAAQ,oBAAoB;;IAoDjD;;;;;;;;;;;;OAYG;gBAEF,EACC,QAAQ,EACR,IAAI,EACJ,OAAO,EACP,cAAc,EACd,OAAO,EACP,MAAM,EACN,cAAc,EACd,KAA4D,EAC5D,eAAe,EACf,EACD;QACC,QAAQ,EAAE,GAAG,CAAC;QACd,IAAI,EAAE,GAAG,CAAC;QACV,OAAO,EAAE,OAAO,CAAC;QACjB,cAAc,EAAE,cAAc,CAAC;QAC/B,OAAO,CAAC,EAAE,GAAG,CAAC;QACd,MAAM,EAAE,OAAO,CAAC;QAChB,cAAc,EAAE,OAAO,CAAC;QACxB,KAAK,CAAC,EAAE,aAAa,CAAC;QACtB,eAAe,CAAC,EAAE,cAAc,CAAC;KACjC;IAmBF;;OAEG;IACH,IAAI,EAAE,IAAI,MAAM,CAGf;IAED;;OAEG;IACH,IAAI,UAAU,IAAI,MAAM,CAGvB;IAED;;OAEG;IACH,IAAI,MAAM,IAAI,OAAO,CAGpB;IAED;;OAEG;IACH,IAAI,IAAI,IAAI,SAAS,CAGpB;IAED;;OAEG;IACH,IAAI,aAAa,IAAI,aAAa,CAGjC;IAED;;OAEG;IACH,IAAI,IAAI,IAAI,YAAY,CAGvB;IAED;;OAEG;IACH,IAAI,MAAM,IAAI,OAAO,CAGpB;IAED;;OAEG;IACH,IAAI,cAAc,IAAI,OAAO,CAG5B;IAED;;OAEG;IACH,IAAI,QAAQ,IAAI,MAAM,CAGrB;IAED;;OAEG;IACH,IAAI,KAAK,IAAI,aAAa,CAGzB;IAED;;OAEG;IACH,IAAI,eAAe,IAAI,cAAc,GAAG,SAAS,CAGhD;IAED;;OAEG;IACH,IAAI,aAAa,IAAI,cAAc,GAAG,SAAS,CAG9C;IAED;;OAEG;IACH,IAAI,OAAO,IAAI,GAAG,CAGjB;IAED;;OAEG;IACH,IAAI,OAAO,CAAC,OAAO,EARJ,GAQI,EAGlB;IAED;;;;;;;;;OASG;IACH,IAAI,QAAQ,IAAI,oBAAoB,CAGnC;IAED;;;OAGG;IACH,IAAI,iBAAiB,IAAI,OAAO,CAG/B;IAED;;OAEG;IACH,KAAK,IAAI,IAAI;IAsBb;;;;OAIG;IACH,eAAe,IAAI,IAAI;IAmBvB;;OAEG;IACG,IAAI,IAAI,OAAO,CAAC,GAAG,CAAC;IAO1B;;OAEG;IACG,QAAQ,IAAI,OAAO,CAAC,KAAK,CAAC,YAAY,GAAG,YAAY,CAAC,CAAC;IAO7D;;OAEG;IACG,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;IAe5B;;OAEG;IACG,MAAM,IAAI,OAAO,CAAC,IAAI,CAAC;IAe7B;;OAEG;IACG,kBAAkB,CACvB,EACC,YAAY,EACZ,aAAa,EACb,EAAE,cAAc,GACf,OAAO,CAAC,IAAI,CAAC;IAYhB;;OAEG;IACG,WAAW,CAAC,QAAQ,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC;IAYlD;;OAEG;IACG,aAAa,IAAI,OAAO,CAAC,IAAI,CAAC;IAYpC;;OAEG;IACG,eAAe,IAAI,OAAO,CAAC,IAAI,CAAC;IAOtC;;OAEG;IACG,gBAAgB,CAAC,KAAK,GAAE,sBAAsB,EAAO,GAAG,OAAO,CAAC,IAAI,CAAC;IAU3E,OAAO,CAAC,yBAAyB;CAsIjC"} \ No newline at end of file +{"version":3,"file":"Consumer.d.ts","sourceRoot":"","sources":["../src/Consumer.ts"],"names":[],"mappings":"AACA,OAAO,EAAE,oBAAoB,EAAE,MAAM,wBAAwB,CAAC;AAC9D,OAAO,EAAE,OAAO,EAAE,MAAM,WAAW,CAAC;AACpC,OAAO,EAAE,cAAc,EAAE,MAAM,kBAAkB,CAAC;AAClD,OAAO,EAAE,YAAY,EAAE,MAAM,YAAY,CAAC;AAC1C,OAAO,EACN,SAAS,EACT,eAAe,EACf,aAAa,EACb,MAAM,iBAAiB,CAAC;AAEzB,oBAAY,eAAe,GAC3B;IACC;;OAEG;IACH,UAAU,EAAE,MAAM,CAAC;IAEnB;;OAEG;IACH,eAAe,EAAE,eAAe,CAAC;IAEjC;;;;;;;;;;;;;OAaG;IACH,MAAM,CAAC,EAAE,OAAO,CAAC;IAEjB;;;OAGG;IACH,GAAG,CAAC,EAAE,MAAM,CAAC;IAEb;;;OAGG;IACH,eAAe,CAAC,EAAE,cAAc,CAAC;IAEjC;;;OAGG;IACH,IAAI,CAAC,EAAE,OAAO,CAAC;IAEf;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,CAAC;CACd,CAAA;AAED;;GAEG;AACH,oBAAY,sBAAsB,GAAG,KAAK,GAAG,UAAU,GAAG,MAAM,GAAG,KAAK,GAAG,KAAK,CAAC;AAEjF;;GAEG;AACH,oBAAY,sBAAsB,GAClC;IACC;;OAEG;IACH,IAAI,EAAE,sBAAsB,CAAC;IAE7B;;OAEG;IACH,SAAS,EAAE,MAAM,CAAC;IAElB;;OAEG;IACH,SAAS,EAAE,IAAI,GAAG,KAAK,CAAC;IAExB;;OAEG;IACH,IAAI,EAAE,GAAG,CAAC;CACV,CAAA;AAED,oBAAY,aAAa,GACzB;IACC;;OAEG;IACH,KAAK,EAAE,MAAM,CAAC;IAEd;;OAEG;IACH,aAAa,EAAE,MAAM,CAAC;IAEtB;;;OAGG;IACH,cAAc,EAAE,MAAM,EAAE,CAAC;CACzB,CAAA;AAED,oBAAY,cAAc,GAC1B;IACC;;OAEG;IACH,YAAY,EAAE,MAAM,CAAC;IAErB;;OAEG;IACH,aAAa,CAAC,EAAE,MAAM,CAAC;CACvB,CAAA;AAED,oBAAY,YAAY,GACxB;IAEC,IAAI,EAAE,MAAM,CAAC;IACb,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,MAAM,CAAC;IACb,OAAO,CAAC,EAAE,MAAM,CAAC;IACjB,IAAI,EAAE,MAAM,CAAC;IACb,QAAQ,EAAE,MAAM,CAAC;IACjB,WAAW,EAAE,MAAM,CAAC;IACpB,YAAY,EAAE,MAAM,CAAC;IACrB,gBAAgB,EAAE,MAAM,CAAC;IACzB,oBAAoB,EAAE,MAAM,CAAC;IAC7B,eAAe,EAAE,MAAM,CAAC;IACxB,SAAS,EAAE,MAAM,CAAC;IAClB,eAAe,EAAE,MAAM,CAAC;IACxB,QAAQ,EAAE,MAAM,CAAC;IACjB,QAAQ,EAAE,MAAM,CAAC;IACjB,KAAK,EAAE,MAAM,CAAC;IACd,WAAW,EAAE,MAAM,CAAC;IACpB,SAAS,EAAE,MAAM,CAAC;IAClB,OAAO,EAAE,MAAM,CAAC;IAChB,aAAa,CAAC,EAAE,MAAM,CAAC;CACvB,CAAA;AAED;;GAEG;AACH,oBAAY,YAAY,GAAG,QAAQ,GAAG,WAAW,GAAG,KAAK,GAAG,MAAM,CAAC;AAInE,qBAAa,QAAS,SAAQ,oBAAoB;;IAoDjD;;;;;;;;;;;;OAYG;gBAEF,EACC,QAAQ,EACR,IAAI,EACJ,OAAO,EACP,cAAc,EACd,OAAO,EACP,MAAM,EACN,cAAc,EACd,KAA4D,EAC5D,eAAe,EACf,EACD;QACC,QAAQ,EAAE,GAAG,CAAC;QACd,IAAI,EAAE,GAAG,CAAC;QACV,OAAO,EAAE,OAAO,CAAC;QACjB,cAAc,EAAE,cAAc,CAAC;QAC/B,OAAO,CAAC,EAAE,GAAG,CAAC;QACd,MAAM,EAAE,OAAO,CAAC;QAChB,cAAc,EAAE,OAAO,CAAC;QACxB,KAAK,CAAC,EAAE,aAAa,CAAC;QACtB,eAAe,CAAC,EAAE,cAAc,CAAC;KACjC;IAmBF;;OAEG;IACH,IAAI,EAAE,IAAI,MAAM,CAGf;IAED;;OAEG;IACH,IAAI,UAAU,IAAI,MAAM,CAGvB;IAED;;OAEG;IACH,IAAI,MAAM,IAAI,OAAO,CAGpB;IAED;;OAEG;IACH,IAAI,IAAI,IAAI,SAAS,CAGpB;IAED;;OAEG;IACH,IAAI,aAAa,IAAI,aAAa,CAGjC;IAED;;OAEG;IACH,IAAI,IAAI,IAAI,YAAY,CAGvB;IAED;;OAEG;IACH,IAAI,MAAM,IAAI,OAAO,CAGpB;IAED;;OAEG;IACH,IAAI,cAAc,IAAI,OAAO,CAG5B;IAED;;OAEG;IACH,IAAI,QAAQ,IAAI,MAAM,CAGrB;IAED;;OAEG;IACH,IAAI,KAAK,IAAI,aAAa,CAGzB;IAED;;OAEG;IACH,IAAI,eAAe,IAAI,cAAc,GAAG,SAAS,CAGhD;IAED;;OAEG;IACH,IAAI,aAAa,IAAI,cAAc,GAAG,SAAS,CAG9C;IAED;;OAEG;IACH,IAAI,OAAO,IAAI,GAAG,CAGjB;IAED;;OAEG;IACH,IAAI,OAAO,CAAC,OAAO,EARJ,GAQI,EAGlB;IAED;;;;;;;;;OASG;IACH,IAAI,QAAQ,IAAI,oBAAoB,CAGnC;IAED;;;OAGG;IACH,IAAI,iBAAiB,IAAI,OAAO,CAG/B;IAED;;OAEG;IACH,KAAK,IAAI,IAAI;IAsBb;;;;OAIG;IACH,eAAe,IAAI,IAAI;IAmBvB;;OAEG;IACG,IAAI,IAAI,OAAO,CAAC,GAAG,CAAC;IAO1B;;OAEG;IACG,QAAQ,IAAI,OAAO,CAAC,KAAK,CAAC,YAAY,GAAG,YAAY,CAAC,CAAC;IAO7D;;OAEG;IACG,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;IAe5B;;OAEG;IACG,MAAM,IAAI,OAAO,CAAC,IAAI,CAAC;IAe7B;;OAEG;IACG,kBAAkB,CACvB,EACC,YAAY,EACZ,aAAa,EACb,EAAE,cAAc,GACf,OAAO,CAAC,IAAI,CAAC;IAYhB;;OAEG;IACG,WAAW,CAAC,QAAQ,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC;IAYlD;;OAEG;IACG,aAAa,IAAI,OAAO,CAAC,IAAI,CAAC;IAYpC;;OAEG;IACG,eAAe,IAAI,OAAO,CAAC,IAAI,CAAC;IAOtC;;OAEG;IACG,gBAAgB,CAAC,KAAK,GAAE,sBAAsB,EAAO,GAAG,OAAO,CAAC,IAAI,CAAC;IAU3E;;OAEG;IACG,cAAc,CAAC,UAAU,EAAE,MAAM,GAAG,OAAO,CAAC,GAAG,CAAC;IAYtD,OAAO,CAAC,yBAAyB;CAsIjC"} \ No newline at end of file diff --git a/node/lib/Consumer.js b/node/lib/Consumer.js index 01db4f39a8..b49b004012 100644 --- a/node/lib/Consumer.js +++ b/node/lib/Consumer.js @@ -276,6 +276,15 @@ class Consumer extends EnhancedEventEmitter_1.EnhancedEventEmitter { const reqData = { types }; await this.#channel.request('consumer.enableTraceEvent', this.#internal, reqData); } + /** + * Replaces the producer associated with this consumer. + */ + async changeProducer(producerId) { + logger.debug('changeProducer()'); + const data = await this.#channel.request('consumer.changeProducer', this.#internal, { producerId }); + this.#internal.producerId = producerId; + return data; + } handleWorkerNotifications() { this.#channel.on(this.#internal.consumerId, (event, data) => { switch (event) { diff --git a/node/src/Consumer.ts b/node/src/Consumer.ts index bae9001b90..7ebc710393 100644 --- a/node/src/Consumer.ts +++ b/node/src/Consumer.ts @@ -576,6 +576,21 @@ export class Consumer extends EnhancedEventEmitter 'consumer.enableTraceEvent', this.#internal, reqData); } + /** + * Replaces the producer associated with this consumer. + */ + async changeProducer(producerId: string): Promise + { + logger.debug('changeProducer()'); + + const data = + await this.#channel.request('consumer.changeProducer', this.#internal, { producerId }); + + this.#internal.producerId = producerId; + + return data; + } + private handleWorkerNotifications(): void { this.#channel.on(this.#internal.consumerId, (event: string, data?: any) => diff --git a/worker/include/Channel/ChannelRequest.hpp b/worker/include/Channel/ChannelRequest.hpp index 6e421418e6..e5cb47e562 100644 --- a/worker/include/Channel/ChannelRequest.hpp +++ b/worker/include/Channel/ChannelRequest.hpp @@ -59,6 +59,7 @@ namespace Channel CONSUMER_SET_PRIORITY, CONSUMER_REQUEST_KEY_FRAME, CONSUMER_ENABLE_TRACE_EVENT, + CONSUMER_CHANGE_PRODUCER, DATA_PRODUCER_CLOSE, DATA_PRODUCER_DUMP, DATA_PRODUCER_GET_STATS, diff --git a/worker/include/RTC/Consumer.hpp b/worker/include/RTC/Consumer.hpp index 5ebc0daadc..db69fa4e1b 100644 --- a/worker/include/RTC/Consumer.hpp +++ b/worker/include/RTC/Consumer.hpp @@ -34,10 +34,11 @@ namespace RTC public: virtual void OnConsumerSendRtpPacket(RTC::Consumer* consumer, RTC::RtpPacket* packet) = 0; virtual void OnConsumerRetransmitRtpPacket(RTC::Consumer* consumer, RTC::RtpPacket* packet) = 0; - virtual void OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc) = 0; - virtual void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) = 0; - virtual void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) = 0; - virtual void OnConsumerProducerClosed(RTC::Consumer* consumer) = 0; + virtual void OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc) = 0; + virtual void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) = 0; + virtual void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) = 0; + virtual void OnConsumerProducerClosed(RTC::Consumer* consumer) = 0; + virtual void OnConsumerChangeProducer(RTC::Consumer* consumer, std::string& producerId) = 0; }; public: @@ -117,6 +118,8 @@ namespace RTC } void TransportConnected(); void TransportDisconnected(); + void Pause(); + void Resume(); bool IsPaused() const { return this->paused; @@ -170,7 +173,7 @@ namespace RTC public: // Passed by argument. const std::string id; - const std::string producerId; + std::string producerId; protected: // Passed by argument. diff --git a/worker/include/RTC/Router.hpp b/worker/include/RTC/Router.hpp index abaebeaf32..da1b96959e 100644 --- a/worker/include/RTC/Router.hpp +++ b/worker/include/RTC/Router.hpp @@ -69,6 +69,8 @@ namespace RTC uint8_t& worstRemoteFractionLost) override; void OnTransportNewConsumer( RTC::Transport* transport, RTC::Consumer* consumer, std::string& producerId) override; + void OnTransportConsumerChangeProducer( + RTC::Transport* transport, RTC::Consumer* consumer, std::string& producerId) override; void OnTransportConsumerClosed(RTC::Transport* transport, RTC::Consumer* consumer) override; void OnTransportConsumerProducerClosed(RTC::Transport* transport, RTC::Consumer* consumer) override; void OnTransportConsumerKeyFrameRequested( diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index ecfaf73424..4bbb79e571 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -104,6 +104,8 @@ namespace RTC RTC::Transport* transport, RTC::DataConsumer* dataConsumer) = 0; virtual void OnTransportDataConsumerDataProducerClosed( RTC::Transport* transport, RTC::DataConsumer* dataConsumer) = 0; + virtual void OnTransportConsumerChangeProducer( + RTC::Transport* transport, RTC::Consumer* consumer, std::string& producerId) = 0; }; private: @@ -199,6 +201,7 @@ namespace RTC void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) override; void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) override; void OnConsumerProducerClosed(RTC::Consumer* consumer) override; + void OnConsumerChangeProducer(RTC::Consumer* consumer, std::string& producerId) override; /* Pure virtual methods inherited from RTC::DataProducer::Listener. */ public: diff --git a/worker/src/Channel/ChannelRequest.cpp b/worker/src/Channel/ChannelRequest.cpp index 1e40aab5d2..237ee16eca 100644 --- a/worker/src/Channel/ChannelRequest.cpp +++ b/worker/src/Channel/ChannelRequest.cpp @@ -53,6 +53,7 @@ namespace Channel { "consumer.setPriority", ChannelRequest::MethodId::CONSUMER_SET_PRIORITY }, { "consumer.requestKeyFrame", ChannelRequest::MethodId::CONSUMER_REQUEST_KEY_FRAME }, { "consumer.enableTraceEvent", ChannelRequest::MethodId::CONSUMER_ENABLE_TRACE_EVENT }, + { "consumer.changeProducer", ChannelRequest::MethodId::CONSUMER_CHANGE_PRODUCER }, { "dataProducer.close", ChannelRequest::MethodId::DATA_PRODUCER_CLOSE }, { "dataProducer.dump", ChannelRequest::MethodId::DATA_PRODUCER_DUMP }, { "dataProducer.getStats", ChannelRequest::MethodId::DATA_PRODUCER_GET_STATS }, diff --git a/worker/src/RTC/Consumer.cpp b/worker/src/RTC/Consumer.cpp index 52b3f2257d..7c6d80eb99 100644 --- a/worker/src/RTC/Consumer.cpp +++ b/worker/src/RTC/Consumer.cpp @@ -267,14 +267,7 @@ namespace RTC return; } - bool wasActive = IsActive(); - - this->paused = true; - - MS_DEBUG_DEV("Consumer paused [consumerId:%s]", this->id.c_str()); - - if (wasActive) - UserOnPaused(); + this->Pause(); request->Accept(); @@ -290,12 +283,7 @@ namespace RTC return; } - this->paused = false; - - MS_DEBUG_DEV("Consumer resumed [consumerId:%s]", this->id.c_str()); - - if (IsActive()) - UserOnResumed(); + this->Resume(); request->Accept(); @@ -362,6 +350,38 @@ namespace RTC break; } + case Channel::ChannelRequest::MethodId::CONSUMER_CHANGE_PRODUCER: + { + auto jsonProducerIdIt = request->data.find("producerId"); + + if (jsonProducerIdIt == request->data.end() || !jsonProducerIdIt->is_string()) + MS_THROW_ERROR("missing data.producerId"); + + std::string producerId = jsonProducerIdIt->get(); + + // Notify the listener. + // This may throw if no Producer is found. + try + { + this->listener->OnConsumerChangeProducer(this, producerId); + } + catch (const MediaSoupError& error) + { + throw; + } + + MS_DEBUG_DEV( + "Consumer Producer changed [consumerId:%s producerId:%s]", + this->id.c_str(), + producerId.c_str()); + + json data = json::object(); + + request->Accept(); + + break; + } + default: { MS_THROW_ERROR("unknown method '%s'", request->method.c_str()); @@ -369,6 +389,42 @@ namespace RTC } } + void Consumer::Pause() + { + MS_TRACE(); + + if (this->paused) + { + return; + } + + bool wasActive = IsActive(); + + this->paused = true; + + MS_DEBUG_DEV("Consumer paused [consumerId:%s]", this->id.c_str()); + + if (wasActive) + UserOnPaused(); + } + + void Consumer::Resume() + { + MS_TRACE(); + + if (!this->paused) + { + return; + } + + this->paused = false; + + MS_DEBUG_DEV("Consumer resumed [consumerId:%s]", this->id.c_str()); + + if (IsActive()) + UserOnResumed(); + } + void Consumer::TransportConnected() { MS_TRACE(); diff --git a/worker/src/RTC/Router.cpp b/worker/src/RTC/Router.cpp index 17eac9e9e0..d1ad62adb0 100644 --- a/worker/src/RTC/Router.cpp +++ b/worker/src/RTC/Router.cpp @@ -778,6 +778,96 @@ namespace RTC consumer->ProducerRtpStreamScores(producer->GetRtpStreamScores()); } + inline void Router::OnTransportConsumerChangeProducer( + RTC::Transport* /*transport*/, RTC::Consumer* consumer, std::string& producerId) + { + MS_TRACE(); + + MS_ASSERT( + consumer->GetType() == RTC::RtpParameters::Type::SIMPLE, + "Invalid consumer type [consumerId:%s type:%s]", + consumer->id.c_str(), + RTC::RtpParameters::GetTypeString(consumer->GetType()).c_str()) + + consumer->Pause(); + + // Remove current producer. + auto mapCurrentProducersIt = this->mapProducers.find(consumer->producerId); + if (mapCurrentProducersIt == this->mapProducers.end()) + MS_THROW_ERROR("Current Producer not found [producerId:%s]", consumer->producerId.c_str()); + + auto* currentProducer = mapCurrentProducersIt->second; + auto mapCurrentProducerConsumersIt = this->mapProducerConsumers.find(currentProducer); + + MS_ASSERT( + mapCurrentProducerConsumersIt != this->mapProducerConsumers.end(), + "Current Producer not present in mapProducerConsumers [producerId: %s]", + consumer->producerId.c_str()); + + // Remove the Consumer from the current consumers map. + auto& currentConsumers = mapCurrentProducerConsumersIt->second; + auto currentConsumersConsumerIt = currentConsumers.find(consumer); + + if (currentConsumersConsumerIt != currentConsumers.end()) + { + currentConsumers.erase(currentConsumersConsumerIt); + } + else + { + MS_WARN_TAG( + rtp, "Consumer not present in current consumers list [consumerId:%s]", consumer->id.c_str()); + } + + // Add new producer + auto mapProducersIt = this->mapProducers.find(producerId); + if (mapProducersIt == this->mapProducers.end()) + MS_THROW_ERROR("Producer not found [producerId:%s]", producerId.c_str()); + + auto* producer = mapProducersIt->second; + auto mapProducerConsumersIt = this->mapProducerConsumers.find(producer); + + MS_ASSERT( + mapProducerConsumersIt != this->mapProducerConsumers.end(), + "Producer not present in mapProducerConsumers [producerId:%s]", + producerId.c_str()); + + consumer->producerId = producerId; + + // Update the Consumer status based on the Producer status. + if (producer->IsPaused()) + consumer->ProducerPaused(); + + // Insert the Consumer in the maps. + auto& consumers = mapProducerConsumersIt->second; + + auto consumersConsumerIt = consumers.find(consumer); + if (consumersConsumerIt == consumers.end()) + { + consumers.insert(consumer); + } + else + { + MS_WARN_TAG( + rtp, "consumer already present in consumers list [consumerId:%s]", consumer->id.c_str()); + } + + this->mapConsumerProducer[consumer] = producer; + + // Get all streams in the Producer and provide the Consumer with them. + for (const auto& kv : producer->GetRtpStreams()) + { + auto* rtpStream = kv.first; + uint32_t mappedSsrc = kv.second; + + consumer->ProducerRtpStream(rtpStream, mappedSsrc); + } + + // Provide the Consumer with the scores of all streams in the Producer. + consumer->ProducerRtpStreamScores(producer->GetRtpStreamScores()); + + consumer->Resume(); + } + inline void Router::OnTransportConsumerClosed(RTC::Transport* /*transport*/, RTC::Consumer* consumer) { MS_TRACE(); diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index b6cf997408..81e14a0a31 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -1385,6 +1385,7 @@ namespace RTC case Channel::ChannelRequest::MethodId::CONSUMER_SET_PRIORITY: case Channel::ChannelRequest::MethodId::CONSUMER_REQUEST_KEY_FRAME: case Channel::ChannelRequest::MethodId::CONSUMER_ENABLE_TRACE_EVENT: + case Channel::ChannelRequest::MethodId::CONSUMER_CHANGE_PRODUCER: { // This may throw. RTC::Consumer* consumer = GetConsumerFromInternal(request->internal); @@ -2739,6 +2740,13 @@ namespace RTC ComputeOutgoingDesiredBitrate(/*forceBitrate*/ true); } + inline void Transport::OnConsumerChangeProducer(RTC::Consumer* consumer, std::string& producerId) + { + MS_TRACE(); + + this->listener->OnTransportConsumerChangeProducer(this, consumer, producerId); + } + inline void Transport::OnConsumerProducerClosed(RTC::Consumer* consumer) { MS_TRACE();