implement new awareness protocol

This commit is contained in:
Kevin Jahns 2019-05-19 21:06:08 +02:00
parent b82399798c
commit fb18c6ce30
5 changed files with 203 additions and 172 deletions

View File

@ -42,7 +42,7 @@ PORT=1234 YPERSISTENCE=./dbDir node ./node_modules/y-websocket/bin/server.js
These are mere suggestions how you could scale your server environment.
**Option 1:** Websocket servers communicate with each other via a PubSub server. A room is represented by a PubSub channel. The downside of this approach is that the same shared document may be handled by many servers. But the upside is that this approach is fault tolerant, does not have a single point of failure, and is perfectly fit for route balancing.
**Option 1:** Websocket servers communicate with each other via a PubSub server. A room is represented by a PubSub channel. The downside of this approach is that the same shared document may be handled by many servers. But the upside is that this approach is fault tolerant, does not have a single point of failure, and is fit for route balancing.
**Option 2:** Sharding with *consistent hashing*. Each document is handled by a unique server. This patterns requires an entity, like etcd, that performs regular health checks and manages servers. Based on the list of available servers (which is managed by etcd) a proxy calculates which server is responsible for each requested document. The disadvantage of this approach is that it is that load distribution may not be fair. Still, this approach may be the preferred solution if you want to store the shared document in a database - e.g. for indexing.

View File

@ -26,7 +26,7 @@ if (typeof persistenceDir === 'string') {
}
/**
* @type {Map<number,WSSharedDoc>}
* @type {Map<string,WSSharedDoc>}
*/
const docs = new Map()
@ -61,13 +61,30 @@ class WSSharedDoc extends Y.Doc {
*/
this.conns = new Map()
/**
* @type {Map<number,Object>}
* @type {awarenessProtocol.Awareness}
*/
this.awareness = new Map()
this.awareness = new awarenessProtocol.Awareness(this)
/**
* @type {Map<number,number>}
* @param {{ added: Array<number>, updated: Array<number>, removed: Array<number> }} changes
* @param {Object | null} conn Origin is the connection that made the change
*/
this.awarenessClock = new Map()
const awarenessChangeHandler = ({ added, updated, removed }, conn) => {
const changedClients = added.concat(updated, removed)
if (conn !== null) {
const connControlledIDs = /** @type {Set<number>} */ (this.conns.get(conn))
added.forEach(clientID => { connControlledIDs.add(clientID) })
removed.forEach(clientID => { connControlledIDs.delete(clientID) })
}
// broadcast awareness update
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients))
const buff = encoding.toUint8Array(encoder)
this.conns.forEach((_, c) => {
send(this, c, buff)
})
}
this.awareness.on('change', awarenessChangeHandler)
this.on('update', updateHandler)
}
}
@ -90,18 +107,7 @@ const messageListener = (conn, doc, message) => {
}
break
case messageAwareness: {
encoding.writeVarUint(encoder, messageAwareness)
const updates = awarenessProtocol.forwardAwarenessMessage(decoder, encoder)
updates.forEach(update => {
doc.awareness.set(update.clientID, update.state)
doc.awarenessClock.set(update.clientID, update.clock)
// @ts-ignore we received an update => so conn exists
doc.conns.get(conn).add(update.clientID)
})
const buff = encoding.toUint8Array(encoder)
doc.conns.forEach((_, c) => {
send(doc, c, buff)
})
awarenessProtocol.applyAwarenessUpdate(doc.awareness, decoding.readVarUint8Array(decoder), conn)
break
}
}
@ -119,24 +125,13 @@ const closeConn = (doc, conn) => {
// @ts-ignore
const controlledIds = doc.conns.get(conn)
doc.conns.delete(conn)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(clientID => {
const clock = (doc.awarenessClock.get(clientID) || 0) + 1
doc.awareness.delete(clientID)
doc.awarenessClock.delete(clientID)
return { clientID, state: null, clock }
}))
const buf = encoding.toUint8Array(encoder)
doc.conns.forEach((_, conn) => {
send(doc, conn, buf)
})
awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null)
if (doc.conns.size === 0 && persistence !== null) {
// if persisted, we store state and destroy ydocument
persistence.writeState(doc.name, doc).then(() => {
doc.destroy()
})
doc.conns.delete(doc.name)
docs.delete(doc.name)
}
}
conn.close()
@ -167,6 +162,9 @@ const pingTimeout = 30000
exports.setupWSConnection = (conn, req) => {
conn.binaryType = 'arraybuffer'
// get doc, create if it does not exist yet
/**
* @type {string}
*/
const docName = req.url.slice(1)
const doc = map.setIfUndefined(docs, docName, () => {
const doc = new WSSharedDoc(docName)
@ -207,17 +205,11 @@ exports.setupWSConnection = (conn, req) => {
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, doc)
send(doc, conn, encoding.toUint8Array(encoder))
if (doc.awareness.size > 0) {
const awarenessStates = doc.awareness.getStates()
if (awarenessStates.size > 0) {
const encoder = encoding.createEncoder()
/**
* @type {Array<Object>}
*/
const userStates = []
doc.awareness.forEach((state, clientID) => {
userStates.push({ state, clientID, clock: (doc.awarenessClock.get(clientID) || 0) })
})
encoding.writeVarUint(encoder, messageAwareness)
awarenessProtocol.writeUsersStateChange(encoder, userStates)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys())))
send(doc, conn, encoding.toUint8Array(encoder))
}
}

