Skip to content

Commit

Permalink
HA update
Browse files Browse the repository at this point in the history
  • Loading branch information
balazskreith committed Nov 2, 2024
1 parent 9946256 commit 39ded55
Show file tree
Hide file tree
Showing 41 changed files with 2,320 additions and 624 deletions.
15 changes: 15 additions & 0 deletions charts/webrtc-observer-org/templates/mediaserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ data:
port: 9081
serverIp: {{ .Values.publicIP }}
announcedIp: {{ .Values.publicIP }}
maxTransportsPerRouter: 6
maxProducerPerClients: 2
maxClientLifeTimeInMins: 15
hamok:
clientsMap:
mapId: "webrtc-observer-clients-map"
eventEmitter:
emitterId: "webrtc-observer-media-service-events"
roomsMap:
mapId: "webrtc-observer-rooms-map"
redis:
host: "redis"
port: 6379
redisChannelId: "webrtc-observer-hamok-message-channel"
devMode: false
mediasoup:
numberOfWorkers: 2
workerSettings:
Expand Down
51 changes: 51 additions & 0 deletions media-server/configs/local-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
server:
port: 9081
serverIp: "127.0.0.1"
# announcedIp: "127.0.0.1"

hamok:
clientsMap:
mapId: "webrtc-observer-clients-map"
eventEmitter:
emitterId: "webrtc-observer-media-service-events"
roomsMap:
mapId: "webrtc-observer-rooms-map"
redis:
host: "localhost"
port: 6379
redisChannelId: "webrtc-observer-hamok-message-channel"
devMode: true

# stunnerAuthUrl: "http://stunner-auth.stunner-system:8088?service=turn"
maxTransportsPerRouter: 6
maxProducerPerClients: 2
maxClientLifeTimeInMins: 15

mediasoup:
numberOfWorkers: 1
workerSettings:
logLevel: "warn"
logTags:
- "info"
- "ice"
- "dtls"
- "rtp"
- "srtp"
- "rtcp"
rtcMinPort: 42000
rtcMaxPort: 43000
mediaCodecs:
- kind: "audio"
mimeType: "audio/opus"
clockRate: 48000
channels: 2
- kind: "video"
mimeType: "video/VP8"
clockRate: 90000
webRtcServerSettings:
- listenInfos:
- ip: "127.0.0.1"
# ip: "0.0.0.0"
protocol: "udp"
# announcedAddress: "127.0.0.1"
port: 5001
6 changes: 5 additions & 1 deletion media-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"build": "tsc --build",
"clean": "tsc --build --clean",
"dev": "nodemon -x ts-node src/main.ts | pino-pretty",
"dev:2": "CONFIG_PATH=./configs/local-2.yaml nodemon --ignore ./sfu-local-files -x ts-node src/main.ts | pino-pretty",
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
Expand All @@ -32,6 +33,8 @@
"@observertc/sfu-monitor-js": "^2.0.0",
"@types/ws": "^8.2.2",
"events": "^3.3.0",
"hamok": "^2.6.1-76ef244.0",
"ioredis": "^5.4.1",
"jsonwebtoken": "^9.0.0",
"kafkajs": "^2.2.4",
"mediasoup": "^3.14.7",
Expand All @@ -45,11 +48,12 @@
"@tsconfig/node20": "^1.0.2",
"@types/events": "^3.0.0",
"@types/jest": "^29.5.1",
"@types/node": "^22.8.4",
"@types/pino": "^7.0.5",
"@types/uuid": "^10.0.0",
"@types/yaml": "^1.9.7",
"pino-pretty": "^11.2.2",
"nodemon": "^3.0.1",
"pino-pretty": "^11.2.2",
"ts-jest": "^29.1.0",
"ts-node": "^10.9.1",
"ts-node-dev": "^2.0.0",
Expand Down
57 changes: 38 additions & 19 deletions media-server/src/Server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import * as http from 'http';
import { WebSocketServer } from 'ws';
import { WebSocketServer, WebSocket } from 'ws';
import { createLogger } from "./common/logger";
import url from 'url';
import { ClientContext } from './common/ClientContext';
import { ClientMessageContext } from './client-listeners/ClientMessageListener';
import { ClientMessageContext, ClientMessageListener } from './client-listeners/ClientMessageListener';
import { ClientMessage } from './protocols/MessageProtocol';
import { EventEmitter } from 'events';

