Skip to content

Commit

Permalink
Fix or improve pause/resume event logic in Consumer and DataConsumer …
Browse files Browse the repository at this point in the history
…JS classes
  • Loading branch information
ibc committed Nov 20, 2023
1 parent 68905fb commit 4d01818
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 17 deletions.
14 changes: 5 additions & 9 deletions node/src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -619,12 +619,12 @@ export class Consumer<ConsumerAppData extends AppData = AppData>
this.#internal.consumerId
);

const wasPaused = this.#paused || this.#producerPaused;
const wasPaused = this.#paused;

this.#paused = true;

// Emit observer event.
if (!wasPaused)
if (!wasPaused && !this.#producerPaused)
{
this.#observer.safeEmit('pause');
}
Expand All @@ -644,7 +644,7 @@ export class Consumer<ConsumerAppData extends AppData = AppData>
this.#internal.consumerId
);

const wasPaused = this.#paused || this.#producerPaused;
const wasPaused = this.#paused;

this.#paused = false;

Expand Down Expand Up @@ -853,14 +853,12 @@ export class Consumer<ConsumerAppData extends AppData = AppData>
break;
}

const wasPaused = this.#paused || this.#producerPaused;

this.#producerPaused = true;

this.safeEmit('producerpause');

// Emit observer event.
if (!wasPaused)
if (!this.#paused)
{
this.#observer.safeEmit('pause');
}
Expand All @@ -875,14 +873,12 @@ export class Consumer<ConsumerAppData extends AppData = AppData>
break;
}

const wasPaused = this.#paused || this.#producerPaused;

this.#producerPaused = false;

this.safeEmit('producerresume');

// Emit observer event.
if (wasPaused && !this.#paused)
if (!this.#paused)
{
this.#observer.safeEmit('resume');
}
Expand Down
12 changes: 4 additions & 8 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#paused = true;

// Emit observer event.
if (!wasPaused)
if (!wasPaused && !this.#dataProducerPaused)
{
this.#observer.safeEmit('pause');
}
Expand All @@ -444,7 +444,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#paused = false;

// Emit observer event.
if (wasPaused)
if (wasPaused && !this.#dataProducerPaused)
{
this.#observer.safeEmit('resume');
}
Expand Down Expand Up @@ -620,14 +620,12 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
break;
}

const wasPaused = this.#paused || this.#dataProducerPaused;

this.#dataProducerPaused = true;

this.safeEmit('dataproducerpause');

// Emit observer event.
if (!wasPaused)
if (!this.#paused)
{
this.#observer.safeEmit('pause');
}
Expand All @@ -642,14 +640,12 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
break;
}

const wasPaused = this.#paused || this.#dataProducerPaused;

this.#dataProducerPaused = false;

this.safeEmit('dataproducerresume');

// Emit observer event.
if (wasPaused && !this.#paused)
if (!this.#paused)
{
this.#observer.safeEmit('resume');
}
Expand Down

0 comments on commit 4d01818

Please sign in to comment.