chad/server/sockets/webrtc.ts
Никита Круглицкий 189404e22c
All checks were successful
Deploy / deploy (push) Successful in 44s
#4
2025-10-04 23:59:48 +06:00

507 lines
13 KiB
TypeScript

import type { types } from 'mediasoup'
import type { Namespace, RemoteSocket, Socket, Server as SocketServer } from 'socket.io'
import { consola } from 'consola'
interface ProducerShort {
producerId: types.Producer['id']
kind: types.MediaKind
}
interface ErrorCallbackResult {
error: string
}
interface SuccessCallbackResult {
ok: true
}
type EventCallback<T = SuccessCallbackResult> = (result: T | ErrorCallbackResult) => void
interface ClientToServerEvents {
join: (
options: {
username: string
rtpCapabilities: types.RtpCapabilities
},
cb: EventCallback<{ id: string, username: string }[]>
) => void
getRtpCapabilities: (
cb: EventCallback<types.RtpCapabilities>
) => void
createTransport: (
options: {
producing: boolean
consuming: boolean
},
cb: EventCallback<Pick<types.WebRtcTransport, 'id' | 'iceParameters' | 'iceCandidates' | 'dtlsParameters'>>
) => void
connectTransport: (
options: {
transportId: types.WebRtcTransport['id']
dtlsParameters: types.WebRtcTransport['dtlsParameters']
},
cb: EventCallback
) => void
produce: (
options: {
transportId: types.WebRtcTransport['id']
kind: types.MediaKind
rtpParameters: types.RtpParameters
},
cb: EventCallback<{ id: types.Producer['id'] }>
) => void
closeProducer: (
options: {
producerId: types.Producer['id']
},
cb: EventCallback
) => void
pauseProducer: (
options: {
producerId: types.Producer['id']
},
cb: EventCallback
) => void
resumeProducer: (
options: {
producerId: types.Producer['id']
},
cb: EventCallback
) => void
pauseConsumer: (
options: {
consumerId: types.Consumer['id']
},
cb: EventCallback
) => void
resumeConsumer: (
options: {
consumerId: types.Consumer['id']
},
cb: EventCallback
) => void
}
interface ServerToClientEvents {
producers: (arg: ProducerShort[]) => void
newConsumer: (arg: {
peerId: string
producerId: types.Producer['id']
id: types.Consumer['id']
kind: types.MediaKind
rtpParameters: types.RtpParameters
type: types.ConsumerType
appData: types.Producer['appData']
producerPaused: types.Consumer['producerPaused']
}, cb: EventCallback) => void
peerClosed: (arg: string) => void
consumerClosed: (arg: { consumerId: string }) => void
consumerPaused: (arg: { consumerId: string }) => void
consumerResumed: (arg: { consumerId: string }) => void
consumerScore: (arg: { consumerId: string, score: types.ConsumerScore }) => void
}
interface InterServerEvent {}
interface SocketData {
joined: boolean
username: string
rtpCapabilities: types.RtpCapabilities
transports: Map<types.WebRtcTransport['id'], types.WebRtcTransport>
producers: Map<types.Producer['id'], types.Producer>
consumers: Map<types.Consumer['id'], types.Consumer>
}
export default function (io: SocketServer, router: types.Router) {
const namespace: Namespace<ClientToServerEvents, ServerToClientEvents, InterServerEvent, SocketData> = io.of('/webrtc')
namespace.on('connection', (socket) => {
consola.info('[WebRtc]', 'Client connected', socket.id)
socket.data.joined = false
socket.data.username = socket.id
socket.data.transports = new Map()
socket.data.producers = new Map()
socket.data.consumers = new Map()
socket.on('join', async ({ username, rtpCapabilities }, cb) => {
if (socket.data.joined)
throw new Error('Already joined')
socket.data.joined = true
socket.data.username = username
socket.data.rtpCapabilities = rtpCapabilities
const joinedSockets = await getJoinedSockets()
cb(joinedSockets.map((s) => {
return {
id: s.id,
username: s.data.username,
}
}))
for (const joinedSocket of joinedSockets) {
for (const producer of joinedSocket.data.producers.values()) {
createConsumer(
socket,
joinedSocket,
producer,
)
}
}
})
socket.on('getRtpCapabilities', (cb) => {
cb(router.rtpCapabilities)
})
socket.on('createTransport', async ({ producing, consuming }, cb) => {
try {
const transport = await router.createWebRtcTransport({
listenInfos: [
{
protocol: 'udp',
ip: '0.0.0.0',
announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1',
portRange: {
min: 40000,
max: 40100,
},
},
],
enableUdp: true,
preferUdp: true,
appData: {
producing,
consuming,
},
})
socket.data.transports.set(transport.id, transport)
cb({
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
})
transport.on('icestatechange', (iceState) => {
if (iceState === 'disconnected' || iceState === 'closed') {
consola.info('[WebRtc]', '[WebRtcTransport]', `"icestatechange" event [iceState:${iceState}], closing peer`, transport.id)
socket.disconnect()
}
})
}
catch (error) {
if (error instanceof Error) {
consola.error('[WebRtc]', '[createTransport]', error.message)
cb({ error: error.message })
}
}
})
socket.on('connectTransport', async ({ transportId, dtlsParameters }, cb) => {
const transport = socket.data.transports.get(transportId)
if (!transport) {
consola.error('[WebRtc]', '[connectTransport]', `Transport with id ${transportId} not found`)
cb({ error: 'Transport not found' })
return
}
try {
await transport.connect({ dtlsParameters })
cb({ ok: true })
}
catch (error) {
if (error instanceof Error) {
consola.error('[WebRtc]', '[connectTransport]', error.message)
cb({ error: error.message })
}
}
})
socket.on('produce', async ({ transportId, kind, rtpParameters }, cb) => {
if (!socket.data.joined) {
consola.error('Peer not joined yet')
cb({ error: 'Peer not joined yet' })
return
}
const transport = socket.data.transports.get(transportId)
if (!transport) {
consola.error('[WebRtc]', '[produce]', `Transport with id ${transportId} not found`)
cb({ error: 'Transport not found' })
return
}
try {
const producer = await transport.produce({ kind, rtpParameters, appData: { socketId: socket.id } })
socket.data.producers.set(producer.id, producer)
cb({ id: producer.id })
const sockets = await namespace.fetchSockets()
const otherSockets = sockets.filter(s => s.id !== socket.id)
for (const otherSocket of otherSockets) {
createConsumer(
socket,
otherSocket,
producer,
)
}
// TODO: Add into the AudioLevelObserver and ActiveSpeakerObserver.
// https://github.com/versatica/mediasoup-demo/blob/v3/server/lib/Room.js#L1276
}
catch (error) {
if (error instanceof Error) {
consola.error('[WebRtc]', '[produce]', error.message)
cb({ error: error.message })
}
}
})
socket.on('closeProducer', async ({ producerId }, cb) => {
if (!socket.data.joined) {
consola.error('Peer not joined yet')
cb({ error: 'Peer not joined yet' })
return
}
const producer = socket.data.producers.get(producerId)
if (!producer) {
consola.error(`producer with id "${producerId}" not found`)
cb({ error: `producer with id "${producerId}" not found` })
return
}
producer.close()
socket.data.producers.delete(producer.id)
cb({ ok: true })
})
socket.on('pauseProducer', async ({ producerId }, cb) => {
if (!socket.data.joined) {
consola.error('Peer not joined yet')
cb({ error: 'Peer not joined yet' })
return
}
const producer = socket.data.producers.get(producerId)
if (!producer) {
consola.error(`producer with id "${producerId}" not found`)
cb({ error: `producer with id "${producerId}" not found` })
return
}
await producer.pause()
cb({ ok: true })
})
socket.on('resumeProducer', async ({ producerId }, cb) => {
if (!socket.data.joined) {
consola.error('Peer not joined yet')
cb({ error: 'Peer not joined yet' })
return
}
const producer = socket.data.producers.get(producerId)
if (!producer) {
consola.error(`producer with id "${producerId}" not found`)
cb({ error: `producer with id "${producerId}" not found` })
return
}
await producer.resume()
cb({ ok: true })
})
socket.on('pauseConsumer', async ({ consumerId }, cb) => {
if (!socket.data.joined) {
consola.error('Peer not joined yet')
cb({ error: 'Peer not joined yet' })
return
}
const consumer = socket.data.consumers.get(consumerId)
if (!consumer) {
consola.error(`consumer with id "${consumerId}" not found`)
cb({ error: `consumer with id "${consumerId}" not found` })
return
}
await consumer.pause()
cb({ ok: true })
})
socket.on('resumeConsumer', async ({ consumerId }, cb) => {
if (!socket.data.joined) {
consola.error('Peer not joined yet')
cb({ error: 'Peer not joined yet' })
return
}
const consumer = socket.data.consumers.get(consumerId)
if (!consumer) {
consola.error(`consumer with id "${consumerId}" not found`)
cb({ error: `consumer with id "${consumerId}" not found` })
return
}
await consumer.resume()
cb({ ok: true })
})
socket.on('disconnect', () => {
consola.info('Client disconnected:', socket.id)
if (socket.data.joined) {
socket.broadcast.emit('peerClosed', socket.id)
}
for (const transport of socket.data.transports.values()) {
transport.close()
}
})
})
async function getJoinedSockets(excludeId?: string) {
const sockets = await namespace.fetchSockets()
return sockets.filter(socket => socket.data.joined && (excludeId ? excludeId !== socket.id : true))
}
async function createConsumer(
consumerSocket: Socket<ClientToServerEvents, ServerToClientEvents, InterServerEvent, SocketData>,
producerSocket: RemoteSocket<ServerToClientEvents, SocketData>,
producer: types.Producer,
) {
if (
!consumerSocket.data.rtpCapabilities
|| !router.canConsume(
{
producerId: producer.id,
rtpCapabilities: consumerSocket.data.rtpCapabilities,
},
)
) {
return
}
const transport = Array.from(consumerSocket.data.transports.values())
.find(t => t.appData.consuming)
if (!transport) {
consola.error('createConsumer() | Transport for consuming not found')
return
}
let consumer: types.Consumer
try {
consumer = await transport.consume(
{
producerId: producer.id,
rtpCapabilities: consumerSocket.data.rtpCapabilities,
// Enable NACK for OPUS.
enableRtx: true,
paused: true,
ignoreDtx: true,
},
)
}
catch (error) {
consola.error('_createConsumer() | transport.consume():%o', error)
return
}
consumerSocket.data.consumers.set(consumer.id, consumer)
consumer.on('transportclose', () => {
consumerSocket.data.consumers.delete(consumer.id)
})
consumer.on('producerclose', () => {
consumerSocket.data.consumers.delete(consumer.id)
consumerSocket.emit('consumerClosed', { consumerId: consumer.id })
})
consumer.on('producerpause', () => {
consumerSocket.emit('consumerPaused', { consumerId: consumer.id })
})
consumer.on('producerresume', () => {
consumerSocket.emit('consumerResumed', { consumerId: consumer.id })
})
consumer.on('score', (score) => {
consumerSocket.emit('consumerScore', { consumerId: consumer.id, score })
})
try {
await consumerSocket.emitWithAck(
'newConsumer',
{
peerId: producerSocket.id,
producerId: producer.id,
id: consumer.id,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
type: consumer.type,
appData: producer.appData,
producerPaused: consumer.producerPaused,
},
)
await consumer.resume()
consumerSocket.emit(
'consumerScore',
{
consumerId: consumer.id,
score: consumer.score,
},
)
}
catch (error) {
consola.error('_createConsumer() | failed:%o', error)
}
}
}