From 80c53a079b83b992dad2f74ab5bbebc026c93a6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9D=D0=B8=D0=BA=D0=B8=D1=82=D0=B0=20=D0=9A=D1=80=D1=83?= =?UTF-8?q?=D0=B3=D0=BB=D0=B8=D1=86=D0=BA=D0=B8=D0=B9?= Date: Sat, 4 Oct 2025 22:44:55 +0600 Subject: [PATCH] #4 --- server/index.ts | 1 + server/sockets/webrtc.ts | 419 ++++++++++++++++++++++++++++++++------- 2 files changed, 345 insertions(+), 75 deletions(-) diff --git a/server/index.ts b/server/index.ts index 37c4a7b..2423c97 100644 --- a/server/index.ts +++ b/server/index.ts @@ -27,6 +27,7 @@ import { webrtcSocket } from './sockets' mimeType: 'audio/opus', clockRate: 48000, channels: 2, + parameters: { useinbandfec: 1, stereo: 1 }, }, ], }) diff --git a/server/sockets/webrtc.ts b/server/sockets/webrtc.ts index b77673b..c4393c3 100644 --- a/server/sockets/webrtc.ts +++ b/server/sockets/webrtc.ts @@ -1,5 +1,5 @@ import type { types } from 'mediasoup' -import type { Namespace, Server as SocketServer } from 'socket.io' +import type { Namespace, RemoteSocket, Socket, Server as SocketServer } from 'socket.io' import { consola } from 'consola' interface ProducerShort { @@ -17,11 +17,22 @@ interface SuccessCallbackResult { type EventCallback = (result: T | ErrorCallbackResult) => void -interface ClientToServerEvents extends Record { +interface ClientToServerEvents { + join: ( + options: { + username: string + rtpCapabilities: types.RtpCapabilities + }, + cb: EventCallback<{ id: string, username: string }[]> + ) => void getRtpCapabilities: ( cb: EventCallback ) => void createTransport: ( + options: { + producing: boolean + consuming: boolean + }, cb: EventCallback> ) => void connectTransport: ( @@ -37,57 +48,116 @@ interface ClientToServerEvents extends Record { kind: types.MediaKind rtpParameters: types.RtpParameters }, - cb: EventCallback<{ producerId: types.Producer['id'] }> + cb: EventCallback<{ id: types.Producer['id'] }> ) => void - consume: ( + closeProducer: ( options: { producerId: types.Producer['id'] - transportId: types.WebRtcTransport['id'] - rtpCapabilities: types.RtpCapabilities }, - cb: EventCallback<{ - consumerId: types.Consumer['id'] + cb: EventCallback + ) => void + pauseProducer: ( + options: { producerId: types.Producer['id'] - kind: types.MediaKind - rtpParameters: types.RtpParameters - }> + }, + cb: EventCallback + ) => void + resumeProducer: ( + options: { + producerId: types.Producer['id'] + }, + cb: EventCallback + ) => void + pauseConsumer: ( + options: { + consumerId: types.Consumer['id'] + }, + cb: EventCallback + ) => void + resumeConsumer: ( + options: { + consumerId: types.Consumer['id'] + }, + cb: EventCallback ) => void } interface ServerToClientEvents { producers: (arg: ProducerShort[]) => void - newProducer: (arg: ProducerShort) => void - producerClosed: (arg: types.Producer['id']) => void + newConsumer: (arg: { + peerId: string + producerId: types.Producer['id'] + id: types.Consumer['id'] + kind: types.MediaKind + rtpParameters: types.RtpParameters + type: types.ConsumerType + appData: types.Producer['appData'] + producerPaused: types.Consumer['producerPaused'] + }, cb: EventCallback) => void + peerClosed: (arg: string) => void + consumerClosed: (arg: { consumerId: string }) => void + consumerPaused: (arg: { consumerId: string }) => void + consumerResumed: (arg: { consumerId: string }) => void + consumerScore: (arg: { consumerId: string, score: types.ConsumerScore }) => void } -const transports = new Map() -const producers = new Map() -const consumers = new Map() +interface InterServerEvent {} + +interface SocketData { + joined: boolean + username: string + rtpCapabilities: types.RtpCapabilities + transports: Map + producers: Map + consumers: Map +} export default function (io: SocketServer, router: types.Router) { - const namespace: Namespace = io.of('/webrtc') + const namespace: Namespace = io.of('/webrtc') namespace.on('connection', (socket) => { consola.info('[WebRtc]', 'Client connected', socket.id) - transports.set(socket.id, []) - producers.set(socket.id, []) - consumers.set(socket.id, []) + socket.data.joined = false + socket.data.username = socket.id - socket.emit('producers', Array.from(producers.values()).flatMap((producers) => { - return producers.map((producer) => { + socket.data.transports = new Map() + socket.data.producers = new Map() + socket.data.consumers = new Map() + + socket.on('join', async ({ username, rtpCapabilities }, cb) => { + if (socket.data.joined) + throw new Error('Already joined') + + socket.data.joined = true + socket.data.username = username + socket.data.rtpCapabilities = rtpCapabilities + + const joinedSockets = await getJoinedSockets() + + cb(joinedSockets.map((s) => { return { - producerId: producer.id, - kind: producer.kind, + id: s.id, + username: s.data.username, } - }) - })) + })) + + for (const joinedSocket of joinedSockets) { + for (const producer of joinedSocket.data.producers.values()) { + createConsumer( + socket, + joinedSocket, + producer, + ) + } + } + }) socket.on('getRtpCapabilities', (cb) => { cb(router.rtpCapabilities) }) - socket.on('createTransport', async (cb) => { + socket.on('createTransport', async ({ producing, consuming }, cb) => { try { const transport = await router.createWebRtcTransport({ listenInfos: [ @@ -103,9 +173,13 @@ export default function (io: SocketServer, router: types.Router) { ], enableUdp: true, preferUdp: true, + appData: { + producing, + consuming, + }, }) - transports.get(socket.id)!.push(transport) + socket.data.transports.set(transport.id, transport) cb({ id: transport.id, @@ -114,10 +188,12 @@ export default function (io: SocketServer, router: types.Router) { dtlsParameters: transport.dtlsParameters, }) - transport.observer.on('close', () => { - transports.set(socket.id, transports.get(socket.id)!.filter(t => t.id === transport.id)) + transport.on('icestatechange', (iceState) => { + if (iceState === 'disconnected' || iceState === 'closed') { + consola.info('[WebRtc]', '[WebRtcTransport]', `"icestatechange" event [iceState:${iceState}], closing peer`, transport.id) - consola.info('[WebRtc]', 'Transport closed', transport.id) + socket.disconnect() + } }) } catch (error) { @@ -129,10 +205,10 @@ export default function (io: SocketServer, router: types.Router) { }) socket.on('connectTransport', async ({ transportId, dtlsParameters }, cb) => { - const transport = transports.get(socket.id)!.find(t => t.id === transportId) + const transport = socket.data.transports.get(transportId) if (!transport) { - consola.error('[WebRtc]', '[connectTransport]', 'Transport not found') + consola.error('[WebRtc]', '[connectTransport]', `Transport with id ${transportId} not found`) cb({ error: 'Transport not found' }) return @@ -152,26 +228,42 @@ export default function (io: SocketServer, router: types.Router) { }) socket.on('produce', async ({ transportId, kind, rtpParameters }, cb) => { - const transport = transports.get(socket.id)!.find(t => t.id === transportId) + if (!socket.data.joined) { + consola.error('Peer not joined yet') + cb({ error: 'Peer not joined yet' }) + + return + } + + const transport = socket.data.transports.get(transportId) if (!transport) { - consola.error('[WebRtc]', '[produce]', 'Transport not found') + consola.error('[WebRtc]', '[produce]', `Transport with id ${transportId} not found`) cb({ error: 'Transport not found' }) return } try { - const producer = await transport.produce({ kind, rtpParameters }) - producers.get(socket.id)!.push(producer) + const producer = await transport.produce({ kind, rtpParameters, appData: { socketId: socket.id } }) - cb({ producerId: producer.id }) - socket.broadcast.emit('newProducer', { producerId: producer.id, kind: producer.kind }) + socket.data.producers.set(producer.id, producer) - producer.on('transportclose', () => { - socket.broadcast.emit('producerClosed', producer.id) - consola.log('[WebRtc]', 'Producer closed', producer.id) - }) + cb({ id: producer.id }) + + const sockets = await namespace.fetchSockets() + const otherSockets = sockets.filter(s => s.id !== socket.id) + + for (const otherSocket of otherSockets) { + createConsumer( + socket, + otherSocket, + producer, + ) + } + + // TODO: Add into the AudioLevelObserver and ActiveSpeakerObserver. + // https://github.com/versatica/mediasoup-demo/blob/v3/server/lib/Room.js#L1276 } catch (error) { if (error instanceof Error) { @@ -181,57 +273,234 @@ export default function (io: SocketServer, router: types.Router) { } }) - socket.on('consume', async ({ producerId, transportId, rtpCapabilities }, cb) => { - if (!router.canConsume({ producerId, rtpCapabilities })) { - consola.error('[WebRtc]', '[consume]', 'Cannot consume') - cb({ error: 'Cannot consume' }) + socket.on('closeProducer', async ({ producerId }, cb) => { + if (!socket.data.joined) { + consola.error('Peer not joined yet') + cb({ error: 'Peer not joined yet' }) return } - const transport = transports.get(socket.id)?.find(t => t.id === transportId) + const producer = socket.data.producers.get(producerId) - if (!transport) { - consola.error('[WebRtc]', '[consume]', 'Transport not found') - cb({ error: 'Transport not found' }) + if (!producer) { + consola.error(`producer with id "${producerId}" not found`) + cb({ error: `producer with id "${producerId}" not found` }) return } - try { - const consumer = await transport.consume({ - producerId, - rtpCapabilities, - paused: false, - }) + producer.close() - consumers.get(socket.id)!.push(consumer) + socket.data.producers.delete(producer.id) - cb({ - consumerId: consumer.id, - producerId, - kind: consumer.kind, - rtpParameters: consumer.rtpParameters, - }) + cb({ ok: true }) + }) + + socket.on('pauseProducer', async ({ producerId }, cb) => { + if (!socket.data.joined) { + consola.error('Peer not joined yet') + cb({ error: 'Peer not joined yet' }) + + return } - catch (error) { - if (error instanceof Error) { - consola.error('[WebRtc]', '[consume]', error.message) - cb({ error: error.message }) - } + + const producer = socket.data.producers.get(producerId) + + if (!producer) { + consola.error(`producer with id "${producerId}" not found`) + cb({ error: `producer with id "${producerId}" not found` }) + + return } + + await producer.pause() + + cb({ ok: true }) + }) + + socket.on('resumeProducer', async ({ producerId }, cb) => { + if (!socket.data.joined) { + consola.error('Peer not joined yet') + cb({ error: 'Peer not joined yet' }) + + return + } + + const producer = socket.data.producers.get(producerId) + + if (!producer) { + consola.error(`producer with id "${producerId}" not found`) + cb({ error: `producer with id "${producerId}" not found` }) + + return + } + + await producer.resume() + + cb({ ok: true }) + }) + + socket.on('pauseConsumer', async ({ consumerId }, cb) => { + if (!socket.data.joined) { + consola.error('Peer not joined yet') + cb({ error: 'Peer not joined yet' }) + + return + } + + const consumer = socket.data.consumers.get(consumerId) + + if (!consumer) { + consola.error(`consumer with id "${consumerId}" not found`) + cb({ error: `consumer with id "${consumerId}" not found` }) + + return + } + + await consumer.pause() + + cb({ ok: true }) + }) + + socket.on('resumeConsumer', async ({ consumerId }, cb) => { + if (!socket.data.joined) { + consola.error('Peer not joined yet') + cb({ error: 'Peer not joined yet' }) + + return + } + + const consumer = socket.data.consumers.get(consumerId) + + if (!consumer) { + consola.error(`consumer with id "${consumerId}" not found`) + cb({ error: `consumer with id "${consumerId}" not found` }) + + return + } + + await consumer.resume() + + cb({ ok: true }) }) socket.on('disconnect', () => { consola.info('Client disconnected:', socket.id) - transports.get(socket.id)!.forEach(t => t.close()) - producers.get(socket.id)!.forEach(p => p.close()) - consumers.get(socket.id)!.forEach(c => c.close()) + if (socket.data.joined) { + socket.broadcast.emit('peerClosed', socket.id) + } - transports.delete(socket.id) - producers.delete(socket.id) - consumers.delete(socket.id) + for (const transport of socket.data.transports.values()) { + transport.close() + } }) }) + + async function getJoinedSockets(excludeId?: string) { + const sockets = await namespace.fetchSockets() + + return sockets.filter(socket => socket.data.joined && (excludeId && excludeId !== socket.id)) + } + + async function createConsumer( + consumerSocket: Socket, + producerSocket: RemoteSocket, + producer: types.Producer, + ) { + if ( + !consumerSocket.data.rtpCapabilities + || !router.canConsume( + { + producerId: producer.id, + rtpCapabilities: consumerSocket.data.rtpCapabilities, + }, + ) + ) { + return + } + + const transport = Array.from(consumerSocket.data.transports.values()) + .find(t => t.appData.consuming) + + if (!transport) { + consola.error('createConsumer() | Transport for consuming not found') + + return + } + + let consumer: types.Consumer + + try { + consumer = await transport.consume( + { + producerId: producer.id, + rtpCapabilities: consumerSocket.data.rtpCapabilities, + // Enable NACK for OPUS. + enableRtx: true, + paused: true, + ignoreDtx: true, + }, + ) + } + catch (error) { + consola.error('_createConsumer() | transport.consume():%o', error) + + return + } + + consumerSocket.data.consumers.set(consumer.id, consumer) + + consumer.on('transportclose', () => { + consumerSocket.data.consumers.delete(consumer.id) + }) + + consumer.on('producerclose', () => { + consumerSocket.data.consumers.delete(consumer.id) + + consumerSocket.emit('consumerClosed', { consumerId: consumer.id }) + }) + + consumer.on('producerpause', () => { + consumerSocket.emit('consumerPaused', { consumerId: consumer.id }) + }) + + consumer.on('producerresume', () => { + consumerSocket.emit('consumerResumed', { consumerId: consumer.id }) + }) + + consumer.on('score', (score) => { + consumerSocket.emit('consumerScore', { consumerId: consumer.id, score }) + }) + + try { + await consumerSocket.emitWithAck( + 'newConsumer', + { + peerId: producerSocket.id, + producerId: producer.id, + id: consumer.id, + kind: consumer.kind, + rtpParameters: consumer.rtpParameters, + type: consumer.type, + appData: producer.appData, + producerPaused: consumer.producerPaused, + }, + ) + + await consumer.resume() + + consumerSocket.emit( + 'consumerScore', + { + consumerId: consumer.id, + score: consumer.score, + }, + ) + } + catch (error) { + consola.error('_createConsumer() | failed:%o', error) + } + } }