From d6df8ad11f1f87921c27513c3cc65a370651ba55 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Sun, 17 Apr 2022 16:17:27 +0200 Subject: [PATCH] remove usage of lib0/mutex - closes #99 --- package-lock.json | 13 ++++++----- src/y-websocket.js | 55 +++++++++++++++++++++------------------------- 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/package-lock.json b/package-lock.json index c28c600..bdd34b3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "y-protocols": "^1.0.5" }, "bin": { + "y-websocket": "bin/server.js", "y-websocket-server": "bin/server.js" }, "devDependencies": { @@ -1747,9 +1748,9 @@ } }, "node_modules/lib0": { - "version": "0.2.47", - "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.47.tgz", - "integrity": "sha512-RXprIyaflw7OmFNMpb8HmvDhuRVUFXYCXrmynQN8OGbGevgMx9u6tjQG/yB0dOoDcuB1XXgqFn8Oy3RlKF/Qhg==", + "version": "0.2.51", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.51.tgz", + "integrity": "sha512-05Erb3465CxJa38LQlMz4EbetNvRna1S3BzqEjC0/pmp5cQuQSfNNmeS0722Wev1dRlMUp2Cql0gQ55krSXf2Q==", "dependencies": { "isomorphic.js": "^0.2.4" }, @@ -4225,9 +4226,9 @@ } }, "lib0": { - "version": "0.2.47", - "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.47.tgz", - "integrity": "sha512-RXprIyaflw7OmFNMpb8HmvDhuRVUFXYCXrmynQN8OGbGevgMx9u6tjQG/yB0dOoDcuB1XXgqFn8Oy3RlKF/Qhg==", + "version": "0.2.51", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.51.tgz", + "integrity": "sha512-05Erb3465CxJa38LQlMz4EbetNvRna1S3BzqEjC0/pmp5cQuQSfNNmeS0722Wev1dRlMUp2Cql0gQ55krSXf2Q==", "requires": { "isomorphic.js": "^0.2.4" } diff --git a/src/y-websocket.js b/src/y-websocket.js index 9ed9fba..3034cfa 100644 --- a/src/y-websocket.js +++ b/src/y-websocket.js @@ -16,7 +16,6 @@ import * as decoding from 'lib0/decoding' import * as syncProtocol from 'y-protocols/sync' import * as authProtocol from 'y-protocols/auth' import * as awarenessProtocol from 'y-protocols/awareness' -import * as mutex from 'lib0/mutex' import { Observable } from 'lib0/observable' import * as math from 'lib0/math' import * as url from 'lib0/url' @@ -159,9 +158,7 @@ const broadcastMessage = (provider, buf) => { /** @type {WebSocket} */ (provider.ws).send(buf) } if (provider.bcconnected) { - provider.mux(() => { - bc.publish(provider.bcChannel, buf) - }) + bc.publish(provider.bcChannel, buf, provider) } } @@ -210,7 +207,6 @@ export class WebsocketProvider extends Observable { this.bcconnected = false this.wsUnsuccessfulReconnects = 0 this.messageHandlers = messageHandlers.slice() - this.mux = mutex.createMutex() /** * @type {boolean} */ @@ -244,14 +240,15 @@ export class WebsocketProvider extends Observable { /** * @param {ArrayBuffer} data + * @param {any} origin */ - this._bcSubscriber = data => { - this.mux(() => { + this._bcSubscriber = (data, origin) => { + if (origin !== this) { const encoder = readMessage(this, new Uint8Array(data), false) if (encoding.length(encoder) > 1) { - bc.publish(this.bcChannel, encoding.toUint8Array(encoder)) + bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this) } - }) + } } /** * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) @@ -336,27 +333,25 @@ export class WebsocketProvider extends Observable { this.bcconnected = true } // send sync step1 to bc - this.mux(() => { - // write sync step 1 - const encoderSync = encoding.createEncoder() - encoding.writeVarUint(encoderSync, messageSync) - syncProtocol.writeSyncStep1(encoderSync, this.doc) - bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync)) - // broadcast local state - const encoderState = encoding.createEncoder() - encoding.writeVarUint(encoderState, messageSync) - syncProtocol.writeSyncStep2(encoderState, this.doc) - bc.publish(this.bcChannel, encoding.toUint8Array(encoderState)) - // write queryAwareness - const encoderAwarenessQuery = encoding.createEncoder() - encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) - bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessQuery)) - // broadcast local awareness state - const encoderAwarenessState = encoding.createEncoder() - encoding.writeVarUint(encoderAwarenessState, messageAwareness) - encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID])) - bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessState)) - }) + // write sync step 1 + const encoderSync = encoding.createEncoder() + encoding.writeVarUint(encoderSync, messageSync) + syncProtocol.writeSyncStep1(encoderSync, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this) + // broadcast local state + const encoderState = encoding.createEncoder() + encoding.writeVarUint(encoderState, messageSync) + syncProtocol.writeSyncStep2(encoderState, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this) + // write queryAwareness + const encoderAwarenessQuery = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessQuery), this) + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID])) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessState), this) } disconnectBc () {