Skip to content

Commit

Permalink
Node: add enhancedOnce() typed function (#1375)
Browse files Browse the repository at this point in the history
### Details

- Same as [events.once(emmiter, eventName)](https://nodejs.org/api/events.html#eventsonceemitter-name-options) but with TypeScript typed events.
- Rename `EnhancedEventEmitter.ts` file to `enhancedEvents.ts` for consistency with the Node `events` module.
  • Loading branch information
ibc authored Apr 10, 2024
1 parent 8be8493 commit e516ebc
Show file tree
Hide file tree
Showing 29 changed files with 199 additions and 159 deletions.
2 changes: 1 addition & 1 deletion node/src/ActiveSpeakerObserver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import {
RtpObserver,
RtpObserverEvents,
Expand Down
2 changes: 1 addition & 1 deletion node/src/AudioLevelObserver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import {
RtpObserver,
RtpObserverEvents,
Expand Down
2 changes: 1 addition & 1 deletion node/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Duplex } from 'node:stream';
import { info, warn } from 'node:console';
import * as flatbuffers from 'flatbuffers';
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { InvalidStateError } from './errors';
import { Body as RequestBody, Method, Request } from './fbs/request';
import { Response } from './fbs/response';
Expand Down
2 changes: 1 addition & 1 deletion node/src/Consumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { Channel } from './Channel';
import { TransportInternal } from './Transport';
import { ProducerStat } from './Producer';
Expand Down
2 changes: 1 addition & 1 deletion node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { Channel } from './Channel';
import { TransportInternal } from './Transport';
import {
Expand Down
2 changes: 1 addition & 1 deletion node/src/DataProducer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { Channel } from './Channel';
import { TransportInternal } from './Transport';
import {
Expand Down
2 changes: 1 addition & 1 deletion node/src/Producer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { Channel } from './Channel';
import { TransportInternal } from './Transport';
import { MediaKind, RtpParameters, parseRtpParameters } from './RtpParameters';
Expand Down
2 changes: 1 addition & 1 deletion node/src/Router.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import * as ortc from './ortc';
import { InvalidStateError } from './errors';
import { Channel } from './Channel';
Expand Down
2 changes: 1 addition & 1 deletion node/src/RtpObserver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { Channel } from './Channel';
import { RouterInternal } from './Router';
import { Producer } from './Producer';
Expand Down
2 changes: 1 addition & 1 deletion node/src/Transport.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as flatbuffers from 'flatbuffers';
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import * as ortc from './ortc';
import { Channel } from './Channel';
import { RouterInternal } from './Router';
Expand Down
2 changes: 1 addition & 1 deletion node/src/WebRtcServer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { Channel } from './Channel';
import { TransportListenInfo } from './Transport';
import { WebRtcTransport } from './WebRtcTransport';
Expand Down
2 changes: 1 addition & 1 deletion node/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as path from 'node:path';
import { spawn, ChildProcess } from 'node:child_process';
import { version } from './';
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import * as ortc from './ortc';
import { Channel } from './Channel';
import { Router, RouterOptions } from './Router';
Expand Down
23 changes: 20 additions & 3 deletions node/src/EnhancedEventEmitter.ts → node/src/enhancedEvents.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from 'node:events';
import { EventEmitter, once } from 'node:events';
import { Logger } from './Logger';

const logger = new Logger('EnhancedEventEmitter');
const enhancedEventEmitterLogger = new Logger('EnhancedEventEmitter');

type Events = Record<string, any[]>;

Expand All @@ -24,7 +24,7 @@ export class EnhancedEventEmitter<
try {
return super.emit(eventName, ...args);
} catch (error) {
logger.error(
enhancedEventEmitterLogger.error(
'safeEmit() | event listener threw an error [eventName:%s]:%o',
eventName,
error
Expand Down Expand Up @@ -121,3 +121,20 @@ export class EnhancedEventEmitter<
return super.rawListeners(eventName);
}
}

/**
* TypeScript version of events.once():
* https://nodejs.org/api/events.html#eventsonceemitter-name-options
*
* Usage example:
* ```ts
* await enhancedOnce<ConsumerEvents>(videoConsumer, 'producerpause');
* ````
*/
export async function enhancedOnce<E extends Events = Events>(
emmiter: EnhancedEventEmitter<E>,
eventName: keyof E & string,
options?: any
): Promise<any[]> {
return once(emmiter, eventName, options);
}
2 changes: 1 addition & 1 deletion node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { EnhancedEventEmitter } from './enhancedEvents';
import { workerBin, Worker, WorkerSettings } from './Worker';
import * as utils from './utils';
import { supportedRtpCapabilities } from './supportedRtpCapabilities';
Expand Down
28 changes: 17 additions & 11 deletions node/src/test/test-ActiveSpeakerObserver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import * as mediasoup from '../';
import { enhancedOnce } from '../enhancedEvents';
import { WorkerEvents, ActiveSpeakerObserverEvents } from '../types';
import * as utils from '../utils';

type TestContext = {
Expand Down Expand Up @@ -31,9 +33,7 @@ afterEach(async () => {
ctx.worker?.close();

if (ctx.worker?.subprocessClosed === false) {
await new Promise<void>(resolve =>
ctx.worker?.on('subprocessclose', resolve)
);
await enhancedOnce<WorkerEvents>(ctx.worker, 'subprocessclose');
}
});

Expand Down Expand Up @@ -105,21 +105,27 @@ test('activeSpeakerObserver.close() succeeds', async () => {
test('ActiveSpeakerObserver emits "routerclose" if Router is closed', async () => {
const activeSpeakerObserver = await ctx.router!.createAudioLevelObserver();

await new Promise<void>(resolve => {
activeSpeakerObserver.on('routerclose', resolve);
ctx.router!.close();
});
const promise = enhancedOnce<ActiveSpeakerObserverEvents>(
activeSpeakerObserver,
'routerclose'
);

ctx.router!.close();
await promise;

expect(activeSpeakerObserver.closed).toBe(true);
}, 2000);

test('ActiveSpeakerObserver emits "routerclose" if Worker is closed', async () => {
const activeSpeakerObserver = await ctx.router!.createAudioLevelObserver();

await new Promise<void>(resolve => {
activeSpeakerObserver.on('routerclose', resolve);
ctx.worker!.close();
});
const promise = enhancedOnce<ActiveSpeakerObserverEvents>(
activeSpeakerObserver,
'routerclose'
);

ctx.worker!.close();
await promise;

expect(activeSpeakerObserver.closed).toBe(true);
}, 2000);
28 changes: 17 additions & 11 deletions node/src/test/test-AudioLevelObserver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import * as mediasoup from '../';
import { enhancedOnce } from '../enhancedEvents';
import { WorkerEvents, AudioLevelObserverEvents } from '../types';
import * as utils from '../utils';

type TestContext = {
Expand Down Expand Up @@ -31,9 +33,7 @@ afterEach(async () => {
ctx.worker?.close();

if (ctx.worker?.subprocessClosed === false) {
await new Promise<void>(resolve =>
ctx.worker?.on('subprocessclose', resolve)
);
await enhancedOnce<WorkerEvents>(ctx.worker, 'subprocessclose');
}
});

Expand Down Expand Up @@ -114,21 +114,27 @@ test('audioLevelObserver.close() succeeds', async () => {
test('AudioLevelObserver emits "routerclose" if Router is closed', async () => {
const audioLevelObserver = await ctx.router!.createAudioLevelObserver();

await new Promise<void>(resolve => {
audioLevelObserver.on('routerclose', resolve);
ctx.router!.close();
});
const promise = enhancedOnce<AudioLevelObserverEvents>(
audioLevelObserver,
'routerclose'
);

ctx.router!.close();
await promise;

expect(audioLevelObserver.closed).toBe(true);
}, 2000);

test('AudioLevelObserver emits "routerclose" if Worker is closed', async () => {
const audioLevelObserver = await ctx.router!.createAudioLevelObserver();

await new Promise<void>(resolve => {
audioLevelObserver.on('routerclose', resolve);
ctx.worker!.close();
});
const promise = enhancedOnce<AudioLevelObserverEvents>(
audioLevelObserver,
'routerclose'
);

ctx.worker!.close();
await promise;

expect(audioLevelObserver.closed).toBe(true);
}, 2000);
27 changes: 13 additions & 14 deletions node/src/test/test-Consumer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { once } from 'node:events';
import * as flatbuffers from 'flatbuffers';
import * as mediasoup from '../';
import { enhancedOnce } from '../enhancedEvents';
import { WorkerEvents, ConsumerEvents } from '../types';
import { UnsupportedError } from '../errors';
import * as utils from '../utils';
import {
Expand Down Expand Up @@ -248,9 +249,7 @@ afterEach(async () => {
ctx.worker?.close();

if (ctx.worker?.subprocessClosed === false) {
await new Promise<void>(resolve =>
ctx.worker?.on('subprocessclose', resolve)
);
await enhancedOnce<WorkerEvents>(ctx.worker, 'subprocessclose');
}
});

Expand Down Expand Up @@ -1025,7 +1024,7 @@ test('Consumer emits "producerpause" and "producerresume"', async () => {
});

await Promise.all([
once(audioConsumer, 'producerpause'),
enhancedOnce<ConsumerEvents>(audioConsumer, 'producerpause'),

// Let's await for pause() to resolve to avoid aborted channel requests
// due to worker closure.
Expand All @@ -1036,7 +1035,7 @@ test('Consumer emits "producerpause" and "producerresume"', async () => {
expect(audioConsumer.producerPaused).toBe(true);

await Promise.all([
once(audioConsumer, 'producerresume'),
enhancedOnce<ConsumerEvents>(audioConsumer, 'producerresume'),

// Let's await for resume() to resolve to avoid aborted channel requests
// due to worker closure.
Expand Down Expand Up @@ -1162,10 +1161,10 @@ test('Consumer emits "producerclose" if Producer is closed', async () => {

audioConsumer.observer.once('close', onObserverClose);

await new Promise<void>(resolve => {
audioConsumer.on('producerclose', resolve);
ctx.audioProducer!.close();
});
const promise = enhancedOnce<ConsumerEvents>(audioConsumer, 'producerclose');

ctx.audioProducer!.close();
await promise;

expect(onObserverClose).toHaveBeenCalledTimes(1);
expect(audioConsumer.closed).toBe(true);
Expand All @@ -1181,10 +1180,10 @@ test('Consumer emits "transportclose" if Transport is closed', async () => {

videoConsumer.observer.once('close', onObserverClose);

await new Promise<void>(resolve => {
videoConsumer.on('transportclose', resolve);
ctx.webRtcTransport2!.close();
});
const promise = enhancedOnce<ConsumerEvents>(videoConsumer, 'transportclose');

ctx.webRtcTransport2!.close();
await promise;

expect(onObserverClose).toHaveBeenCalledTimes(1);
expect(videoConsumer.closed).toBe(true);
Expand Down
26 changes: 15 additions & 11 deletions node/src/test/test-DataConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import * as mediasoup from '../';
import { enhancedOnce } from '../enhancedEvents';
import { WorkerEvents, DataConsumerEvents } from '../types';
import * as utils from '../utils';

type TestContext = {
Expand Down Expand Up @@ -44,9 +46,7 @@ afterEach(async () => {
ctx.worker?.close();

if (ctx.worker?.subprocessClosed === false) {
await new Promise<void>(resolve =>
ctx.worker?.on('subprocessclose', resolve)
);
await enhancedOnce<WorkerEvents>(ctx.worker, 'subprocessclose');
}
});

Expand Down Expand Up @@ -374,11 +374,13 @@ test('DataConsumer emits "dataproducerclose" if DataProducer is closed', async (

dataConsumer.observer.once('close', onObserverClose);

await new Promise<void>(resolve => {
dataConsumer.on('dataproducerclose', resolve);
const promise = enhancedOnce<DataConsumerEvents>(
dataConsumer,
'dataproducerclose'
);

ctx.dataProducer!.close();
});
ctx.dataProducer!.close();
await promise;

expect(onObserverClose).toHaveBeenCalledTimes(1);
expect(dataConsumer.closed).toBe(true);
Expand All @@ -392,11 +394,13 @@ test('DataConsumer emits "transportclose" if Transport is closed', async () => {

dataConsumer.observer.once('close', onObserverClose);

await new Promise<void>(resolve => {
dataConsumer.on('transportclose', resolve);
const promise = enhancedOnce<DataConsumerEvents>(
dataConsumer,
'transportclose'
);

ctx.webRtcTransport2!.close();
});
ctx.webRtcTransport2!.close();
await promise;

expect(onObserverClose).toHaveBeenCalledTimes(1);
expect(dataConsumer.closed).toBe(true);
Expand Down
Loading

0 comments on commit e516ebc

Please sign in to comment.