From d4e731fb4a411efa91a6c0f81404013decb14395 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Sun, 28 Apr 2019 17:36:20 +0200 Subject: [PATCH] server chatches send errors and checks if the connection is still alive --- bin/utils.js | 113 +++++++++++++++++++++++++++++++++++++++------------ package.json | 6 +-- 2 files changed, 89 insertions(+), 30 deletions(-) diff --git a/bin/utils.js b/bin/utils.js index 83e6eb5..5f252f3 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -7,6 +7,11 @@ const decoding = require('lib0/dist/decoding.js') const mutex = require('lib0/dist/mutex.js') const map = require('lib0/dist/map.js') +const wsReadyStateConnecting = 0 +const wsReadyStateOpen = 1 +const wsReadyStateClosing = 2 // eslint-disable-line +const wsReadyStateClosed = 3 // eslint-disable-line + // disable gc when using snapshots! const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0' const persistenceDir = process.env.YPERSISTENCE @@ -39,13 +44,17 @@ const afterTransaction = (transaction, doc) => { encoding.writeVarUint(encoder, messageSync) syncProtocol.writeUpdate(encoder, transaction.updateMessage) const message = encoding.toBuffer(encoder) - doc.conns.forEach((_, conn) => conn.send(message)) + doc.conns.forEach((_, conn) => send(doc, conn, message)) } } class WSSharedDoc extends Y.Y { - constructor () { + /** + * @param {string} name + */ + constructor (name) { super({ gc: gcEnabled }) + this.name = name this.mux = mutex.createMutex() /** * Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed @@ -78,7 +87,7 @@ const messageListener = (conn, doc, message) => { encoding.writeVarUint(encoder, messageSync) syncProtocol.readSyncMessage(decoder, encoder, doc, null) if (encoding.length(encoder) > 1) { - conn.send(encoding.toBuffer(encoder)) + send(doc, conn, encoding.toBuffer(encoder)) } break case messageAwareness: { @@ -92,7 +101,7 @@ const messageListener = (conn, doc, message) => { }) const buff = encoding.toBuffer(encoder) doc.conns.forEach((_, c) => { - c.send(buff) + send(doc, c, buff) }) break } @@ -100,26 +109,11 @@ const messageListener = (conn, doc, message) => { } /** + * @param {WSSharedDoc} doc * @param {any} conn - * @param {any} req */ -exports.setupWSConnection = (conn, req) => { - conn.binaryType = 'arraybuffer' - // get doc, create if it does not exist yet - const docName = req.url.slice(1) - const doc = map.setIfUndefined(docs, docName, () => { - const doc = new WSSharedDoc() - if (persistence !== null) { - persistence.bindState(docName, doc) - } - docs.set(docName, doc) - return doc - }) - doc.conns.set(conn, new Set()) - // listen and reply to events - // @ts-ignore - conn.on('message', message => messageListener(conn, doc, message)) - conn.on('close', () => { +const closeConn = (doc, conn) => { + if (doc.conns.has(conn)) { /** * @type {Set} */ @@ -135,20 +129,85 @@ exports.setupWSConnection = (conn, req) => { return { clientID, state: null, clock } })) const buf = encoding.toBuffer(encoder) - doc.conns.forEach((_, conn) => conn.send(buf)) + doc.conns.forEach((_, conn) => { + send(doc, conn, buf) + }) if (doc.conns.size === 0 && persistence !== null) { // if persisted, we store state and destroy ydocument - persistence.writeState(docName, doc).then(() => { + persistence.writeState(doc.name, doc).then(() => { doc.destroy() }) - docs.delete(docName) + doc.conns.delete(doc.name) } + } + conn.close() +} + +/** + * @param {WSSharedDoc} doc + * @param {any} conn + * @param {ArrayBuffer} m + */ +const send = (doc, conn, m) => { + if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) { + closeConn(doc, conn) + } + try { + conn.send(m, /** @param {any} err */ err => { err != null && closeConn(doc, conn) }) + } catch (e) { + closeConn(doc, conn) + } +} + +const pingTimeout = 30000 + +/** + * @param {any} conn + * @param {any} req + */ +exports.setupWSConnection = (conn, req) => { + conn.binaryType = 'arraybuffer' + // get doc, create if it does not exist yet + const docName = req.url.slice(1) + const doc = map.setIfUndefined(docs, docName, () => { + const doc = new WSSharedDoc(docName) + if (persistence !== null) { + persistence.bindState(docName, doc) + } + docs.set(docName, doc) + return doc + }) + doc.conns.set(conn, new Set()) + // listen and reply to events + conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, message)) + conn.on('close', () => { + closeConn(doc, conn) + }) + // Check if connection is still alive + let pongReceived = true + const pingInterval = setInterval(() => { + if (!pongReceived) { + if (doc.conns.has(conn)) { + closeConn(doc, conn) + } + clearInterval(pingInterval) + } else if (doc.conns.has(conn)) { + pongReceived = false + try { + conn.ping() + } catch (e) { + closeConn(doc, conn) + } + } + }, pingTimeout) + conn.on('pong', () => { + pongReceived = true }) // send sync step 1 const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) syncProtocol.writeSyncStep1(encoder, doc.store) - conn.send(encoding.toBuffer(encoder)) + send(doc, conn, encoding.toBuffer(encoder)) if (doc.awareness.size > 0) { const encoder = encoding.createEncoder() /** @@ -160,6 +219,6 @@ exports.setupWSConnection = (conn, req) => { }) encoding.writeVarUint(encoder, messageAwareness) awarenessProtocol.writeUsersStateChange(encoder, userStates) - conn.send(encoding.toBuffer(encoder)) + send(doc, conn, encoding.toBuffer(encoder)) } } diff --git a/package.json b/package.json index 710a35b..d936a55 100644 --- a/package.json +++ b/package.json @@ -34,17 +34,17 @@ }, "homepage": "https://github.com/y-js/y-websocket#readme", "dependencies": { - "y-protocols": "0.0.2" + "y-protocols": "0.0.3" }, "devDependencies": { "rollup": "^1.1.2", "rollup-cli": "^1.0.9", "standard": "^12.0.1", - "yjs": "13.0.0-80" + "yjs": "13.0.0-81" }, "peerDependenies": { "lib0": "*", - "yjs": "13.0.0-80" + "yjs": "13.0.0-81" }, "optionalDependencies": { "ws": "^6.2.1"