remove usage of lib0/mutex - closes #99

This commit is contained in:
Kevin Jahns 2022-04-17 16:17:27 +02:00
parent 6fcabc57e0
commit d6df8ad11f
2 changed files with 32 additions and 36 deletions

13
package-lock.json generated
View File

@ -14,6 +14,7 @@
"y-protocols": "^1.0.5"
},
"bin": {
"y-websocket": "bin/server.js",
"y-websocket-server": "bin/server.js"
},
"devDependencies": {
@ -1747,9 +1748,9 @@
}
},
"node_modules/lib0": {
"version": "0.2.47",
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.47.tgz",
"integrity": "sha512-RXprIyaflw7OmFNMpb8HmvDhuRVUFXYCXrmynQN8OGbGevgMx9u6tjQG/yB0dOoDcuB1XXgqFn8Oy3RlKF/Qhg==",
"version": "0.2.51",
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.51.tgz",
"integrity": "sha512-05Erb3465CxJa38LQlMz4EbetNvRna1S3BzqEjC0/pmp5cQuQSfNNmeS0722Wev1dRlMUp2Cql0gQ55krSXf2Q==",
"dependencies": {
"isomorphic.js": "^0.2.4"
},
@ -4225,9 +4226,9 @@
}
},
"lib0": {
"version": "0.2.47",
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.47.tgz",
"integrity": "sha512-RXprIyaflw7OmFNMpb8HmvDhuRVUFXYCXrmynQN8OGbGevgMx9u6tjQG/yB0dOoDcuB1XXgqFn8Oy3RlKF/Qhg==",
"version": "0.2.51",
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.51.tgz",
"integrity": "sha512-05Erb3465CxJa38LQlMz4EbetNvRna1S3BzqEjC0/pmp5cQuQSfNNmeS0722Wev1dRlMUp2Cql0gQ55krSXf2Q==",
"requires": {
"isomorphic.js": "^0.2.4"
}

View File

@ -16,7 +16,6 @@ import * as decoding from 'lib0/decoding'
import * as syncProtocol from 'y-protocols/sync'
import * as authProtocol from 'y-protocols/auth'
import * as awarenessProtocol from 'y-protocols/awareness'
import * as mutex from 'lib0/mutex'
import { Observable } from 'lib0/observable'
import * as math from 'lib0/math'
import * as url from 'lib0/url'
@ -159,9 +158,7 @@ const broadcastMessage = (provider, buf) => {
/** @type {WebSocket} */ (provider.ws).send(buf)
}
if (provider.bcconnected) {
provider.mux(() => {
bc.publish(provider.bcChannel, buf)
})
bc.publish(provider.bcChannel, buf, provider)
}
}
@ -210,7 +207,6 @@ export class WebsocketProvider extends Observable {
this.bcconnected = false
this.wsUnsuccessfulReconnects = 0
this.messageHandlers = messageHandlers.slice()
this.mux = mutex.createMutex()
/**
* @type {boolean}
*/
@ -244,14 +240,15 @@ export class WebsocketProvider extends Observable {
/**
* @param {ArrayBuffer} data
* @param {any} origin
*/
this._bcSubscriber = data => {
this.mux(() => {
this._bcSubscriber = (data, origin) => {
if (origin !== this) {
const encoder = readMessage(this, new Uint8Array(data), false)
if (encoding.length(encoder) > 1) {
bc.publish(this.bcChannel, encoding.toUint8Array(encoder))
bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this)
}
})
}
}
/**
* Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
@ -336,27 +333,25 @@ export class WebsocketProvider extends Observable {
this.bcconnected = true
}
// send sync step1 to bc
this.mux(() => {
// write sync step 1
const encoderSync = encoding.createEncoder()
encoding.writeVarUint(encoderSync, messageSync)
syncProtocol.writeSyncStep1(encoderSync, this.doc)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync))
// broadcast local state
const encoderState = encoding.createEncoder()
encoding.writeVarUint(encoderState, messageSync)
syncProtocol.writeSyncStep2(encoderState, this.doc)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderState))
// write queryAwareness
const encoderAwarenessQuery = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessQuery))
// broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessState, messageAwareness)
encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID]))
bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessState))
})
// write sync step 1
const encoderSync = encoding.createEncoder()
encoding.writeVarUint(encoderSync, messageSync)
syncProtocol.writeSyncStep1(encoderSync, this.doc)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this)
// broadcast local state
const encoderState = encoding.createEncoder()
encoding.writeVarUint(encoderState, messageSync)
syncProtocol.writeSyncStep2(encoderState, this.doc)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this)
// write queryAwareness
const encoderAwarenessQuery = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessQuery), this)
// broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessState, messageAwareness)
encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID]))
bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessState), this)
}
disconnectBc () {