diff --git a/README.md b/README.md index 1fdeb6a..30f565b 100644 --- a/README.md +++ b/README.md @@ -3,11 +3,25 @@ The websocket provider implements a classical client server model. Clients connect to a single endpoint over websocket. The server distributes awareness information and document updates among clients. -The Websocket Provider is a solid choice if you want a central source that handles authentication and authorization. Websockets also send header information and cookies, so you can use existing authentication mechanisms with this server. I recommend that you slightly adapt the server in `./provider/websocket/server.js` to your needs. +The Websocket Provider is a solid choice if you want a central source that handles authentication and authorization. Websockets also send header information and cookies, so you can use existing authentication mechanisms with this server. * Supports cross-tab communication. When you open the same document in the same browser, changes on the document are exchanged via cross-tab communication ([Broadcast Channel](https://developer.mozilla.org/en-US/docs/Web/API/Broadcast_Channel_API) and [localStorage](https://developer.mozilla.org/en-US/docs/Web/API/Window/localStorage) as fallback). * Supports exange of awareness information (e.g. cursors) +##### Client Code: + +```js +import * as Y from 'yjs' +import { WebsocketProvider } from 'yjs/provider/websocket.js' + +const doc = new Y.Doc() +const wsProvider = new WebsocketProvider('http://localhost:1234', 'my-roomname', doc) + +provider.on('status', event => { + console.log(event.status) // logs "connected" or "disconnected" +}) +``` + ##### Start a Websocket Server: ```sh @@ -21,26 +35,10 @@ Persist document updates in a LevelDB database. See [LevelDB Persistence](#LevelDB Persistence) for more info. ```sh -PORT=1234 YPERSISTENCE=./dbDir node ./node_modules/yjs/provider/websocket/server.js +PORT=1234 YPERSISTENCE=./dbDir node ./node_modules/y-websocket/bin/server.js ``` -##### Client Code: - -```js -import * as Y from 'yjs' -import { WebsocketProvider } from 'yjs/provider/websocket.js' - -const provider = new WebsocketProvider('http://localhost:1234') - -// open a websocket connection to http://localhost:1234/my-document-name -const Doc = provider.get('my-document-name') - -Doc.on('status', event => { - console.log(event.status) // logs "connected" or "disconnected" -}) -``` - -#### Scaling +### Scaling These are mere suggestions how you could scale your server environment. diff --git a/src/y-websocket.js b/src/y-websocket.js index 31f2311..c8354a1 100644 --- a/src/y-websocket.js +++ b/src/y-websocket.js @@ -8,7 +8,7 @@ Unlike stated in the LICENSE file, it is not necessary to include the copyright /* eslint-env browser */ -import * as Y from 'yjs' +import * as Y from 'yjs' // eslint-disable-line import * as bc from 'lib0/broadcastchannel.js' import * as encoding from 'lib0/encoding.js' import * as decoding from 'lib0/decoding.js' @@ -16,6 +16,7 @@ import * as syncProtocol from 'y-protocols/sync.js' import * as authProtocol from 'y-protocols/auth.js' import * as awarenessProtocol from 'y-protocols/awareness.js' import * as mutex from 'lib0/mutex.js' +import { Observable } from 'lib0/observable.js' const messageSync = 0 const messageAwareness = 1 @@ -24,120 +25,120 @@ const messageAuth = 2 const reconnectTimeout = 3000 /** - * @param {WebsocketsDoc} doc + * @param {WebsocketProvider} provider * @param {string} reason */ -const permissionDeniedHandler = (doc, reason) => console.warn(`Permission denied to access ${doc.url}.\n${reason}`) +const permissionDeniedHandler = (provider, reason) => console.warn(`Permission denied to access ${provider.url}.\n${reason}`) /** - * @param {WebsocketsDoc} doc + * @param {WebsocketProvider} provider * @param {Uint8Array} buf * @return {encoding.Encoder} */ -const readMessage = (doc, buf) => { +const readMessage = (provider, buf) => { const decoder = decoding.createDecoder(buf) const encoder = encoding.createEncoder() const messageType = decoding.readVarUint(decoder) switch (messageType) { case messageSync: encoding.writeVarUint(encoder, messageSync) - syncProtocol.readSyncMessage(decoder, encoder, doc, doc.ws) + syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider.ws) break case messageAwareness: - doc.mux(() => - awarenessProtocol.readAwarenessMessage(decoder, doc) + provider.mux(() => + awarenessProtocol.readAwarenessMessage(decoder, provider) ) break case messageAuth: - authProtocol.readAuthMessage(decoder, doc, permissionDeniedHandler) + authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler) } return encoder } /** - * @param {WebsocketsDoc} doc - * @param {string} url + * @param {WebsocketProvider} provider */ -const setupWS = (doc, url) => { - const websocket = new WebSocket(url) +const setupWS = provider => { + const websocket = new WebSocket(provider.url) websocket.binaryType = 'arraybuffer' - doc.ws = websocket + provider.ws = websocket websocket.onmessage = event => { - const encoder = readMessage(doc, new Uint8Array(event.data)) + const encoder = readMessage(provider, new Uint8Array(event.data)) if (encoding.length(encoder) > 1) { websocket.send(encoding.toUint8Array(encoder)) } } websocket.onclose = () => { - doc.ws = null - doc.wsconnected = false + provider.ws = null + provider.wsconnected = false // update awareness (all users left) /** * @type {Array} */ const removed = [] - doc.getAwarenessInfo().forEach((_, clientID) => { + provider.getAwarenessInfo().forEach((_, clientID) => { removed.push(clientID) }) - doc.awareness = new Map() - doc.emit('awareness', [{ + provider.awareness = new Map() + provider.emit('awareness', [{ added: [], updated: [], removed }]) - doc.emit('status', [{ + provider.emit('status', [{ status: 'disconnected' }]) - if (doc.shouldReconnect) { - setTimeout(setupWS, reconnectTimeout, doc, url) + if (provider.shouldReconnect) { + setTimeout(setupWS, reconnectTimeout, provider, provider.url) } } websocket.onopen = () => { - doc.wsconnected = true - doc.emit('status', [{ + provider.wsconnected = true + provider.emit('status', [{ status: 'connected' }]) // always send sync step 1 when connected const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, doc) + syncProtocol.writeSyncStep1(encoder, provider.doc) websocket.send(encoding.toUint8Array(encoder)) // force send stored awareness info - doc.setAwarenessField(null, null) + provider.setAwarenessField(null, null) } } /** - * @param {Uint8Array} update - * @param {any} origin - * @param {WebsocketsDoc} doc + * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. + * The document name is attached to the provided url. I.e. the following example + * creates a websocket connection to http://localhost:1234/my-document-name + * + * @example + * import * as Y from 'yjs' + * import { WebsocketProvider } from 'y-websocket' + * const doc = new Y.Doc() + * const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc) + * + * @extends {Observable} */ -const updateHandler = (update, origin, doc) => { - if (origin !== doc.ws) { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeUpdate(encoder, update) - const buf = encoding.toUint8Array(encoder) - if (doc.wsconnected) { - // @ts-ignore We know that wsconnected = true - doc.ws.send(buf) - } - bc.publish(doc.url, buf) - } -} - -class WebsocketsDoc extends Y.Doc { +export class WebsocketProvider extends Observable { /** * @param {string} url - * @param {Object} opts + * @param {string} roomname + * @param {Y.Doc} doc */ - constructor (url, opts) { - super(opts) + constructor (url, roomname, doc) { + super() + // ensure that url is always ends with / + while (url[url.length - 1] === '/') { + url = url.slice(0, url.length - 1) + } + this.url = url + '/' + roomname + this.roomname = roomname + this.doc = doc /** * @type {Object} */ this._localAwarenessState = {} this.awareness = new Map() this.awarenessClock = new Map() - this.url = url this.wsconnected = false this.mux = mutex.createMutex() /** @@ -156,6 +157,23 @@ class WebsocketsDoc extends Y.Doc { } }) } + /** + * @param {Uint8Array} update + * @param {any} origin + */ + this._updateHandler = (update, origin) => { + if (origin !== this.ws) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, update) + const buf = encoding.toUint8Array(encoder) + if (this.wsconnected) { + // @ts-ignore We know that wsconnected = true + this.ws.send(buf) + } + bc.publish(this.url, buf) + } + } this.connect() } disconnect () { @@ -163,86 +181,22 @@ class WebsocketsDoc extends Y.Doc { if (this.ws !== null) { this.ws.close() bc.unsubscribe(this.url, this._bcSubscriber) - this.off('update', updateHandler) + this.off('update', this._updateHandler) } } connect () { this.shouldReconnect = true if (!this.wsconnected && this.ws === null) { - setupWS(this, this.url) + setupWS(this) bc.subscribe(this.url, this._bcSubscriber) // send sync step1 to bc this.mux(() => { const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, this) + syncProtocol.writeSyncStep1(encoder, this.doc) bc.publish(this.url, encoding.toUint8Array(encoder)) }) - this.on('update', updateHandler) - } - } - getLocalAwarenessInfo () { - return this._localAwarenessState - } - getAwarenessInfo () { - return this.awareness - } - /** - * @param {string?} field - * @param {Object} value - */ - setAwarenessField (field, value) { - if (field !== null) { - this._localAwarenessState[field] = value - } - if (this.wsconnected) { - const clock = (this.awarenessClock.get(this.clientID) || 0) + 1 - this.awarenessClock.set(this.clientID, clock) - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageAwareness) - awarenessProtocol.writeUsersStateChange(encoder, [{ clientID: this.clientID, state: this._localAwarenessState, clock }]) - const buf = encoding.toUint8Array(encoder) - // @ts-ignore we know that wsconnected = true - this.ws.send(buf) + this.on('update', this._updateHandler) } } } - -/** - * Websocket Provider for Yjs. Creates a single websocket connection to each document. - * The document name is attached to the provided url. I.e. the following example - * creates a websocket connection to http://localhost:1234/my-document-name - * - * @example - * import { WebsocketProvider } from 'yjs/provider/websocket/client.js' - * const provider = new WebsocketProvider('http://localhost:1234') - * const ydocument = provider.get('my-document-name') - */ -export class WebsocketProvider { - /** - * @param {string} url - */ - constructor (url) { - // ensure that url is always ends with / - while (url[url.length - 1] === '/') { - url = url.slice(0, url.length - 1) - } - this.url = url + '/' - /** - * @type {Map} - */ - this.docs = new Map() - } - /** - * @param {string} name - * @param {Object} [opts] - * @return {WebsocketsDoc} - */ - get (name, opts) { - let doc = this.docs.get(name) - if (doc === undefined) { - doc = new WebsocketsDoc(this.url + name, opts) - } - return doc - } -}