import type { types } from 'mediasoup' import type { Namespace, RemoteSocket, Socket, Server as SocketServer } from 'socket.io' import { consola } from 'consola' interface ChadClient { id: string username: string inputMuted: boolean outputMuted: boolean } interface ProducerShort { producerId: types.Producer['id'] kind: types.MediaKind } interface ErrorCallbackResult { error: string } interface SuccessCallbackResult { ok: true } type EventCallback = (result: T | ErrorCallbackResult) => void interface ClientToServerEvents { join: ( options: { username: string rtpCapabilities: types.RtpCapabilities }, cb: EventCallback<{ id: string, username: string }[]> ) => void getRtpCapabilities: ( cb: EventCallback ) => void createTransport: ( options: { producing: boolean consuming: boolean }, cb: EventCallback> ) => void connectTransport: ( options: { transportId: types.WebRtcTransport['id'] dtlsParameters: types.WebRtcTransport['dtlsParameters'] }, cb: EventCallback ) => void produce: ( options: { transportId: types.WebRtcTransport['id'] kind: types.MediaKind rtpParameters: types.RtpParameters }, cb: EventCallback<{ id: types.Producer['id'] }> ) => void closeProducer: ( options: { producerId: types.Producer['id'] }, cb: EventCallback ) => void pauseProducer: ( options: { producerId: types.Producer['id'] }, cb: EventCallback ) => void resumeProducer: ( options: { producerId: types.Producer['id'] }, cb: EventCallback ) => void pauseConsumer: ( options: { consumerId: types.Consumer['id'] }, cb: EventCallback ) => void resumeConsumer: ( options: { consumerId: types.Consumer['id'] }, cb: EventCallback ) => void updateClient: ( options: Partial>, cb: EventCallback ) => void } interface ServerToClientEvents { newPeer: (arg: ChadClient) => void producers: (arg: ProducerShort[]) => void newConsumer: ( arg: { peerId: string producerId: types.Producer['id'] id: types.Consumer['id'] kind: types.MediaKind rtpParameters: types.RtpParameters type: types.ConsumerType appData: types.Producer['appData'] producerPaused: types.Consumer['producerPaused'] }, cb: EventCallback ) => void peerClosed: (arg: string) => void consumerClosed: (arg: { consumerId: string }) => void consumerPaused: (arg: { consumerId: string }) => void consumerResumed: (arg: { consumerId: string }) => void consumerScore: (arg: { consumerId: string, score: types.ConsumerScore }) => void clientChanged: (clientId: ChadClient['id'], client: ChadClient) => void } interface InterServerEvent {} interface SocketData { joined: boolean username: string inputMuted: boolean outputMuted: boolean rtpCapabilities: types.RtpCapabilities transports: Map producers: Map consumers: Map } export default function (io: SocketServer, router: types.Router) { const namespace: Namespace = io.of('/webrtc') namespace.on('connection', (socket) => { consola.info('[WebRtc]', 'Client connected', socket.id) socket.data.joined = false socket.data.username = socket.id socket.data.inputMuted = false socket.data.outputMuted = false socket.data.transports = new Map() socket.data.producers = new Map() socket.data.consumers = new Map() socket.on('join', async ({ username, rtpCapabilities }, cb) => { if (socket.data.joined) { consola.error('[WebRtc]', 'Already joined') cb({ error: 'Already joined' }) } socket.data.joined = true socket.data.username = username socket.data.rtpCapabilities = rtpCapabilities const joinedSockets = await getJoinedSockets() cb(joinedSockets.map((s) => { return { id: s.id, username: s.data.username, } })) for (const joinedSocket of joinedSockets.filter(joinedSocket => joinedSocket.id !== socket.id)) { for (const producer of joinedSocket.data.producers.values()) { createConsumer( socket, joinedSocket, producer, ) } } socket.broadcast.emit('newPeer', socketToClient(socket)) }) socket.on('getRtpCapabilities', (cb) => { cb(router.rtpCapabilities) }) socket.on('createTransport', async ({ producing, consuming }, cb) => { try { const transport = await router.createWebRtcTransport({ listenInfos: [ { protocol: 'udp', ip: '0.0.0.0', announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1', portRange: { min: 40000, max: 40100, }, }, ], enableUdp: true, preferUdp: true, appData: { producing, consuming, }, }) socket.data.transports.set(transport.id, transport) cb({ id: transport.id, iceParameters: transport.iceParameters, iceCandidates: transport.iceCandidates, dtlsParameters: transport.dtlsParameters, }) transport.on('icestatechange', (iceState) => { if (iceState === 'disconnected' || iceState === 'closed') { consola.info('[WebRtc]', '[WebRtcTransport]', `"icestatechange" event [iceState:${iceState}], closing peer`, transport.id) socket.disconnect() } }) transport.on('dtlsstatechange', (dtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { consola.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s], closing peer', dtlsState) socket.disconnect() } }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[createTransport]', error.message) cb({ error: error.message }) } } }) socket.on('connectTransport', async ({ transportId, dtlsParameters }, cb) => { const transport = socket.data.transports.get(transportId) if (!transport) { consola.error('[WebRtc]', '[connectTransport]', `Transport with id ${transportId} not found`) cb({ error: 'Transport not found' }) return } try { await transport.connect({ dtlsParameters }) cb({ ok: true }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[connectTransport]', error.message) cb({ error: error.message }) } } }) socket.on('produce', async ({ transportId, kind, rtpParameters }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const transport = socket.data.transports.get(transportId) if (!transport) { consola.error('[WebRtc]', '[produce]', `Transport with id ${transportId} not found`) cb({ error: 'Transport not found' }) return } try { const producer = await transport.produce({ kind, rtpParameters, appData: { socketId: socket.id } }) socket.data.producers.set(producer.id, producer) cb({ id: producer.id }) const otherSockets = await getJoinedSockets(socket.id) for (const otherSocket of otherSockets) { createConsumer( otherSocket, socket, producer, ) } // TODO: Add into the AudioLevelObserver and ActiveSpeakerObserver. // https://github.com/versatica/mediasoup-demo/blob/v3/server/lib/Room.js#L1276 } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[produce]', error.message) cb({ error: error.message }) } } }) socket.on('closeProducer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const producer = socket.data.producers.get(producerId) if (!producer) { consola.error(`producer with id "${producerId}" not found`) cb({ error: `producer with id "${producerId}" not found` }) return } producer.close() socket.data.producers.delete(producerId) cb({ ok: true }) }) socket.on('pauseProducer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const producer = socket.data.producers.get(producerId) if (!producer) { consola.error(`producer with id "${producerId}" not found`) cb({ error: `producer with id "${producerId}" not found` }) return } await producer.pause() cb({ ok: true }) }) socket.on('resumeProducer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const producer = socket.data.producers.get(producerId) if (!producer) { consola.error(`producer with id "${producerId}" not found`) cb({ error: `producer with id "${producerId}" not found` }) return } await producer.resume() cb({ ok: true }) }) socket.on('pauseConsumer', async ({ consumerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const consumer = socket.data.consumers.get(consumerId) if (!consumer) { consola.error(`consumer with id "${consumerId}" not found`) cb({ error: `consumer with id "${consumerId}" not found` }) return } await consumer.pause() cb({ ok: true }) }) socket.on('resumeConsumer', async ({ consumerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const consumer = socket.data.consumers.get(consumerId) if (!consumer) { consola.error(`consumer with id "${consumerId}" not found`) cb({ error: `consumer with id "${consumerId}" not found` }) return } await consumer.resume() cb({ ok: true }) }) socket.on('updateClient', (updatedClient, cb) => { if (updatedClient.username) { socket.data.username = updatedClient.username } if (updatedClient.inputMuted) { socket.data.inputMuted = updatedClient.inputMuted } if (updatedClient.outputMuted) { socket.data.outputMuted = updatedClient.outputMuted } cb({ ok: true }) namespace.emit('clientChanged', socket.id, socketToClient(socket)) }) socket.on('disconnect', () => { consola.info('Client disconnected:', socket.id) if (socket.data.joined) { socket.broadcast.emit('peerClosed', socket.id) } for (const transport of socket.data.transports.values()) { transport.close() } }) }) async function getJoinedSockets(excludeId?: string) { const sockets = await namespace.fetchSockets() return sockets.filter(socket => socket.data.joined && (excludeId ? excludeId !== socket.id : true)) } async function createConsumer( consumerSocket: Socket, producerSocket: RemoteSocket, producer: types.Producer, ) { if ( !consumerSocket.data.rtpCapabilities || !router.canConsume( { producerId: producer.id, rtpCapabilities: consumerSocket.data.rtpCapabilities, }, ) ) { return } const transport = Array.from(consumerSocket.data.transports.values()) .find(t => t.appData.consuming) if (!transport) { consola.error('createConsumer() | Transport for consuming not found') return } let consumer: types.Consumer try { consumer = await transport.consume( { producerId: producer.id, rtpCapabilities: consumerSocket.data.rtpCapabilities, // Enable NACK for OPUS. enableRtx: true, paused: true, ignoreDtx: true, }, ) } catch (error) { consola.error('_createConsumer() | transport.consume():%o', error) return } consumerSocket.data.consumers.set(consumer.id, consumer) consumer.on('transportclose', () => { consumerSocket.data.consumers.delete(consumer.id) }) consumer.on('producerclose', () => { consumerSocket.data.consumers.delete(consumer.id) consumerSocket.emit('consumerClosed', { consumerId: consumer.id }) }) consumer.on('producerpause', () => { consumerSocket.emit('consumerPaused', { consumerId: consumer.id }) }) consumer.on('producerresume', () => { consumerSocket.emit('consumerResumed', { consumerId: consumer.id }) }) consumer.on('score', (score) => { consumerSocket.emit('consumerScore', { consumerId: consumer.id, score }) }) try { await consumerSocket.emitWithAck( 'newConsumer', { peerId: producerSocket.id, producerId: producer.id, id: consumer.id, kind: consumer.kind, rtpParameters: consumer.rtpParameters, type: consumer.type, appData: producer.appData, producerPaused: consumer.producerPaused, }, ) await consumer.resume() consumerSocket.emit( 'consumerScore', { consumerId: consumer.id, score: consumer.score, }, ) } catch (error) { consola.error('_createConsumer() | failed:%o', error) } } function socketToClient(socket: Socket): ChadClient { return { id: socket.id, username: socket.data.username, inputMuted: socket.data.inputMuted, outputMuted: socket.data.outputMuted, } } }