Expand Down Expand Up @@ -49,7 +49,11 @@ export class Server extends EventEmitter {
private _state: ServerState = 'idle';
private _httpServer?: http.Server;
private _wsServer?: WebSocketServer;


public readonly clients = new Map<string, ClientContext>();
public readonly messageListeners = new Map<string, ClientMessageListener>();
public createClientContext?: (base: Pick<ClientContext, 'clientId' | 'send' | 'webSocket' | 'userId'> & { callId?: string }) => Promise<ClientContext>;

public constructor(
public readonly config: ServerConfig,
) {
Expand Down Expand Up @@ -139,38 +143,53 @@ export class Server extends EventEmitter {
});

wsServer.on('connection', async (ws, req) => {
if (!this.createClientContext) {
logger.error(`createClientContext is not set`);
return ws.close(4001, 'Server error');
}

// console.warn("\n\n", url.parse(req.url, true).query, "\n\n");
const query = url.parse(req.url ?? '', true).query;
const clientId = query.clientId as string;
const schemaVersion = query.schemaVersion as string;
// const schemaVersion = query.schemaVersion as string;
const send = (message: ClientMessage) => {
const data = JSON.stringify(message);
ws.send(data);
}
const clientContext: ClientContext = {
userId: query.userId as string,
clientId,
schemaVersion,
webSocket: ws,
send,
mediaProducers: new Set(),
mediaConsumers: new Set(),
}
const clientContext = await this.createClientContext({
callId: typeof query.callId === 'string' ? query.callId : undefined,
webSocket: ws,
clientId,
send,
userId: typeof query.userId === 'string' ? query.userId : 'unknown-user',
});

this.emit('newclient', clientContext);

ws.on('message', data => {
ws.on('message', async data => {
const message = JSON.parse(data.toString());
const messageContext: ClientMessageContext = {
clientId,
client: clientContext,
message,
send,
get callId() {
return clientContext.routerId;
},
}
this.emit('newmessage', messageContext);
const listener = this.messageListeners.get(message.type);

if (!listener) return logger.warn(`No listener found for message type ${message.type}`);

try {
await listener(messageContext);
} catch (err) {
logger.error(`Error occurred while processing message: %o`, err);
}
// this.emit('newmessage', messageContext);
});

clientContext.webSocket.once('close', () => {
this.clients.delete(clientId);
});
this.clients.set(clientId, clientContext);

logger.info(`Websocket connection is requested from ${req.socket.remoteAddress}, query:`, query);
});
wsServer.on('error', error => {
Expand Down
4 changes: 2 additions & 2 deletions media-server/src/client-listeners/ClientMessageListener.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { ClientContext } from "../common/ClientContext"
import { ClientMessage } from "../protocols/MessageProtocol"

export type ClientMessageContext = {
readonly callId?: string,
clientId: string,
client: ClientContext,
message: ClientMessage,
send: (message: ClientMessage) => void,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,109 +5,41 @@ import { ClientMessageContext } from "./ClientMessageListener";
import { Kafka, Producer } from "kafkajs";
import { SampleMessage } from "../protocols/SampleMessage";
import { MainEmitter } from "../common/MainEmitter";
import { HamokService } from "../services/HamokService";

const logger = createLogger('ClientMonitorSampleNotificatinListener');

export type ClientMonitorSampleNotificatinListenerContext = {
clients: Map<string, ClientContext>,
observer: Observer,
mainEmitter: MainEmitter
// clients: Map<string, ClientContext>,
// observer: Observer,
// mainEmitter: MainEmitter
hamokService: HamokService,
}

export function createClientMonitorSampleNotificatinListener(listenerContext: ClientMonitorSampleNotificatinListenerContext) {
const {
observer,
clients,
mainEmitter,
hamokService,
} = listenerContext;
const result = async (messageContext: ClientMessageContext) => {
const {
client,
message: notification,
} = messageContext;
const client = clients.get(messageContext.clientId);

if (notification.type !== 'client-monitor-sample-notification') {
return console.warn(`Invalid message type ${notification.type}`);
} else if (!client) {
return console.warn(`Client ${messageContext.clientId} not found`);
} else if (!client.decoder) {
return console.warn(`Client ${messageContext.clientId} decoder not found`);
} else if (!client.routerId) {
return console.warn(`Client ${messageContext.clientId} routerId not found`);
}

let observedCall = observer.observedCalls.get(client.routerId);
if (!observedCall) {
observedCall = observer.createObservedCall({
roomId: client.routerId,
callId: client.routerId,
serviceId: 'demo-service',
appData: {

},
});

observedCall.once('close', () => {
// once it is closed
});
}

let observedClient = observedCall.clients.get(client.clientId);
if (!observedClient) {
const newObservedClient = observedCall.createClient({
clientId: client.clientId,
userId: client.userId,
mediaUnitId: 'webapp',
appData: {

},
});

client.webSocket.once('close', () => {
newObservedClient.close();
if (newObservedClient.call.clients.size === 0) {
newObservedClient.call.close();
}
});
observedClient = newObservedClient;
}

const sample = client.decoder.decodeFromBase64(notification.sample);

observedClient.accept(sample);

mainEmitter.emit('sample', {

hamokService.publishClientSample({
callId: client.callId,
clientId: client.clientId,
callId: client.routerId,
mediaUnitId: 'webapp',
roomId: client.routerId,
sampleInBase64: notification.sample,
serviceId: 'demo-service',
mediaUnitId: 'webapp',
roomId: client.roomId,
serviceId: 'webrtc-observer',
userId: client.userId,
});
};




// const consumer = kafka.consumer({
// groupId: 'client-monitor-sample-notification',
// });

// consumer.connect().then(() => {
// consumer.subscribe({ topic: 'client-sample' }).then(() => {
// consumer.run({
// eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
// console.log({
// topic,
// partition,
// key: message.key?.toString(),
// value: message.value?.toString(),
// headers: message.headers,
// })
// },
// });
// });
// });

return result;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,22 @@ import { ClientMessageContext } from "./ClientMessageListener";
const logger = createLogger('ConnectTransportRequestListener');

export type ConnectTransportRequestListenerContext = {
mediasoupService: MediasoupService;
clients: Map<string, ClientContext>;
}

export function createConnectTransportRequestListener(listenerContext: ConnectTransportRequestListenerContext) {
const {
mediasoupService,
clients,
} = listenerContext;

const result = async (messageContext: ClientMessageContext) => {
const {
client,
message: request,
} = messageContext;
const client = clients.get(messageContext.clientId);

if (request.type !== 'connect-transport-request') {
return console.warn(`Invalid message type ${request.type}`);
} else if (!client) {
return console.warn(`Client ${messageContext.clientId} not found`);
}

logger.debug(`Transport ${request.transportId} attempt to connect`);

let response: ConnectTransportResponsePayload | undefined;
let error: string | undefined;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,23 @@ const logger = createLogger('ControlConsumerNotificationListener');

export type ControlConsumerNotificationListenerContext = {
mediasoupService: MediasoupService,
clients: Map<string, ClientContext>,
}

export function createControlConsumerNotificationListener(listenerContext: ControlConsumerNotificationListenerContext): ClientMessageListener {
const {
mediasoupService,
clients,
} = listenerContext;

const result = async (messageContext: ClientMessageContext) => {
const {
message: request,
client,
} = messageContext;
const client = clients.get(messageContext.clientId);

if (request.type !== 'control-consumer-notification') {
return console.warn(`Invalid message type ${request.type}`);
} else if (!client?.mediaConsumers.has(request.consumerId)) {
return console.warn(`Consumer ${request.consumerId} not found for client ${messageContext.clientId}`);
return console.warn(`Consumer ${request.consumerId} not found for client ${client.clientId}`);
}

const consumer = mediasoupService.mediaConsumers.get(request.consumerId);
Expand Down
Loading

0 comments on commit 39ded55

Please sign in to comment.