diff --git a/README.md b/README.md index 09ab730..5a0f057 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,9 @@ import { WebsocketProvider } from 'yjs/provider/websocket.js' const provider = new WebsocketProvider('http://localhost:1234') // open a websocket connection to http://localhost:1234/my-document-name -const sharedDocument = provider.get('my-document-name') +const Doc = provider.get('my-document-name') -sharedDocument.on('status', event => { +Doc.on('status', event => { console.log(event.status) // logs "connected" or "disconnected" }) ``` diff --git a/bin/utils.js b/bin/utils.js index 5f252f3..bb22e5c 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -35,20 +35,19 @@ const messageAwareness = 1 // const messageAuth = 2 /** - * @param {Y.Transaction} transaction + * @param {Uint8Array} update + * @param {any} origin * @param {WSSharedDoc} doc */ -const afterTransaction = (transaction, doc) => { - if (transaction.updateMessage !== null) { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeUpdate(encoder, transaction.updateMessage) - const message = encoding.toBuffer(encoder) - doc.conns.forEach((_, conn) => send(doc, conn, message)) - } +const updateHandler = (update, origin, doc) => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, update) + const message = encoding.toUint8Array(encoder) + doc.conns.forEach((_, conn) => send(doc, conn, message)) } -class WSSharedDoc extends Y.Y { +class WSSharedDoc extends Y.Doc { /** * @param {string} name */ @@ -69,14 +68,14 @@ class WSSharedDoc extends Y.Y { * @type {Map} */ this.awarenessClock = new Map() - this.on('afterTransaction', afterTransaction) + this.on('update', updateHandler) } } /** * @param {any} conn * @param {WSSharedDoc} doc - * @param {ArrayBuffer} message + * @param {Uint8Array} message */ const messageListener = (conn, doc, message) => { const encoder = encoding.createEncoder() @@ -87,7 +86,7 @@ const messageListener = (conn, doc, message) => { encoding.writeVarUint(encoder, messageSync) syncProtocol.readSyncMessage(decoder, encoder, doc, null) if (encoding.length(encoder) > 1) { - send(doc, conn, encoding.toBuffer(encoder)) + send(doc, conn, encoding.toUint8Array(encoder)) } break case messageAwareness: { @@ -99,7 +98,7 @@ const messageListener = (conn, doc, message) => { // @ts-ignore we received an update => so conn exists doc.conns.get(conn).add(update.clientID) }) - const buff = encoding.toBuffer(encoder) + const buff = encoding.toUint8Array(encoder) doc.conns.forEach((_, c) => { send(doc, c, buff) }) @@ -128,7 +127,7 @@ const closeConn = (doc, conn) => { doc.awarenessClock.delete(clientID) return { clientID, state: null, clock } })) - const buf = encoding.toBuffer(encoder) + const buf = encoding.toUint8Array(encoder) doc.conns.forEach((_, conn) => { send(doc, conn, buf) }) @@ -146,7 +145,7 @@ const closeConn = (doc, conn) => { /** * @param {WSSharedDoc} doc * @param {any} conn - * @param {ArrayBuffer} m + * @param {Uint8Array} m */ const send = (doc, conn, m) => { if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) { @@ -179,7 +178,7 @@ exports.setupWSConnection = (conn, req) => { }) doc.conns.set(conn, new Set()) // listen and reply to events - conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, message)) + conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, new Uint8Array(message))) conn.on('close', () => { closeConn(doc, conn) }) @@ -206,8 +205,8 @@ exports.setupWSConnection = (conn, req) => { // send sync step 1 const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, doc.store) - send(doc, conn, encoding.toBuffer(encoder)) + syncProtocol.writeSyncStep1(encoder, doc) + send(doc, conn, encoding.toUint8Array(encoder)) if (doc.awareness.size > 0) { const encoder = encoding.createEncoder() /** @@ -219,6 +218,6 @@ exports.setupWSConnection = (conn, req) => { }) encoding.writeVarUint(encoder, messageAwareness) awarenessProtocol.writeUsersStateChange(encoder, userStates) - send(doc, conn, encoding.toBuffer(encoder)) + send(doc, conn, encoding.toUint8Array(encoder)) } } diff --git a/package-lock.json b/package-lock.json index 06be84b..510eac4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -968,10 +968,9 @@ } }, "lib0": { - "version": "0.0.2", - "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.0.2.tgz", - "integrity": "sha512-7bJgV2emHGRO5kpj66Gdc9SQKVfhDBLx0UIS/aU5P8R0179nRFHKDTYGvLlNloWbeUUARlqk3ndFIO4JbUy7Sw==", - "dev": true + "version": "0.0.4", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.0.4.tgz", + "integrity": "sha512-osSGIxFM0mUuVAclVOQAio4lq0YYk1xFfj6J+1i3u5az8rXAQKDil2skA19aiiG0sfAdasOtr8Mk+9Mrw10cfQ==" }, "load-json-file": { "version": "2.0.0", @@ -1700,17 +1699,20 @@ "dev": true }, "y-protocols": { - "version": "0.0.3", - "resolved": "https://registry.npmjs.org/y-protocols/-/y-protocols-0.0.3.tgz", - "integrity": "sha512-b6yUR4KuRN9ehf2b0/YyaxxO2bxk7ti5DDTg5Jm/FEtPf0Vk6s4ez5wIhkV1nA7bloz/NDmVpoCDPFX1zZORhg==" + "version": "0.0.5", + "resolved": "https://registry.npmjs.org/y-protocols/-/y-protocols-0.0.5.tgz", + "integrity": "sha512-4ZKcDxM2A83ZeGi7WqqK+mS0QRFP0L7Xd8JLxjX+fZeWiUIfYin16f8/0liMaHvGOHgfpomQrQsZm4HCkObgmQ==", + "requires": { + "lib0": "0.0.4" + } }, "yjs": { - "version": "13.0.0-81", - "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-81.tgz", - "integrity": "sha512-wGTCXHsBIIGb2eswyl9HNY2TeoFy8jdqPGbGHsu9Ooo9+/LbTUz10XlEKp5/JHQ10fToMSEc1rf5wBu4g+TVNQ==", + "version": "13.0.0-82", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.0.0-82.tgz", + "integrity": "sha512-pU3siEW0j+pKynWtnT7wKYn6797rdM/FnxsgdjFKJhUzdmeN1vYYZ+hTtPVWg/v8cjIVWCcWebMJZ1IhqFm5yQ==", "dev": true, "requires": { - "lib0": "0.0.2" + "lib0": "0.0.4" } } } diff --git a/package.json b/package.json index 8c4c13d..daeb6ed 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "sideEffects": false, "scripts": { "start": "node ./bin/server.js", - "dist": "rm -rf dist/* && rollup -c", + "dist": "rm -rf dist && rollup -c", "test": "echo 'should lint here'", "lint": "standard", "preversion": "npm run test", @@ -18,7 +18,8 @@ }, "files": [ "dist/*", - "bin/*" + "bin/*", + "src/*" ], "repository": { "type": "git", @@ -34,17 +35,17 @@ }, "homepage": "https://github.com/y-js/y-websocket#readme", "dependencies": { - "y-protocols": "0.0.3" + "y-protocols": "0.0.5", + "lib0": "0.0.4" }, "devDependencies": { "rollup": "^1.1.2", "rollup-cli": "^1.0.9", "standard": "^12.0.1", - "yjs": "13.0.0-81" + "yjs": "13.0.0-82" }, "peerDependenies": { - "lib0": "*", - "yjs": "13.0.0-81" + "yjs": "13.0.0-82" }, "optionalDependencies": { "ws": "^6.2.1" diff --git a/src/y-websocket.js b/src/y-websocket.js index 2173615..31f2311 100644 --- a/src/y-websocket.js +++ b/src/y-websocket.js @@ -24,14 +24,14 @@ const messageAuth = 2 const reconnectTimeout = 3000 /** - * @param {WebsocketsSharedDocument} doc + * @param {WebsocketsDoc} doc * @param {string} reason */ const permissionDeniedHandler = (doc, reason) => console.warn(`Permission denied to access ${doc.url}.\n${reason}`) /** - * @param {WebsocketsSharedDocument} doc - * @param {ArrayBuffer} buf + * @param {WebsocketsDoc} doc + * @param {Uint8Array} buf * @return {encoding.Encoder} */ const readMessage = (doc, buf) => { @@ -55,7 +55,7 @@ const readMessage = (doc, buf) => { } /** - * @param {WebsocketsSharedDocument} doc + * @param {WebsocketsDoc} doc * @param {string} url */ const setupWS = (doc, url) => { @@ -63,9 +63,9 @@ const setupWS = (doc, url) => { websocket.binaryType = 'arraybuffer' doc.ws = websocket websocket.onmessage = event => { - const encoder = readMessage(doc, event.data) + const encoder = readMessage(doc, new Uint8Array(event.data)) if (encoding.length(encoder) > 1) { - websocket.send(encoding.toBuffer(encoder)) + websocket.send(encoding.toUint8Array(encoder)) } } websocket.onclose = () => { @@ -98,35 +98,33 @@ const setupWS = (doc, url) => { // always send sync step 1 when connected const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, doc.store) - websocket.send(encoding.toBuffer(encoder)) + syncProtocol.writeSyncStep1(encoder, doc) + websocket.send(encoding.toUint8Array(encoder)) // force send stored awareness info doc.setAwarenessField(null, null) } } /** - * @param {Y.Transaction} transaction - * @param {WebsocketsSharedDocument} y + * @param {Uint8Array} update + * @param {any} origin + * @param {WebsocketsDoc} doc */ -const broadcastUpdate = (transaction, y) => { - if (transaction.origin !== y.ws && transaction.updateMessage !== null) { - const updateMessage = transaction.updateMessage - if (updateMessage !== null) { - const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeUpdate(encoder, updateMessage) - const buf = encoding.toBuffer(encoder) - if (y.wsconnected) { - // @ts-ignore We know that wsconnected = true - y.ws.send(buf) - } - bc.publish(y.url, buf) +const updateHandler = (update, origin, doc) => { + if (origin !== doc.ws) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, update) + const buf = encoding.toUint8Array(encoder) + if (doc.wsconnected) { + // @ts-ignore We know that wsconnected = true + doc.ws.send(buf) } + bc.publish(doc.url, buf) } } -class WebsocketsSharedDocument extends Y.Y { +class WebsocketsDoc extends Y.Doc { /** * @param {string} url * @param {Object} opts @@ -151,10 +149,10 @@ class WebsocketsSharedDocument extends Y.Y { * @param {ArrayBuffer} data */ this._bcSubscriber = data => { - const encoder = readMessage(this, data) // already muxed + const encoder = readMessage(this, new Uint8Array(data)) this.mux(() => { if (encoding.length(encoder) > 1) { - bc.publish(url, encoding.toBuffer(encoder)) + bc.publish(url, encoding.toUint8Array(encoder)) } }) } @@ -165,7 +163,7 @@ class WebsocketsSharedDocument extends Y.Y { if (this.ws !== null) { this.ws.close() bc.unsubscribe(this.url, this._bcSubscriber) - this.off('afterTransaction', broadcastUpdate) + this.off('update', updateHandler) } } connect () { @@ -177,10 +175,10 @@ class WebsocketsSharedDocument extends Y.Y { this.mux(() => { const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - syncProtocol.writeSyncStep1(encoder, this.store) - bc.publish(this.url, encoding.toBuffer(encoder)) + syncProtocol.writeSyncStep1(encoder, this) + bc.publish(this.url, encoding.toUint8Array(encoder)) }) - this.on('afterTransaction', broadcastUpdate) + this.on('update', updateHandler) } } getLocalAwarenessInfo () { @@ -203,7 +201,7 @@ class WebsocketsSharedDocument extends Y.Y { const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageAwareness) awarenessProtocol.writeUsersStateChange(encoder, [{ clientID: this.clientID, state: this._localAwarenessState, clock }]) - const buf = encoding.toBuffer(encoder) + const buf = encoding.toUint8Array(encoder) // @ts-ignore we know that wsconnected = true this.ws.send(buf) } @@ -231,19 +229,19 @@ export class WebsocketProvider { } this.url = url + '/' /** - * @type {Map} + * @type {Map} */ this.docs = new Map() } /** * @param {string} name * @param {Object} [opts] - * @return {WebsocketsSharedDocument} + * @return {WebsocketsDoc} */ get (name, opts) { let doc = this.docs.get(name) if (doc === undefined) { - doc = new WebsocketsSharedDocument(this.url + name, opts) + doc = new WebsocketsDoc(this.url + name, opts) } return doc }