never actually set local state to null (only user should be able to do that)

This commit is contained in:
Kevin Jahns 2019-08-01 20:20:00 +02:00
parent d2daa58db2
commit 397164d7bc

View File

@ -25,6 +25,110 @@ const messageAuth = 2
const reconnectTimeout = 3000
/**
* @param {WebsocketProvider} provider
* @param {string} reason
*/
const permissionDeniedHandler = (provider, reason) => console.warn(`Permission denied to access ${provider.url}.\n${reason}`)
/**
* @param {WebsocketProvider} provider
* @param {Uint8Array} buf
* @return {encoding.Encoder}
*/
const readMessage = (provider, buf) => {
const decoder = decoding.createDecoder(buf)
const encoder = encoding.createEncoder()
const messageType = decoding.readVarUint(decoder)
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider)
break
case messageQueryAwareness:
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys())))
break
case messageAwareness:
awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider)
break
case messageAuth:
authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler)
break
default:
console.error('Unable to compute message')
return encoder
}
return encoder
}
/**
* @param {WebsocketProvider} provider
*/
const setupWS = provider => {
if (provider.shouldConnect && provider.ws === null) {
const websocket = new WebSocket(provider.url)
websocket.binaryType = 'arraybuffer'
provider.ws = websocket
provider.wsconnecting = true
provider.wsconnected = false
websocket.onmessage = event => {
const encoder = readMessage(provider, new Uint8Array(event.data))
if (encoding.length(encoder) > 1) {
websocket.send(encoding.toUint8Array(encoder))
}
}
websocket.onclose = () => {
provider.ws = null
provider.wsconnecting = false
provider.wsconnected = false
if (provider.wsconnected) {
// update awareness (all users left)
awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider)
provider.emit('status', [{
status: 'disconnected'
}])
}
setTimeout(setupWS, reconnectTimeout, provider)
}
websocket.onopen = () => {
provider.wsconnecting = false
provider.wsconnected = true
provider.emit('status', [{
status: 'connected'
}])
// always send sync step 1 when connected
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, provider.doc)
websocket.send(encoding.toUint8Array(encoder))
// broadcast local awareness state
if (provider.awareness.getLocalState() !== null) {
const encoderAwarenessState = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessState, messageAwareness)
encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [provider.doc.clientID]))
websocket.send(encoding.toUint8Array(encoderAwarenessState))
}
}
}
}
/**
* @param {WebsocketProvider} provider
* @param {ArrayBuffer} buf
*/
const broadcastMessage = (provider, buf) => {
if (provider.wsconnected) {
// @ts-ignore We know that wsconnected = true
provider.ws.send(buf)
}
if (provider.bcconnected) {
provider.mux(() => {
bc.publish(provider.url, buf)
})
}
}
/**
* Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.
* The document name is attached to the provided url. I.e. the following example
@ -61,12 +165,18 @@ export class WebsocketProvider extends Observable {
this.db = db
this.awareness = awareness
this.wsconnected = false
this.wsconnecting = false
this.bcconnected = false
this.mux = mutex.createMutex()
/**
* @type {WebSocket?}
*/
this.ws = null
this.shouldReconnect = true
/**
* Whether to connect to other peers or not
* @type {boolean}
*/
this.shouldConnect = true
/**
* @param {ArrayBuffer} data
*/
@ -84,18 +194,11 @@ export class WebsocketProvider extends Observable {
* @param {any} origin
*/
this._updateHandler = (update, origin) => {
if (origin !== this.ws || origin === null) {
if (origin !== this || origin === null) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
const buf = encoding.toUint8Array(encoder)
if (this.wsconnected) {
// @ts-ignore We know that wsconnected = true
this.ws.send(buf)
}
this.mux(() => {
bc.publish(this.url, buf)
})
broadcastMessage(this, encoding.toUint8Array(encoder))
}
}
/**
@ -103,23 +206,17 @@ export class WebsocketProvider extends Observable {
* @param {any} origin
*/
this._awarenessUpdateHandler = ({ added, updated, removed }, origin) => {
// only broadcast local awareness information and when ws connected
const predicate = /** @param {number} id */ id => id === doc.clientID
if (added.some(predicate) || updated.some(predicate) || removed.some(predicate)) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, [doc.clientID]))
const buf = encoding.toUint8Array(encoder)
if (this.wsconnected && this.ws !== null) {
this.ws.send(buf)
}
this.mux(() => {
bc.publish(this.url, buf)
})
}
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, [doc.clientID]))
broadcastMessage(this, encoding.toUint8Array(encoder))
}
window.addEventListener('beforeunload', () => {
awarenessProtocol.removeAwarenessStates(awareness, [this.doc.clientID], null)
// broadcast message with local awareness state set to null (indicating disconnect)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID], new Map()))
broadcastMessage(this, encoding.toUint8Array(encoder))
})
awareness.on('change', this._awarenessUpdateHandler)
this.connect()
@ -130,22 +227,29 @@ export class WebsocketProvider extends Observable {
super.destroy()
}
disconnect () {
this.shouldReconnect = false
this.shouldConnect = false
// broadcast message with local awareness state set to null (indicating disconnect)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID], new Map()))
broadcastMessage(this, encoding.toUint8Array(encoder))
if (this.ws !== null) {
this.awareness.setLocalState(null)
this.ws.close()
bc.unsubscribe(this.url, this._bcSubscriber)
this.doc.off('update', this._updateHandler)
}
if (this.bcconnected) {
bc.unsubscribe(this.url, this._bcSubscriber)
this.bcconnected = false
}
this.doc.off('update', this._updateHandler)
}
connect () {
this.shouldReconnect = true
this.shouldConnect = true
if (!this.wsconnected && this.ws === null) {
if (this.awareness.getLocalState() === null) {
this.awareness.setLocalState({})
}
setupWS(this)
bc.subscribe(this.url, this._bcSubscriber)
if (!this.bcconnected) {
bc.subscribe(this.url, this._bcSubscriber)
this.bcconnected = true
}
// send sync step1 to bc
this.mux(() => {
// write sync step 1
@ -153,91 +257,22 @@ export class WebsocketProvider extends Observable {
encoding.writeVarUint(encoderSync, messageSync)
syncProtocol.writeSyncStep1(encoderSync, this.doc)
bc.publish(this.url, encoding.toUint8Array(encoderSync))
// broadcast local state
const encoderState = encoding.createEncoder()
encoding.writeVarUint(encoderState, messageSync)
syncProtocol.writeSyncStep2(encoderState, this.doc)
bc.publish(this.url, encoding.toUint8Array(encoderState))
// write queryAwareness
const encoderAwareness = encoding.createEncoder()
encoding.writeVarUint(encoderAwareness, messageQueryAwareness)
bc.publish(this.url, encoding.toUint8Array(encoderAwareness))
const encoderAwarenessQuery = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness)
bc.publish(this.url, encoding.toUint8Array(encoderAwarenessQuery))
// broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessState, messageAwareness)
encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID]))
bc.publish(this.url, encoding.toUint8Array(encoderAwarenessState))
})
this.doc.on('update', this._updateHandler)
}
}
}
/**
* @param {WebsocketProvider} provider
* @param {string} reason
*/
const permissionDeniedHandler = (provider, reason) => console.warn(`Permission denied to access ${provider.url}.\n${reason}`)
/**
* @param {WebsocketProvider} provider
* @param {Uint8Array} buf
* @return {encoding.Encoder}
*/
const readMessage = (provider, buf) => {
const decoder = decoding.createDecoder(buf)
const encoder = encoding.createEncoder()
const messageType = decoding.readVarUint(decoder)
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider.ws)
break
case messageQueryAwareness:
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys())))
break
case messageAwareness:
awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider)
break
case messageAuth:
authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler)
break
default:
console.error('Unable to compute message')
return encoder
}
return encoder
}
/**
* @param {WebsocketProvider} provider
*/
const setupWS = provider => {
const websocket = new WebSocket(provider.url)
websocket.binaryType = 'arraybuffer'
provider.ws = websocket
websocket.onmessage = event => {
const encoder = readMessage(provider, new Uint8Array(event.data))
if (encoding.length(encoder) > 1) {
websocket.send(encoding.toUint8Array(encoder))
}
}
websocket.onclose = () => {
provider.ws = null
if (provider.wsconnected) {
provider.wsconnected = false
// update awareness (all users left)
awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider)
provider.emit('status', [{
status: 'disconnected'
}])
}
if (provider.shouldReconnect) {
setTimeout(setupWS, reconnectTimeout, provider, provider.url)
}
}
websocket.onopen = () => {
provider.wsconnected = true
provider.emit('status', [{
status: 'connected'
}])
// always send sync step 1 when connected
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, provider.doc)
websocket.send(encoding.toUint8Array(encoder))
// by updating the local awareness state we trigger the event handler that propagates this information to other clients.
provider.awareness.setLocalState(provider.awareness.getLocalState() || {})
}
}