работаем бля работаем

This commit is contained in:
2026-05-09 03:21:44 +06:00
parent f845777bac
commit 0b148c6a7d
169 changed files with 15816 additions and 1005 deletions

View File

@@ -1,7 +1,8 @@
import type { FastifyPluginAsync } from 'fastify'
import type { Type } from 'typebox'
import type { UserSchema } from '../schemas/auth.ts'
import type { ChatMessageSchema } from '../schemas/chat.ts'
import type { Channel } from '../prisma/generated-client/client.ts'
import type { UserSchema } from './schemas/auth.ts'
import type { ChatMessageSchema } from './schemas/chat.ts'
import { EventEmitter } from 'node:events'
import fp from 'fastify-plugin'
@@ -14,6 +15,8 @@ declare module 'fastify' {
interface EventMap {
'chat:new-message': [Type.Static<typeof ChatMessageSchema>]
'user:profile-updated': [Type.Static<typeof UserSchema>]
'channel:created': [Channel]
'channel:removed': [Channel]
}
const plugin: FastifyPluginAsync = fp(async (fastify) => {

View File

@@ -27,20 +27,9 @@ export const autoConfig: mediasoup.types.RouterOptions = {
},
{
kind: 'video',
mimeType: 'video/VP8',
mimeType: 'video/AV1',
clockRate: 90000,
parameters: {
'x-google-start-bitrate': 1000,
},
},
{
kind: 'video',
mimeType: 'video/VP9',
clockRate: 90000,
parameters: {
'profile-id': 2,
'x-google-start-bitrate': 1000,
},
parameters: {},
},
{
kind: 'video',
@@ -66,9 +55,20 @@ export const autoConfig: mediasoup.types.RouterOptions = {
},
{
kind: 'video',
mimeType: 'video/AV1',
mimeType: 'video/VP8',
clockRate: 90000,
parameters: {},
parameters: {
'x-google-start-bitrate': 1000,
},
},
{
kind: 'video',
mimeType: 'video/VP9',
clockRate: 90000,
parameters: {
'profile-id': 2,
'x-google-start-bitrate': 1000,
},
},
],
}

View File

@@ -11,6 +11,7 @@ declare module 'fastify' {
export default fp(
async (fastify) => {
const worker = await mediasoup.createWorker()
worker.on('died', () => {
consola.error('[Mediasoup]', 'Worker died, exiting...')

View File

@@ -11,7 +11,7 @@ declare module 'fastify' {
const plugin: FastifyPluginAsync = fp(async (fastify) => {
const prisma = new PrismaClient({
log: ['query', 'error', 'warn'],
log: ['error'],
adapter: new PrismaBetterSqlite3({
url: process.env.DATABASE_URL!,
}),

11
server/plugins/schemas.ts Normal file
View File

@@ -0,0 +1,11 @@
import type { FastifyPluginAsync } from 'fastify'
import fp from 'fastify-plugin'
import * as schemas from './schemas/index.ts'
const plugin: FastifyPluginAsync = fp(async (fastify) => {
for (const schema of Object.values(schemas)) {
fastify.addSchema(schema)
}
})
export default plugin

View File

@@ -0,0 +1,13 @@
import { Type } from 'typebox'
export const AttachmentSchema = Type.Object({
id: Type.String(),
name: Type.String(),
mimetype: Type.String(),
size: Type.Number({ minimum: 0 }),
createdAt: Type.String({ format: 'date-time' }),
}, { $id: 'Attachment' })
export const GetAttachmentParamsSchema = Type.Object({
id: Type.String({ format: 'uuid' }),
}, { $id: 'GetAttachmentParams' })

View File

@@ -0,0 +1,18 @@
import { Type } from 'typebox'
export const UserSchema = Type.Object({
id: Type.String(),
username: Type.String(),
displayName: Type.String(),
createdAt: Type.String({ format: 'date-time' }),
}, { $id: 'User' })
export const CreateUserPayloadSchema = Type.Object({
username: Type.String({ minLength: 1 }),
password: Type.String({ minLength: 6 }),
}, { $id: 'CreateUser' })
export const LoginPayloadSchema = Type.Object({
username: Type.String({ minLength: 1 }),
password: Type.String({ minLength: 1 }),
}, { $id: 'Login' })

View File

@@ -0,0 +1,13 @@
import { Type } from 'typebox'
export const ChannelSchema = Type.Object({
id: Type.String(),
ownerId: Type.Union([Type.String(), Type.Null()]),
name: Type.String(),
persistent: Type.Boolean(),
}, { $id: 'Channel' })
export const CreateChannelPayloadSchema = Type.Object({
name: Type.String(),
persistent: Type.Boolean(),
}, { $id: 'CreateChannelPayload' })

View File

@@ -0,0 +1,25 @@
import { Type } from 'typebox'
export const ReplySchema = Type.Object({
messageId: Type.String({ format: 'uuid' }),
senderId: Type.String({ format: 'uuid' }),
text: Type.String(),
}, { $id: 'Reply' })
export const ChatMessageSchema = Type.Object({
id: Type.String({ format: 'uuid' }),
senderId: Type.String({ format: 'uuid' }),
text: Type.String({ minLength: 1 }),
createdAt: Type.String({ format: 'date-time' }),
updatedAt: Type.String({ format: 'date-time' }),
attachments: Type.Array(Type.String({ format: 'uuid' })),
}, { $id: 'ChatMessage' })
export const NewChatMessagePayloadSchema = Type.Object({
text: Type.String({ minLength: 1 }),
attachments: Type.Optional(Type.Array(Type.String({ format: 'uuid' }))),
// replyTo: Type.Object({
// messageId: Type.String({ format: 'uuid' }),
// }),
}, { $id: 'NewChatMessagePayload' })

View File

@@ -0,0 +1,7 @@
import { Type } from 'typebox'
export const ResponseErrorSchema = Type.Object({
statusCode: Type.Number(),
error: Type.String(),
message: Type.String(),
}, { $id: 'ResponseError' })

View File

@@ -0,0 +1,6 @@
export * from './attachment.ts'
export * from './auth.ts'
export * from './channel.ts'
export * from './chat.ts'
export * from './common.ts'
export * from './user.ts'

View File

@@ -0,0 +1,19 @@
import { Type } from 'typebox'
export const GetUserQuerySchema = Type.Partial(Type.Object({
username: Type.String(),
}), { $id: 'GetUserQuery' })
export const UserPreferencesSchema = Type.Object({
toggleInputHotkey: Type.String(),
toggleOutputHotkey: Type.String(),
}, { $id: 'UserPreferences' })
export const UpdateUserPreferencesPayloadSchema = Type.Partial(
UserPreferencesSchema,
{ $id: 'UpdateUserPreferencesPayload' },
)
export const UpdateUserPayloadSchema = Type.Object({
displayName: Type.String(),
}, { $id: 'UpdateUserPayload' })

View File

@@ -1,10 +1,9 @@
import type { FastifyInstance } from 'fastify'
import type { ServerOptions } from 'socket.io'
import type { MessageSelect } from '../prisma/generated-client/models/Message.ts'
import fp from 'fastify-plugin'
import { Server } from 'socket.io'
import registerChatSocket from '../socket/chat.ts'
import registerWebrtcSocket from '../socket/webrtc.ts'
import registerChatSocket from './socket/chat/index.ts'
import registerWebrtcSocket from './socket/webrtc/index.ts'
declare module 'fastify' {
interface FastifyInstance {
@@ -24,12 +23,26 @@ export default fp<Partial<ServerOptions>>(
await fastify.io.close()
})
await registerWebrtcSocket(fastify.io, fastify.mediasoupRouter, fastify.prisma)
await registerChatSocket(fastify.io)
fastify.io.use(async (socket, next) => {
const sessionId = fastify.lucia.readSessionCookie(socket.handshake.headers.cookie ?? '')
fastify.bus.on('chat:new-message', async (message: MessageSelect) => {
fastify.io.emit('chat:new-message', message)
if (!sessionId) {
return next(fastify.httpErrors.unauthorized())
}
const { user } = await fastify.lucia.validateSession(sessionId)
if (!user) {
return next(fastify.httpErrors.unauthorized())
}
socket.data.user = user
next()
})
await registerWebrtcSocket(fastify)
await registerChatSocket(fastify)
},
{ name: 'socket-io', dependencies: ['mediasoup-worker', 'mediasoup-router', 'prisma', 'event-bus'] },
)

View File

@@ -0,0 +1,10 @@
import type { FastifyInstance } from 'fastify'
import type { MessageSelect } from '../../../prisma/generated-client/models.ts'
export default async function (fastify: FastifyInstance) {
const { io, bus } = fastify
bus.on('chat:new-message', async (message: MessageSelect) => {
io.emit('chat:new-message', message)
})
}

View File

@@ -0,0 +1,136 @@
import type { types } from 'mediasoup'
import type { Server, Socket } from 'socket.io'
import type { Channel, User } from '../../prisma/generated-client/client.ts'
export interface SerializedClient {
socketId: string
userId: User['id']
channelId: Channel['id']
inputMuted: boolean
outputMuted: boolean
streaming: boolean
}
export interface ProducerAppData extends types.AppData {
source: 'mic-video' | 'share'
}
export interface ErrorCallbackResult {
error: string
}
export interface SuccessCallbackResult {
ok: true
}
export type EventCallback<T = SuccessCallbackResult> = (result: T | ErrorCallbackResult) => void
export interface ClientToServerEvents {
'join-channel': (
options: { channelId: string },
cb?: EventCallback
) => void
'create-transport': (
options: {
producing: boolean
consuming: boolean
},
cb: EventCallback<Pick<types.WebRtcTransport, 'id' | 'iceParameters' | 'iceCandidates' | 'dtlsParameters'>>
) => void
'connect-transport': (
options: {
transportId: types.WebRtcTransport['id']
dtlsParameters: types.WebRtcTransport['dtlsParameters']
},
cb: EventCallback
) => void
'produce': (
options: {
transportId: types.WebRtcTransport['id']
kind: types.MediaKind
rtpParameters: types.RtpParameters
appData: { source: 'share' | string }
},
cb: EventCallback<{ id: types.Producer['id'] }>
) => void
'close-producer': (
options: {
producerId: types.Producer['id']
},
cb: EventCallback
) => void
'pause-producer': (
options: {
producerId: types.Producer['id']
},
cb: EventCallback
) => void
'resume-producer': (
options: {
producerId: types.Producer['id']
},
cb: EventCallback
) => void
'pause-consumer': (
options: {
consumerId: types.Consumer['id']
},
cb: EventCallback
) => void
'resume-consumer': (
options: {
consumerId: types.Consumer['id']
},
cb: EventCallback
) => void
'update-client': (
options: Partial<Pick<SerializedClient, 'inputMuted' | 'outputMuted'>>,
cb: EventCallback<SerializedClient>
) => void
}
export interface ServerToClientEvents {
'initialized': (arg: {
rtpCapabilities: types.RtpCapabilities
channelId: string
clients: SerializedClient[]
}) => void
'new-client': (arg: SerializedClient) => void
'client-updated': (arg: SerializedClient) => void
'client-switched-channel': (arg: SerializedClient) => void
'client-disconnected': (arg: string) => void
'producers': (arg: {
producerId: types.Producer['id']
kind: types.MediaKind
}[]) => void
'new-consumer': (
arg: {
socketId: string
producerId: types.Producer['id']
id: types.Consumer['id']
kind: types.MediaKind
rtpParameters: types.RtpParameters
type: types.ConsumerType
appData: types.Producer['appData']
producerPaused: types.Consumer['producerPaused']
},
cb: EventCallback
) => void
'consumer-closed': (arg: { consumerId: string }) => void
'consumer-paused': (arg: { consumerId: string }) => void
'consumer-resumed': (arg: { consumerId: string }) => void
'speaking-clients': (arg: { clientId: SerializedClient['socketId'], volume: types.AudioLevelObserverVolume['volume'] }[]) => void
'active-speaker': (arg?: SerializedClient['socketId']) => void
'channel-created': (arg: Channel) => void
'channel-removed': (arg: Channel['id']) => void
'channel-updated': (arg: Channel) => void
}
export interface InterServerEvent {}
export interface SocketData {
user: User
}
export type ChadSocket = Socket<ClientToServerEvents, ServerToClientEvents, InterServerEvent, SocketData>
export type ChadSocketServer = Server<ClientToServerEvents, ServerToClientEvents, InterServerEvent, SocketData>

View File

@@ -0,0 +1,119 @@
import type { types } from 'mediasoup'
import type { ActiveSpeakerObserverDominantSpeaker } from 'mediasoup/types'
import type { Client } from './Client.ts'
import { EventEmitter } from 'node:events'
interface ChannelEvents {
'speaking-peers': [{
socketId: string
volume: number
}[]]
'silence': []
'active-speaker': [socketId: string]
'empty': []
}
export class Channel extends EventEmitter<ChannelEvents> {
readonly id: string
readonly persistent: boolean
readonly #audioLevelObserver: types.AudioLevelObserver
readonly #activeSpeakerObserver: types.ActiveSpeakerObserver
readonly #clients = new Map<string, Client>()
private constructor(
id: string,
persistent: boolean,
audioLevelObserver: types.AudioLevelObserver,
activeSpeakerObserver: types.ActiveSpeakerObserver,
) {
super()
this.id = id
this.persistent = persistent
this.#audioLevelObserver = audioLevelObserver
this.#activeSpeakerObserver = activeSpeakerObserver
this.#audioLevelObserver.on('volumes', (volumes: types.AudioLevelObserverVolume[]) => {
this.emit('speaking-peers', volumes.map(({ producer, volume }) => {
const { socketId } = producer.appData as { socketId: string }
return { socketId, volume }
}))
})
this.#audioLevelObserver.on('silence', () => {
this.emit('silence')
})
this.#activeSpeakerObserver.on('dominantspeaker', ({ producer }: ActiveSpeakerObserverDominantSpeaker) => {
const { socketId } = producer.appData as { socketId: string }
this.emit('active-speaker', socketId)
})
}
static async create(id: string, persistent: boolean, router: types.Router): Promise<Channel> {
const audioLevelObserver = await router.createAudioLevelObserver({
maxEntries: 10,
threshold: -80,
interval: 800,
})
const activeSpeakerObserver = await router.createActiveSpeakerObserver()
return new Channel(id, persistent, audioLevelObserver, activeSpeakerObserver)
}
get clients(): Client[] {
return Array.from(this.#clients.values())
}
get size(): number {
return this.#clients.size
}
getClient(socketId: string): Client | undefined {
return this.#clients.get(socketId)
}
addClient(client: Client): void {
client.channelId = this.id
this.#clients.set(client.socketId, client)
}
kickClient(client: Client): void {
this.#clients.delete(client.socketId)
if (this.#clients.size === 0)
this.emit('empty')
}
async addAudioProducer(producer: types.Producer): Promise<void> {
if (producer.kind !== 'audio')
return
await this.#audioLevelObserver.addProducer({ producerId: producer.id })
await this.#activeSpeakerObserver.addProducer({ producerId: producer.id })
}
async wireClient(client: Client): Promise<void> {
for (const otherClient of this.#clients.values()) {
if (otherClient.socketId === client.socketId)
continue
for (const producer of otherClient.producers.values()) {
await client.createConsumerFor(producer, otherClient.socketId)
}
for (const producer of client.producers.values()) {
await otherClient.createConsumerFor(producer, client.socketId)
}
}
}
unwireClient(client: Client): void {
for (const otherClient of this.#clients.values()) {
for (const producerId of client.producers.keys()) {
otherClient.removeConsumersOf(producerId)
}
}
}
}

View File

@@ -0,0 +1,33 @@
import type { Router } from 'mediasoup/types'
import type { Channel as DbChannel } from '../../../prisma/generated-client/client.ts'
import { Channel } from './Channel.ts'
export class ChannelManager {
private channels = new Map<string, Channel>()
private mediasoupRouter: Router
constructor(mediasoupRouter: Router) {
this.mediasoupRouter = mediasoupRouter
}
async create(newChannel: Channel | DbChannel) {
if (newChannel instanceof Channel) {
this.channels.set(newChannel.id, newChannel)
}
else {
this.channels.set(newChannel.id, await Channel.create(newChannel.id, newChannel.persistent, this.mediasoupRouter))
}
}
get(id: string) {
return this.channels.get(id)
}
delete(id: string) {
this.channels.delete(id)
}
get all() {
return Array.from(this.channels.values())
}
}

View File

@@ -0,0 +1,288 @@
import type { types } from 'mediasoup'
import type { SerializedClient } from '../types.ts'
import { EventEmitter } from 'node:events'
import { consola } from 'consola'
export interface NewConsumerSignal {
socketId: string
producerId: string
id: string
kind: types.MediaKind
rtpParameters: types.RtpParameters
type: types.ConsumerType
appData: types.Producer['appData']
producerPaused: boolean
}
interface ClientEvents {
'signal:new-consumer': [data: NewConsumerSignal, onAcked: () => Promise<void>]
'consumer:closed': [consumerId: string]
'consumer:paused': [consumerId: string]
'consumer:resumed': [consumerId: string]
'transport:closed': []
'closed': []
'updated': []
}
export class Client extends EventEmitter<ClientEvents> {
readonly socketId: string
readonly userId: string
channelId: string = ''
#inputMuted = false
#outputMuted = false
readonly #router: types.Router
readonly #transports = new Map<string, types.WebRtcTransport>()
readonly #producers = new Map<string, types.Producer>()
readonly #consumers = new Map<string, types.Consumer>()
constructor(socketId: string, userId: string, router: types.Router) {
super()
this.socketId = socketId
this.userId = userId
this.#router = router
}
get producers(): ReadonlyMap<string, types.Producer> { return this.#producers }
get consumers(): ReadonlyMap<string, types.Consumer> { return this.#consumers }
get transports(): ReadonlyMap<string, types.WebRtcTransport> { return this.#transports }
get inputMuted(): boolean { return this.#inputMuted }
get outputMuted(): boolean { return this.#outputMuted }
get streaming(): boolean {
return Array.from(this.#producers.values()).some(
producer => producer.kind === 'video' && producer.appData.source === 'share',
)
}
async createTransport(options: { producing: boolean, consuming: boolean }) {
const transport = await this.#router.createWebRtcTransport({
listenInfos: [{
protocol: 'udp',
ip: '0.0.0.0',
announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1',
portRange: { min: 40000, max: 40100 },
}],
enableUdp: true,
preferUdp: true,
appData: options,
})
this.#transports.set(transport.id, transport)
transport.on('icestatechange', (iceState: types.IceState) => {
if (iceState === 'disconnected' || iceState === 'closed') {
consola.info('[Client]', `[${this.socketId}]`, `iceState=${iceState}`)
this.emit('transport:closed')
}
})
transport.on('dtlsstatechange', (dtlsState: types.DtlsState) => {
if (dtlsState === 'failed' || dtlsState === 'closed') {
consola.warn('[Client]', `[${this.socketId}]`, `dtlsState=${dtlsState}`)
this.emit('transport:closed')
}
})
return {
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters,
}
}
async connectTransport(transportId: string, dtlsParameters: types.DtlsParameters): Promise<void> {
const transport = this.#transports.get(transportId)
if (!transport)
throw new Error(`Transport not found: ${transportId}`)
await transport.connect({ dtlsParameters })
}
async produce(
transportId: string,
kind: types.MediaKind,
rtpParameters: types.RtpParameters,
appData: object,
): Promise<types.Producer> {
const transport = this.#transports.get(transportId)
if (!transport)
throw new Error(`Transport not found: ${transportId}`)
const streamingBefore = this.streaming
const producer = await transport.produce({
kind,
rtpParameters,
appData: { ...appData, socketId: this.socketId },
})
this.#producers.set(producer.id, producer)
if (this.streaming !== streamingBefore)
this.emit('updated')
return producer
}
closeProducer(producerId: string): void {
const producer = this.#producers.get(producerId)
if (!producer)
throw new Error(`Producer not found: ${producerId}`)
const streamingBefore = this.streaming
producer.close()
this.#producers.delete(producerId)
if (this.streaming !== streamingBefore)
this.emit('updated')
}
async pauseProducer(producerId: string): Promise<void> {
const producer = this.#producers.get(producerId)
if (!producer)
throw new Error(`Producer not found: ${producerId}`)
if (!producer.paused)
await producer.pause()
}
async resumeProducer(producerId: string): Promise<void> {
const producer = this.#producers.get(producerId)
if (!producer)
throw new Error(`Producer not found: ${producerId}`)
await producer.resume()
}
async createConsumerFor(producer: types.Producer, producerSocketId: string): Promise<types.Consumer | null> {
const transport = Array.from(this.#transports.values()).find(t => t.appData.consuming)
if (!transport) {
consola.warn('[Client]', `[${this.socketId}]`, 'No consuming transport, skipping consumer creation')
return null
}
try {
const consumer = await transport.consume({
producerId: producer.id,
rtpCapabilities: this.#router.rtpCapabilities,
enableRtx: true,
paused: true,
ignoreDtx: true,
})
this.#consumers.set(consumer.id, consumer)
consumer.observer.on('close', () => {
this.#consumers.delete(consumer.id)
this.emit('consumer:closed', consumer.id)
})
consumer.on('transportclose', () => {
consumer.close()
})
consumer.on('producerclose', () => {
consumer.close()
})
consumer.on('producerpause', () => {
this.emit('consumer:paused', consumer.id)
})
consumer.on('producerresume', () => {
this.emit('consumer:resumed', consumer.id)
})
await new Promise<void>((resolve) => {
this.emit('signal:new-consumer', {
socketId: producerSocketId,
producerId: producer.id,
id: consumer.id,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
type: consumer.type,
appData: producer.appData,
producerPaused: consumer.producerPaused,
}, async () => { resolve() })
})
await consumer.resume()
return consumer
}
catch (error) {
consola.error('[Client]', `[${this.socketId}]`, 'createConsumerFor() failed:', error)
return null
}
}
removeConsumersOf(producerId: string): void {
for (const consumer of this.#consumers.values()) {
if (consumer.producerId === producerId)
consumer.close()
}
}
clearConsumers(): void {
for (const consumer of this.#consumers.values()) {
consumer.close()
}
this.#consumers.clear()
}
async pauseConsumer(consumerId: string): Promise<void> {
const consumer = this.#consumers.get(consumerId)
if (!consumer)
throw new Error(`Consumer not found: ${consumerId}`)
await consumer.pause()
}
async resumeConsumer(consumerId: string): Promise<void> {
const consumer = this.#consumers.get(consumerId)
if (!consumer)
throw new Error(`Consumer not found: ${consumerId}`)
await consumer.resume()
}
update(patch: { inputMuted?: boolean, outputMuted?: boolean }): void {
if (typeof patch.inputMuted === 'boolean')
this.#inputMuted = patch.inputMuted
if (typeof patch.outputMuted === 'boolean')
this.#outputMuted = patch.outputMuted
this.emit('updated')
}
close(): void {
for (const transport of this.#transports.values()) {
transport.close()
}
this.emit('closed')
}
serialize(): SerializedClient {
return {
socketId: this.socketId,
userId: this.userId,
channelId: this.channelId,
inputMuted: this.#inputMuted,
outputMuted: this.#outputMuted,
streaming: this.streaming,
}
}
}

View File

@@ -0,0 +1,254 @@
import type { types } from 'mediasoup'
import type { ChadSocket, ChadSocketServer } from '../types.ts'
import type { Channel } from './Channel.ts'
import type { ChannelManager } from './ChannelManager.ts'
import type { Client } from './Client.ts'
import { consola } from 'consola'
export class WebRtcGateway {
readonly #io: ChadSocketServer
readonly #socket: ChadSocket
readonly #client: Client
readonly #channels: ChannelManager
constructor(
io: ChadSocketServer,
socket: ChadSocket,
client: Client,
channels: ChannelManager,
) {
this.#io = io
this.#socket = socket
this.#client = client
this.#channels = channels
this.register()
}
register(): void {
this.#client.on('signal:new-consumer', async (data, onAcked) => {
await this.#socket.emitWithAck('new-consumer', data)
await onAcked()
})
this.#client.on('consumer:closed', consumerId => this.#socket.emit('consumer-closed', { consumerId }))
this.#client.on('consumer:paused', consumerId => this.#socket.emit('consumer-paused', { consumerId }))
this.#client.on('consumer:resumed', consumerId => this.#socket.emit('consumer-resumed', { consumerId }))
this.#client.on('transport:closed', () => this.#socket.disconnect())
this.#client.on('updated', () => this.#io.emit('client-updated', this.#client.serialize()))
this.#socket.on('join-channel', this.#onJoinChannel.bind(this))
this.#socket.on('create-transport', this.#onCreateTransport.bind(this))
this.#socket.on('connect-transport', this.#onConnectTransport.bind(this))
this.#socket.on('produce', this.#onProduce.bind(this))
this.#socket.on('close-producer', this.#onCloseProducer.bind(this))
this.#socket.on('pause-producer', this.#onPauseProducer.bind(this))
this.#socket.on('resume-producer', this.#onResumeProducer.bind(this))
this.#socket.on('pause-consumer', this.#onPauseConsumer.bind(this))
this.#socket.on('resume-consumer', this.#onResumeConsumer.bind(this))
this.#socket.on('update-client', this.#onUpdateClient.bind(this))
this.#socket.on('disconnect', this.#onDisconnect.bind(this))
}
async #onJoinChannel({ channelId }: { channelId: string }): Promise<void> {
if (this.#client.channelId === channelId)
return
const newChannel = this.#channels.get(channelId)
if (!newChannel) {
consola.error('[Gateway]', `Channel not found: ${channelId}`)
return
}
const oldChannel = this.#channels.get(this.#client.channelId)
if (oldChannel)
this.#leaveChannel(oldChannel)
this.#client.clearConsumers()
this.#socket.join(newChannel.id)
newChannel.addClient(this.#client)
await newChannel.wireClient(this.#client)
this.#io.emit('client-switched-channel', this.#client.serialize())
}
async #onCreateTransport(
{ producing, consuming }: { producing: boolean, consuming: boolean },
cb: (result: any) => void,
): Promise<void> {
try {
const transportData = await this.#client.createTransport({ producing, consuming })
cb(transportData)
if (consuming) {
const channel = this.#channels.get(this.#client.channelId)
if (channel)
await channel.wireClient(this.#client)
}
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[createTransport]', error.message)
cb({ error: error.message })
}
}
}
async #onConnectTransport(
{ transportId, dtlsParameters }: { transportId: string, dtlsParameters: types.DtlsParameters },
cb: (result: any) => void,
): Promise<void> {
try {
await this.#client.connectTransport(transportId, dtlsParameters)
cb({ ok: true })
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[connectTransport]', error.message)
cb({ error: error.message })
}
}
}
async #onProduce(
{ transportId, kind, rtpParameters, appData }: {
transportId: string
kind: types.MediaKind
rtpParameters: types.RtpParameters
appData: { source: string }
},
cb: (result: any) => void,
): Promise<void> {
try {
const producer = await this.#client.produce(transportId, kind, rtpParameters, appData)
cb({ id: producer.id })
const channel = this.#channels.get(this.#client.channelId)
if (channel) {
for (const peer of channel.clients) {
if (peer.socketId !== this.#client.socketId)
await peer.createConsumerFor(producer, this.#client.socketId)
}
if (kind === 'audio')
await channel.addAudioProducer(producer)
}
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[produce]', error.message)
cb({ error: error.message })
}
}
}
#onCloseProducer(
{ producerId }: { producerId: string },
cb: (result: any) => void,
): void {
try {
this.#client.closeProducer(producerId)
cb({ ok: true })
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[closeProducer]', error.message)
cb({ error: error.message })
}
}
}
async #onPauseProducer(
{ producerId }: { producerId: string },
cb: (result: any) => void,
): Promise<void> {
try {
await this.#client.pauseProducer(producerId)
cb({ ok: true })
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[pauseProducer]', error.message)
cb({ error: error.message })
}
}
}
async #onResumeProducer(
{ producerId }: { producerId: string },
cb: (result: any) => void,
): Promise<void> {
try {
await this.#client.resumeProducer(producerId)
cb({ ok: true })
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[resumeProducer]', error.message)
cb({ error: error.message })
}
}
}
async #onPauseConsumer(
{ consumerId }: { consumerId: string },
cb: (result: any) => void,
): Promise<void> {
try {
await this.#client.pauseConsumer(consumerId)
cb({ ok: true })
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[pauseConsumer]', error.message)
cb({ error: error.message })
}
}
}
async #onResumeConsumer(
{ consumerId }: { consumerId: string },
cb: (result: any) => void,
): Promise<void> {
try {
await this.#client.resumeConsumer(consumerId)
cb({ ok: true })
}
catch (error) {
if (error instanceof Error) {
consola.error('[Gateway]', '[resumeConsumer]', error.message)
cb({ error: error.message })
}
}
}
#onUpdateClient(
patch: { inputMuted?: boolean, outputMuted?: boolean },
cb: (result: any) => void,
): void {
this.#client.update(patch)
cb(this.#client.serialize())
}
#leaveChannel(channel: Channel): void {
channel.unwireClient(this.#client)
channel.kickClient(this.#client)
this.#socket.leave(channel.id)
}
#onDisconnect(): void {
consola.info('[Gateway]', 'Client disconnected:', this.#client.socketId)
this.#socket.broadcast.emit('client-disconnected', this.#client.socketId)
const channel = this.#channels.get(this.#client.channelId)
if (channel)
this.#leaveChannel(channel)
this.#client.close()
}
}

View File

@@ -0,0 +1,14 @@
import type { Channel } from '../../../prisma/generated-client/browser.ts'
import type { ChannelManager } from './ChannelManager.ts'
import type { Client } from './Client.ts'
export class MessagingService {
private channels: ChannelManager
constructor(channels: ChannelManager) {
this.channels = channels
}
joinChannel(client: Client, channel: Channel) {
}
}

View File

@@ -0,0 +1,96 @@
import type { FastifyInstance } from 'fastify'
import { consola } from 'consola'
import { Channel } from './Channel.ts'
import { ChannelManager } from './ChannelManager.ts'
import { Client } from './Client.ts'
import { WebRtcGateway } from './Gateway.ts'
export default async function (fastify: FastifyInstance) {
const { io, bus, mediasoupRouter, prisma } = fastify
const channels = new ChannelManager(mediasoupRouter)
const dbChannels = await prisma.channel.findMany()
for (const dbChannel of dbChannels) {
const channel = await Channel.create(dbChannel.id, dbChannel.persistent, mediasoupRouter)
channels.create(channel)
setupChannelEvents(channel)
}
const defaultChannel = channels.get('default')!
io.on('connection', async (socket) => {
consola.info('[WebRtc]', 'Client connected', socket.id)
const client = new Client(socket.id, socket.data.user.id, mediasoupRouter)
defaultChannel.addClient(client)
socket.join(defaultChannel.id)
const _gateway = new WebRtcGateway(io, socket, client, channels)
socket.emit('initialized', {
rtpCapabilities: mediasoupRouter.rtpCapabilities,
channelId: client.channelId,
clients: channels.all.flatMap(c => c.clients).map(c => c.serialize()),
})
socket.broadcast.emit('new-client', client.serialize())
})
bus.on('channel:created', async (dbChannel) => {
io.emit('channel-created', dbChannel)
const channel = await Channel.create(dbChannel.id, dbChannel.persistent, mediasoupRouter)
channels.create(channel)
setupChannelEvents(channel)
})
bus.on('channel:removed', async (dbChannel) => {
io.emit('channel-removed', dbChannel.id)
const channel = channels.get(dbChannel.id)
if (!channel)
return
for (const client of channel.clients) {
channel.unwireClient(client)
client.clearConsumers()
const socket = io.sockets.sockets.get(client.socketId)
if (socket) {
socket.leave(dbChannel.id)
defaultChannel.addClient(client)
socket.join(defaultChannel.id)
await defaultChannel.wireClient(client)
io.emit('client-switched-channel', client.serialize())
}
}
channels.delete(dbChannel.id)
})
function setupChannelEvents(channel: Channel): void {
channel.on('speaking-peers', peers => io.to(channel.id).emit('speaking-clients', peers))
channel.on('silence', () => {
io.to(channel.id).emit('speaking-clients', [])
io.to(channel.id).emit('active-speaker', undefined)
})
channel.on('active-speaker', socketId => io.to(channel.id).emit('active-speaker', socketId))
channel.on('empty', async () => {
if (channel.persistent)
return
channels.delete(channel.id)
await prisma.channel.delete({ where: { id: channel.id } })
consola.info('[WebRtc]', `Non-persistent channel "${channel.id}" deleted`)
io.emit('channel-removed', channel.id)
})
}
}