Skip to content

Commit

Permalink
Compound update
Browse files Browse the repository at this point in the history
 - rename sfu-observer to sfu-monitor
  - add RestTransport
  - rewrite WebsocketTransport
  - refactored Transport
  • Loading branch information
balazskreith committed Apr 9, 2022
1 parent 4f6e178 commit c4185d2
Show file tree
Hide file tree
Showing 18 changed files with 528 additions and 414 deletions.
42 changes: 21 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ObserveRTC Integration for Selective Forwarding Units (SFU)
---

`@observertc/sfu-observer-js` is an SFU side library to monitor your SFU and integrate with observertc components.
`@observertc/sfu-monitor-js` is an SFU side library to monitor your SFU and integrate with observertc components.

Table of Contents:
* [Quick Start](#quick-start)
Expand All @@ -22,22 +22,22 @@ Table of Contents:
## Qucik Start

```
npm i @observertc/sfu-observer-js
npm i @observertc/sfu-monitor-js
```

### Collect stats from mediasoup

If you use [mediasoup:3.x.y]() you can use the built-in integration.

```javascript
import { SfuObserver, MediasoupCollector } from "@observertc/sfu-observer-js";
import { SfuMonitor, MediasoupCollector } from "@observertc/sfu-monitor-js";
// see full config in Configuration section
const config = {
collectingPeriodInMs: 5000,
};
const observer = SfuObserver.create(config);
const monitor = SfuMonitor.create(config);
const collector = MediasoupCollector.create();
observer.addStatsCollector(collector);
monitor.addStatsCollector(collector);

// ... somewhere in your code
const transport = router.createWebRtcTransport(options);
Expand All @@ -48,14 +48,14 @@ collector.watchWebRtcTransport(transport);

You can write a stats collector by using `AuxCollector`.
```javascript
import { SfuObserver, AuxCollector } from "@observertc/sfu-observer-js";
import { SfuMonitor, AuxCollector } from "@observertc/sfu-monitor-js";
// see full config in Configuration section
const config = {
collectingPeriodInMs: 5000,
};
const observer = SfuObserver.create(config);
const monitor = SfuMonitor.create(config);
const collector = AuxCollector.create();
observer.addStatsCollector(collector);
monitor.addStatsCollector(collector);

collector.addTransportStatsSupplier("transportId", async () => {
const stats: SfuTransport = {
Expand All @@ -78,13 +78,13 @@ collector.removeSctpStreamSupplier("channelId");

### Sample and Send

Sampling means the sfu-observer creates a so-called SfuSample. SfuSample is a compound object contains a snapshot from the polled stats. SfuSample is created by a Sampler component.
Sampling means the sfu-monitor creates a so-called SfuSample. SfuSample is a compound object contains a snapshot from the polled stats. SfuSample is created by a Sampler component.
A created SfuSample is added to Samples object. Samples can be sent to the server by a Sender component.

The above shown examples can be extended to sample and send by adding the following configurations:

```javascript
import { ClientObserver } from "@observertc/client-observer-js";
import { SfuMonitor } from "@observertc/sfu-monitor-js";
// see full config in Configuration section
const config = {
collectingPeriodInMs: 5000,
Expand All @@ -96,14 +96,14 @@ const config = {
}
}
};
const observer = SfuObserver.create(config);
const monitor = SfuMonitor.create(config);
//... the rest of your code
```

## Use collected stats

```javascript
const storage = observer.stats;
const storage = monitor.storage;
for (const sfuTransportEntry of storage.transports()) {
// use SfuTransportEntry
}
Expand Down Expand Up @@ -168,14 +168,14 @@ Additionally the observer groups the collected stats into the following entities
### Number of RTP Sessions

```javascript
const storage = observer.stats;
const storage = monitor.storage;
// The total number of RTP session going through the SFU
const totalNumberOfRtpSessions = storage.getNumberOfInboundRtpPads() + storage.getNumberOfOutboundRtpPads();

// The average number of rtp session in one transport (peer connection) between the SFU and its peers.
const avgNumberOfRtpSessionsPerTransport = totalNumberOfRtpSessions / storage.getNumberOfTransports();
const rtpPadsNum = [];
for (const sfuTransportEntry of stats.transports()) {
for (const sfuTransportEntry of storage.transports()) {
const nrOfRtpSessions = sfuTransportEntry.getNumberOfOutboundRtpPads() + sfuTransportEntry.getNumberOfOutboundRtpPads();
rtpPadsNum.push(nrOfRtpSessions);
}
Expand All @@ -195,7 +195,7 @@ The following example is created for mediasoup integration, but any other integr
`streamId` for inboundRtpPads, and `sinkId` for outbound RTP pads behave similarly.

```javascript
const storage = observer.stats;
const storage = monitor.storage;

const nrOfProducers = storage.getNumberOfMediaStreams();
const nrOfAudioProducers = storage.getNumberOfAudioStreams();
Expand All @@ -211,11 +211,11 @@ const avgNrOfProducersPerTransport = storage.getNumberOfMediaStreams() / storage
### Receiver and sender bitrates

```javascript
const storage = observer.stats;
const storage = monitor.stats;

const traces = new Map();
let lastCheck = Date.now();
observer.events.onStatsCollected(() => {
monitor.events.onStatsCollected(() => {
let totalReceivedBytes = 0;
for (const sfuInboundRtpPadEntry of storage.inboundRtpPads()) {
const { bytesReceived } = sfuInboundRtpPadEntry.stats;
Expand Down Expand Up @@ -248,14 +248,14 @@ observer.events.onStatsCollected(() => {
```javascript
const config = {
/**
* By setting it, the observer calls the added statsCollectors periodically
* By setting it, the monitor calls the added statsCollectors periodically
* and pulls the stats.
*
* DEFAULT: undefined
*/
collectingPeriodInMs: 5000,
/**
* By setting it, the observer make samples periodically.
* By setting it, the monitor make samples periodically.
*
* DEFAULT: undefined
*/
Expand Down Expand Up @@ -341,11 +341,11 @@ const config = {

## API docs

https://observertc.github.io/sfu-observer-js/interfaces/SfuObserver.html
https://observertc.github.io/sfu-monitor-js/interfaces/SfuMonitor.html

## NPM package

https://www.npmjs.com/package/@observertc/sfu-observer-js
https://www.npmjs.com/package/@observertc/sfu-monitor-js

## Schema

Expand Down
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "@observertc/sfu-observer-js",
"name": "@observertc/sfu-monitor-js",
"version": "1.0.0-beta.20",
"description": "ObserveRTC SFU Integration Core Library",
"main": "lib/index.js",
Expand Down Expand Up @@ -28,11 +28,10 @@
"author": "Balazs Kreith",
"license": "Apache-2.0",
"dependencies": {
"@observertc/schemas": "^2.0.0-beta.55",
"@observertc/schemas": "^2.0.0-beta.57",
"@types/events": "^3.0.0",
"@types/uuid": "^8.3.4",
"@types/ws": "^8.5.3",
"axios": "^0.26.1",
"bowser": "^2.11.0",
"js-base64": "^3.7.2",
"js-sha256": "^0.9.0",
Expand All @@ -54,6 +53,6 @@
},
"repository": {
"type": "git",
"url": "https://github.com/observertc/sfu-observer-js"
"url": "https://github.com/observertc/sfu-monitor-js"
}
}
62 changes: 0 additions & 62 deletions puml/user-manual.md
Original file line number Diff line number Diff line change
@@ -1,62 +0,0 @@
Javascript library to create and send [SfuSample](https://github.com/ObserveRTC/schemas-2.0/blob/main/generated-schemas/samples/v2/SfuSample.md)s. SfuSamples can be processed by an [observer](https://github.com/ObserveRTC/observer). Observer match [samples originated from client applications](https://github.com/ObserveRTC/schemas-2.0/blob/main/generated-schemas/samples/v2/ClientSample.md) and samples originated from SFU. After you setup an observer, you can integrate your SFU and start watching your transports.

## Install

Add the npm package to your SFU project.

```shell
npm i sfu-observer-js
```

In order to provide and send [SfuSample]()s from your SFU, you need to
provide components for it. Currently the following SFUs are supported by the
library:
* [mediasoup]() integration

## Integrate SFU

### Mediasoup

Take a look the following example to integrate mediaosup:

```javascript
const { MediasoupSfuObserver } = require("sfu-observer-js");
const mediasoup = require('mediasoup');
// ...
const POLLING_INTERVAL_IN_MS = 10000;
const OBSERVER_ADDRESS = "localhost:7080";
const SERVICE_ID = "myServiceId";
const MEDIAUNIT_ID = "mySFU";
const sfuObserver = MediasoupSfuObserver.builder()
.withMediasoup(mediasoup)
.withPollingInterval(POLLING_INTERVAL_IN_MS)
.withEndpoint(`ws://${OBSERVER_ADDRESS}/sfusamples/${SERVICE_ID}/${MEDIAUNIT_ID}`)
.build();

// start monitoring when you want the service to start polling
sfuObserver.start();

// optionally, you can subscribe to the event emitted when a sample is ready
sfuObserver
.onSample(sample => {
console.log("sfu sample", sample);
})
.onError(error => {
console.warn("An error occurred", error);
})
;

// stop monitoring whenever you want no longer to monitor the service
sfuObserver.stop();
```

where
* `POLLING_INTERVAL_IN_MS` is the period of the sampling
* `OBSERVER_ADDRESS` is the address where an [observer]() listens for samples
* `SERVICE_ID`, and `MEDIAUNIT_ID` corresponds to the Service and Media identifiers
used matching and report generation processes.
* `sfuObserver.start();` starts making and sending samples
* `sfuObserver.stop();` stops making and seding samples.

## Appendixes

1 change: 1 addition & 0 deletions src/Accumulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export class Accumulator {
const buffer = this._buffer;
this._buffer = [];
buffer.forEach(samples => {
// samples.clientSamples = [];
consumer(samples);
});
}
Expand Down
32 changes: 25 additions & 7 deletions src/EventsRelayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@ export interface EventsRegister {
onSampleCreated(listener: SampleCreatedListener): EventsRegister;
offSampleCreated(listener: SampleCreatedListener): EventsRegister;

onSampleSent(listener: SampleSentListener): EventsRegister;
offSampleSent(listener: SampleSentListener): EventsRegister;
onSamplesSent(listener: SampleSentListener): EventsRegister;
offSamplesSent(listener: SampleSentListener): EventsRegister;

onSenderDisconnected(listener: SampleSentListener): EventsRegister;
offSenderDisconnected(listener: SampleSentListener): EventsRegister;
}

export interface EventsEmitter {
emitStatsCollected(peerConnectionId: string): void;
emitSampleCreated(sfuSample: SfuSample): void;
emitSampleSent(): void;

emitSamplesSent(): void;
emitSenderDisconnected(): void;
}

const ON_STATS_COLLECTED_EVENT_NAME = "onStatsCollected";
const ON_SAMPLE_CREATED_EVENT_NAME = "onSampleCreated";
const ON_SAMPLES_SENT_EVENT_NAME = "onSamplesSent";
const ON_SENDER_DISCONNECTED_EVENT_NAME = "onSenderDisconnected";

export class EventsRelayer implements EventsRegister, EventsEmitter {
public static create(): EventsRelayer {
Expand Down Expand Up @@ -64,18 +68,32 @@ export class EventsRelayer implements EventsRegister, EventsEmitter {
return this;
}

onSampleSent(listener: SampleSentListener): EventsRegister {
onSamplesSent(listener: SampleSentListener): EventsRegister {
this._emitter.on(ON_SAMPLES_SENT_EVENT_NAME, listener);
return this;
}

emitSampleSent(): void {
emitSamplesSent(): void {
this._emitter.emit(ON_SAMPLES_SENT_EVENT_NAME);
}

offSampleSent(listener: SampleSentListener): EventsRegister {
offSamplesSent(listener: SampleSentListener): EventsRegister {
this._emitter.off(ON_SAMPLES_SENT_EVENT_NAME, listener);
return this;
}

onSenderDisconnected(listener: SampleSentListener): EventsRegister {
this._emitter.on(ON_SENDER_DISCONNECTED_EVENT_NAME, listener);
return this;
}

emitSenderDisconnected(): void {
this._emitter.emit(ON_SENDER_DISCONNECTED_EVENT_NAME);
}

offSenderDisconnected(listener: SampleSentListener): EventsRegister {
this._emitter.off(ON_SENDER_DISCONNECTED_EVENT_NAME, listener);
return this;
}

}
26 changes: 14 additions & 12 deletions src/Sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ export class Sampler {
private _statsReader?: StatsReader;
// private _peerConnections: Map<string, PeerConnectionEntry> = new Map();
private _sampled?: number;
private _sampleSeq = 0;
private _marker?: string;
private _timezoneOffset: number = new Date().getTimezoneOffset();
private _config: SamplerConstructorConfig;
Expand Down Expand Up @@ -102,7 +101,6 @@ export class Sampler {
return;
}
this._closed = true;
this._extensionStats = [];
}

public make(): SfuSample {
Expand All @@ -111,46 +109,50 @@ export class Sampler {
}
const sfuSample: SfuSample = {
sfuId: this.sfuId,
timeZoneOffsetInHours: this._timezoneOffset,
marker: this._marker,
timestamp: Date.now(),

extensionStats: this._extensionStats,
};
++this._sampleSeq;
this._extensionStats = undefined;
if (this._timezoneOffset) {
sfuSample.timeZoneOffsetInHours = this._timezoneOffset;
}
if (this._marker) {
sfuSample.marker = this._marker;
}
if (this._extensionStats) {
sfuSample.extensionStats = this._extensionStats;
this._extensionStats = undefined;
}
if (!this._statsReader) {
logger.warn(`No StatsProvider has been assigned to Sampler`);
this._sampled = sfuSample.timestamp;
return sfuSample;
}
const { incrementalSampling } = this._config;
for (const transportEntry of this._statsReader.transports()) {
if (incrementalSampling && this._sampled && transportEntry.updated <= this._sampled) {
if (incrementalSampling && this._sampled && transportEntry.touched <= this._sampled) {
continue;
}
if (!sfuSample.transports) sfuSample.transports = [];
const stats = { ...transportEntry.stats };
sfuSample.transports.push(stats);
}
for (const inboundRtpPadEntry of this._statsReader.inboundRtpPads()) {
if (incrementalSampling && this._sampled && inboundRtpPadEntry.updated <= this._sampled) {
if (incrementalSampling && this._sampled && inboundRtpPadEntry.touched <= this._sampled) {
continue;
}
if (!sfuSample.inboundRtpPads) sfuSample.inboundRtpPads = [];
const stats = { ...inboundRtpPadEntry.stats };
sfuSample.inboundRtpPads.push(stats);
}
for (const outboundRtpPadEntry of this._statsReader.outboundRtpPads()) {
if (incrementalSampling && this._sampled && outboundRtpPadEntry.updated <= this._sampled) {
if (incrementalSampling && this._sampled && outboundRtpPadEntry.touched <= this._sampled) {
continue;
}
if (!sfuSample.outboundRtpPads) sfuSample.outboundRtpPads = [];
const stats = { ...outboundRtpPadEntry.stats };
sfuSample.outboundRtpPads.push(stats);
}
for (const sctpChannelEntry of this._statsReader.sctpChannels()) {
if (incrementalSampling && this._sampled && sctpChannelEntry.updated <= this._sampled) {
if (incrementalSampling && this._sampled && sctpChannelEntry.touched <= this._sampled) {
continue;
}
if (!sfuSample.sctpChannels) sfuSample.sctpChannels = [];
Expand Down
Loading

0 comments on commit c4185d2

Please sign in to comment.