compatible with yjs@v13-79

This commit is contained in:
Kevin Jahns 2019-04-24 08:07:37 +02:00
parent 9208e699e2
commit 01519447ca
6 changed files with 14846 additions and 69 deletions

View File

@ -3,18 +3,29 @@ Unlike stated in the LICENSE file, it is not necessary to include the copyright
*/
const Y = require('yjs')
const syncProtocol = require('y-protocols/dist/sync.js')
const awarenessProtocol = require('y-protocols/dist/awareness.js')
/**
* @type {any}
*/
const WebSocket = require('ws')
const http = require('http')
const encoding = require('lib0/dist/encoding.js')
const decoding = require('lib0/dist/decoding.js')
const mutex = require('lib0/dist/mutex.js')
const map = require('lib0/dist/map.js')
const port = process.env.PORT || 1234
// disable gc when using snapshots!
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0'
const persistenceDir = process.env.YPERSISTENCE
/**
* @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise}|null}
*/
let persistence = null
if (typeof persistenceDir === 'string') {
// @ts-ignore
const LevelDbPersistence = require('y-leveldb').LevelDbPersistence
persistence = new LevelDbPersistence(persistenceDir)
}
@ -26,17 +37,24 @@ const server = http.createServer((req, res) => {
const wss = new WebSocket.Server({ noServer: true })
/**
* @type {Map<number,WSSharedDoc>}
*/
const docs = new Map()
const messageSync = 0
const messageAwareness = 1
const messageAuth = 2
// const messageAuth = 2
const afterTransaction = (doc, transaction) => {
if (transaction.encodedStructsLen > 0) {
/**
* @param {Y.Transaction} transaction
* @param {WSSharedDoc} doc
*/
const afterTransaction = (transaction, doc) => {
if (transaction.updateMessage !== null) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
Y.syncProtocol.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
syncProtocol.writeUpdate(encoder, transaction.updateMessage)
const message = encoding.toBuffer(encoder)
doc.conns.forEach((_, conn) => conn.send(message))
}
@ -45,18 +63,29 @@ const afterTransaction = (doc, transaction) => {
class WSSharedDoc extends Y.Y {
constructor () {
super({ gc: gcEnabled })
this.mux = Y.createMutex()
this.mux = mutex.createMutex()
/**
* Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed
* @type {Map<Object, Set<number>>}
*/
this.conns = new Map()
/**
* @type {Map<number,Object>}
*/
this.awareness = new Map()
/**
* @type {Map<number,number>}
*/
this.awarenessClock = new Map()
this.on('afterTransaction', afterTransaction)
}
}
/**
* @param {any} conn
* @param {WSSharedDoc} doc
* @param {ArrayBuffer} message
*/
const messageListener = (conn, doc, message) => {
const encoder = encoding.createEncoder()
const decoder = decoding.createDecoder(message)
@ -64,18 +93,19 @@ const messageListener = (conn, doc, message) => {
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync)
Y.syncProtocol.readSyncMessage(decoder, encoder, doc)
syncProtocol.readSyncMessage(decoder, encoder, doc)
if (encoding.length(encoder) > 1) {
conn.send(encoding.toBuffer(encoder))
}
break
case messageAwareness: {
encoding.writeVarUint(encoder, messageAwareness)
const updates = Y.awarenessProtocol.forwardAwarenessMessage(decoder, encoder)
const updates = awarenessProtocol.forwardAwarenessMessage(decoder, encoder)
updates.forEach(update => {
doc.awareness.set(update.userID, update.state)
doc.awarenessClock.set(update.userID, update.clock)
doc.conns.get(conn).add(update.userID)
doc.awareness.set(update.clientID, update.state)
doc.awarenessClock.set(update.clientID, update.clock)
// @ts-ignore we received an update => so conn exists
doc.conns.get(conn).add(update.clientID)
})
const buff = encoding.toBuffer(encoder)
doc.conns.forEach((_, c) => {
@ -86,31 +116,40 @@ const messageListener = (conn, doc, message) => {
}
}
/**
* @param {any} conn
* @param {any} req
*/
const setupConnection = (conn, req) => {
conn.binaryType = 'arraybuffer'
// get doc, create if it does not exist yet
const docName = req.url.slice(1)
let doc = docs.get(docName)
if (doc === undefined) {
doc = new WSSharedDoc()
const doc = map.setIfUndefined(docs, docName, () => {
const doc = new WSSharedDoc()
if (persistence !== null) {
persistence.bindState(docName, doc)
}
docs.set(docName, doc)
}
return doc
})
doc.conns.set(conn, new Set())
// listen and reply to events
// @ts-ignore
conn.on('message', message => messageListener(conn, doc, message))
conn.on('close', () => {
/**
* @type {Set<number>}
*/
// @ts-ignore
const controlledIds = doc.conns.get(conn)
doc.conns.delete(conn)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
Y.awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(userID => {
const clock = (doc.awarenessClock.get(userID) || 0) + 1
doc.awareness.delete(userID)
doc.awarenessClock.delete(userID)
return { userID, state: null, clock }
awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(clientID => {
const clock = (doc.awarenessClock.get(clientID) || 0) + 1
doc.awareness.delete(clientID)
doc.awarenessClock.delete(clientID)
return { clientID, state: null, clock }
}))
const buf = encoding.toBuffer(encoder)
doc.conns.forEach((_, conn) => conn.send(buf))
@ -125,16 +164,19 @@ const setupConnection = (conn, req) => {
// send sync step 1
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
Y.syncProtocol.writeSyncStep1(encoder, doc)
syncProtocol.writeSyncStep1(encoder, doc.store)
conn.send(encoding.toBuffer(encoder))
if (doc.awareness.size > 0) {
const encoder = encoding.createEncoder()
/**
* @type {Array<Object>}
*/
const userStates = []
doc.awareness.forEach((state, userID) => {
userStates.push({ state, userID, clock: (doc.awarenessClock.get(userID) || 0) })
doc.awareness.forEach((state, clientID) => {
userStates.push({ state, clientID, clock: (doc.awarenessClock.get(clientID) || 0) })
})
encoding.writeVarUint(encoder, messageAwareness)
Y.awarenessProtocol.writeUsersStateChange(encoder, userStates)
awarenessProtocol.writeUsersStateChange(encoder, userStates)
conn.send(encoding.toBuffer(encoder))
}
}
@ -143,9 +185,13 @@ wss.on('connection', setupConnection)
server.on('upgrade', (request, socket, head) => {
// You may check auth of request here..
wss.handleUpgrade(request, socket, head, ws => {
/**
* @param {any} ws
*/
const handleAuth = ws => {
wss.emit('connection', ws, request)
})
}
wss.handleUpgrade(request, socket, head, handleAuth)
})
server.listen(port)

14678
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,21 +1,18 @@
{
"name": "y-websocket",
"version": "1.0.0",
"description": "Yjs Websocket Provider",
"main": "./dist/y-websocket.js",
"module": "./src/y-websocket.js",
"bin": {
"y-websocket": "./bin/server.js"
},
"dependencies": {},
"devDependencies": {
"funlib": "*",
"y-protocols": "*",
"rollup": "^1.1.2"
},
"version": "0.0.0",
"description": "Websockets provider for Yjs",
"main": "",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dist": "rollup -c"
"start": "node ./bin/server.js",
"dist": "rm -rf dist/* && rollup -c",
"test": "echo 'should lint here'",
"lint": "standard",
"preversion": "npm run test",
"version": "npm run dist"
},
"bin": {
"y-websocket-server": "./bin/server.js"
},
"repository": {
"type": "git",
@ -29,5 +26,23 @@
"bugs": {
"url": "https://github.com/y-js/y-websocket/issues"
},
"homepage": "https://github.com/y-js/y-websocket#readme"
"homepage": "https://github.com/y-js/y-websocket#readme",
"dependencies": {},
"devDependencies": {
"rollup": "^1.1.2",
"rollup-cli": "^1.0.9",
"standard": "^12.0.1",
"lib0": "file:/../lib0",
"y-leveldb": "file:../y-leveldb",
"y-protocols": "file:../y-protocols",
"yjs": "file:../yjs"
},
"peerDependenies": {
"lib0": "^0.0.1",
"yjs": "*"
},
"optionalDependencies": {
"ws": "^6.2.1",
"y-leveldb": "*"
}
}

View File

@ -1,15 +1,15 @@
export default {
input: './src/y-websocket.js',
external: id => /^(funlib|yjs|y-protocols)/.test(id),
external: id => /^(lib0|yjs|y-protocols)/.test(id),
output: [{
name: 'y-websocket',
file: 'dist/y-websocket.js',
format: 'cjs',
sourcemap: true,
paths: path => {
if (/^funlib\//.test(path)) {
return `lib0/dist${path.slice(6)}`
} else if (/^y\-protocols\//.test(path)) {
if (/^lib0\//.test(path)) {
return `lib0/dist${path.slice(4)}`
} else if (/^y-protocols\//.test(path)) {
return `y-protocols/dist${path.slice(11)}`
}
return path

View File

@ -54,6 +54,10 @@ const readMessage = (doc, buf) => {
return encoder
}
/**
* @param {WebsocketsSharedDocument} doc
* @param {string} url
*/
const setupWS = (doc, url) => {
const websocket = new WebSocket(url)
websocket.binaryType = 'arraybuffer'
@ -68,9 +72,12 @@ const setupWS = (doc, url) => {
doc.ws = null
doc.wsconnected = false
// update awareness (all users left)
/**
* @type {Array<number>}
*/
const removed = []
doc.getAwarenessInfo().forEach((_, userid) => {
removed.push(userid)
doc.getAwarenessInfo().forEach((_, clientID) => {
removed.push(clientID)
})
doc.awareness = new Map()
doc.emit('awareness', [{
@ -89,40 +96,61 @@ const setupWS = (doc, url) => {
// always send sync step 1 when connected
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, doc)
syncProtocol.writeSyncStep1(encoder, doc.store)
websocket.send(encoding.toBuffer(encoder))
// force send stored awareness info
doc.setAwarenessField(null, null)
}
}
const broadcastUpdate = (y, transaction) => {
if (transaction.encodedStructsLen > 0) {
/**
* @param {Y.Transaction} transaction
* @param {WebsocketsSharedDocument} y
*/
const broadcastUpdate = (transaction, y) => {
if (transaction.updateMessage !== null) {
y.mux(() => {
const updateMessage = transaction.updateMessage
if (updateMessage !== null) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
syncProtocol.writeUpdate(encoder, updateMessage)
const buf = encoding.toBuffer(encoder)
if (y.wsconnected) {
// @ts-ignore We know that wsconnected = true
y.ws.send(buf)
}
bc.publish(y.url, buf)
}
})
}
}
class WebsocketsSharedDocument extends Y.Y {
/**
* @param {string} url
* @param {Object} opts
*/
constructor (url, opts) {
super(opts)
this.url = url
this.wsconnected = false
this.mux = mutex.createMutex()
this.ws = null
/**
* @type {Object<string,Object>}
*/
this._localAwarenessState = {}
this.awareness = new Map()
this.awarenessClock = new Map()
this.url = url
this.wsconnected = false
this.mux = mutex.createMutex()
/**
* @type {WebSocket?}
*/
this.ws = null
setupWS(this, url)
this.on('afterTransaction', broadcastUpdate)
/**
* @param {ArrayBuffer} data
*/
this._bcSubscriber = data => {
const encoder = readMessage(this, data) // already muxed
this.mux(() => {
@ -136,7 +164,7 @@ class WebsocketsSharedDocument extends Y.Y {
this.mux(() => {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, this)
syncProtocol.writeSyncStep1(encoder, this.store)
bc.publish(url, encoding.toBuffer(encoder))
})
}
@ -146,17 +174,22 @@ class WebsocketsSharedDocument extends Y.Y {
getAwarenessInfo () {
return this.awareness
}
/**
* @param {string?} field
* @param {Object} value
*/
setAwarenessField (field, value) {
if (field !== null) {
this._localAwarenessState[field] = value
}
if (this.wsconnected) {
const clock = (this.awarenessClock.get(this.userID) || 0) + 1
this.awarenessClock.set(this.userID, clock)
const clock = (this.awarenessClock.get(this.clientID) || 0) + 1
this.awarenessClock.set(this.clientID, clock)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
awarenessProtocol.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState, clock }])
awarenessProtocol.writeUsersStateChange(encoder, [{ clientID: this.clientID, state: this._localAwarenessState, clock }])
const buf = encoding.toBuffer(encoder)
// @ts-ignore we know that wsconnected = true
this.ws.send(buf)
}
}
@ -173,6 +206,9 @@ class WebsocketsSharedDocument extends Y.Y {
* const ydocument = provider.get('my-document-name')
*/
export class WebsocketProvider {
/**
* @param {string} url
*/
constructor (url) {
// ensure that url is always ends with /
while (url[url.length - 1] === '/') {
@ -186,6 +222,7 @@ export class WebsocketProvider {
}
/**
* @param {string} name
* @param {Object} [opts]
* @return {WebsocketsSharedDocument}
*/
get (name, opts) {

View File

@ -21,7 +21,7 @@
/* Strict Type-Checking Options */
"strict": true, /* Enable all strict type-checking options. */
"noImplicitAny": false, /* Raise error on expressions and declarations with an implied 'any' type. */
"noImplicitAny": true, /* Raise error on expressions and declarations with an implied 'any' type. */
// "strictNullChecks": true, /* Enable strict null checks. */
// "strictFunctionTypes": true, /* Enable strict checking of function types. */
// "strictPropertyInitialization": true, /* Enable strict checking of property initialization in classes. */
@ -36,8 +36,11 @@
/* Module Resolution Options */
"moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */
// "baseUrl": "./", /* Base directory to resolve non-absolute module names. */
// "paths": {}, /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */
"baseUrl": "./", /* Base directory to resolve non-absolute module names. */
"paths": {
"yjs": ["node_modules/yjs/src/index.js"],
"lib0": ["node_modules/lib0"]
},
// "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */
// "typeRoots": [], /* List of folders to include type definitions from. */
// "types": [], /* Type declaration files to be included in compilation. */