server chatches send errors and checks if the connection is still alive

This commit is contained in:
Kevin Jahns 2019-04-28 17:36:20 +02:00
parent e819493fe1
commit d4e731fb4a
2 changed files with 89 additions and 30 deletions

View File

@ -7,6 +7,11 @@ const decoding = require('lib0/dist/decoding.js')
const mutex = require('lib0/dist/mutex.js') const mutex = require('lib0/dist/mutex.js')
const map = require('lib0/dist/map.js') const map = require('lib0/dist/map.js')
const wsReadyStateConnecting = 0
const wsReadyStateOpen = 1
const wsReadyStateClosing = 2 // eslint-disable-line
const wsReadyStateClosed = 3 // eslint-disable-line
// disable gc when using snapshots! // disable gc when using snapshots!
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0' const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0'
const persistenceDir = process.env.YPERSISTENCE const persistenceDir = process.env.YPERSISTENCE
@ -39,13 +44,17 @@ const afterTransaction = (transaction, doc) => {
encoding.writeVarUint(encoder, messageSync) encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, transaction.updateMessage) syncProtocol.writeUpdate(encoder, transaction.updateMessage)
const message = encoding.toBuffer(encoder) const message = encoding.toBuffer(encoder)
doc.conns.forEach((_, conn) => conn.send(message)) doc.conns.forEach((_, conn) => send(doc, conn, message))
} }
} }
class WSSharedDoc extends Y.Y { class WSSharedDoc extends Y.Y {
constructor () { /**
* @param {string} name
*/
constructor (name) {
super({ gc: gcEnabled }) super({ gc: gcEnabled })
this.name = name
this.mux = mutex.createMutex() this.mux = mutex.createMutex()
/** /**
* Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed * Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed
@ -78,7 +87,7 @@ const messageListener = (conn, doc, message) => {
encoding.writeVarUint(encoder, messageSync) encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, doc, null) syncProtocol.readSyncMessage(decoder, encoder, doc, null)
if (encoding.length(encoder) > 1) { if (encoding.length(encoder) > 1) {
conn.send(encoding.toBuffer(encoder)) send(doc, conn, encoding.toBuffer(encoder))
} }
break break
case messageAwareness: { case messageAwareness: {
@ -92,7 +101,7 @@ const messageListener = (conn, doc, message) => {
}) })
const buff = encoding.toBuffer(encoder) const buff = encoding.toBuffer(encoder)
doc.conns.forEach((_, c) => { doc.conns.forEach((_, c) => {
c.send(buff) send(doc, c, buff)
}) })
break break
} }
@ -100,26 +109,11 @@ const messageListener = (conn, doc, message) => {
} }
/** /**
* @param {WSSharedDoc} doc
* @param {any} conn * @param {any} conn
* @param {any} req
*/ */
exports.setupWSConnection = (conn, req) => { const closeConn = (doc, conn) => {
conn.binaryType = 'arraybuffer' if (doc.conns.has(conn)) {
// get doc, create if it does not exist yet
const docName = req.url.slice(1)
const doc = map.setIfUndefined(docs, docName, () => {
const doc = new WSSharedDoc()
if (persistence !== null) {
persistence.bindState(docName, doc)
}
docs.set(docName, doc)
return doc
})
doc.conns.set(conn, new Set())
// listen and reply to events
// @ts-ignore
conn.on('message', message => messageListener(conn, doc, message))
conn.on('close', () => {
/** /**
* @type {Set<number>} * @type {Set<number>}
*/ */
@ -135,20 +129,85 @@ exports.setupWSConnection = (conn, req) => {
return { clientID, state: null, clock } return { clientID, state: null, clock }
})) }))
const buf = encoding.toBuffer(encoder) const buf = encoding.toBuffer(encoder)
doc.conns.forEach((_, conn) => conn.send(buf)) doc.conns.forEach((_, conn) => {
send(doc, conn, buf)
})
if (doc.conns.size === 0 && persistence !== null) { if (doc.conns.size === 0 && persistence !== null) {
// if persisted, we store state and destroy ydocument // if persisted, we store state and destroy ydocument
persistence.writeState(docName, doc).then(() => { persistence.writeState(doc.name, doc).then(() => {
doc.destroy() doc.destroy()
}) })
docs.delete(docName) doc.conns.delete(doc.name)
} }
}
conn.close()
}
/**
* @param {WSSharedDoc} doc
* @param {any} conn
* @param {ArrayBuffer} m
*/
const send = (doc, conn, m) => {
if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) {
closeConn(doc, conn)
}
try {
conn.send(m, /** @param {any} err */ err => { err != null && closeConn(doc, conn) })
} catch (e) {
closeConn(doc, conn)
}
}
const pingTimeout = 30000
/**
* @param {any} conn
* @param {any} req
*/
exports.setupWSConnection = (conn, req) => {
conn.binaryType = 'arraybuffer'
// get doc, create if it does not exist yet
const docName = req.url.slice(1)
const doc = map.setIfUndefined(docs, docName, () => {
const doc = new WSSharedDoc(docName)
if (persistence !== null) {
persistence.bindState(docName, doc)
}
docs.set(docName, doc)
return doc
})
doc.conns.set(conn, new Set())
// listen and reply to events
conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, message))
conn.on('close', () => {
closeConn(doc, conn)
})
// Check if connection is still alive
let pongReceived = true
const pingInterval = setInterval(() => {
if (!pongReceived) {
if (doc.conns.has(conn)) {
closeConn(doc, conn)
}
clearInterval(pingInterval)
} else if (doc.conns.has(conn)) {
pongReceived = false
try {
conn.ping()
} catch (e) {
closeConn(doc, conn)
}
}
}, pingTimeout)
conn.on('pong', () => {
pongReceived = true
}) })
// send sync step 1 // send sync step 1
const encoder = encoding.createEncoder() const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync) encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, doc.store) syncProtocol.writeSyncStep1(encoder, doc.store)
conn.send(encoding.toBuffer(encoder)) send(doc, conn, encoding.toBuffer(encoder))
if (doc.awareness.size > 0) { if (doc.awareness.size > 0) {
const encoder = encoding.createEncoder() const encoder = encoding.createEncoder()
/** /**
@ -160,6 +219,6 @@ exports.setupWSConnection = (conn, req) => {
}) })
encoding.writeVarUint(encoder, messageAwareness) encoding.writeVarUint(encoder, messageAwareness)
awarenessProtocol.writeUsersStateChange(encoder, userStates) awarenessProtocol.writeUsersStateChange(encoder, userStates)
conn.send(encoding.toBuffer(encoder)) send(doc, conn, encoding.toBuffer(encoder))
} }
} }

View File

@ -34,17 +34,17 @@
}, },
"homepage": "https://github.com/y-js/y-websocket#readme", "homepage": "https://github.com/y-js/y-websocket#readme",
"dependencies": { "dependencies": {
"y-protocols": "0.0.2" "y-protocols": "0.0.3"
}, },
"devDependencies": { "devDependencies": {
"rollup": "^1.1.2", "rollup": "^1.1.2",
"rollup-cli": "^1.0.9", "rollup-cli": "^1.0.9",
"standard": "^12.0.1", "standard": "^12.0.1",
"yjs": "13.0.0-80" "yjs": "13.0.0-81"
}, },
"peerDependenies": { "peerDependenies": {
"lib0": "*", "lib0": "*",
"yjs": "13.0.0-80" "yjs": "13.0.0-81"
}, },
"optionalDependencies": { "optionalDependencies": {
"ws": "^6.2.1" "ws": "^6.2.1"