diff --git a/README.md b/README.md index 30f565b..63799c7 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ PORT=1234 YPERSISTENCE=./dbDir node ./node_modules/y-websocket/bin/server.js These are mere suggestions how you could scale your server environment. -**Option 1:** Websocket servers communicate with each other via a PubSub server. A room is represented by a PubSub channel. The downside of this approach is that the same shared document may be handled by many servers. But the upside is that this approach is fault tolerant, does not have a single point of failure, and is perfectly fit for route balancing. +**Option 1:** Websocket servers communicate with each other via a PubSub server. A room is represented by a PubSub channel. The downside of this approach is that the same shared document may be handled by many servers. But the upside is that this approach is fault tolerant, does not have a single point of failure, and is fit for route balancing. **Option 2:** Sharding with *consistent hashing*. Each document is handled by a unique server. This patterns requires an entity, like etcd, that performs regular health checks and manages servers. Based on the list of available servers (which is managed by etcd) a proxy calculates which server is responsible for each requested document. The disadvantage of this approach is that it is that load distribution may not be fair. Still, this approach may be the preferred solution if you want to store the shared document in a database - e.g. for indexing. diff --git a/bin/utils.js b/bin/utils.js index bb22e5c..89d226f 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -26,7 +26,7 @@ if (typeof persistenceDir === 'string') { } /** - * @type {Map} + * @type {Map} */ const docs = new Map() @@ -61,13 +61,30 @@ class WSSharedDoc extends Y.Doc { */ this.conns = new Map() /** - * @type {Map} + * @type {awarenessProtocol.Awareness} */ - this.awareness = new Map() + this.awareness = new awarenessProtocol.Awareness(this) /** - * @type {Map} + * @param {{ added: Array, updated: Array, removed: Array }} changes + * @param {Object | null} conn Origin is the connection that made the change */ - this.awarenessClock = new Map() + const awarenessChangeHandler = ({ added, updated, removed }, conn) => { + const changedClients = added.concat(updated, removed) + if (conn !== null) { + const connControlledIDs = /** @type {Set} */ (this.conns.get(conn)) + added.forEach(clientID => { connControlledIDs.add(clientID) }) + removed.forEach(clientID => { connControlledIDs.delete(clientID) }) + } + // broadcast awareness update + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients)) + const buff = encoding.toUint8Array(encoder) + this.conns.forEach((_, c) => { + send(this, c, buff) + }) + } + this.awareness.on('change', awarenessChangeHandler) this.on('update', updateHandler) } } @@ -90,18 +107,7 @@ const messageListener = (conn, doc, message) => { } break case messageAwareness: { - encoding.writeVarUint(encoder, messageAwareness) - const updates = awarenessProtocol.forwardAwarenessMessage(decoder, encoder) - updates.forEach(update => { - doc.awareness.set(update.clientID, update.state) - doc.awarenessClock.set(update.clientID, update.clock) - // @ts-ignore we received an update => so conn exists - doc.conns.get(conn).add(update.clientID) - }) - const buff = encoding.toUint8Array(encoder) - doc.conns.forEach((_, c) => { - send(doc, c, buff) - }) + awarenessProtocol.applyAwarenessUpdate(doc.awareness, decoding.readVarUint8Array(decoder), conn) break } } @@ -119,24 +125,13 @@ const closeConn = (doc, conn) => { // @ts-ignore const controlledIds = doc.conns.get(conn) doc.conns.delete(conn) - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageAwareness) - awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(clientID => { - const clock = (doc.awarenessClock.get(clientID) || 0) + 1 - doc.awareness.delete(clientID) - doc.awarenessClock.delete(clientID) - return { clientID, state: null, clock } - })) - const buf = encoding.toUint8Array(encoder) - doc.conns.forEach((_, conn) => { - send(doc, conn, buf) - }) + awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null) if (doc.conns.size === 0 && persistence !== null) { // if persisted, we store state and destroy ydocument persistence.writeState(doc.name, doc).then(() => { doc.destroy() }) - doc.conns.delete(doc.name) + docs.delete(doc.name) } } conn.close() @@ -167,6 +162,9 @@ const pingTimeout = 30000 exports.setupWSConnection = (conn, req) => { conn.binaryType = 'arraybuffer' // get doc, create if it does not exist yet + /** + * @type {string} + */ const docName = req.url.slice(1) const doc = map.setIfUndefined(docs, docName, () => { const doc = new WSSharedDoc(docName) @@ -207,17 +205,11 @@ exports.setupWSConnection = (conn, req) => { encoding.writeVarUint(encoder, messageSync) syncProtocol.writeSyncStep1(encoder, doc) send(doc, conn, encoding.toUint8Array(encoder)) - if (doc.awareness.size > 0) { + const awarenessStates = doc.awareness.getStates() + if (awarenessStates.size > 0) { const encoder = encoding.createEncoder() - /** - * @type {Array} - */ - const userStates = [] - doc.awareness.forEach((state, clientID) => { - userStates.push({ state, clientID, clock: (doc.awarenessClock.get(clientID) || 0) }) - }) encoding.writeVarUint(encoder, messageAwareness) - awarenessProtocol.writeUsersStateChange(encoder, userStates) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys()))) send(doc, conn, encoding.toUint8Array(encoder)) } } diff --git a/package-lock.json b/package-lock.json index 920b6fd..1c74050 100644 --- a/package-lock.json +++ b/package-lock.json @@ -968,9 +968,9 @@ } }, "lib0": { - "version": "0.0.4", - "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.0.4.tgz", - "integrity": "sha512-osSGIxFM0mUuVAclVOQAio4lq0YYk1xFfj6J+1i3u5az8rXAQKDil2skA19aiiG0sfAdasOtr8Mk+9Mrw10cfQ==" + "version": "0.0.5", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.0.5.tgz", + "integrity": "sha512-3ElV6/t5Lv0Eczlnh/05q+Uq3RxQ/Q0zdN6LVtaUERQIDDZsP/CUXEGLsV8KZTgZwVFNCPGXNWYE+3WTOo+SHw==" }, "load-json-file": { "version": "2.0.0", @@ -1699,20 +1699,20 @@ "dev": true }, "y-protocols": { - "version": "0.0.5", - "resolved": "https://registry.npmjs.org/y-protocols/-/y-protocols-0.0.5.tgz", - "integrity": "sha512-4ZKcDxM2A83ZeGi7WqqK+mS0QRFP0L7Xd8JLxjX+fZeWiUIfYin16f8/0liMaHvGOHgfpomQrQsZm4HCkObgmQ==", + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/y-protocols/-/y-protocols-0.0.6.tgz", + "integrity": "sha512-XgUBKrFesfUYN3wXmVp9Exy7dOUOeX3A56gHNuI1ZNy9N7OdwoBv2TGfbvSH6+YpV1IEvEq7u5v0/je5MwXKJg==", "requires": { - "lib0": "0.0.4" + "lib0": "0.0.5" } }, "yjs": { - "version": "13.0.0-82", - "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-82.tgz", - "integrity": "sha512-pU3siEW0j+pKynWtnT7wKYn6797rdM/FnxsgdjFKJhUzdmeN1vYYZ+hTtPVWg/v8cjIVWCcWebMJZ1IhqFm5yQ==", + "version": "13.0.0-83", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-83.tgz", + "integrity": "sha512-8M1X8fZ95odf2VU8BHrKcYr0PeEsx8tspV5svh4oLp8BVcIprbp0J2hzCvJDlOFOlyJVgvNf00UJ4uiyDKmk5A==", "dev": true, "requires": { - "lib0": "0.0.4" + "lib0": "0.0.5" } } } diff --git a/package.json b/package.json index 3519f32..aff4a71 100644 --- a/package.json +++ b/package.json @@ -35,17 +35,17 @@ }, "homepage": "https://github.com/y-js/y-websocket#readme", "dependencies": { - "y-protocols": "0.0.5", - "lib0": "0.0.4" + "y-protocols": "0.0.6", + "lib0": "0.0.5" }, "devDependencies": { "rollup": "^1.1.2", "rollup-cli": "^1.0.9", "standard": "^12.0.1", - "yjs": "13.0.0-82" + "yjs": "13.0.0-83" }, "peerDependenies": { - "yjs": "13.0.0-82" + "yjs": "13.0.0-83" }, "optionalDependencies": { "ws": "^6.2.1" diff --git a/src/y-websocket.js b/src/y-websocket.js index c8354a1..10fda74 100644 --- a/src/y-websocket.js +++ b/src/y-websocket.js @@ -19,11 +19,148 @@ import * as mutex from 'lib0/mutex.js' import { Observable } from 'lib0/observable.js' const messageSync = 0 +const messageQueryAwareness = 3 const messageAwareness = 1 const messageAuth = 2 const reconnectTimeout = 3000 +/** + * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. + * The document name is attached to the provided url. I.e. the following example + * creates a websocket connection to http://localhost:1234/my-document-name + * + * @example + * import * as Y from 'yjs' + * import { WebsocketProvider } from 'y-websocket' + * const doc = new Y.Doc() + * const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc) + * + * @extends {Observable} + */ +export class WebsocketProvider extends Observable { + /** + * @param {string} url + * @param {string} roomname + * @param {Y.Doc} doc + */ + constructor (url, roomname, doc, awareness = new awarenessProtocol.Awareness(doc)) { + super() + window.addEventListener('beforeunload', () => { + awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], null) + }) + // ensure that url is always ends with / + while (url[url.length - 1] === '/') { + url = url.slice(0, url.length - 1) + } + this.url = url + '/' + roomname + this.roomname = roomname + this.doc = doc + /** + * @type {Object} + */ + this._localAwarenessState = {} + this.awareness = awareness + this.wsconnected = false + this.mux = mutex.createMutex() + /** + * @type {WebSocket?} + */ + this.ws = null + this.shouldReconnect = true + /** + * @param {ArrayBuffer} data + */ + this._bcSubscriber = data => { + this.mux(() => { + const encoder = readMessage(this, new Uint8Array(data)) + if (encoding.length(encoder) > 1) { + bc.publish(this.url, encoding.toUint8Array(encoder)) + } + }) + } + /** + * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) + * @param {Uint8Array} update + * @param {any} origin + */ + this._updateHandler = (update, origin) => { + if (origin !== this.ws || origin === null) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, update) + const buf = encoding.toUint8Array(encoder) + if (this.wsconnected) { + // @ts-ignore We know that wsconnected = true + this.ws.send(buf) + } + this.mux(() => { + bc.publish(this.url, buf) + }) + } + } + /** + * @param {any} changed + * @param {any} origin + */ + this._awarenessUpdateHandler = ({ added, updated, removed }, origin) => { + // only broadcast local awareness information and when ws connected + const predicate = /** @param {number} id */ id => id === doc.clientID + if (added.some(predicate) || updated.some(predicate) || removed.some(predicate)) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, [doc.clientID])) + const buf = encoding.toUint8Array(encoder) + if (this.wsconnected && this.ws !== null) { + this.ws.send(buf) + } + this.mux(() => { + bc.publish(this.url, buf) + }) + } + } + awareness.on('change', this._awarenessUpdateHandler) + this.connect() + } + destroy () { + this.disconnect() + this.awareness.off('change', this._awarenessUpdateHandler) + super.destroy() + } + disconnect () { + this.shouldReconnect = false + if (this.ws !== null) { + this.awareness.setLocalState(null) + this.ws.close() + bc.unsubscribe(this.url, this._bcSubscriber) + this.doc.off('update', this._updateHandler) + } + } + connect () { + this.shouldReconnect = true + if (!this.wsconnected && this.ws === null) { + if (this.awareness.getLocalState() === null) { + this.awareness.setLocalState({}) + } + setupWS(this) + bc.subscribe(this.url, this._bcSubscriber) + // send sync step1 to bc + this.mux(() => { + // write sync step 1 + const encoderSync = encoding.createEncoder() + encoding.writeVarUint(encoderSync, messageSync) + syncProtocol.writeSyncStep1(encoderSync, this.doc) + bc.publish(this.url, encoding.toUint8Array(encoderSync)) + // write queryAwareness + const encoderAwareness = encoding.createEncoder() + encoding.writeVarUint(encoderAwareness, messageQueryAwareness) + bc.publish(this.url, encoding.toUint8Array(encoderAwareness)) + }) + this.doc.on('update', this._updateHandler) + } + } +} + /** * @param {WebsocketProvider} provider * @param {string} reason @@ -44,13 +181,19 @@ const readMessage = (provider, buf) => { encoding.writeVarUint(encoder, messageSync) syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider.ws) break + case messageQueryAwareness: + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys()))) + break case messageAwareness: - provider.mux(() => - awarenessProtocol.readAwarenessMessage(decoder, provider) - ) + awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider) break case messageAuth: authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler) + break + default: + console.error('Unable to compute message') + return encoder } return encoder } @@ -70,22 +213,14 @@ const setupWS = provider => { } websocket.onclose = () => { provider.ws = null - provider.wsconnected = false - // update awareness (all users left) - /** - * @type {Array} - */ - const removed = [] - provider.getAwarenessInfo().forEach((_, clientID) => { - removed.push(clientID) - }) - provider.awareness = new Map() - provider.emit('awareness', [{ - added: [], updated: [], removed - }]) - provider.emit('status', [{ - status: 'disconnected' - }]) + if (provider.wsconnected) { + provider.wsconnected = false + // update awareness (all users left) + awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider) + provider.emit('status', [{ + status: 'disconnected' + }]) + } if (provider.shouldReconnect) { setTimeout(setupWS, reconnectTimeout, provider, provider.url) } @@ -100,103 +235,7 @@ const setupWS = provider => { encoding.writeVarUint(encoder, messageSync) syncProtocol.writeSyncStep1(encoder, provider.doc) websocket.send(encoding.toUint8Array(encoder)) - // force send stored awareness info - provider.setAwarenessField(null, null) - } -} - -/** - * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. - * The document name is attached to the provided url. I.e. the following example - * creates a websocket connection to http://localhost:1234/my-document-name - * - * @example - * import * as Y from 'yjs' - * import { WebsocketProvider } from 'y-websocket' - * const doc = new Y.Doc() - * const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc) - * - * @extends {Observable} - */ -export class WebsocketProvider extends Observable { - /** - * @param {string} url - * @param {string} roomname - * @param {Y.Doc} doc - */ - constructor (url, roomname, doc) { - super() - // ensure that url is always ends with / - while (url[url.length - 1] === '/') { - url = url.slice(0, url.length - 1) - } - this.url = url + '/' + roomname - this.roomname = roomname - this.doc = doc - /** - * @type {Object} - */ - this._localAwarenessState = {} - this.awareness = new Map() - this.awarenessClock = new Map() - this.wsconnected = false - this.mux = mutex.createMutex() - /** - * @type {WebSocket?} - */ - this.ws = null - this.shouldReconnect = true - /** - * @param {ArrayBuffer} data - */ - this._bcSubscriber = data => { - const encoder = readMessage(this, new Uint8Array(data)) - this.mux(() => { - if (encoding.length(encoder) > 1) { - bc.publish(url, encoding.toUint8Array(encoder)) - } - }) - } - /** - * @param {Uint8Array} update - * @param {any} origin - */ - this._updateHandler = (update, origin) => { - if (origin !== this.ws) { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeUpdate(encoder, update) - const buf = encoding.toUint8Array(encoder) - if (this.wsconnected) { - // @ts-ignore We know that wsconnected = true - this.ws.send(buf) - } - bc.publish(this.url, buf) - } - } - this.connect() - } - disconnect () { - this.shouldReconnect = false - if (this.ws !== null) { - this.ws.close() - bc.unsubscribe(this.url, this._bcSubscriber) - this.off('update', this._updateHandler) - } - } - connect () { - this.shouldReconnect = true - if (!this.wsconnected && this.ws === null) { - setupWS(this) - bc.subscribe(this.url, this._bcSubscriber) - // send sync step1 to bc - this.mux(() => { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, this.doc) - bc.publish(this.url, encoding.toUint8Array(encoder)) - }) - this.on('update', this._updateHandler) - } + // by updating the local awareness state we trigger the event handler that propagates this information to other clients. + provider.awareness.setLocalState(provider.awareness.getLocalState() || {}) } }