Skip to content

Commit

Permalink
Avoiding unnecessary peer connection renegotiation in consumer resume…
Browse files Browse the repository at this point in the history
…/pause methods (versatica#212)

Avoid calling handler methods if there is no need
  • Loading branch information
douglaseel authored Jul 22, 2022
1 parent 17dfa08 commit d510a4d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
14 changes: 14 additions & 0 deletions src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ export class Consumer extends EnhancedEventEmitter<ConsumerEvents>
return;
}

if (this._paused)
{
logger.debug('pause() | Consumer is already paused');

return;
}

this._paused = true;
this._track.enabled = false;

Expand All @@ -283,6 +290,13 @@ export class Consumer extends EnhancedEventEmitter<ConsumerEvents>
return;
}

if (!this._paused)
{
logger.debug('resume() | Consumer is already resumed');

return;
}

this._paused = false;
this._track.enabled = true;

Expand Down
50 changes: 40 additions & 10 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,13 @@ export class Transport extends EnhancedEventEmitter<TransportEvents>
this._awaitQueue.push(
async () =>
{
if (this._pendingConsumerTasks.length === 0)
{
logger.debug('_createPendingConsumers() | there is no Consumer to be created');

return;
}

const pendingConsumerTasks = [ ...this._pendingConsumerTasks ];

// Clear pending Consumer tasks.
Expand Down Expand Up @@ -923,12 +930,12 @@ export class Transport extends EnhancedEventEmitter<TransportEvents>
error);
}
}

this._consumerCreationInProgress = false;
},
'transport._createPendingConsumers()')
.then(() =>
{
this._consumerCreationInProgress = false;

// There are pending Consumer tasks, enqueue their creation.
if (this._pendingConsumerTasks.length > 0)
{
Expand All @@ -946,27 +953,35 @@ export class Transport extends EnhancedEventEmitter<TransportEvents>
this._awaitQueue.push(
async () =>
{
if (this._pendingPauseConsumers.size === 0)
{
logger.debug('_pausePendingConsumers() | there is no Consumer to be paused');

return;
}

const pendingPauseConsumers = Array.from(this._pendingPauseConsumers.values());

// Clear pending pause Consumer map.
this._pendingPauseConsumers.clear();

try
{
await this._handler.pauseReceiving(
pendingPauseConsumers.map((consumer) => consumer.localId)
);
const localIds = pendingPauseConsumers
.map((consumer) => consumer.localId);

await this._handler.pauseReceiving(localIds);
}
catch (error)
{
logger.error('_pausePendingConsumers() | failed to pause Consumers:', error);
}

this._consumerPauseInProgress = false;
},
'transport._pausePendingConsumers')
.then(() =>
{
this._consumerPauseInProgress = false;

// There are pending Consumers to be paused, do it.
if (this._pendingPauseConsumers.size > 0)
{
Expand All @@ -984,16 +999,24 @@ export class Transport extends EnhancedEventEmitter<TransportEvents>
this._awaitQueue.push(
async () =>
{
if (this._pendingResumeConsumers.size === 0)
{
logger.debug('_resumePendingConsumers() | there is no Consumer to be resumed');

return;
}

const pendingResumeConsumers = Array.from(this._pendingResumeConsumers.values());

// Clear pending resume Consumer map.
this._pendingResumeConsumers.clear();

try
{
await this._handler.resumeReceiving(
pendingResumeConsumers.map((consumer) => consumer.localId)
);
const localIds = pendingResumeConsumers
.map((consumer) => consumer.localId);

await this._handler.resumeReceiving(localIds);
}
catch (error)
{
Expand Down Expand Up @@ -1022,6 +1045,13 @@ export class Transport extends EnhancedEventEmitter<TransportEvents>
this._awaitQueue.push(
async () =>
{
if (this._pendingCloseConsumers.size === 0)
{
logger.debug('_closePendingConsumers() | there is no Consumer to be closed');

return;
}

const pendingCloseConsumers = Array.from(this._pendingCloseConsumers.values());

// Clear pending close Consumer map.
Expand Down

0 comments on commit d510a4d

Please sign in to comment.