22
package-lock.json generated
View File

@ -968,9 +968,9 @@
}
},
"lib0": {
"version": "0.0.4",
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.0.4.tgz",
"integrity": "sha512-osSGIxFM0mUuVAclVOQAio4lq0YYk1xFfj6J+1i3u5az8rXAQKDil2skA19aiiG0sfAdasOtr8Mk+9Mrw10cfQ=="
"version": "0.0.5",
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.0.5.tgz",
"integrity": "sha512-3ElV6/t5Lv0Eczlnh/05q+Uq3RxQ/Q0zdN6LVtaUERQIDDZsP/CUXEGLsV8KZTgZwVFNCPGXNWYE+3WTOo+SHw=="
},
"load-json-file": {
"version": "2.0.0",
@ -1699,20 +1699,20 @@
"dev": true
},
"y-protocols": {
"version": "0.0.5",
"resolved": "https://registry.npmjs.org/y-protocols/-/y-protocols-0.0.5.tgz",
"integrity": "sha512-4ZKcDxM2A83ZeGi7WqqK+mS0QRFP0L7Xd8JLxjX+fZeWiUIfYin16f8/0liMaHvGOHgfpomQrQsZm4HCkObgmQ==",
"version": "0.0.6",
"resolved": "https://registry.npmjs.org/y-protocols/-/y-protocols-0.0.6.tgz",
"integrity": "sha512-XgUBKrFesfUYN3wXmVp9Exy7dOUOeX3A56gHNuI1ZNy9N7OdwoBv2TGfbvSH6+YpV1IEvEq7u5v0/je5MwXKJg==",
"requires": {
"lib0": "0.0.4"
"lib0": "0.0.5"
}
},
"yjs": {
"version": "13.0.0-82",
"resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-82.tgz",
"integrity": "sha512-pU3siEW0j+pKynWtnT7wKYn6797rdM/FnxsgdjFKJhUzdmeN1vYYZ+hTtPVWg/v8cjIVWCcWebMJZ1IhqFm5yQ==",
"version": "13.0.0-83",
"resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-83.tgz",
"integrity": "sha512-8M1X8fZ95odf2VU8BHrKcYr0PeEsx8tspV5svh4oLp8BVcIprbp0J2hzCvJDlOFOlyJVgvNf00UJ4uiyDKmk5A==",
"dev": true,
"requires": {
"lib0": "0.0.4"
"lib0": "0.0.5"
}
}
}

View File

