diff --git a/bin/server.js b/bin/server.js index 29ae90d..2b5b0e0 100755 --- a/bin/server.js +++ b/bin/server.js @@ -1,187 +1,20 @@ -/* -Unlike stated in the LICENSE file, it is not necessary to include the copyright notice and permission notice when you copy code from this file. -*/ -const Y = require('yjs') -const syncProtocol = require('y-protocols/dist/sync.js') -const awarenessProtocol = require('y-protocols/dist/awareness.js') /** * @type {any} */ const WebSocket = require('ws') const http = require('http') -const encoding = require('lib0/dist/encoding.js') -const decoding = require('lib0/dist/decoding.js') -const mutex = require('lib0/dist/mutex.js') -const map = require('lib0/dist/map.js') +const wss = new WebSocket.Server({ noServer: true }) +const setupWSConnection = require('./utils.js').setupWSConnection const port = process.env.PORT || 1234 -// disable gc when using snapshots! -const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0' -const persistenceDir = process.env.YPERSISTENCE -/** - * @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise}|null} - */ -let persistence = null -if (typeof persistenceDir === 'string') { - // @ts-ignore - const LevelDbPersistence = require('y-leveldb').LevelDbPersistence - persistence = new LevelDbPersistence(persistenceDir) -} - -const server = http.createServer((req, res) => { - res.writeHead(200, { 'Content-Type': 'text/plain' }) - res.end('okay') +const server = http.createServer((request, response) => { + response.writeHead(200, { 'Content-Type': 'text/plain' }) + response.end('okay') }) -const wss = new WebSocket.Server({ noServer: true }) - -/** - * @type {Map} - */ -const docs = new Map() - -const messageSync = 0 -const messageAwareness = 1 -// const messageAuth = 2 - -/** - * @param {Y.Transaction} transaction - * @param {WSSharedDoc} doc - */ -const afterTransaction = (transaction, doc) => { - if (transaction.updateMessage !== null) { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeUpdate(encoder, transaction.updateMessage) - const message = encoding.toBuffer(encoder) - doc.conns.forEach((_, conn) => conn.send(message)) - } -} - -class WSSharedDoc extends Y.Y { - constructor () { - super({ gc: gcEnabled }) - this.mux = mutex.createMutex() - /** - * Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed - * @type {Map>} - */ - this.conns = new Map() - /** - * @type {Map} - */ - this.awareness = new Map() - /** - * @type {Map} - */ - this.awarenessClock = new Map() - this.on('afterTransaction', afterTransaction) - } -} - -/** - * @param {any} conn - * @param {WSSharedDoc} doc - * @param {ArrayBuffer} message - */ -const messageListener = (conn, doc, message) => { - const encoder = encoding.createEncoder() - const decoder = decoding.createDecoder(message) - const messageType = decoding.readVarUint(decoder) - switch (messageType) { - case messageSync: - encoding.writeVarUint(encoder, messageSync) - syncProtocol.readSyncMessage(decoder, encoder, doc) - if (encoding.length(encoder) > 1) { - conn.send(encoding.toBuffer(encoder)) - } - break - case messageAwareness: { - encoding.writeVarUint(encoder, messageAwareness) - const updates = awarenessProtocol.forwardAwarenessMessage(decoder, encoder) - updates.forEach(update => { - doc.awareness.set(update.clientID, update.state) - doc.awarenessClock.set(update.clientID, update.clock) - // @ts-ignore we received an update => so conn exists - doc.conns.get(conn).add(update.clientID) - }) - const buff = encoding.toBuffer(encoder) - doc.conns.forEach((_, c) => { - c.send(buff) - }) - break - } - } -} - -/** - * @param {any} conn - * @param {any} req - */ -const setupConnection = (conn, req) => { - conn.binaryType = 'arraybuffer' - // get doc, create if it does not exist yet - const docName = req.url.slice(1) - const doc = map.setIfUndefined(docs, docName, () => { - const doc = new WSSharedDoc() - if (persistence !== null) { - persistence.bindState(docName, doc) - } - docs.set(docName, doc) - return doc - }) - doc.conns.set(conn, new Set()) - // listen and reply to events - // @ts-ignore - conn.on('message', message => messageListener(conn, doc, message)) - conn.on('close', () => { - /** - * @type {Set} - */ - // @ts-ignore - const controlledIds = doc.conns.get(conn) - doc.conns.delete(conn) - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageAwareness) - awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(clientID => { - const clock = (doc.awarenessClock.get(clientID) || 0) + 1 - doc.awareness.delete(clientID) - doc.awarenessClock.delete(clientID) - return { clientID, state: null, clock } - })) - const buf = encoding.toBuffer(encoder) - doc.conns.forEach((_, conn) => conn.send(buf)) - if (doc.conns.size === 0 && persistence !== null) { - // if persisted, we store state and destroy ydocument - persistence.writeState(docName, doc).then(() => { - doc.destroy() - }) - docs.delete(docName) - } - }) - // send sync step 1 - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, doc.store) - conn.send(encoding.toBuffer(encoder)) - if (doc.awareness.size > 0) { - const encoder = encoding.createEncoder() - /** - * @type {Array} - */ - const userStates = [] - doc.awareness.forEach((state, clientID) => { - userStates.push({ state, clientID, clock: (doc.awarenessClock.get(clientID) || 0) }) - }) - encoding.writeVarUint(encoder, messageAwareness) - awarenessProtocol.writeUsersStateChange(encoder, userStates) - conn.send(encoding.toBuffer(encoder)) - } -} - -wss.on('connection', setupConnection) +wss.on('connection', setupWSConnection) server.on('upgrade', (request, socket, head) => { // You may check auth of request here.. diff --git a/bin/utils.js b/bin/utils.js new file mode 100644 index 0000000..83e6eb5 --- /dev/null +++ b/bin/utils.js @@ -0,0 +1,165 @@ +const Y = require('yjs') +const syncProtocol = require('y-protocols/dist/sync.js') +const awarenessProtocol = require('y-protocols/dist/awareness.js') + +const encoding = require('lib0/dist/encoding.js') +const decoding = require('lib0/dist/decoding.js') +const mutex = require('lib0/dist/mutex.js') +const map = require('lib0/dist/map.js') + +// disable gc when using snapshots! +const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0' +const persistenceDir = process.env.YPERSISTENCE +/** + * @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise}|null} + */ +let persistence = null +if (typeof persistenceDir === 'string') { + // @ts-ignore + const LevelDbPersistence = require('y-leveldb').LevelDbPersistence + persistence = new LevelDbPersistence(persistenceDir) +} + +/** + * @type {Map} + */ +const docs = new Map() + +const messageSync = 0 +const messageAwareness = 1 +// const messageAuth = 2 + +/** + * @param {Y.Transaction} transaction + * @param {WSSharedDoc} doc + */ +const afterTransaction = (transaction, doc) => { + if (transaction.updateMessage !== null) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, transaction.updateMessage) + const message = encoding.toBuffer(encoder) + doc.conns.forEach((_, conn) => conn.send(message)) + } +} + +class WSSharedDoc extends Y.Y { + constructor () { + super({ gc: gcEnabled }) + this.mux = mutex.createMutex() + /** + * Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed + * @type {Map>} + */ + this.conns = new Map() + /** + * @type {Map} + */ + this.awareness = new Map() + /** + * @type {Map} + */ + this.awarenessClock = new Map() + this.on('afterTransaction', afterTransaction) + } +} + +/** + * @param {any} conn + * @param {WSSharedDoc} doc + * @param {ArrayBuffer} message + */ +const messageListener = (conn, doc, message) => { + const encoder = encoding.createEncoder() + const decoder = decoding.createDecoder(message) + const messageType = decoding.readVarUint(decoder) + switch (messageType) { + case messageSync: + encoding.writeVarUint(encoder, messageSync) + syncProtocol.readSyncMessage(decoder, encoder, doc, null) + if (encoding.length(encoder) > 1) { + conn.send(encoding.toBuffer(encoder)) + } + break + case messageAwareness: { + encoding.writeVarUint(encoder, messageAwareness) + const updates = awarenessProtocol.forwardAwarenessMessage(decoder, encoder) + updates.forEach(update => { + doc.awareness.set(update.clientID, update.state) + doc.awarenessClock.set(update.clientID, update.clock) + // @ts-ignore we received an update => so conn exists + doc.conns.get(conn).add(update.clientID) + }) + const buff = encoding.toBuffer(encoder) + doc.conns.forEach((_, c) => { + c.send(buff) + }) + break + } + } +} + +/** + * @param {any} conn + * @param {any} req + */ +exports.setupWSConnection = (conn, req) => { + conn.binaryType = 'arraybuffer' + // get doc, create if it does not exist yet + const docName = req.url.slice(1) + const doc = map.setIfUndefined(docs, docName, () => { + const doc = new WSSharedDoc() + if (persistence !== null) { + persistence.bindState(docName, doc) + } + docs.set(docName, doc) + return doc + }) + doc.conns.set(conn, new Set()) + // listen and reply to events + // @ts-ignore + conn.on('message', message => messageListener(conn, doc, message)) + conn.on('close', () => { + /** + * @type {Set} + */ + // @ts-ignore + const controlledIds = doc.conns.get(conn) + doc.conns.delete(conn) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(clientID => { + const clock = (doc.awarenessClock.get(clientID) || 0) + 1 + doc.awareness.delete(clientID) + doc.awarenessClock.delete(clientID) + return { clientID, state: null, clock } + })) + const buf = encoding.toBuffer(encoder) + doc.conns.forEach((_, conn) => conn.send(buf)) + if (doc.conns.size === 0 && persistence !== null) { + // if persisted, we store state and destroy ydocument + persistence.writeState(docName, doc).then(() => { + doc.destroy() + }) + docs.delete(docName) + } + }) + // send sync step 1 + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, doc.store) + conn.send(encoding.toBuffer(encoder)) + if (doc.awareness.size > 0) { + const encoder = encoding.createEncoder() + /** + * @type {Array} + */ + const userStates = [] + doc.awareness.forEach((state, clientID) => { + userStates.push({ state, clientID, clock: (doc.awarenessClock.get(clientID) || 0) }) + }) + encoding.writeVarUint(encoder, messageAwareness) + awarenessProtocol.writeUsersStateChange(encoder, userStates) + conn.send(encoding.toBuffer(encoder)) + } +} diff --git a/package-lock.json b/package-lock.json index fb0388e..882f6ad 100644 --- a/package-lock.json +++ b/package-lock.json @@ -967,6 +967,12 @@ "type-check": "~0.3.2" } }, + "lib0": { + "version": "0.0.2", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.0.2.tgz", + "integrity": "sha512-7bJgV2emHGRO5kpj66Gdc9SQKVfhDBLx0UIS/aU5P8R0179nRFHKDTYGvLlNloWbeUUARlqk3ndFIO4JbUy7Sw==", + "dev": true + }, "load-json-file": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/load-json-file/-/load-json-file-2.0.0.tgz", @@ -1697,6 +1703,15 @@ "version": "0.0.2", "resolved": "https://registry.npmjs.org/y-protocols/-/y-protocols-0.0.2.tgz", "integrity": "sha512-ixAaywK7USrMX7h6H2PZ59rtNVZcfJCNO0+/gDhAV1Sizwxdt0T6wPm9RCxDJtY3pXNeWA8MgtBysMGkgGA5xA==" + }, + "yjs": { + "version": "13.0.0-80", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-80.tgz", + "integrity": "sha512-kKS7U6JerP1MwnJ5W0aRvqWb2YLqrng8NXirLVq8b/dPzY+hYRGhmxHvMZ/zGZ053A5eTqmprCTE5M22BOm3Yw==", + "dev": true, + "requires": { + "lib0": "0.0.2" + } } } } diff --git a/package.json b/package.json index 355595a..ee145ad 100644 --- a/package.json +++ b/package.json @@ -39,7 +39,8 @@ "devDependencies": { "rollup": "^1.1.2", "rollup-cli": "^1.0.9", - "standard": "^12.0.1" + "standard": "^12.0.1", + "yjs": "13.0.0-80" }, "peerDependenies": { "lib0": "*", diff --git a/tsconfig.json b/tsconfig.json index 5941f6e..b559176 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -38,8 +38,7 @@ "moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */ "baseUrl": "./", /* Base directory to resolve non-absolute module names. */ "paths": { - "yjs": ["node_modules/yjs/src/index.js"], - "lib0": ["node_modules/lib0"] + "yjs": ["node_modules/yjs/src/index.js"] }, // "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */ // "typeRoots": [], /* List of folders to include type definitions from. */