Files
chad/server/plugins/socket/webrtc/Client.ts
2026-05-22 05:08:02 +06:00

289 lines
7.8 KiB
TypeScript

import type { types } from 'mediasoup'
import type { SerializedClient } from '../types.ts'
import { EventEmitter } from 'node:events'
import { consola } from 'consola'
export interface NewConsumerSignal {
socketId: string
producerId: string
id: string
kind: types.MediaKind
rtpParameters: types.RtpParameters
type: types.ConsumerType
appData: types.Producer['appData']
producerPaused: boolean
}
interface ClientEvents {
'signal:new-consumer': [data: NewConsumerSignal, onAcked: () => Promise<void>]
'consumer:closed': [consumerId: string]
'consumer:paused': [consumerId: string]
'consumer:resumed': [consumerId: string]
'transport:closed': []
'closed': []
'updated': []
}
export class Client extends EventEmitter<ClientEvents> {
readonly socketId: string
readonly username: string
channelId: string = ''
#inputMuted = false
#outputMuted = false
readonly #router: types.Router
readonly #transports = new Map<string, types.WebRtcTransport>()
readonly #producers = new Map<string, types.Producer>()
readonly #consumers = new Map<string, types.Consumer>()
constructor(socketId: string, username: string, router: types.Router) {
super()
this.socketId = socketId
this.username = username
this.#router = router
}
get producers(): ReadonlyMap<string, types.Producer> { return this.#producers }
get consumers(): ReadonlyMap<string, types.Consumer> { return this.#consumers }
get transports(): ReadonlyMap<string, types.WebRtcTransport> { return this.#transports }
get inputMuted(): boolean { return this.#inputMuted }
get outputMuted(): boolean { return this.#outputMuted }
get streaming(): boolean {
return Array.from(this.#producers.values()).some(
producer => producer.kind === 'video' && producer.appData.source === 'share',
)
}
async createTransport(options: { producing: boolean, consuming: boolean }) {
const transport = await this.#router.createWebRtcTransport({
listenInfos: [{
protocol: 'udp',
ip: '0.0.0.0',
announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1',
portRange: { min: 40000, max: 40100 },
}],
enableUdp: true,
preferUdp: true,
appData: options,
})
this.#transports.set(transport.id, transport)
transport.on('icestatechange', (iceState: types.IceState) => {
if (iceState === 'disconnected' || iceState === 'closed') {
consola.info('[Client]', `[${this.socketId}]`, `iceState=${iceState}`)
this.emit('transport:closed')
}
})
transport.on('dtlsstatechange', (dtlsState: types.DtlsState) => {
if (dtlsState === 'failed' || dtlsState === 'closed') {
consola.warn('[Client]', `[${this.socketId}]`, `dtlsState=${dtlsState}`)
this.emit('transport:closed')
}
})
return {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
}
}
async connectTransport(transportId: string, dtlsParameters: types.DtlsParameters): Promise<void> {
const transport = this.#transports.get(transportId)
if (!transport)
throw new Error(`Transport not found: ${transportId}`)
await transport.connect({ dtlsParameters })
}
async produce(
transportId: string,
kind: types.MediaKind,
rtpParameters: types.RtpParameters,
appData: object,
): Promise<types.Producer> {
const transport = this.#transports.get(transportId)
if (!transport)
throw new Error(`Transport not found: ${transportId}`)
const streamingBefore = this.streaming
const producer = await transport.produce({
kind,
rtpParameters,
appData: { ...appData, socketId: this.socketId },
})
this.#producers.set(producer.id, producer)
if (this.streaming !== streamingBefore)
this.emit('updated')
return producer
}
closeProducer(producerId: string): void {
const producer = this.#producers.get(producerId)
if (!producer)
throw new Error(`Producer not found: ${producerId}`)
const streamingBefore = this.streaming
producer.close()
this.#producers.delete(producerId)
if (this.streaming !== streamingBefore)
this.emit('updated')
}
async pauseProducer(producerId: string): Promise<void> {
const producer = this.#producers.get(producerId)
if (!producer)
throw new Error(`Producer not found: ${producerId}`)
if (!producer.paused)
await producer.pause()
}
async resumeProducer(producerId: string): Promise<void> {
const producer = this.#producers.get(producerId)
if (!producer)
throw new Error(`Producer not found: ${producerId}`)
await producer.resume()
}
async createConsumerFor(producer: types.Producer, producerSocketId: string): Promise<types.Consumer | null> {
const transport = Array.from(this.#transports.values()).find(t => t.appData.consuming)
if (!transport) {
consola.warn('[Client]', `[${this.socketId}]`, 'No consuming transport, skipping consumer creation')
return null
}
try {
const consumer = await transport.consume({
producerId: producer.id,
rtpCapabilities: this.#router.rtpCapabilities,
enableRtx: true,
paused: true,
ignoreDtx: true,
})
this.#consumers.set(consumer.id, consumer)
consumer.observer.on('close', () => {
this.#consumers.delete(consumer.id)
this.emit('consumer:closed', consumer.id)
})
consumer.on('transportclose', () => {
consumer.close()
})
consumer.on('producerclose', () => {
consumer.close()
})
consumer.on('producerpause', () => {
this.emit('consumer:paused', consumer.id)
})
consumer.on('producerresume', () => {
this.emit('consumer:resumed', consumer.id)
})
await new Promise<void>((resolve) => {
this.emit('signal:new-consumer', {
socketId: producerSocketId,
producerId: producer.id,
id: consumer.id,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
type: consumer.type,
appData: producer.appData,
producerPaused: consumer.producerPaused,
}, async () => { resolve() })
})
await consumer.resume()
return consumer
}
catch (error) {
consola.error('[Client]', `[${this.socketId}]`, 'createConsumerFor() failed:', error)
return null
}
}
removeConsumersOf(producerId: string): void {
for (const consumer of this.#consumers.values()) {
if (consumer.producerId === producerId)
consumer.close()
}
}
clearConsumers(): void {
for (const consumer of this.#consumers.values()) {
consumer.close()
}
this.#consumers.clear()
}
async pauseConsumer(consumerId: string): Promise<void> {
const consumer = this.#consumers.get(consumerId)
if (!consumer)
throw new Error(`Consumer not found: ${consumerId}`)
await consumer.pause()
}
async resumeConsumer(consumerId: string): Promise<void> {
const consumer = this.#consumers.get(consumerId)
if (!consumer)
throw new Error(`Consumer not found: ${consumerId}`)
await consumer.resume()
}
update(patch: { inputMuted?: boolean, outputMuted?: boolean }): void {
if (typeof patch.inputMuted === 'boolean')
this.#inputMuted = patch.inputMuted
if (typeof patch.outputMuted === 'boolean')
this.#outputMuted = patch.outputMuted
this.emit('updated')
}
close(): void {
for (const transport of this.#transports.values()) {
transport.close()
}
this.emit('closed')
}
serialize(): SerializedClient {
return {
socketId: this.socketId,
username: this.username,
channelId: this.channelId,
inputMuted: this.#inputMuted,
outputMuted: this.#outputMuted,
streaming: this.streaming,
}
}
}