From 397164d7bcc1ed9b1f7dfc3aa7107507e40f85d4 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Thu, 1 Aug 2019 20:20:00 +0200 Subject: [PATCH] never actually set local state to null (only user should be able to do that) --- src/y-websocket.js | 267 +++++++++++++++++++++++++-------------------- 1 file changed, 151 insertions(+), 116 deletions(-) diff --git a/src/y-websocket.js b/src/y-websocket.js index d068154..2d2bfa9 100644 --- a/src/y-websocket.js +++ b/src/y-websocket.js @@ -25,6 +25,110 @@ const messageAuth = 2 const reconnectTimeout = 3000 +/** + * @param {WebsocketProvider} provider + * @param {string} reason + */ +const permissionDeniedHandler = (provider, reason) => console.warn(`Permission denied to access ${provider.url}.\n${reason}`) + +/** + * @param {WebsocketProvider} provider + * @param {Uint8Array} buf + * @return {encoding.Encoder} + */ +const readMessage = (provider, buf) => { + const decoder = decoding.createDecoder(buf) + const encoder = encoding.createEncoder() + const messageType = decoding.readVarUint(decoder) + switch (messageType) { + case messageSync: + encoding.writeVarUint(encoder, messageSync) + syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider) + break + case messageQueryAwareness: + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys()))) + break + case messageAwareness: + awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider) + break + case messageAuth: + authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler) + break + default: + console.error('Unable to compute message') + return encoder + } + return encoder +} + +/** + * @param {WebsocketProvider} provider + */ +const setupWS = provider => { + if (provider.shouldConnect && provider.ws === null) { + const websocket = new WebSocket(provider.url) + websocket.binaryType = 'arraybuffer' + provider.ws = websocket + provider.wsconnecting = true + provider.wsconnected = false + websocket.onmessage = event => { + const encoder = readMessage(provider, new Uint8Array(event.data)) + if (encoding.length(encoder) > 1) { + websocket.send(encoding.toUint8Array(encoder)) + } + } + websocket.onclose = () => { + provider.ws = null + provider.wsconnecting = false + provider.wsconnected = false + if (provider.wsconnected) { + // update awareness (all users left) + awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider) + provider.emit('status', [{ + status: 'disconnected' + }]) + } + setTimeout(setupWS, reconnectTimeout, provider) + } + websocket.onopen = () => { + provider.wsconnecting = false + provider.wsconnected = true + provider.emit('status', [{ + status: 'connected' + }]) + // always send sync step 1 when connected + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, provider.doc) + websocket.send(encoding.toUint8Array(encoder)) + // broadcast local awareness state + if (provider.awareness.getLocalState() !== null) { + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [provider.doc.clientID])) + websocket.send(encoding.toUint8Array(encoderAwarenessState)) + } + } + } +} + +/** + * @param {WebsocketProvider} provider + * @param {ArrayBuffer} buf + */ +const broadcastMessage = (provider, buf) => { + if (provider.wsconnected) { + // @ts-ignore We know that wsconnected = true + provider.ws.send(buf) + } + if (provider.bcconnected) { + provider.mux(() => { + bc.publish(provider.url, buf) + }) + } +} + /** * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. * The document name is attached to the provided url. I.e. the following example @@ -61,12 +165,18 @@ export class WebsocketProvider extends Observable { this.db = db this.awareness = awareness this.wsconnected = false + this.wsconnecting = false + this.bcconnected = false this.mux = mutex.createMutex() /** * @type {WebSocket?} */ this.ws = null - this.shouldReconnect = true + /** + * Whether to connect to other peers or not + * @type {boolean} + */ + this.shouldConnect = true /** * @param {ArrayBuffer} data */ @@ -84,18 +194,11 @@ export class WebsocketProvider extends Observable { * @param {any} origin */ this._updateHandler = (update, origin) => { - if (origin !== this.ws || origin === null) { + if (origin !== this || origin === null) { const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) syncProtocol.writeUpdate(encoder, update) - const buf = encoding.toUint8Array(encoder) - if (this.wsconnected) { - // @ts-ignore We know that wsconnected = true - this.ws.send(buf) - } - this.mux(() => { - bc.publish(this.url, buf) - }) + broadcastMessage(this, encoding.toUint8Array(encoder)) } } /** @@ -103,23 +206,17 @@ export class WebsocketProvider extends Observable { * @param {any} origin */ this._awarenessUpdateHandler = ({ added, updated, removed }, origin) => { - // only broadcast local awareness information and when ws connected - const predicate = /** @param {number} id */ id => id === doc.clientID - if (added.some(predicate) || updated.some(predicate) || removed.some(predicate)) { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageAwareness) - encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, [doc.clientID])) - const buf = encoding.toUint8Array(encoder) - if (this.wsconnected && this.ws !== null) { - this.ws.send(buf) - } - this.mux(() => { - bc.publish(this.url, buf) - }) - } + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, [doc.clientID])) + broadcastMessage(this, encoding.toUint8Array(encoder)) } window.addEventListener('beforeunload', () => { - awarenessProtocol.removeAwarenessStates(awareness, [this.doc.clientID], null) + // broadcast message with local awareness state set to null (indicating disconnect) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID], new Map())) + broadcastMessage(this, encoding.toUint8Array(encoder)) }) awareness.on('change', this._awarenessUpdateHandler) this.connect() @@ -130,22 +227,29 @@ export class WebsocketProvider extends Observable { super.destroy() } disconnect () { - this.shouldReconnect = false + this.shouldConnect = false + // broadcast message with local awareness state set to null (indicating disconnect) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID], new Map())) + broadcastMessage(this, encoding.toUint8Array(encoder)) if (this.ws !== null) { - this.awareness.setLocalState(null) this.ws.close() - bc.unsubscribe(this.url, this._bcSubscriber) - this.doc.off('update', this._updateHandler) } + if (this.bcconnected) { + bc.unsubscribe(this.url, this._bcSubscriber) + this.bcconnected = false + } + this.doc.off('update', this._updateHandler) } connect () { - this.shouldReconnect = true + this.shouldConnect = true if (!this.wsconnected && this.ws === null) { - if (this.awareness.getLocalState() === null) { - this.awareness.setLocalState({}) - } setupWS(this) - bc.subscribe(this.url, this._bcSubscriber) + if (!this.bcconnected) { + bc.subscribe(this.url, this._bcSubscriber) + this.bcconnected = true + } // send sync step1 to bc this.mux(() => { // write sync step 1 @@ -153,91 +257,22 @@ export class WebsocketProvider extends Observable { encoding.writeVarUint(encoderSync, messageSync) syncProtocol.writeSyncStep1(encoderSync, this.doc) bc.publish(this.url, encoding.toUint8Array(encoderSync)) + // broadcast local state + const encoderState = encoding.createEncoder() + encoding.writeVarUint(encoderState, messageSync) + syncProtocol.writeSyncStep2(encoderState, this.doc) + bc.publish(this.url, encoding.toUint8Array(encoderState)) // write queryAwareness - const encoderAwareness = encoding.createEncoder() - encoding.writeVarUint(encoderAwareness, messageQueryAwareness) - bc.publish(this.url, encoding.toUint8Array(encoderAwareness)) + const encoderAwarenessQuery = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) + bc.publish(this.url, encoding.toUint8Array(encoderAwarenessQuery)) + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID])) + bc.publish(this.url, encoding.toUint8Array(encoderAwarenessState)) }) this.doc.on('update', this._updateHandler) } } } - -/** - * @param {WebsocketProvider} provider - * @param {string} reason - */ -const permissionDeniedHandler = (provider, reason) => console.warn(`Permission denied to access ${provider.url}.\n${reason}`) - -/** - * @param {WebsocketProvider} provider - * @param {Uint8Array} buf - * @return {encoding.Encoder} - */ -const readMessage = (provider, buf) => { - const decoder = decoding.createDecoder(buf) - const encoder = encoding.createEncoder() - const messageType = decoding.readVarUint(decoder) - switch (messageType) { - case messageSync: - encoding.writeVarUint(encoder, messageSync) - syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider.ws) - break - case messageQueryAwareness: - encoding.writeVarUint(encoder, messageAwareness) - encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys()))) - break - case messageAwareness: - awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider) - break - case messageAuth: - authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler) - break - default: - console.error('Unable to compute message') - return encoder - } - return encoder -} - -/** - * @param {WebsocketProvider} provider - */ -const setupWS = provider => { - const websocket = new WebSocket(provider.url) - websocket.binaryType = 'arraybuffer' - provider.ws = websocket - websocket.onmessage = event => { - const encoder = readMessage(provider, new Uint8Array(event.data)) - if (encoding.length(encoder) > 1) { - websocket.send(encoding.toUint8Array(encoder)) - } - } - websocket.onclose = () => { - provider.ws = null - if (provider.wsconnected) { - provider.wsconnected = false - // update awareness (all users left) - awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider) - provider.emit('status', [{ - status: 'disconnected' - }]) - } - if (provider.shouldReconnect) { - setTimeout(setupWS, reconnectTimeout, provider, provider.url) - } - } - websocket.onopen = () => { - provider.wsconnected = true - provider.emit('status', [{ - status: 'connected' - }]) - // always send sync step 1 when connected - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, provider.doc) - websocket.send(encoding.toUint8Array(encoder)) - // by updating the local awareness state we trigger the event handler that propagates this information to other clients. - provider.awareness.setLocalState(provider.awareness.getLocalState() || {}) - } -}