prelim new Provider approach
This commit is contained in:
parent
4084573369
commit
b82399798c
36
README.md
36
README.md
@ -3,11 +3,25 @@
|
|||||||
|
|
||||||
The websocket provider implements a classical client server model. Clients connect to a single endpoint over websocket. The server distributes awareness information and document updates among clients.
|
The websocket provider implements a classical client server model. Clients connect to a single endpoint over websocket. The server distributes awareness information and document updates among clients.
|
||||||
|
|
||||||
The Websocket Provider is a solid choice if you want a central source that handles authentication and authorization. Websockets also send header information and cookies, so you can use existing authentication mechanisms with this server. I recommend that you slightly adapt the server in `./provider/websocket/server.js` to your needs.
|
The Websocket Provider is a solid choice if you want a central source that handles authentication and authorization. Websockets also send header information and cookies, so you can use existing authentication mechanisms with this server.
|
||||||
|
|
||||||
* Supports cross-tab communication. When you open the same document in the same browser, changes on the document are exchanged via cross-tab communication ([Broadcast Channel](https://developer.mozilla.org/en-US/docs/Web/API/Broadcast_Channel_API) and [localStorage](https://developer.mozilla.org/en-US/docs/Web/API/Window/localStorage) as fallback).
|
* Supports cross-tab communication. When you open the same document in the same browser, changes on the document are exchanged via cross-tab communication ([Broadcast Channel](https://developer.mozilla.org/en-US/docs/Web/API/Broadcast_Channel_API) and [localStorage](https://developer.mozilla.org/en-US/docs/Web/API/Window/localStorage) as fallback).
|
||||||
* Supports exange of awareness information (e.g. cursors)
|
* Supports exange of awareness information (e.g. cursors)
|
||||||
|
|
||||||
|
##### Client Code:
|
||||||
|
|
||||||
|
```js
|
||||||
|
import * as Y from 'yjs'
|
||||||
|
import { WebsocketProvider } from 'yjs/provider/websocket.js'
|
||||||
|
|
||||||
|
const doc = new Y.Doc()
|
||||||
|
const wsProvider = new WebsocketProvider('http://localhost:1234', 'my-roomname', doc)
|
||||||
|
|
||||||
|
provider.on('status', event => {
|
||||||
|
console.log(event.status) // logs "connected" or "disconnected"
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
##### Start a Websocket Server:
|
##### Start a Websocket Server:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
@ -21,26 +35,10 @@ Persist document updates in a LevelDB database.
|
|||||||
See [LevelDB Persistence](#LevelDB Persistence) for more info.
|
See [LevelDB Persistence](#LevelDB Persistence) for more info.
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
PORT=1234 YPERSISTENCE=./dbDir node ./node_modules/yjs/provider/websocket/server.js
|
PORT=1234 YPERSISTENCE=./dbDir node ./node_modules/y-websocket/bin/server.js
|
||||||
```
|
```
|
||||||
|
|
||||||
##### Client Code:
|
### Scaling
|
||||||
|
|
||||||
```js
|
|
||||||
import * as Y from 'yjs'
|
|
||||||
import { WebsocketProvider } from 'yjs/provider/websocket.js'
|
|
||||||
|
|
||||||
const provider = new WebsocketProvider('http://localhost:1234')
|
|
||||||
|
|
||||||
// open a websocket connection to http://localhost:1234/my-document-name
|
|
||||||
const Doc = provider.get('my-document-name')
|
|
||||||
|
|
||||||
Doc.on('status', event => {
|
|
||||||
console.log(event.status) // logs "connected" or "disconnected"
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Scaling
|
|
||||||
|
|
||||||
These are mere suggestions how you could scale your server environment.
|
These are mere suggestions how you could scale your server environment.
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,7 @@ Unlike stated in the LICENSE file, it is not necessary to include the copyright
|
|||||||
|
|
||||||
/* eslint-env browser */
|
/* eslint-env browser */
|
||||||
|
|
||||||
import * as Y from 'yjs'
|
import * as Y from 'yjs' // eslint-disable-line
|
||||||
import * as bc from 'lib0/broadcastchannel.js'
|
import * as bc from 'lib0/broadcastchannel.js'
|
||||||
import * as encoding from 'lib0/encoding.js'
|
import * as encoding from 'lib0/encoding.js'
|
||||||
import * as decoding from 'lib0/decoding.js'
|
import * as decoding from 'lib0/decoding.js'
|
||||||
@ -16,6 +16,7 @@ import * as syncProtocol from 'y-protocols/sync.js'
|
|||||||
import * as authProtocol from 'y-protocols/auth.js'
|
import * as authProtocol from 'y-protocols/auth.js'
|
||||||
import * as awarenessProtocol from 'y-protocols/awareness.js'
|
import * as awarenessProtocol from 'y-protocols/awareness.js'
|
||||||
import * as mutex from 'lib0/mutex.js'
|
import * as mutex from 'lib0/mutex.js'
|
||||||
|
import { Observable } from 'lib0/observable.js'
|
||||||
|
|
||||||
const messageSync = 0
|
const messageSync = 0
|
||||||
const messageAwareness = 1
|
const messageAwareness = 1
|
||||||
@ -24,120 +25,120 @@ const messageAuth = 2
|
|||||||
const reconnectTimeout = 3000
|
const reconnectTimeout = 3000
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {WebsocketsDoc} doc
|
* @param {WebsocketProvider} provider
|
||||||
* @param {string} reason
|
* @param {string} reason
|
||||||
*/
|
*/
|
||||||
const permissionDeniedHandler = (doc, reason) => console.warn(`Permission denied to access ${doc.url}.\n${reason}`)
|
const permissionDeniedHandler = (provider, reason) => console.warn(`Permission denied to access ${provider.url}.\n${reason}`)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {WebsocketsDoc} doc
|
* @param {WebsocketProvider} provider
|
||||||
* @param {Uint8Array} buf
|
* @param {Uint8Array} buf
|
||||||
* @return {encoding.Encoder}
|
* @return {encoding.Encoder}
|
||||||
*/
|
*/
|
||||||
const readMessage = (doc, buf) => {
|
const readMessage = (provider, buf) => {
|
||||||
const decoder = decoding.createDecoder(buf)
|
const decoder = decoding.createDecoder(buf)
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
const messageType = decoding.readVarUint(decoder)
|
const messageType = decoding.readVarUint(decoder)
|
||||||
switch (messageType) {
|
switch (messageType) {
|
||||||
case messageSync:
|
case messageSync:
|
||||||
encoding.writeVarUint(encoder, messageSync)
|
encoding.writeVarUint(encoder, messageSync)
|
||||||
syncProtocol.readSyncMessage(decoder, encoder, doc, doc.ws)
|
syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider.ws)
|
||||||
break
|
break
|
||||||
case messageAwareness:
|
case messageAwareness:
|
||||||
doc.mux(() =>
|
provider.mux(() =>
|
||||||
awarenessProtocol.readAwarenessMessage(decoder, doc)
|
awarenessProtocol.readAwarenessMessage(decoder, provider)
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
case messageAuth:
|
case messageAuth:
|
||||||
authProtocol.readAuthMessage(decoder, doc, permissionDeniedHandler)
|
authProtocol.readAuthMessage(decoder, provider, permissionDeniedHandler)
|
||||||
}
|
}
|
||||||
return encoder
|
return encoder
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {WebsocketsDoc} doc
|
* @param {WebsocketProvider} provider
|
||||||
* @param {string} url
|
|
||||||
*/
|
*/
|
||||||
const setupWS = (doc, url) => {
|
const setupWS = provider => {
|
||||||
const websocket = new WebSocket(url)
|
const websocket = new WebSocket(provider.url)
|
||||||
websocket.binaryType = 'arraybuffer'
|
websocket.binaryType = 'arraybuffer'
|
||||||
doc.ws = websocket
|
provider.ws = websocket
|
||||||
websocket.onmessage = event => {
|
websocket.onmessage = event => {
|
||||||
const encoder = readMessage(doc, new Uint8Array(event.data))
|
const encoder = readMessage(provider, new Uint8Array(event.data))
|
||||||
if (encoding.length(encoder) > 1) {
|
if (encoding.length(encoder) > 1) {
|
||||||
websocket.send(encoding.toUint8Array(encoder))
|
websocket.send(encoding.toUint8Array(encoder))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
websocket.onclose = () => {
|
websocket.onclose = () => {
|
||||||
doc.ws = null
|
provider.ws = null
|
||||||
doc.wsconnected = false
|
provider.wsconnected = false
|
||||||
// update awareness (all users left)
|
// update awareness (all users left)
|
||||||
/**
|
/**
|
||||||
* @type {Array<number>}
|
* @type {Array<number>}
|
||||||
*/
|
*/
|
||||||
const removed = []
|
const removed = []
|
||||||
doc.getAwarenessInfo().forEach((_, clientID) => {
|
provider.getAwarenessInfo().forEach((_, clientID) => {
|
||||||
removed.push(clientID)
|
removed.push(clientID)
|
||||||
})
|
})
|
||||||
doc.awareness = new Map()
|
provider.awareness = new Map()
|
||||||
doc.emit('awareness', [{
|
provider.emit('awareness', [{
|
||||||
added: [], updated: [], removed
|
added: [], updated: [], removed
|
||||||
}])
|
}])
|
||||||
doc.emit('status', [{
|
provider.emit('status', [{
|
||||||
status: 'disconnected'
|
status: 'disconnected'
|
||||||
}])
|
}])
|
||||||
if (doc.shouldReconnect) {
|
if (provider.shouldReconnect) {
|
||||||
setTimeout(setupWS, reconnectTimeout, doc, url)
|
setTimeout(setupWS, reconnectTimeout, provider, provider.url)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
websocket.onopen = () => {
|
websocket.onopen = () => {
|
||||||
doc.wsconnected = true
|
provider.wsconnected = true
|
||||||
doc.emit('status', [{
|
provider.emit('status', [{
|
||||||
status: 'connected'
|
status: 'connected'
|
||||||
}])
|
}])
|
||||||
// always send sync step 1 when connected
|
// always send sync step 1 when connected
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
encoding.writeVarUint(encoder, messageSync)
|
encoding.writeVarUint(encoder, messageSync)
|
||||||
syncProtocol.writeSyncStep1(encoder, doc)
|
syncProtocol.writeSyncStep1(encoder, provider.doc)
|
||||||
websocket.send(encoding.toUint8Array(encoder))
|
websocket.send(encoding.toUint8Array(encoder))
|
||||||
// force send stored awareness info
|
// force send stored awareness info
|
||||||
doc.setAwarenessField(null, null)
|
provider.setAwarenessField(null, null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {Uint8Array} update
|
* Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.
|
||||||
* @param {any} origin
|
* The document name is attached to the provided url. I.e. the following example
|
||||||
* @param {WebsocketsDoc} doc
|
* creates a websocket connection to http://localhost:1234/my-document-name
|
||||||
|
*
|
||||||
|
* @example
|
||||||
|
* import * as Y from 'yjs'
|
||||||
|
* import { WebsocketProvider } from 'y-websocket'
|
||||||
|
* const doc = new Y.Doc()
|
||||||
|
* const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc)
|
||||||
|
*
|
||||||
|
* @extends {Observable<string>}
|
||||||
*/
|
*/
|
||||||
const updateHandler = (update, origin, doc) => {
|
export class WebsocketProvider extends Observable {
|
||||||
if (origin !== doc.ws) {
|
|
||||||
const encoder = encoding.createEncoder()
|
|
||||||
encoding.writeVarUint(encoder, messageSync)
|
|
||||||
syncProtocol.writeUpdate(encoder, update)
|
|
||||||
const buf = encoding.toUint8Array(encoder)
|
|
||||||
if (doc.wsconnected) {
|
|
||||||
// @ts-ignore We know that wsconnected = true
|
|
||||||
doc.ws.send(buf)
|
|
||||||
}
|
|
||||||
bc.publish(doc.url, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class WebsocketsDoc extends Y.Doc {
|
|
||||||
/**
|
/**
|
||||||
* @param {string} url
|
* @param {string} url
|
||||||
* @param {Object} opts
|
* @param {string} roomname
|
||||||
|
* @param {Y.Doc} doc
|
||||||
*/
|
*/
|
||||||
constructor (url, opts) {
|
constructor (url, roomname, doc) {
|
||||||
super(opts)
|
super()
|
||||||
|
// ensure that url is always ends with /
|
||||||
|
while (url[url.length - 1] === '/') {
|
||||||
|
url = url.slice(0, url.length - 1)
|
||||||
|
}
|
||||||
|
this.url = url + '/' + roomname
|
||||||
|
this.roomname = roomname
|
||||||
|
this.doc = doc
|
||||||
/**
|
/**
|
||||||
* @type {Object<string,Object>}
|
* @type {Object<string,Object>}
|
||||||
*/
|
*/
|
||||||
this._localAwarenessState = {}
|
this._localAwarenessState = {}
|
||||||
this.awareness = new Map()
|
this.awareness = new Map()
|
||||||
this.awarenessClock = new Map()
|
this.awarenessClock = new Map()
|
||||||
this.url = url
|
|
||||||
this.wsconnected = false
|
this.wsconnected = false
|
||||||
this.mux = mutex.createMutex()
|
this.mux = mutex.createMutex()
|
||||||
/**
|
/**
|
||||||
@ -156,6 +157,23 @@ class WebsocketsDoc extends Y.Doc {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* @param {Uint8Array} update
|
||||||
|
* @param {any} origin
|
||||||
|
*/
|
||||||
|
this._updateHandler = (update, origin) => {
|
||||||
|
if (origin !== this.ws) {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, messageSync)
|
||||||
|
syncProtocol.writeUpdate(encoder, update)
|
||||||
|
const buf = encoding.toUint8Array(encoder)
|
||||||
|
if (this.wsconnected) {
|
||||||
|
// @ts-ignore We know that wsconnected = true
|
||||||
|
this.ws.send(buf)
|
||||||
|
}
|
||||||
|
bc.publish(this.url, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
this.connect()
|
this.connect()
|
||||||
}
|
}
|
||||||
disconnect () {
|
disconnect () {
|
||||||
@ -163,86 +181,22 @@ class WebsocketsDoc extends Y.Doc {
|
|||||||
if (this.ws !== null) {
|
if (this.ws !== null) {
|
||||||
this.ws.close()
|
this.ws.close()
|
||||||
bc.unsubscribe(this.url, this._bcSubscriber)
|
bc.unsubscribe(this.url, this._bcSubscriber)
|
||||||
this.off('update', updateHandler)
|
this.off('update', this._updateHandler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
connect () {
|
connect () {
|
||||||
this.shouldReconnect = true
|
this.shouldReconnect = true
|
||||||
if (!this.wsconnected && this.ws === null) {
|
if (!this.wsconnected && this.ws === null) {
|
||||||
setupWS(this, this.url)
|
setupWS(this)
|
||||||
bc.subscribe(this.url, this._bcSubscriber)
|
bc.subscribe(this.url, this._bcSubscriber)
|
||||||
// send sync step1 to bc
|
// send sync step1 to bc
|
||||||
this.mux(() => {
|
this.mux(() => {
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
encoding.writeVarUint(encoder, messageSync)
|
encoding.writeVarUint(encoder, messageSync)
|
||||||
syncProtocol.writeSyncStep1(encoder, this)
|
syncProtocol.writeSyncStep1(encoder, this.doc)
|
||||||
bc.publish(this.url, encoding.toUint8Array(encoder))
|
bc.publish(this.url, encoding.toUint8Array(encoder))
|
||||||
})
|
})
|
||||||
this.on('update', updateHandler)
|
this.on('update', this._updateHandler)
|
||||||
}
|
|
||||||
}
|
|
||||||
getLocalAwarenessInfo () {
|
|
||||||
return this._localAwarenessState
|
|
||||||
}
|
|
||||||
getAwarenessInfo () {
|
|
||||||
return this.awareness
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* @param {string?} field
|
|
||||||
* @param {Object} value
|
|
||||||
*/
|
|
||||||
setAwarenessField (field, value) {
|
|
||||||
if (field !== null) {
|
|
||||||
this._localAwarenessState[field] = value
|
|
||||||
}
|
|
||||||
if (this.wsconnected) {
|
|
||||||
const clock = (this.awarenessClock.get(this.clientID) || 0) + 1
|
|
||||||
this.awarenessClock.set(this.clientID, clock)
|
|
||||||
const encoder = encoding.createEncoder()
|
|
||||||
encoding.writeVarUint(encoder, messageAwareness)
|
|
||||||
awarenessProtocol.writeUsersStateChange(encoder, [{ clientID: this.clientID, state: this._localAwarenessState, clock }])
|
|
||||||
const buf = encoding.toUint8Array(encoder)
|
|
||||||
// @ts-ignore we know that wsconnected = true
|
|
||||||
this.ws.send(buf)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Websocket Provider for Yjs. Creates a single websocket connection to each document.
|
|
||||||
* The document name is attached to the provided url. I.e. the following example
|
|
||||||
* creates a websocket connection to http://localhost:1234/my-document-name
|
|
||||||
*
|
|
||||||
* @example
|
|
||||||
* import { WebsocketProvider } from 'yjs/provider/websocket/client.js'
|
|
||||||
* const provider = new WebsocketProvider('http://localhost:1234')
|
|
||||||
* const ydocument = provider.get('my-document-name')
|
|
||||||
*/
|
|
||||||
export class WebsocketProvider {
|
|
||||||
/**
|
|
||||||
* @param {string} url
|
|
||||||
*/
|
|
||||||
constructor (url) {
|
|
||||||
// ensure that url is always ends with /
|
|
||||||
while (url[url.length - 1] === '/') {
|
|
||||||
url = url.slice(0, url.length - 1)
|
|
||||||
}
|
|
||||||
this.url = url + '/'
|
|
||||||
/**
|
|
||||||
* @type {Map<string, WebsocketsDoc>}
|
|
||||||
*/
|
|
||||||
this.docs = new Map()
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* @param {string} name
|
|
||||||
* @param {Object} [opts]
|
|
||||||
* @return {WebsocketsDoc}
|
|
||||||
*/
|
|
||||||
get (name, opts) {
|
|
||||||
let doc = this.docs.get(name)
|
|
||||||
if (doc === undefined) {
|
|
||||||
doc = new WebsocketsDoc(this.url + name, opts)
|
|
||||||
}
|
|
||||||
return doc
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user