import type { types } from 'mediasoup' import type { ActiveSpeakerObserver, AudioLevelObserver } from 'mediasoup/types' import type { Server as SocketServer } from 'socket.io' import type { ChadSocket, SomeSocket, } from '../types/socket.ts' import { consola } from 'consola' import { socketToClient } from '../utils/socket-to-client.ts' export default async function ( io: SocketServer, socket: ChadSocket, router: types.Router, audioLevelObserver: AudioLevelObserver, activeSpeakerObserver: ActiveSpeakerObserver, ) { io.on('connection', async (socket) => { socket.data.inputMuted = false socket.data.outputMuted = false socket.data.transports = new Map() socket.data.producers = new Map() socket.data.consumers = new Map() socket.on('webrtc:get-rtp-capabilities', (cb) => { cb(router.rtpCapabilities) }) socket.on('webrtc:create-transport', async ({ producing, consuming }, cb) => { try { const transport = await router.createWebRtcTransport({ listenInfos: [ { protocol: 'udp', ip: '0.0.0.0', announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1', portRange: { min: 40000, max: 40100, }, }, ], enableUdp: true, preferUdp: true, appData: { producing, consuming, }, }) socket.data.transports.set(transport.id, transport) cb({ id: transport.id, iceParameters: transport.iceParameters, iceCandidates: transport.iceCandidates, dtlsParameters: transport.dtlsParameters, }) transport.on('icestatechange', (iceState: types.IceState) => { if (iceState === 'disconnected' || iceState === 'closed') { consola.info('[WebRtc]', '[WebRtcTransport]', `"icestatechange" event [iceState:${iceState}], closing peer`, transport.id) socket.disconnect() } }) transport.on('dtlsstatechange', (dtlsState: types.DtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { consola.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s], closing peer', dtlsState) socket.disconnect() } }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[createTransport]', error.message) cb({ error: error.message }) } } }) socket.on('webrtc:connect-transport', async ({ transportId, dtlsParameters }, cb) => { const transport = socket.data.transports.get(transportId) if (!transport) { consola.error('[WebRtc]', '[connectTransport]', `Transport with id ${transportId} not found`) cb({ error: 'Transport not found' }) return } try { await transport.connect({ dtlsParameters }) cb({ ok: true }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[connectTransport]', error.message) cb({ error: error.message }) } } }) socket.on('webrtc:produce', async ({ transportId, kind, rtpParameters, appData }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } // Block production in default channel const currentChannelId = Array.from(socket.rooms).find(room => room !== socket.id) || 'default' if (currentChannelId === 'default') { consola.error('Cannot produce in default channel') cb({ error: 'Cannot produce media in default channel' }) return } const transport = socket.data.transports.get(transportId) if (!transport) { consola.error('[WebRtc]', '[produce]', `Transport with id ${transportId} not found`) cb({ error: 'Transport not found' }) return } try { const producer = await transport.produce({ kind, rtpParameters, appData: { ...appData, socketId: socket.id } }) socket.data.producers.set(producer.id, producer) cb({ id: producer.id }) // Filter by channel when creating consumers const otherSockets = await getJoinedSockets(socket.id, currentChannelId) for (const otherSocket of otherSockets) { createConsumer( otherSocket, socket, producer, ) } await audioLevelObserver.addProducer({ producerId: producer.id }) await activeSpeakerObserver.addProducer({ producerId: producer.id }) } catch (error) { if (error instanceof Error) { consola.error('[WebRtc]', '[produce]', error.message) cb({ error: error.message }) } } }) socket.on('webrtc:close-producer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const producer = socket.data.producers.get(producerId) if (!producer) { consola.error(`producer with id "${producerId}" not found`) cb({ error: `producer with id "${producerId}" not found` }) return } producer.close() socket.data.producers.delete(producerId) cb({ ok: true }) }) socket.on('webrtc:pause-producer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const producer = socket.data.producers.get(producerId) if (!producer) { consola.error(`producer with id "${producerId}" not found`) cb({ error: `producer with id "${producerId}" not found` }) return } if (producer.paused) return await producer.pause() cb({ ok: true }) }) socket.on('webrtc:resume-producer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const producer = socket.data.producers.get(producerId) if (!producer) { consola.error(`producer with id "${producerId}" not found`) cb({ error: `producer with id "${producerId}" not found` }) return } await producer.resume() cb({ ok: true }) }) socket.on('webrtc:pause-consumer', async ({ consumerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const consumer = socket.data.consumers.get(consumerId) if (!consumer) { consola.error(`consumer with id "${consumerId}" not found`) cb({ error: `consumer with id "${consumerId}" not found` }) return } await consumer.pause() cb({ ok: true }) }) socket.on('webrtc:resume-consumer', async ({ consumerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) return } const consumer = socket.data.consumers.get(consumerId) if (!consumer) { consola.error(`consumer with id "${consumerId}" not found`) cb({ error: `consumer with id "${consumerId}" not found` }) return } await consumer.resume() cb({ ok: true }) }) socket.on('webrtc:update-client', async (updatedClient, cb) => { if (typeof updatedClient.inputMuted === 'boolean') { socket.data.inputMuted = updatedClient.inputMuted } if (typeof updatedClient.outputMuted === 'boolean') { socket.data.outputMuted = updatedClient.outputMuted } cb(socketToClient(socket)) io.emit('webrtc:client-changed', socket.id, socketToClient(socket)) }) socket.on('disconnect', () => { consola.info('Client disconnected:', socket.id) // Get current channel from Socket.IO rooms const channelId = Array.from(socket.rooms).find(room => room !== socket.id) if (socket.data.joined) { // Notify only same channel using Socket.IO room if (channelId) { socket.to(channelId).emit('webrtc:peer-closed', socket.id) io.emit('channelUserLeft', { channelId, clientId: socket.id }) } } for (const transport of socket.data.transports.values()) { transport.close() } }) }) async function getJoinedSockets(excludeId?: string, channelId?: string) { let sockets = await io.fetchSockets() // Filter by channel using Socket.IO rooms if (channelId) { sockets = await io.in(channelId).fetchSockets() } return sockets.filter((socket) => { if (!socket.data.joined) return false if (excludeId && socket.id === excludeId) return false return true }) } async function createConsumer( consumerSocket: SomeSocket, producerSocket: SomeSocket, producer: types.Producer, ) { if ( !consumerSocket.data.rtpCapabilities || !router.canConsume( { producerId: producer.id, rtpCapabilities: consumerSocket.data.rtpCapabilities, }, ) ) { return } const transport = Array.from(consumerSocket.data.transports.values()) .find(t => t.appData.consuming) if (!transport) { consola.error('createConsumer() | Transport for consuming not found') return } let consumer: types.Consumer try { consumer = await transport.consume( { producerId: producer.id, rtpCapabilities: consumerSocket.data.rtpCapabilities, // Enable NACK for OPUS. enableRtx: true, paused: true, ignoreDtx: true, }, ) } catch (error) { consola.error('_createConsumer() | transport.consume():%o', error) return } consumerSocket.data.consumers.set(consumer.id, consumer) consumer.on('transportclose', () => { consumerSocket.data.consumers.delete(consumer.id) }) consumer.on('producerclose', () => { consumerSocket.data.consumers.delete(consumer.id) consumerSocket.emit('webrtc:consumer-closed', { consumerId: consumer.id }) }) consumer.on('producerpause', () => { consumerSocket.emit('webrtc:consumer-paused', { consumerId: consumer.id }) }) consumer.on('producerresume', () => { consumerSocket.emit('webrtc:consumer-resumed', { consumerId: consumer.id }) }) consumer.on('score', (score: types.ConsumerScore) => { consumerSocket.emit('webrtc:consumer-score', { consumerId: consumer.id, score }) }) try { await consumerSocket.emitWithAck( 'webrtc:new-consumer', { socketId: producerSocket.id, producerId: producer.id, id: consumer.id, kind: consumer.kind, rtpParameters: consumer.rtpParameters, type: consumer.type, appData: producer.appData, producerPaused: consumer.producerPaused, }, ) await consumer.resume() consumerSocket.emit( 'webrtc:consumer-score', { consumerId: consumer.id, score: consumer.score, }, ) } catch (error) { consola.error('_createConsumer() | failed:%o', error) } } }