@ -35,17 +35,17 @@
},
"homepage": "https://github.com/y-js/y-websocket#readme",
"dependencies": {
"y-protocols": "0.0.5",
"lib0": "0.0.4"
"y-protocols": "0.0.6",
"lib0": "0.0.5"
},
"devDependencies": {
"rollup": "^1.1.2",
"rollup-cli": "^1.0.9",
"standard": "^12.0.1",
"yjs": "13.0.0-82"
"yjs": "13.0.0-83"
},
"peerDependenies": {
"yjs": "13.0.0-82"
"yjs": "13.0.0-83"
},
"optionalDependencies": {
"ws": "^6.2.1"

View File

@ -19,11 +19,148 @@ import * as mutex from 'lib0/mutex.js'
import { Observable } from 'lib0/observable.js'
const messageSync = 0
const messageQueryAwareness = 3
const messageAwareness = 1
const messageAuth = 2
const reconnectTimeout = 3000
/**
* Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.
* The document name is attached to the provided url. I.e. the following example
* creates a websocket connection to http://localhost:1234/my-document-name
*
* @example
* import * as Y from 'yjs'
* import { WebsocketProvider } from 'y-websocket'
* const doc = new Y.Doc()
* const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc)
*
* @extends {Observable<string>}
*/
export class WebsocketProvider extends Observable {
/**
* @param {string} url
* @param {string} roomname
* @param {Y.Doc} doc
*/
constructor (url, roomname, doc, awareness = new awarenessProtocol.Awareness(doc)) {
super()
window.addEventListener('beforeunload', () => {
awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], null)
})
// ensure that url is always ends with /
while (url[url.length - 1] === '/') {
url = url.slice(0, url.length - 1)
}
this.url = url + '/' + roomname
this.roomname = roomname
this.doc = doc
/**
* @type {Object<string,Object>}
*/
this._localAwarenessState = {}
this.awareness = awareness
this.wsconnected = false
this.mux = mutex.createMutex()
/**
* @type {WebSocket?}
*/
this.ws = null
this.shouldReconnect = true
/**
* @param {ArrayBuffer} data
*/
this._bcSubscriber = data => {
this.mux(() => {
const encoder = readMessage(this, new Uint8Array(data))
if (encoding.length(encoder) > 1) {
bc.publish(this.url, encoding.toUint8Array(encoder))
}
})
}
/**
* Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
* @param {Uint8Array} update
* @param {any} origin
*/
this._updateHandler = (update, origin) => {
if (origin !== this.ws || origin === null) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
const buf = encoding.toUint8Array(encoder)
if (this.wsconnected) {
// @ts-ignore We know that wsconnected = true
this.ws.send(buf)
}
this.mux(() => {
bc.publish(this.url, buf)
})
}
}
/**
* @param {any} changed
* @param {any} origin
*/
this._awarenessUpdateHandler = ({ added, updated, removed }, origin) => {
// only broadcast local awareness information and when ws connected
const predicate = /** @param {number} id */ id => id === doc.clientID
if (added.some(predicate) || updated.some(predicate) || removed.some(predicate)) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, [doc.clientID]))
const buf = encoding.toUint8Array(encoder)
if (this.wsconnected && this.ws !== null) {
this.ws.send(buf)
}
this.mux(() => {
bc.publish(this.url, buf)
})
}
}
awareness.on('change', this._awarenessUpdateHandler)
this.connect()
}
destroy () {
this.disconnect()
this.awareness.off('change', this._awarenessUpdateHandler)
super.destroy()
}
disconnect () {
this.shouldReconnect = false
if (this.ws !== null) {
this.awareness.setLocalState(null)
this.ws.close()
bc.unsubscribe(this.url, this._bcSubscriber)
this.doc.off('update', this._updateHandler)
}
}
connect () {
this.shouldReconnect = true
if (!this.wsconnected && this.ws === null) {
if (this.awareness.getLocalState() === null) {
this.awareness.setLocalState({})
}
setupWS(this)
bc.subscribe(this.url, this._bcSubscriber)
// send sync step1 to bc
this.mux(() => {
// write sync step 1
const encoderSync = encoding.createEncoder()
encoding.writeVarUint(encoderSync, messageSync)
syncProtocol.writeSyncStep1(encoderSync, this.doc)
bc.publish(this.url, encoding.toUint8Array(encoderSync))
// write queryAwareness
const encoderAwareness = encoding.createEncoder()
encoding.writeVarUint(encoderAwareness, messageQueryAwareness)
bc.publish(this.url, encoding.toUint8Array(encoderAwareness))
})
this.doc.on('update', this._updateHandler)
}
}
}
/**
* @param {WebsocketProvider} provider
* @param {string} reason
@ -44,13 +181,19 @@ const readMessage = (provider, buf) => {
encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider.ws)
break
case messageQueryAwareness:
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys())))
break
case messageAwareness:
provider.mux(() =>
awarenessProtocol.readAwarenessMessage(decoder, provider)
)
awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider)
break
case messageAuth:
authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler)
break
default:
console.error('Unable to compute message')
return encoder
}
return encoder
}
@ -70,22 +213,14 @@ const setupWS = provider => {
}
websocket.onclose = () => {
provider.ws = null
if (provider.wsconnected) {
provider.wsconnected = false
// update awareness (all users left)
/**
* @type {Array<number>}
*/
const removed = []
provider.getAwarenessInfo().forEach((_, clientID) => {
removed.push(clientID)
})
provider.awareness = new Map()
provider.emit('awareness', [{
added: [], updated: [], removed
}])
awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider)
provider.emit('status', [{
status: 'disconnected'
}])
}
if (provider.shouldReconnect) {
setTimeout(setupWS, reconnectTimeout, provider, provider.url)
}
@ -100,103 +235,7 @@ const setupWS = provider => {
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, provider.doc)
websocket.send(encoding.toUint8Array(encoder))
// force send stored awareness info
provider.setAwarenessField(null, null)
}
}
/**
* Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.
* The document name is attached to the provided url. I.e. the following example
* creates a websocket connection to http://localhost:1234/my-document-name
*
* @example
* import * as Y from 'yjs'
* import { WebsocketProvider } from 'y-websocket'
* const doc = new Y.Doc()
* const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc)
*
* @extends {Observable<string>}
*/
export class WebsocketProvider extends Observable {
/**
* @param {string} url
* @param {string} roomname
* @param {Y.Doc} doc
*/
constructor (url, roomname, doc) {
super()
// ensure that url is always ends with /
while (url[url.length - 1] === '/') {
url = url.slice(0, url.length - 1)
}
this.url = url + '/' + roomname
this.roomname = roomname
this.doc = doc
/**
* @type {Object<string,Object>}
*/
this._localAwarenessState = {}
this.awareness = new Map()
this.awarenessClock = new Map()
this.wsconnected = false
this.mux = mutex.createMutex()
/**
* @type {WebSocket?}
*/
this.ws = null
this.shouldReconnect = true
/**
* @param {ArrayBuffer} data
*/
this._bcSubscriber = data => {
const encoder = readMessage(this, new Uint8Array(data))
this.mux(() => {
if (encoding.length(encoder) > 1) {
bc.publish(url, encoding.toUint8Array(encoder))
}
})
}
/**
* @param {Uint8Array} update
* @param {any} origin
*/
this._updateHandler = (update, origin) => {
if (origin !== this.ws) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
const buf = encoding.toUint8Array(encoder)
if (this.wsconnected) {
// @ts-ignore We know that wsconnected = true
this.ws.send(buf)
}
bc.publish(this.url, buf)
}
}
this.connect()
}
disconnect () {
this.shouldReconnect = false
if (this.ws !== null) {
this.ws.close()
bc.unsubscribe(this.url, this._bcSubscriber)
this.off('update', this._updateHandler)
}
}
connect () {
this.shouldReconnect = true
if (!this.wsconnected && this.ws === null) {
setupWS(this)
bc.subscribe(this.url, this._bcSubscriber)
// send sync step1 to bc
this.mux(() => {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, this.doc)
bc.publish(this.url, encoding.toUint8Array(encoder))
})
this.on('update', this._updateHandler)
}
// by updating the local awareness state we trigger the event handler that propagates this information to other clients.
provider.awareness.setLocalState(provider.awareness.getLocalState() || {})
}
}