From 92f7bae50dfc05dbda2fcdbda4b3da487ab4bef9 Mon Sep 17 00:00:00 2001 From: opti1337 Date: Sun, 12 Apr 2026 22:35:47 +0600 Subject: [PATCH] cringe sfx --- client/app/components.d.ts | 5 - client/app/composables/use-signaling.ts | 2 +- client/app/layouts/auth.vue | 29 +- client/app/pages/preferences.vue | 1 + server/CLAUDE.md | 73 +++ server/dto/channel.dto.ts | 2 +- server/plugins/socket.ts | 81 +++- server/routes/channels.ts | 255 +++++++++++ server/routes/user.ts | 2 +- server/socket/channel.ts | 569 ++++++------------------ server/socket/webrtc.ts | 168 +++---- server/types/socket.ts | 86 ++-- server/types/utils.ts | 4 + server/utils/fetch-sockets.ts | 16 + server/utils/socket-to-client.ts | 4 + 15 files changed, 700 insertions(+), 597 deletions(-) create mode 100644 server/CLAUDE.md create mode 100644 server/routes/channels.ts create mode 100644 server/types/utils.ts create mode 100644 server/utils/fetch-sockets.ts diff --git a/client/app/components.d.ts b/client/app/components.d.ts index 7300bc5..cc1871e 100644 --- a/client/app/components.d.ts +++ b/client/app/components.d.ts @@ -13,18 +13,13 @@ declare module 'vue' { PrimeButton: typeof import('primevue/button')['default'] PrimeButtonGroup: typeof import('primevue/buttongroup')['default'] PrimeCard: typeof import('primevue/card')['default'] - PrimeDivider: typeof import('primevue/divider')['default'] PrimeFloatLabel: typeof import('primevue/floatlabel')['default'] PrimeInputText: typeof import('primevue/inputtext')['default'] PrimePassword: typeof import('primevue/password')['default'] - PrimeProgressBar: typeof import('primevue/progressbar')['default'] PrimeScrollPanel: typeof import('primevue/scrollpanel')['default'] - PrimeSelect: typeof import('primevue/select')['default'] PrimeSelectButton: typeof import('primevue/selectbutton')['default'] PrimeSlider: typeof import('primevue/slider')['default'] - PrimeTag: typeof import('primevue/tag')['default'] PrimeToast: typeof import('primevue/toast')['default'] - PrimeToggleSwitch: typeof import('primevue/toggleswitch')['default'] RouterLink: typeof import('vue-router')['RouterLink'] RouterView: typeof import('vue-router')['RouterView'] } diff --git a/client/app/composables/use-signaling.ts b/client/app/composables/use-signaling.ts index 9e21462..5b0b53c 100644 --- a/client/app/composables/use-signaling.ts +++ b/client/app/composables/use-signaling.ts @@ -66,7 +66,7 @@ export const useSignaling = createSharedComposable(() => { const uri = host ? `${protocol}//${host}` : `` - socket.value = io(`${uri}/webrtc`, { + socket.value = io(`${uri}`, { path: `${pathname}/ws`, transports: ['websocket'], withCredentials: true, diff --git a/client/app/layouts/auth.vue b/client/app/layouts/auth.vue index 71bcbb3..2ee63cb 100644 --- a/client/app/layouts/auth.vue +++ b/client/app/layouts/auth.vue @@ -4,7 +4,7 @@
- +
@@ -15,28 +15,35 @@ diff --git a/client/app/pages/preferences.vue b/client/app/pages/preferences.vue index cf8a570..cedde5e 100644 --- a/client/app/pages/preferences.vue +++ b/client/app/pages/preferences.vue @@ -81,6 +81,7 @@ size="small" option-label="label" option-value="value" + :allow-empty="false" />
diff --git a/server/CLAUDE.md b/server/CLAUDE.md new file mode 100644 index 0000000..c5d9979 --- /dev/null +++ b/server/CLAUDE.md @@ -0,0 +1,73 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Chad is a real-time voice/video chat server (think Discord-like) built with Fastify, Socket.IO, and mediasoup for WebRTC media handling. It uses SQLite via Prisma ORM and Lucia for session-based authentication. The client is a Tauri desktop app (separate repo). + +## Commands + +- **Start server:** `yarn start` (runs `ts-node --transpile-only server.ts`) +- **Deploy DB (migrate + seed + generate):** `yarn db:deploy` +- **Generate Prisma client after schema changes:** `npx prisma generate` +- **Create a migration:** `npx prisma migrate dev --name ` +- **Lint:** `npx eslint .` +- **Package manager:** Yarn 4 (corepack). Do not use npm. + +## Architecture + +### Entry Point & Plugin System + +`server.ts` creates a Fastify instance and uses `@fastify/autoload` to auto-register everything in `plugins/` and `routes/` (prefixed under `/chad`). Plugins use `fastify-plugin` (`fp`) with named dependencies to control load order. + +### Plugin Load Order (dependency chain) + +1. `plugins/auth.ts` — Adds `req.user` / `req.session` via Lucia cookie validation on every request (preHandler hook) +2. `plugins/mediasoup-worker.ts` — Creates a mediasoup Worker, decorates `fastify.mediasoupWorker` +3. `plugins/mediasoup-router.ts` — Creates a mediasoup Router (depends on worker), decorates `fastify.mediasoupRouter`. Configures supported audio/video codecs (Opus, VP8, VP9, H.264, AV1) +4. `plugins/socket.ts` — Creates Socket.IO server at `/chad/ws` (depends on worker + router), decorates `fastify.io`. Registers socket handlers on `fastify.ready()` + +### Socket Handlers (`socket/`) + +- `socket/webrtc.ts` — Main WebRTC signaling: join/leave, transport creation, producer/consumer lifecycle, audio level observation, active speaker detection +- `socket/channel.ts` — Channel-based socket logic (in development on `channels` branch) + +Both handlers authenticate by looking up `socket.handshake.auth.userId` against the DB. + +### REST Routes (`routes/`) + +All routes are prefixed with `/chad` via autoload config. + +- `routes/auth.ts` — `/register`, `/login`, `/logout`, `/me` +- `routes/user.ts` — `/preferences` (GET/PATCH), `/profile` (PATCH). Profile changes broadcast to connected socket peers via `clientChanged` event. + +### Type System (`types/socket.ts`) + +Fully typed Socket.IO events: `ClientToServerEvents`, `ServerToClientEvents`, `SocketData`. The `SomeSocket` union type covers both live `Socket` and `RemoteSocket` (from `fetchSockets()`). + +### Database + +- SQLite with Prisma 7 + `@prisma/adapter-better-sqlite3` +- Schema at `prisma/schema.prisma`, generated client at `prisma/generated/client/` +- Models: `User`, `Session` (Lucia), `UserPreferences`, `Channel` +- Seed creates a persistent "Default" channel +- Config in `prisma.config.ts` using `dotenv` for `DATABASE_URL` + +### Key Patterns + +- Request validation uses Zod schemas defined inline in route handlers +- DTOs in `dto/` define Prisma select objects with `satisfies Prisma.*Select` for type-safe projections +- `utils/socket-to-client.ts` maps socket data to the `ChadClient` interface sent to clients +- ESLint uses `@antfu/eslint-config` (flat config) with `no-console` and `n/prefer-global/process` disabled + +### Environment Variables + +- `PORT` — Server port (default: 4000, Docker: 80) +- `DATABASE_URL` — SQLite path (e.g., `file:../data/database.db`) +- `ANNOUNCED_ADDRESS` — Public IP for WebRTC ICE candidates (default: `127.0.0.1`) +- `CORS_ORIGIN` — Socket.IO CORS origin (default: `*`) + +### Docker + +`Dockerfile` requires python3 and build-essential for mediasoup native compilation. Runs `yarn db:deploy && yarn start`. diff --git a/server/dto/channel.dto.ts b/server/dto/channel.dto.ts index b7c4cca..32931ee 100644 --- a/server/dto/channel.dto.ts +++ b/server/dto/channel.dto.ts @@ -7,7 +7,7 @@ export const channelPublicSelect = { owner_id: true, persistent: true, maxClients: true, -} satisfies Prisma.ChannelSelect +} as Prisma.ChannelSelect export type ChannelPublicDTO = Prisma.ChannelGetPayload<{ select: typeof channelPublicSelect diff --git a/server/plugins/socket.ts b/server/plugins/socket.ts index fe96591..1a1b1ca 100644 --- a/server/plugins/socket.ts +++ b/server/plugins/socket.ts @@ -1,8 +1,12 @@ import type { FastifyInstance } from 'fastify' import type { ServerOptions } from 'socket.io' +import type { ChadClient } from '../types/socket.ts' +import { consola } from 'consola' import fp from 'fastify-plugin' import { Server } from 'socket.io' -import registerWebrtcSocket from '../socket/webrtc.ts' +import prisma from '../prisma/client.ts' +import registerChannelHandlers from '../socket/channel.ts' +import registerWebrtcHandlers from '../socket/webrtc.ts' declare module 'fastify' { interface FastifyInstance { @@ -23,8 +27,79 @@ export default fp>( }) fastify.ready(() => { - registerWebrtcSocket(fastify.io, fastify.mediasoupRouter) - registerChannelSocket(fastify.io, fastify.mediasoupRouter) + const audioLevelObserver = await fastify.mediasoupRouter.createAudioLevelObserver({ + maxEntries: 10, + threshold: -80, + interval: 800, + }) + + const activeSpeakerObserver = await fastify.mediasoupRouter.createActiveSpeakerObserver() + + audioLevelObserver.on('volumes', async (volumes: types.AudioLevelObserverVolume[]) => { + fastify.io.emit('webrtc:speaking-peers', volumes.map(({ producer, volume }) => { + const { socketId } = producer.appData as { socketId: ChadClient['socketId'] } + + return { + clientId: socketId, + volume, + } + })) + }) + + audioLevelObserver.on('silence', () => { + fastify.io.emit('webrtc:speaking-peers', []) + fastify.io.emit('webrtc:active-speaker', undefined) + }) + + activeSpeakerObserver.on('dominantspeaker', ({ producer }) => { + const { socketId } = producer.appData as { socketId: ChadClient['socketId'] } + + fastify.io.emit('webrtc:active-speaker', socketId) + }) + + fastify.io.on('connection', async (socket) => { + consola.info('New connection', socket.id) + + const user = await prisma.user.findUnique({ + where: { + id: socket.handshake.auth.userId, + }, + select: { + id: true, + username: true, + displayName: true, + }, + }) + + if (!user) { + socket.disconnect() + + return + } + + const { id, username, displayName } = user + + socket.data.userId = id + socket.data.username = username + socket.data.displayName = displayName + + consola.info('User authorized', ...Object.values(user)) + + const channel = await registerChannelHandlers(fastify.io, socket) + const webrtc = await registerWebrtcHandlers( + fastify.io, + socket, + fastify.mediasoupRouter, + audioLevelObserver, + activeSpeakerObserver, + ) + + socket.emit('webrtc:authenticated', { + ...channel, + ...webrtc, + rtpCapabilities: fastify.mediasoupRouter.rtpCapabilities, + }) + }) }) }, { name: 'socket-io', dependencies: ['mediasoup-worker', 'mediasoup-router'] }, diff --git a/server/routes/channels.ts b/server/routes/channels.ts new file mode 100644 index 0000000..0876c1f --- /dev/null +++ b/server/routes/channels.ts @@ -0,0 +1,255 @@ +import type { FastifyInstance } from 'fastify' +import { z } from 'zod' +import prisma from '../prisma/client.ts' +import { channelPublicSelect } from '../dto/channel.dto.ts' + +export default function (fastify: FastifyInstance) { + // GET /chad/channels - List all channels with client counts + fastify.get('/channels', async (req, reply) => { + if (!req.user) { + return reply.code(401).send({ error: 'Unauthorized' }) + } + + const channels = await prisma.channel.findMany({ + select: channelPublicSelect, + orderBy: { name: 'asc' }, + }) + + // Add client count to each channel using Socket.IO rooms + const channelsWithCounts = await Promise.all( + channels.map(async channel => { + const socketsInChannel = await fastify.io.in(channel.id).fetchSockets() + return { + ...channel, + clientCount: socketsInChannel.length, + } + }), + ) + + return channelsWithCounts + }) + + // GET /chad/channels/:id - Get specific channel with client list + fastify.get('/channels/:id', async (req, reply) => { + if (!req.user) { + return reply.code(401).send({ error: 'Unauthorized' }) + } + + try { + const paramsSchema = z.object({ + id: z.string(), + }) + const params = paramsSchema.parse(req.params) + + const channel = await prisma.channel.findUnique({ + where: { id: params.id }, + select: channelPublicSelect, + }) + + if (!channel) { + return reply.code(404).send({ error: 'Channel not found' }) + } + + // Get clients in this channel using Socket.IO rooms + const sockets = await fastify.io.in(params.id).fetchSockets() + const clients = sockets + .filter(s => s.data.joined) + .map(s => { + const channelId = Array.from(s.rooms).find(room => room !== s.id) || 'default' + return { + socketId: s.id, + userId: s.data.userId, + username: s.data.username, + displayName: s.data.displayName, + inputMuted: s.data.inputMuted, + outputMuted: s.data.outputMuted, + currentChannelId: channelId, + } + }) + + return { + ...channel, + clients, + } + } + catch (err) { + fastify.log.error(err) + reply.code(400) + if (err instanceof z.ZodError) { + reply.send({ error: z.prettifyError(err) }) + } + else { + reply.send({ error: err.message }) + } + } + }) + + // POST /chad/channels - Create channel + fastify.post('/channels', async (req, reply) => { + if (!req.user) { + return reply.code(401).send({ error: 'Unauthorized' }) + } + + try { + const schema = z.object({ + name: z.string().min(1).max(50), + maxClients: z.number().int().positive().optional().nullable(), + persistent: z.boolean().default(false), + }) + const input = schema.parse(req.body) + + const channel = await prisma.channel.create({ + data: { + name: input.name, + maxClients: input.maxClients, + persistent: input.persistent, + owner_id: req.user.id, + }, + select: channelPublicSelect, + }) + + // Notify all connected clients about new channel + fastify.io.emit('channelCreated', channel) + + return channel + } + catch (err) { + fastify.log.error(err) + reply.code(400) + if (err instanceof z.ZodError) { + reply.send({ error: z.prettifyError(err) }) + } + else { + reply.send({ error: err.message }) + } + } + }) + + // PATCH /chad/channels/:id - Update channel + fastify.patch('/channels/:id', async (req, reply) => { + if (!req.user) { + return reply.code(401).send({ error: 'Unauthorized' }) + } + + try { + const paramsSchema = z.object({ + id: z.string(), + }) + const params = paramsSchema.parse(req.params) + + const schema = z.object({ + name: z.string().min(1).max(50).optional(), + maxClients: z.number().int().positive().optional().nullable(), + }) + const input = schema.parse(req.body) + + // Cannot update default channel + if (params.id === 'default') { + return reply.code(403).send({ error: 'Cannot modify default channel' }) + } + + const existing = await prisma.channel.findUnique({ + where: { id: params.id }, + }) + + if (!existing) { + return reply.code(404).send({ error: 'Channel not found' }) + } + + if (existing.owner_id !== req.user.id) { + return reply.code(403).send({ error: 'Not channel owner' }) + } + + const channel = await prisma.channel.update({ + where: { id: params.id }, + data: input, + select: channelPublicSelect, + }) + + fastify.io.emit('channelUpdated', channel) + + return channel + } + catch (err) { + fastify.log.error(err) + reply.code(400) + if (err instanceof z.ZodError) { + reply.send({ error: z.prettifyError(err) }) + } + else { + reply.send({ error: err.message }) + } + } + }) + + // DELETE /chad/channels/:id - Delete channel + fastify.delete('/channels/:id', async (req, reply) => { + if (!req.user) { + return reply.code(401).send({ error: 'Unauthorized' }) + } + + try { + const paramsSchema = z.object({ + id: z.string(), + }) + const params = paramsSchema.parse(req.params) + + if (params.id === 'default') { + return reply.code(403).send({ error: 'Cannot delete default channel' }) + } + + const existing = await prisma.channel.findUnique({ + where: { id: params.id }, + }) + + if (!existing) { + return reply.code(404).send({ error: 'Channel not found' }) + } + + if (existing.owner_id !== req.user.id) { + return reply.code(403).send({ error: 'Not channel owner' }) + } + + // Move all users in this channel back to default using Socket.IO rooms + const sockets = await fastify.io.in(params.id).fetchSockets() + + for (const socket of sockets) { + // Close all their producers + for (const producer of socket.data.producers.values()) { + producer.close() + } + socket.data.producers.clear() + + // Close all their consumers + for (const consumer of socket.data.consumers.values()) { + consumer.close() + } + socket.data.consumers.clear() + + // Move to default room + await socket.leave(params.id) + await socket.join('default') + + socket.emit('forcedChannelSwitch', { channelId: 'default' }) + } + + await prisma.channel.delete({ + where: { id: params.id }, + }) + + fastify.io.emit('channelDeleted', { channelId: params.id }) + + return { success: true } + } + catch (err) { + fastify.log.error(err) + reply.code(400) + if (err instanceof z.ZodError) { + reply.send({ error: z.prettifyError(err) }) + } + else { + reply.send({ error: err.message }) + } + } + }) +} diff --git a/server/routes/user.ts b/server/routes/user.ts index cc3934d..72157d5 100644 --- a/server/routes/user.ts +++ b/server/routes/user.ts @@ -77,7 +77,7 @@ export default function (fastify: FastifyInstance) { if (found) { found.data.displayName = input.displayName - namespace.emit('clientChanged', found.id, socketToClient(found)) + namespace.emit('webrtc:client-changed', found.id, socketToClient(found)) } return updatedUser diff --git a/server/socket/channel.ts b/server/socket/channel.ts index 18dceba..084d8f4 100644 --- a/server/socket/channel.ts +++ b/server/socket/channel.ts @@ -1,449 +1,150 @@ -import type { types } from 'mediasoup' -import type { Server as SocketServer } from 'socket.io' import type { - Namespace, - SomeSocket, + ChadSocket, + ClientToServerEvents, + ExtractCallbackPayload, + SocketServer, } from '../types/socket.ts' import { consola } from 'consola' +import { channelPublicSelect } from '../dto/channel.dto.ts' import prisma from '../prisma/client.ts' import { socketToClient } from '../utils/socket-to-client.ts' -export default function (io: SocketServer) { - io.on('connection', async (socket) => { - consola.info('[WebRtc]', 'Client connected', socket.id) +export default async function (io: SocketServer, socket: ChadSocket) { + // io.on('channel:join', async (cb) => { + // if (socket.data.joined) { + // consola.error('[WebRtc]', 'Already joined') + // cb({ error: 'Already joined' }) + // return + // } + // + // socket.data.joined = true + // socket.data.rtpCapabilities = rtpCapabilities + // + // // Get current channel from Socket.IO rooms + // const currentChannelId = Array.from(socket.rooms).find(room => room !== socket.id) || 'default' + // const joinedSockets = await getJoinedSockets(socket.id, currentChannelId) + // + // cb(joinedSockets.map(socketToClient)) + // + // for (const joinedSocket of joinedSockets) { + // for (const producer of joinedSocket.data.producers.values()) { + // createConsumer( + // socket, + // joinedSocket, + // producer, + // ) + // } + // } + // + // // Broadcast only to same channel using Socket.IO room + // socket.to(currentChannelId).emit('newPeer', socketToClient(socket)) + // }) - socket.data.joined = false - - socket.data.inputMuted = false - socket.data.outputMuted = false - - socket.data.transports = new Map() - socket.data.producers = new Map() - socket.data.consumers = new Map() - - const user = await prisma.user.findUnique({ - where: { - id: socket.handshake.auth.userId, - }, - select: { - id: true, - username: true, - displayName: true, - }, - }) - - if (!user) { - socket.disconnect() - - return + socket.on('channel:join', async ({ channelId }, cb) => { + try { + cb(await handleJoin(channelId)) + } + catch (e) { + cb(e) } - - const { id, username, displayName } = user - - socket.data.userId = id - socket.data.username = username - socket.data.displayName = displayName - - socket.emit('authenticated', { channels }) - - socket.on('join', async ({ rtpCapabilities }, cb) => { - if (socket.data.joined) { - consola.error('[WebRtc]', 'Already joined') - cb({ error: 'Already joined' }) - } - - socket.data.joined = true - socket.data.rtpCapabilities = rtpCapabilities - - const joinedSockets = await getJoinedSockets() - - cb(joinedSockets.map(socketToClient)) - - for (const joinedSocket of joinedSockets.filter(joinedSocket => joinedSocket.id !== socket.id)) { - for (const producer of joinedSocket.data.producers.values()) { - createConsumer( - socket, - joinedSocket, - producer, - ) - } - } - - socket.broadcast.emit('newPeer', socketToClient(socket)) - }) - - socket.on('getRtpCapabilities', (cb) => { - cb(router.rtpCapabilities) - }) - - socket.on('createTransport', async ({ producing, consuming }, cb) => { - try { - const transport = await router.createWebRtcTransport({ - listenInfos: [ - { - protocol: 'udp', - ip: '0.0.0.0', - announcedAddress: process.env.ANNOUNCED_ADDRESS || '127.0.0.1', - portRange: { - min: 40000, - max: 40100, - }, - }, - ], - enableUdp: true, - preferUdp: true, - appData: { - producing, - consuming, - }, - }) - - socket.data.transports.set(transport.id, transport) - - cb({ - id: transport.id, - iceParameters: transport.iceParameters, - iceCandidates: transport.iceCandidates, - dtlsParameters: transport.dtlsParameters, - }) - - transport.on('icestatechange', (iceState: types.IceState) => { - if (iceState === 'disconnected' || iceState === 'closed') { - consola.info('[WebRtc]', '[WebRtcTransport]', `"icestatechange" event [iceState:${iceState}], closing peer`, transport.id) - - socket.disconnect() - } - }) - - transport.on('dtlsstatechange', (dtlsState: types.DtlsState) => { - if (dtlsState === 'failed' || dtlsState === 'closed') { - consola.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s], closing peer', dtlsState) - - socket.disconnect() - } - }) - } - catch (error) { - if (error instanceof Error) { - consola.error('[WebRtc]', '[createTransport]', error.message) - cb({ error: error.message }) - } - } - }) - - socket.on('connectTransport', async ({ transportId, dtlsParameters }, cb) => { - const transport = socket.data.transports.get(transportId) - - if (!transport) { - consola.error('[WebRtc]', '[connectTransport]', `Transport with id ${transportId} not found`) - cb({ error: 'Transport not found' }) - - return - } - - try { - await transport.connect({ dtlsParameters }) - - cb({ ok: true }) - } - catch (error) { - if (error instanceof Error) { - consola.error('[WebRtc]', '[connectTransport]', error.message) - cb({ error: error.message }) - } - } - }) - - socket.on('produce', async ({ transportId, kind, rtpParameters, appData }, cb) => { - if (!socket.data.joined) { - consola.error('Peer not joined yet') - cb({ error: 'Peer not joined yet' }) - - return - } - - const transport = socket.data.transports.get(transportId) - - if (!transport) { - consola.error('[WebRtc]', '[produce]', `Transport with id ${transportId} not found`) - cb({ error: 'Transport not found' }) - - return - } - - try { - const producer = await transport.produce({ kind, rtpParameters, appData: { ...appData, socketId: socket.id } }) - - socket.data.producers.set(producer.id, producer) - - cb({ id: producer.id }) - - const otherSockets = await getJoinedSockets(socket.id) - - for (const otherSocket of otherSockets) { - createConsumer( - otherSocket, - socket, - producer, - ) - } - - // TODO: Add into the AudioLevelObserver and ActiveSpeakerObserver. - // https://github.com/versatica/mediasoup-demo/blob/v3/server/lib/Room.js#L1276 - } - catch (error) { - if (error instanceof Error) { - consola.error('[WebRtc]', '[produce]', error.message) - cb({ error: error.message }) - } - } - }) - - socket.on('closeProducer', async ({ producerId }, cb) => { - if (!socket.data.joined) { - consola.error('Peer not joined yet') - cb({ error: 'Peer not joined yet' }) - - return - } - - const producer = socket.data.producers.get(producerId) - - if (!producer) { - consola.error(`producer with id "${producerId}" not found`) - cb({ error: `producer with id "${producerId}" not found` }) - - return - } - - producer.close() - - socket.data.producers.delete(producerId) - - cb({ ok: true }) - }) - - socket.on('pauseProducer', async ({ producerId }, cb) => { - if (!socket.data.joined) { - consola.error('Peer not joined yet') - cb({ error: 'Peer not joined yet' }) - - return - } - - const producer = socket.data.producers.get(producerId) - - if (!producer) { - consola.error(`producer with id "${producerId}" not found`) - cb({ error: `producer with id "${producerId}" not found` }) - - return - } - - if (producer.paused) - return - - await producer.pause() - - cb({ ok: true }) - }) - - socket.on('resumeProducer', async ({ producerId }, cb) => { - if (!socket.data.joined) { - consola.error('Peer not joined yet') - cb({ error: 'Peer not joined yet' }) - - return - } - - const producer = socket.data.producers.get(producerId) - - if (!producer) { - consola.error(`producer with id "${producerId}" not found`) - cb({ error: `producer with id "${producerId}" not found` }) - - return - } - - await producer.resume() - - cb({ ok: true }) - }) - - socket.on('pauseConsumer', async ({ consumerId }, cb) => { - if (!socket.data.joined) { - consola.error('Peer not joined yet') - cb({ error: 'Peer not joined yet' }) - - return - } - - const consumer = socket.data.consumers.get(consumerId) - - if (!consumer) { - consola.error(`consumer with id "${consumerId}" not found`) - cb({ error: `consumer with id "${consumerId}" not found` }) - - return - } - - await consumer.pause() - - cb({ ok: true }) - }) - - socket.on('resumeConsumer', async ({ consumerId }, cb) => { - if (!socket.data.joined) { - consola.error('Peer not joined yet') - cb({ error: 'Peer not joined yet' }) - - return - } - - const consumer = socket.data.consumers.get(consumerId) - - if (!consumer) { - consola.error(`consumer with id "${consumerId}" not found`) - cb({ error: `consumer with id "${consumerId}" not found` }) - - return - } - - await consumer.resume() - - cb({ ok: true }) - }) - - socket.on('updateClient', async (updatedClient, cb) => { - if (typeof updatedClient.inputMuted === 'boolean') { - socket.data.inputMuted = updatedClient.inputMuted - } - - if (typeof updatedClient.outputMuted === 'boolean') { - socket.data.outputMuted = updatedClient.outputMuted - } - - cb(socketToClient(socket)) - - namespace.emit('clientChanged', socket.id, socketToClient(socket)) - }) - - socket.on('disconnect', () => { - consola.info('Client disconnected:', socket.id) - - if (socket.data.joined) { - socket.broadcast.emit('peerClosed', socket.id) - } - - for (const transport of socket.data.transports.values()) { - transport.close() - } - }) }) - async function getJoinedSockets(excludeId?: string) { - const sockets = await namespace.fetchSockets() + await handleJoin('default') - return sockets.filter(socket => socket.data.joined && (excludeId ? excludeId !== socket.id : true)) + const channels = await prisma.channel.findMany({ + select: channelPublicSelect, + orderBy: { name: 'asc' }, + }) + + type ChannelJoinCallback = ExtractCallbackPayload + async function handleJoin(channelId: string): Promise { + try { + const channel = await prisma.channel.findUnique({ + where: { id: channelId }, + select: channelPublicSelect, + }) + + if (!channel) { + return { error: 'Channel not found' } + } + + if (channel.maxClients) { + const socketsInChannel = await io.in(channelId).fetchSockets() + if (socketsInChannel.length >= channel.maxClients) { + return { error: 'Channel is full' } + } + } + + const oldChannelId = Array.from(socket.rooms).find(room => room !== socket.id) + + // for (const producer of socket.data.producers.values()) { + // producer.close() + // } + // socket.data.producers.clear() + // + // for (const consumer of socket.data.consumers.values()) { + // consumer.close() + // } + // socket.data.consumers.clear() + + if (oldChannelId) { + await socket.leave(oldChannelId) + io.emit('channel:user-left', { channelId: oldChannelId, clientId: socket.id }) + + // // Auto-delete non-persistent empty channels + // if (isLeavingNonPersistentChannel) { + // const oldChannelSockets = await io.in(oldChannelId).fetchSockets() + // + // if (oldChannelSockets.length === 0) { + // const oldChannel = await prisma.channel.findUnique({ + // where: { id: oldChannelId }, + // select: { persistent: true, id: true }, + // }) + // + // if (oldChannel && !oldChannel.persistent) { + // await prisma.channel.delete({ where: { id: oldChannelId } }) + // io.emit('channelDeleted', { channelId: oldChannelId }) + // consola.info('[Channel]', `Auto-deleted empty non-persistent channel: ${oldChannelId}`) + // } + // } + // } + } + + await socket.join(channelId) + + // Get new channel members + // const newChannelSockets = await getJoinedSockets(socket.id, channelId) + // + // // Create consumers for existing producers in new channel + // for (const peer of newChannelSockets) { + // for (const producer of peer.data.producers.values()) { + // createConsumer(socket, peer, producer) + // } + // } + + io.emit('channel:user-joined', { + channelId, + client: socketToClient(socket), + }) + + return { + channel, + clients: newChannelSockets.map(socketToClient), + } + } + catch (error) { + consola.error('[channel:join]', error) + + if (error instanceof Error) { + return { error: error.message } + } + else { + return { error: 'Something went wrong' } + } + } } - async function createConsumer( - consumerSocket: SomeSocket, - producerSocket: SomeSocket, - producer: types.Producer, - ) { - if ( - !consumerSocket.data.rtpCapabilities - || !router.canConsume( - { - producerId: producer.id, - rtpCapabilities: consumerSocket.data.rtpCapabilities, - }, - ) - ) { - return - } - - const transport = Array.from(consumerSocket.data.transports.values()) - .find(t => t.appData.consuming) - - if (!transport) { - consola.error('createConsumer() | Transport for consuming not found') - - return - } - - let consumer: types.Consumer - - try { - consumer = await transport.consume( - { - producerId: producer.id, - rtpCapabilities: consumerSocket.data.rtpCapabilities, - // Enable NACK for OPUS. - enableRtx: true, - paused: true, - ignoreDtx: true, - }, - ) - } - catch (error) { - consola.error('_createConsumer() | transport.consume():%o', error) - - return - } - - consumerSocket.data.consumers.set(consumer.id, consumer) - - consumer.on('transportclose', () => { - consumerSocket.data.consumers.delete(consumer.id) - }) - - consumer.on('producerclose', () => { - consumerSocket.data.consumers.delete(consumer.id) - - consumerSocket.emit('consumerClosed', { consumerId: consumer.id }) - }) - - consumer.on('producerpause', () => { - consumerSocket.emit('consumerPaused', { consumerId: consumer.id }) - }) - - consumer.on('producerresume', () => { - consumerSocket.emit('consumerResumed', { consumerId: consumer.id }) - }) - - consumer.on('score', (score: types.ConsumerScore) => { - consumerSocket.emit('consumerScore', { consumerId: consumer.id, score }) - }) - - try { - await consumerSocket.emitWithAck( - 'newConsumer', - { - socketId: producerSocket.id, - producerId: producer.id, - id: consumer.id, - kind: consumer.kind, - rtpParameters: consumer.rtpParameters, - type: consumer.type, - appData: producer.appData, - producerPaused: consumer.producerPaused, - }, - ) - - await consumer.resume() - - consumerSocket.emit( - 'consumerScore', - { - consumerId: consumer.id, - score: consumer.score, - }, - ) - } - catch (error) { - consola.error('_createConsumer() | failed:%o', error) - } + return { + channels, } } diff --git a/server/socket/webrtc.ts b/server/socket/webrtc.ts index 1cef7f6..e15a5cd 100644 --- a/server/socket/webrtc.ts +++ b/server/socket/webrtc.ts @@ -1,49 +1,21 @@ import type { types } from 'mediasoup' +import type { ActiveSpeakerObserver, AudioLevelObserver } from 'mediasoup/types' import type { Server as SocketServer } from 'socket.io' import type { - ChadClient, + ChadSocket, SomeSocket, } from '../types/socket.ts' import { consola } from 'consola' -import prisma from '../prisma/client.ts' import { socketToClient } from '../utils/socket-to-client.ts' -export default async function (io: SocketServer, router: types.Router) { - const audioLevelObserver = await router.createAudioLevelObserver({ - maxEntries: 10, - threshold: -80, - interval: 800, - }) - - const activeSpeakerObserver = await router.createActiveSpeakerObserver() - - audioLevelObserver.on('volumes', async (volumes: types.AudioLevelObserverVolume[]) => { - io.emit('speakingPeers', volumes.map(({ producer, volume }) => { - const { socketId } = producer.appData as { socketId: ChadClient['socketId'] } - - return { - clientId: socketId, - volume, - } - })) - }) - - audioLevelObserver.on('silence', () => { - io.emit('speakingPeers', []) - io.emit('activeSpeaker', undefined) - }) - - activeSpeakerObserver.on('dominantspeaker', ({ producer }) => { - const { socketId } = producer.appData as { socketId: ChadClient['socketId'] } - - io.emit('activeSpeaker', socketId) - }) - +export default async function ( + io: SocketServer, + socket: ChadSocket, + router: types.Router, + audioLevelObserver: AudioLevelObserver, + activeSpeakerObserver: ActiveSpeakerObserver, +) { io.on('connection', async (socket) => { - consola.info('[WebRtc]', 'Client connected', socket.id) - - socket.data.joined = false - socket.data.inputMuted = false socket.data.outputMuted = false @@ -51,62 +23,11 @@ export default async function (io: SocketServer, router: types.Router) { socket.data.producers = new Map() socket.data.consumers = new Map() - const user = await prisma.user.findUnique({ - where: { - id: socket.handshake.auth.userId, - }, - select: { - id: true, - username: true, - displayName: true, - }, - }) - - if (!user) { - socket.disconnect() - - return - } - - const { id, username, displayName } = user - - socket.data.userId = id - socket.data.username = username - socket.data.displayName = displayName - - socket.emit('authenticated', { channels }) - - socket.on('join', async ({ rtpCapabilities }, cb) => { - if (socket.data.joined) { - consola.error('[WebRtc]', 'Already joined') - cb({ error: 'Already joined' }) - } - - socket.data.joined = true - socket.data.rtpCapabilities = rtpCapabilities - - const joinedSockets = await getJoinedSockets() - - cb(joinedSockets.map(socketToClient)) - - for (const joinedSocket of joinedSockets.filter(joinedSocket => joinedSocket.id !== socket.id)) { - for (const producer of joinedSocket.data.producers.values()) { - createConsumer( - socket, - joinedSocket, - producer, - ) - } - } - - socket.broadcast.emit('newPeer', socketToClient(socket)) - }) - - socket.on('getRtpCapabilities', (cb) => { + socket.on('webrtc:get-rtp-capabilities', (cb) => { cb(router.rtpCapabilities) }) - socket.on('createTransport', async ({ producing, consuming }, cb) => { + socket.on('webrtc:create-transport', async ({ producing, consuming }, cb) => { try { const transport = await router.createWebRtcTransport({ listenInfos: [ @@ -161,7 +82,7 @@ export default async function (io: SocketServer, router: types.Router) { } }) - socket.on('connectTransport', async ({ transportId, dtlsParameters }, cb) => { + socket.on('webrtc:connect-transport', async ({ transportId, dtlsParameters }, cb) => { const transport = socket.data.transports.get(transportId) if (!transport) { @@ -184,7 +105,7 @@ export default async function (io: SocketServer, router: types.Router) { } }) - socket.on('produce', async ({ transportId, kind, rtpParameters, appData }, cb) => { + socket.on('webrtc:produce', async ({ transportId, kind, rtpParameters, appData }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) @@ -192,6 +113,14 @@ export default async function (io: SocketServer, router: types.Router) { return } + // Block production in default channel + const currentChannelId = Array.from(socket.rooms).find(room => room !== socket.id) || 'default' + if (currentChannelId === 'default') { + consola.error('Cannot produce in default channel') + cb({ error: 'Cannot produce media in default channel' }) + return + } + const transport = socket.data.transports.get(transportId) if (!transport) { @@ -208,7 +137,8 @@ export default async function (io: SocketServer, router: types.Router) { cb({ id: producer.id }) - const otherSockets = await getJoinedSockets(socket.id) + // Filter by channel when creating consumers + const otherSockets = await getJoinedSockets(socket.id, currentChannelId) for (const otherSocket of otherSockets) { createConsumer( @@ -229,7 +159,7 @@ export default async function (io: SocketServer, router: types.Router) { } }) - socket.on('closeProducer', async ({ producerId }, cb) => { + socket.on('webrtc:close-producer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) @@ -253,7 +183,7 @@ export default async function (io: SocketServer, router: types.Router) { cb({ ok: true }) }) - socket.on('pauseProducer', async ({ producerId }, cb) => { + socket.on('webrtc:pause-producer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) @@ -278,7 +208,7 @@ export default async function (io: SocketServer, router: types.Router) { cb({ ok: true }) }) - socket.on('resumeProducer', async ({ producerId }, cb) => { + socket.on('webrtc:resume-producer', async ({ producerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) @@ -300,7 +230,7 @@ export default async function (io: SocketServer, router: types.Router) { cb({ ok: true }) }) - socket.on('pauseConsumer', async ({ consumerId }, cb) => { + socket.on('webrtc:pause-consumer', async ({ consumerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) @@ -322,7 +252,7 @@ export default async function (io: SocketServer, router: types.Router) { cb({ ok: true }) }) - socket.on('resumeConsumer', async ({ consumerId }, cb) => { + socket.on('webrtc:resume-consumer', async ({ consumerId }, cb) => { if (!socket.data.joined) { consola.error('Peer not joined yet') cb({ error: 'Peer not joined yet' }) @@ -344,7 +274,7 @@ export default async function (io: SocketServer, router: types.Router) { cb({ ok: true }) }) - socket.on('updateClient', async (updatedClient, cb) => { + socket.on('webrtc:update-client', async (updatedClient, cb) => { if (typeof updatedClient.inputMuted === 'boolean') { socket.data.inputMuted = updatedClient.inputMuted } @@ -355,14 +285,21 @@ export default async function (io: SocketServer, router: types.Router) { cb(socketToClient(socket)) - io.emit('clientChanged', socket.id, socketToClient(socket)) + io.emit('webrtc:client-changed', socket.id, socketToClient(socket)) }) socket.on('disconnect', () => { consola.info('Client disconnected:', socket.id) + // Get current channel from Socket.IO rooms + const channelId = Array.from(socket.rooms).find(room => room !== socket.id) + if (socket.data.joined) { - socket.broadcast.emit('peerClosed', socket.id) + // Notify only same channel using Socket.IO room + if (channelId) { + socket.to(channelId).emit('webrtc:peer-closed', socket.id) + io.emit('channelUserLeft', { channelId, clientId: socket.id }) + } } for (const transport of socket.data.transports.values()) { @@ -371,10 +308,21 @@ export default async function (io: SocketServer, router: types.Router) { }) }) - async function getJoinedSockets(excludeId?: string) { - const sockets = await io.fetchSockets() + async function getJoinedSockets(excludeId?: string, channelId?: string) { + let sockets = await io.fetchSockets() - return sockets.filter(socket => socket.data.joined && (excludeId ? excludeId !== socket.id : true)) + // Filter by channel using Socket.IO rooms + if (channelId) { + sockets = await io.in(channelId).fetchSockets() + } + + return sockets.filter((socket) => { + if (!socket.data.joined) + return false + if (excludeId && socket.id === excludeId) + return false + return true + }) } async function createConsumer( @@ -432,24 +380,24 @@ export default async function (io: SocketServer, router: types.Router) { consumer.on('producerclose', () => { consumerSocket.data.consumers.delete(consumer.id) - consumerSocket.emit('consumerClosed', { consumerId: consumer.id }) + consumerSocket.emit('webrtc:consumer-closed', { consumerId: consumer.id }) }) consumer.on('producerpause', () => { - consumerSocket.emit('consumerPaused', { consumerId: consumer.id }) + consumerSocket.emit('webrtc:consumer-paused', { consumerId: consumer.id }) }) consumer.on('producerresume', () => { - consumerSocket.emit('consumerResumed', { consumerId: consumer.id }) + consumerSocket.emit('webrtc:consumer-resumed', { consumerId: consumer.id }) }) consumer.on('score', (score: types.ConsumerScore) => { - consumerSocket.emit('consumerScore', { consumerId: consumer.id, score }) + consumerSocket.emit('webrtc:consumer-score', { consumerId: consumer.id, score }) }) try { await consumerSocket.emitWithAck( - 'newConsumer', + 'webrtc:new-consumer', { socketId: producerSocket.id, producerId: producer.id, @@ -465,7 +413,7 @@ export default async function (io: SocketServer, router: types.Router) { await consumer.resume() consumerSocket.emit( - 'consumerScore', + 'webrtc:consumer-score', { consumerId: consumer.id, score: consumer.score, diff --git a/server/types/socket.ts b/server/types/socket.ts index bbf129a..669574b 100644 --- a/server/types/socket.ts +++ b/server/types/socket.ts @@ -1,10 +1,11 @@ import type { types } from 'mediasoup' import type { RemoteSocket, Server, Socket } from 'socket.io' +import type { ChannelPublicDTO } from '../dto/channel.dto.ts' import type { ChannelModel, UserModel } from '../prisma/generated/client/models.ts' export interface ServerInfo { owner_id: UserModel['id'] - channels: ChannelModel[] + channels: ChannelPublicDTO[] rtpCapabilities: types.RtpCapabilities } @@ -15,6 +16,7 @@ export interface ChadClient { displayName: UserModel['displayName'] inputMuted: boolean outputMuted: boolean + currentChannelId: ChannelModel['id'] } export interface ProducerShort { @@ -31,32 +33,41 @@ export interface SuccessCallbackResult { } export type EventCallback = (result: T | ErrorCallbackResult) => void +export type EventCallbackResult = Parameters>[0] + +type LastArg = T extends [...any[], infer Last] ? Last : T extends [infer Only] ? Only : never + +export type ExtractCallbackPayload< + T, + Fallback = EventCallbackResult, +> + = T extends (...args: infer Args) => any + ? LastArg extends (...inner: infer CallbackArgs) => any + ? CallbackArgs extends [infer First, ...any[]] + ? First + : Fallback + : Fallback + : Fallback export interface ClientToServerEvents { - join: ( - options: { - rtpCapabilities: types.RtpCapabilities - }, - cb: EventCallback - ) => void - getRtpCapabilities: ( + 'webrtc:get-rtp-capabilities': ( cb: EventCallback ) => void - createTransport: ( + 'webrtc:create-transport': ( options: { producing: boolean consuming: boolean }, cb: EventCallback> ) => void - connectTransport: ( + 'webrtc:connect-transport': ( options: { transportId: types.WebRtcTransport['id'] dtlsParameters: types.WebRtcTransport['dtlsParameters'] }, cb: EventCallback ) => void - produce: ( + 'webrtc:produce': ( options: { transportId: types.WebRtcTransport['id'] kind: types.MediaKind @@ -65,47 +76,51 @@ export interface ClientToServerEvents { }, cb: EventCallback<{ id: types.Producer['id'] }> ) => void - closeProducer: ( + 'webrtc:close-producer': ( options: { producerId: types.Producer['id'] }, cb: EventCallback ) => void - pauseProducer: ( + 'webrtc:pause-producer': ( options: { producerId: types.Producer['id'] }, cb: EventCallback ) => void - resumeProducer: ( + 'webrtc:resume-producer': ( options: { producerId: types.Producer['id'] }, cb: EventCallback ) => void - pauseConsumer: ( + 'webrtc:pause-consumer': ( options: { consumerId: types.Consumer['id'] }, cb: EventCallback ) => void - resumeConsumer: ( + 'webrtc:resume-consumer': ( options: { consumerId: types.Consumer['id'] }, cb: EventCallback ) => void - updateClient: ( + 'webrtc:update-client': ( options: Partial>, cb: EventCallback ) => void + + 'channel:join': ( + options: { channelId: ChannelModel['id'] }, + cb: EventCallback<{ channel: ChannelPublicDTO, clients: ChadClient[] }> + ) => void } export interface ServerToClientEvents { - authenticated: (arg: ServerInfo) => void - newPeer: (arg: ChadClient) => void - producers: (arg: ProducerShort[]) => void - newConsumer: ( + 'webrtc:authenticated': (arg: ServerInfo) => void + 'webrtc:producers': (arg: ProducerShort[]) => void + 'webrtc:new-consumer': ( arg: { socketId: string producerId: types.Producer['id'] @@ -118,14 +133,21 @@ export interface ServerToClientEvents { }, cb: EventCallback ) => void - peerClosed: (arg: string) => void - consumerClosed: (arg: { consumerId: string }) => void - consumerPaused: (arg: { consumerId: string }) => void - consumerResumed: (arg: { consumerId: string }) => void - consumerScore: (arg: { consumerId: string, score: types.ConsumerScore }) => void - clientChanged: (clientId: ChadClient['socketId'], client: ChadClient) => void - speakingPeers: (arg: { clientId: ChadClient['socketId'], volume: types.AudioLevelObserverVolume['volume'] }[]) => void - activeSpeaker: (clientId?: ChadClient['socketId']) => void + 'webrtc:peer-closed': (arg: string) => void + 'webrtc:consumer-closed': (arg: { consumerId: string }) => void + 'webrtc:consumer-paused': (arg: { consumerId: string }) => void + 'webrtc:consumer-resumed': (arg: { consumerId: string }) => void + 'webrtc:consumer-score': (arg: { consumerId: string, score: types.ConsumerScore }) => void + 'webrtc:client-changed': (clientId: ChadClient['socketId'], client: ChadClient) => void + 'webrtc:speaking-peers': (arg: { clientId: ChadClient['socketId'], volume: types.AudioLevelObserverVolume['volume'] }[]) => void + 'webrtc:active-speaker': (clientId?: ChadClient['socketId']) => void + + 'channel:user-joined': (arg: { channelId: string, client: ChadClient }) => void + 'channel:user-left': (arg: { channelId: string, clientId: string }) => void + 'channel:deleted': (arg: { channelId: string }) => void + 'channel:created': (arg: ChannelPublicDTO) => void + 'channel:updated': (arg: ChannelPublicDTO) => void + 'channel:force-switch': (arg: { channelId: string }) => void } export interface InterServerEvent {} @@ -143,7 +165,9 @@ export interface SocketData { consumers: Map } -export type SomeSocket = Socket - | RemoteSocket +export type ChadSocket = Socket +export type ChadRemoteSocket = RemoteSocket + +export type SomeSocket = ChadSocket | ChadRemoteSocket export type SocketServer = Server diff --git a/server/types/utils.ts b/server/types/utils.ts new file mode 100644 index 0000000..e3bd548 --- /dev/null +++ b/server/types/utils.ts @@ -0,0 +1,4 @@ +export type LastArgument + = T extends (...args: [...any[], infer Last]) => any + ? Last + : never diff --git a/server/utils/fetch-sockets.ts b/server/utils/fetch-sockets.ts new file mode 100644 index 0000000..a436d76 --- /dev/null +++ b/server/utils/fetch-sockets.ts @@ -0,0 +1,16 @@ +import type { SocketServer } from '../types/socket.ts' + +export async function fetchSockets(io: SocketServer, excludeId?: string, channelId?: string) { + let sockets: Awaited> + + if (channelId) { + sockets = await io.in(channelId).fetchSockets() + } + else { + sockets = await io.fetchSockets() + } + + return sockets.filter((socket) => { + return !(excludeId && socket.id === excludeId) + }) +} diff --git a/server/utils/socket-to-client.ts b/server/utils/socket-to-client.ts index 7840a9f..4d92c9a 100644 --- a/server/utils/socket-to-client.ts +++ b/server/utils/socket-to-client.ts @@ -1,6 +1,9 @@ import type { ChadClient, SomeSocket } from '../types/socket.ts' export function socketToClient(socket: SomeSocket): ChadClient { + // Socket.IO rooms: Extract channel room (filter out the socket's own room) + const channelId = Array.from(socket.rooms).find(room => room !== socket.id) || 'default' + return { socketId: socket.id, userId: socket.data.userId, @@ -8,5 +11,6 @@ export function socketToClient(socket: SomeSocket): ChadClient { displayName: socket.data.displayName, inputMuted: socket.data.inputMuted, outputMuted: socket.data.outputMuted, + currentChannelId: channelId, } }