import type { types } from 'mediasoup' import type { SerializedClient } from '../types.ts' import { EventEmitter } from 'node:events' import { consola } from 'consola' export interface NewConsumerSignal { socketId: string producerId: string id: string kind: types.MediaKind rtpParameters: types.RtpParameters type: types.ConsumerType appData: types.Producer['appData'] producerPaused: boolean } interface ClientEvents { 'signal:new-consumer': [data: NewConsumerSignal, onAcked: () => Promise] 'consumer:closed': [consumerId: string] 'consumer:paused': [consumerId: string] 'consumer:resumed': [consumerId: string] 'transport:closed': [] 'closed': [] 'updated': [] } export class Client extends EventEmitter { readonly socketId: string readonly userId: string channelId: string = '' #inputMuted = false #outputMuted = false readonly #router: types.Router readonly #transports = new Map() readonly #producers = new Map() readonly #consumers = new Map() constructor(socketId: string, userId: string, router: types.Router) { super() this.socketId = socketId this.userId = userId this.#router = router } get producers(): ReadonlyMap { return this.#producers } get consumers(): ReadonlyMap { return this.#consumers } get transports(): ReadonlyMap { return this.#transports } get inputMuted(): boolean { return this.#inputMuted } get outputMuted(): boolean { return this.#outputMuted } get streaming(): boolean { return Array.from(this.#producers.values()).some( producer => producer.kind === 'video' && producer.appData.source === 'share', ) } async createTransport(options: { producing: boolean, consuming: boolean }) { const transport = await this.#router.createWebRtcTransport({ listenInfos: [{ protocol: 'udp', ip: '0.0.0.0', announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1', portRange: { min: 40000, max: 40100 }, }], enableUdp: true, preferUdp: true, appData: options, }) this.#transports.set(transport.id, transport) transport.on('icestatechange', (iceState: types.IceState) => { if (iceState === 'disconnected' || iceState === 'closed') { consola.info('[Client]', `[${this.socketId}]`, `iceState=${iceState}`) this.emit('transport:closed') } }) transport.on('dtlsstatechange', (dtlsState: types.DtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { consola.warn('[Client]', `[${this.socketId}]`, `dtlsState=${dtlsState}`) this.emit('transport:closed') } }) return { id: transport.id, iceParameters: transport.iceParameters, iceCandidates: transport.iceCandidates, dtlsParameters: transport.dtlsParameters, } } async connectTransport(transportId: string, dtlsParameters: types.DtlsParameters): Promise { const transport = this.#transports.get(transportId) if (!transport) throw new Error(`Transport not found: ${transportId}`) await transport.connect({ dtlsParameters }) } async produce( transportId: string, kind: types.MediaKind, rtpParameters: types.RtpParameters, appData: object, ): Promise { const transport = this.#transports.get(transportId) if (!transport) throw new Error(`Transport not found: ${transportId}`) const streamingBefore = this.streaming const producer = await transport.produce({ kind, rtpParameters, appData: { ...appData, socketId: this.socketId }, }) this.#producers.set(producer.id, producer) if (this.streaming !== streamingBefore) this.emit('updated') return producer } closeProducer(producerId: string): void { const producer = this.#producers.get(producerId) if (!producer) throw new Error(`Producer not found: ${producerId}`) const streamingBefore = this.streaming producer.close() this.#producers.delete(producerId) if (this.streaming !== streamingBefore) this.emit('updated') } async pauseProducer(producerId: string): Promise { const producer = this.#producers.get(producerId) if (!producer) throw new Error(`Producer not found: ${producerId}`) if (!producer.paused) await producer.pause() } async resumeProducer(producerId: string): Promise { const producer = this.#producers.get(producerId) if (!producer) throw new Error(`Producer not found: ${producerId}`) await producer.resume() } async createConsumerFor(producer: types.Producer, producerSocketId: string): Promise { const transport = Array.from(this.#transports.values()).find(t => t.appData.consuming) if (!transport) { consola.warn('[Client]', `[${this.socketId}]`, 'No consuming transport, skipping consumer creation') return null } try { const consumer = await transport.consume({ producerId: producer.id, rtpCapabilities: this.#router.rtpCapabilities, enableRtx: true, paused: true, ignoreDtx: true, }) this.#consumers.set(consumer.id, consumer) consumer.observer.on('close', () => { this.#consumers.delete(consumer.id) this.emit('consumer:closed', consumer.id) }) consumer.on('transportclose', () => { consumer.close() }) consumer.on('producerclose', () => { consumer.close() }) consumer.on('producerpause', () => { this.emit('consumer:paused', consumer.id) }) consumer.on('producerresume', () => { this.emit('consumer:resumed', consumer.id) }) await new Promise((resolve) => { this.emit('signal:new-consumer', { socketId: producerSocketId, producerId: producer.id, id: consumer.id, kind: consumer.kind, rtpParameters: consumer.rtpParameters, type: consumer.type, appData: producer.appData, producerPaused: consumer.producerPaused, }, async () => { resolve() }) }) await consumer.resume() return consumer } catch (error) { consola.error('[Client]', `[${this.socketId}]`, 'createConsumerFor() failed:', error) return null } } removeConsumersOf(producerId: string): void { for (const consumer of this.#consumers.values()) { if (consumer.producerId === producerId) consumer.close() } } clearConsumers(): void { for (const consumer of this.#consumers.values()) { consumer.close() } this.#consumers.clear() } async pauseConsumer(consumerId: string): Promise { const consumer = this.#consumers.get(consumerId) if (!consumer) throw new Error(`Consumer not found: ${consumerId}`) await consumer.pause() } async resumeConsumer(consumerId: string): Promise { const consumer = this.#consumers.get(consumerId) if (!consumer) throw new Error(`Consumer not found: ${consumerId}`) await consumer.resume() } update(patch: { inputMuted?: boolean, outputMuted?: boolean }): void { if (typeof patch.inputMuted === 'boolean') this.#inputMuted = patch.inputMuted if (typeof patch.outputMuted === 'boolean') this.#outputMuted = patch.outputMuted this.emit('updated') } close(): void { for (const transport of this.#transports.values()) { transport.close() } this.emit('closed') } serialize(): SerializedClient { return { socketId: this.socketId, userId: this.userId, channelId: this.channelId, inputMuted: this.#inputMuted, outputMuted: this.#outputMuted, streaming: this.streaming, } } }