From f4a61ef474081806d97ae0388bb8fba56b08c756 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Wed, 18 Sep 2019 17:15:44 +0200 Subject: [PATCH] implement sync event --- src/y-websocket.js | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/y-websocket.js b/src/y-websocket.js index 9ca8a52..83c5e5d 100644 --- a/src/y-websocket.js +++ b/src/y-websocket.js @@ -39,16 +39,20 @@ const permissionDeniedHandler = (provider, reason) => console.warn(`Permission d /** * @param {WebsocketProvider} provider * @param {Uint8Array} buf + * @param {boolean} emitSynced * @return {encoding.Encoder} */ -const readMessage = (provider, buf) => { +const readMessage = (provider, buf, emitSynced) => { const decoder = decoding.createDecoder(buf) const encoder = encoding.createEncoder() const messageType = decoding.readVarUint(decoder) switch (messageType) { case messageSync: encoding.writeVarUint(encoder, messageSync) - syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider) + const messageType = syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider) + if (emitSynced && messageType === syncProtocol.messageYjsSyncStep2 && !provider.synced) { + provider.synced = true + } break case messageQueryAwareness: encoding.writeVarUint(encoder, messageAwareness) @@ -77,9 +81,10 @@ const setupWS = provider => { provider.ws = websocket provider.wsconnecting = true provider.wsconnected = false + provider.synced = false websocket.onmessage = event => { provider.wsLastMessageReceived = time.getUnixTime() - const encoder = readMessage(provider, new Uint8Array(event.data)) + const encoder = readMessage(provider, new Uint8Array(event.data), true) if (encoding.length(encoder) > 1) { websocket.send(encoding.toUint8Array(encoder)) } @@ -89,6 +94,7 @@ const setupWS = provider => { provider.wsconnecting = false if (provider.wsconnected) { provider.wsconnected = false + provider.synced = false // update awareness (all users left) awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider) provider.emit('status', [{ @@ -183,6 +189,10 @@ export class WebsocketProvider extends Observable { this.bcconnected = false this.wsUnsuccessfulReconnects = 0 this.mux = mutex.createMutex() + /** + * @type {boolean} + */ + this._synced = false /** * @type {WebSocket?} */ @@ -198,7 +208,7 @@ export class WebsocketProvider extends Observable { */ this._bcSubscriber = data => { this.mux(() => { - const encoder = readMessage(this, new Uint8Array(data)) + const encoder = readMessage(this, new Uint8Array(data), false) if (encoding.length(encoder) > 1) { bc.publish(this.url, encoding.toUint8Array(encoder)) } @@ -244,6 +254,18 @@ export class WebsocketProvider extends Observable { }) this.connect() } + /** + * @type {boolean} + */ + get synced () { + return this._synced + } + set synced (state) { + if (this._synced !== state) { + this._synced = state + this.emit('sync', [state]) + } + } destroy () { clearInterval(this._checkInterval) this.disconnect()