Skip to content

Commit

Permalink
[mirotalksfu] - refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
miroslavpejic85 committed Dec 22, 2024
1 parent 0f45190 commit ec26d0b
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 113 deletions.
2 changes: 1 addition & 1 deletion app/api/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ definitions:
timestamp:
type: string
format: date-time
example: "2024-12-21T12:00:00Z"
example: '2024-12-21T12:00:00Z'
totalRooms:
type: integer
totalPeers:
Expand Down
72 changes: 48 additions & 24 deletions app/src/Peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,25 +108,33 @@ module.exports = class Peer {

async connectTransport(transport_id, dtlsParameters) {
if (!this.transports.has(transport_id)) {
return false;
throw new Error(`Transport with ID ${transport_id} not found`);
}

await this.transports.get(transport_id).connect({
dtlsParameters: dtlsParameters,
});
try {
await this.transports.get(transport_id).connect({
dtlsParameters: dtlsParameters,
});
} catch (error) {
log.error(`Failed to connect transport with ID ${transport_id}`, error);
throw new Error(`Failed to connect transport with ID ${transport_id}`);
}

return true;
}

close() {
this.transports.forEach((transport, transport_id) => {
transport.close();
this.delTransport(transport_id);
log.debug('Closed and deleted peer transport', {
//transport_id: transport_id,
transportInternal: transport.internal,
transport_closed: transport.closed,
});
try {
transport.close();
this.delTransport(transport_id);
log.debug('Closed and deleted peer transport', {
transportInternal: transport.internal,
transport_closed: transport.closed,
});
} catch (error) {
log.warn(`Error closing transport with ID ${transport_id}`, error.message);
}
});

const peerTransports = this.getTransports();
Expand Down Expand Up @@ -159,14 +167,22 @@ module.exports = class Peer {
}

async createProducer(producerTransportId, producer_rtpParameters, producer_kind, producer_type) {
if (!this.transports.has(producerTransportId)) return;
if (!this.transports.has(producerTransportId)) {
throw new Error(`Producer transport with ID ${producerTransportId} not found`);
}

const producerTransport = this.transports.get(producerTransportId);

const producer = await producerTransport.produce({
kind: producer_kind,
rtpParameters: producer_rtpParameters,
});
let producer;
try {
producer = await producerTransport.produce({
kind: producer_kind,
rtpParameters: producer_rtpParameters,
});
} catch (error) {
log.error(`Error creating producer for transport ID ${producerTransportId}:`, error);
throw new Error(`Failed to create producer for transport ID ${producerTransportId}`);
}

const { id, appData, type, kind, rtpParameters } = producer;

Expand Down Expand Up @@ -236,17 +252,25 @@ module.exports = class Peer {
}

async createConsumer(consumer_transport_id, producer_id, rtpCapabilities) {
if (!this.transports.has(consumer_transport_id)) return;
if (!this.transports.has(consumer_transport_id)) {
throw new Error(`Consumer transport with ID ${consumer_transport_id} not found`);
}

const consumerTransport = this.transports.get(consumer_transport_id);

const consumer = await consumerTransport.consume({
producerId: producer_id,
rtpCapabilities,
enableRtx: true, // Enable NACK for OPUS.
paused: true,
ignoreDtx: true,
});
let consumer;
try {
consumer = await consumerTransport.consume({
producerId: producer_id,
rtpCapabilities,
enableRtx: true, // Enable NACK for OPUS.
paused: true,
ignoreDtx: true,
});
} catch (error) {
log.error(`Error creating consumer for transport ID ${consumer_transport_id}`, error);
throw new Error(`Failed to create consumer for transport ID ${consumer_transport_id}`);
}

const { id, type, kind, rtpParameters, producerPaused } = consumer;

Expand Down
81 changes: 55 additions & 26 deletions app/src/Room.js
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,9 @@ module.exports = class Room {
// ####################################################

async createWebRtcTransport(socket_id) {
if (!this.peers.has(socket_id)) return;
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}

const { maxIncomingBitrate, initialAvailableOutgoingBitrate, listenInfos } = this.webRtcTransport;

Expand All @@ -550,15 +552,18 @@ module.exports = class Room {
const transport = await this.router.createWebRtcTransport(webRtcTransportOptions);

if (!transport) {
throw new Error('Create WebRtc Transport failed!');
throw new Error('Failed to create WebRtc Transport');
}

const { id, type, iceParameters, iceCandidates, dtlsParameters } = transport;

if (maxIncomingBitrate) {
try {
await transport.setMaxIncomingBitrate(maxIncomingBitrate);
} catch (error) {}
} catch (error) {
log.error('Failed to set max incoming bitrate', error);
throw new Error(`Failed to set max incoming bitrate for transport ${id}`);
}
}

const peer = this.getPeer(socket_id);
Expand Down Expand Up @@ -615,11 +620,18 @@ module.exports = class Room {
}

async connectPeerTransport(socket_id, transport_id, dtlsParameters) {
if (!this.peers.has(socket_id)) return;
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}

const peer = this.getPeer(socket_id);

await peer.connectTransport(transport_id, dtlsParameters);
try {
await peer.connectTransport(transport_id, dtlsParameters);
} catch (error) {
log.error(`Failed to connect peer transport for socket ID ${socket_id}`, error);
throw new Error(`Failed to connect transport for peer with socket ID ${socket_id}`);
}

return '[Room|connectPeerTransport] done';
}
Expand All @@ -629,14 +641,22 @@ module.exports = class Room {
// ####################################################

async produce(socket_id, producerTransportId, rtpParameters, kind, type) {
if (!this.peers.has(socket_id)) return;
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}

const peer = this.getPeer(socket_id);

const peerProducer = await peer.createProducer(producerTransportId, rtpParameters, kind, type);
let peerProducer;
try {
peerProducer = await peer.createProducer(producerTransportId, rtpParameters, kind, type);
} catch (error) {
log.error(`Error creating producer for peer with socket ID ${socket_id}`, error);
throw new Error(`Error creating producer with transport ID ${producerTransportId} for peer ${socket_id}`);
}

if (!peerProducer) {
throw new Error(`Peer producer kind ${kind} with id ${producerTransportId} not found`);
throw new Error(`Failed to create producer with ID ${producerTransportId} for peer ${socket_id}`);
}

const { id } = peerProducer;
Expand All @@ -657,40 +677,49 @@ module.exports = class Room {
}

closeProducer(socket_id, producer_id) {
if (!this.peers.has(socket_id)) return;
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}

const peer = this.getPeer(socket_id);

peer.closeProducer(producer_id);
try {
peer.closeProducer(producer_id);
} catch (error) {
log.error(`Error closing producer for peer ${socket_id}`, error);
throw new Error(`Error closing producer with ID ${producer_id} for peer ${socket_id}`);
}
}

// ####################################################
// CONSUME
// ####################################################

async consume(socket_id, consumer_transport_id, producer_id, rtpCapabilities) {
if (!this.peers.has(socket_id)) return;
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}

if (
!this.router.canConsume({
producerId: producer_id,
rtpCapabilities,
})
) {
log.warn('Cannot consume', {
socket_id,
consumer_transport_id,
producer_id,
});
return;
if (!this.router.canConsume({ producerId: producer_id, rtpCapabilities })) {
throw new Error(`Cannot consume producer with ID ${producer_id}, router validation failed`);
}

const peer = this.getPeer(socket_id);

const peerConsumer = await peer.createConsumer(consumer_transport_id, producer_id, rtpCapabilities);
let peerConsumer;
try {
peerConsumer = await peer.createConsumer(consumer_transport_id, producer_id, rtpCapabilities);
} catch (error) {
log.error(`Error creating consumer for peer with socket ID ${socket_id}`, error);
throw new Error(
`Failed to create consumer with transport ID ${consumer_transport_id} and producer ID ${producer_id} for peer ${socket_id}`,
);
}

if (!peerConsumer) {
throw new Error(`Peer consumer kind ${kind} with id ${consumer_transport_id} not found`);
throw new Error(
`Consumer creation failed for transport ID ${consumer_transport_id} and producer ID ${producer_id}`,
);
}

const { consumer, params } = peerConsumer;
Expand All @@ -702,7 +731,7 @@ module.exports = class Room {

peer.removeConsumer(id);

// Notify the client that consumer is closed
// Notify the client that the consumer is closed
this.send(socket_id, 'consumerClosed', {
consumer_id: id,
consumer_kind: kind,
Expand Down
Loading

0 comments on commit ec26d0b

Please sign in to comment.