import type { types } from 'mediasoup' import type { Namespace, Server as SocketServer } from 'socket.io' import { consola } from 'consola' interface ProducerShort { producerId: types.Producer['id'] kind: types.MediaKind } interface ErrorCallbackResult { error: string } interface SuccessCallbackResult { ok: true } type EventCallback = (result: T | ErrorCallbackResult) => void interface ClientToServerEvents extends Record { getRtpCapabilities: ( cb: EventCallback ) => void createTransport: ( cb: EventCallback> ) => void connectTransport: ( options: { transportId: types.WebRtcTransport['id'] dtlsParameters: types.WebRtcTransport['dtlsParameters'] }, cb: EventCallback ) => void produce: ( options: { transportId: types.WebRtcTransport['id'] kind: types.MediaKind rtpParameters: types.RtpParameters }, cb: EventCallback<{ producerId: types.Producer['id'] }> ) => void consume: ( options: { producerId: types.Producer['id'] transportId: types.WebRtcTransport['id'] rtpCapabilities: types.RtpCapabilities }, cb: EventCallback<{ consumerId: types.Consumer['id'] producerId: types.Producer['id'] kind: types.MediaKind rtpParameters: types.RtpParameters }> ) => void } interface ServerToClientEvents { producers: (arg: ProducerShort[]) => void newProducer: (arg: ProducerShort) => void producerClosed: (arg: types.Producer['id']) => void } const transports = new Map() const producers = new Map() const consumers = new Map() export default function (io: SocketServer, router: types.Router) { const namespace: Namespace = io.of('/webrtc') namespace.on('connection', (socket) => { consola.info('[WebRtc]', 'Client connected', socket.id) transports.set(socket.id, []) producers.set(socket.id, []) consumers.set(socket.id, []) socket.emit('producers', Array.from(producers.values()).flatMap((producers) => { return producers.map((producer) => { return { producerId: producer.id, kind: producer.kind, } }) })) socket.on('getRtpCapabilities', (cb) => { cb(router.rtpCapabilities) }) socket.on('createTransport', async (cb) => { try { const transport = await router.createWebRtcTransport({ listenInfos: [ { protocol: 'udp', ip: '0.0.0.0', announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1', portRange: { min: 40000, max: 40100, }, }, ], enableUdp: true, preferUdp: true, }) transports.get(socket.id)!.push(transport) cb({ id: transport.id, iceParameters: transport.iceParameters, iceCandidates: transport.iceCandidates, dtlsParameters: transport.dtlsParameters, }) transport.observer.on('close', () => { transports.set(socket.id, transports.get(socket.id)!.filter(t => t.id === transport.id)) consola.info('[WebRtc]', 'Transport closed', transport.id) }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[createTransport]', error.message) cb({ error: error.message }) } } }) socket.on('connectTransport', async ({ transportId, dtlsParameters }, cb) => { const transport = transports.get(socket.id)!.find(t => t.id === transportId) if (!transport) { consola.error('[WebRtc]', '[connectTransport]', 'Transport not found') cb({ error: 'Transport not found' }) return } try { await transport.connect({ dtlsParameters }) cb({ ok: true }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[connectTransport]', error.message) cb({ error: error.message }) } } }) socket.on('produce', async ({ transportId, kind, rtpParameters }, cb) => { const transport = transports.get(socket.id)!.find(t => t.id === transportId) if (!transport) { consola.error('[WebRtc]', '[produce]', 'Transport not found') cb({ error: 'Transport not found' }) return } try { const producer = await transport.produce({ kind, rtpParameters }) producers.get(socket.id)!.push(producer) cb({ producerId: producer.id }) socket.broadcast.emit('newProducer', { producerId: producer.id, kind: producer.kind }) producer.on('transportclose', () => { socket.broadcast.emit('producerClosed', producer.id) consola.log('[WebRtc]', 'Producer closed', producer.id) }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[produce]', error.message) cb({ error: error.message }) } } }) socket.on('consume', async ({ producerId, transportId, rtpCapabilities }, cb) => { if (!router.canConsume({ producerId, rtpCapabilities })) { consola.error('[WebRtc]', '[consume]', 'Cannot consume') cb({ error: 'Cannot consume' }) return } const transport = transports.get(socket.id)?.find(t => t.id === transportId) if (!transport) { consola.error('[WebRtc]', '[consume]', 'Transport not found') cb({ error: 'Transport not found' }) return } try { const consumer = await transport.consume({ producerId, rtpCapabilities, paused: false, }) consumers.get(socket.id)!.push(consumer) cb({ consumerId: consumer.id, producerId, kind: consumer.kind, rtpParameters: consumer.rtpParameters, }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[consume]', error.message) cb({ error: error.message }) } } }) socket.on('disconnect', () => { consola.info('Client disconnected:', socket.id) transports.get(socket.id)!.forEach(t => t.close()) producers.get(socket.id)!.forEach(p => p.close()) consumers.get(socket.id)!.forEach(c => c.close()) transports.delete(socket.id) producers.delete(socket.id) consumers.delete(socket.id) }) }) }