implement sync event

This commit is contained in:
Kevin Jahns 2019-09-18 17:15:44 +02:00
parent 38e74a764f
commit f4a61ef474

View File

@ -39,16 +39,20 @@ const permissionDeniedHandler = (provider, reason) => console.warn(`Permission d
/** /**
* @param {WebsocketProvider} provider * @param {WebsocketProvider} provider
* @param {Uint8Array} buf * @param {Uint8Array} buf
* @param {boolean} emitSynced
* @return {encoding.Encoder} * @return {encoding.Encoder}
*/ */
const readMessage = (provider, buf) => { const readMessage = (provider, buf, emitSynced) => {
const decoder = decoding.createDecoder(buf) const decoder = decoding.createDecoder(buf)
const encoder = encoding.createEncoder() const encoder = encoding.createEncoder()
const messageType = decoding.readVarUint(decoder) const messageType = decoding.readVarUint(decoder)
switch (messageType) { switch (messageType) {
case messageSync: case messageSync:
encoding.writeVarUint(encoder, messageSync) encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider) const messageType = syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider)
if (emitSynced && messageType === syncProtocol.messageYjsSyncStep2 && !provider.synced) {
provider.synced = true
}
break break
case messageQueryAwareness: case messageQueryAwareness:
encoding.writeVarUint(encoder, messageAwareness) encoding.writeVarUint(encoder, messageAwareness)
@ -77,9 +81,10 @@ const setupWS = provider => {
provider.ws = websocket provider.ws = websocket
provider.wsconnecting = true provider.wsconnecting = true
provider.wsconnected = false provider.wsconnected = false
provider.synced = false
websocket.onmessage = event => { websocket.onmessage = event => {
provider.wsLastMessageReceived = time.getUnixTime() provider.wsLastMessageReceived = time.getUnixTime()
const encoder = readMessage(provider, new Uint8Array(event.data)) const encoder = readMessage(provider, new Uint8Array(event.data), true)
if (encoding.length(encoder) > 1) { if (encoding.length(encoder) > 1) {
websocket.send(encoding.toUint8Array(encoder)) websocket.send(encoding.toUint8Array(encoder))
} }
@ -89,6 +94,7 @@ const setupWS = provider => {
provider.wsconnecting = false provider.wsconnecting = false
if (provider.wsconnected) { if (provider.wsconnected) {
provider.wsconnected = false provider.wsconnected = false
provider.synced = false
// update awareness (all users left) // update awareness (all users left)
awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider) awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider)
provider.emit('status', [{ provider.emit('status', [{
@ -183,6 +189,10 @@ export class WebsocketProvider extends Observable {
this.bcconnected = false this.bcconnected = false
this.wsUnsuccessfulReconnects = 0 this.wsUnsuccessfulReconnects = 0
this.mux = mutex.createMutex() this.mux = mutex.createMutex()
/**
* @type {boolean}
*/
this._synced = false
/** /**
* @type {WebSocket?} * @type {WebSocket?}
*/ */
@ -198,7 +208,7 @@ export class WebsocketProvider extends Observable {
*/ */
this._bcSubscriber = data => { this._bcSubscriber = data => {
this.mux(() => { this.mux(() => {
const encoder = readMessage(this, new Uint8Array(data)) const encoder = readMessage(this, new Uint8Array(data), false)
if (encoding.length(encoder) > 1) { if (encoding.length(encoder) > 1) {
bc.publish(this.url, encoding.toUint8Array(encoder)) bc.publish(this.url, encoding.toUint8Array(encoder))
} }
@ -244,6 +254,18 @@ export class WebsocketProvider extends Observable {
}) })
this.connect() this.connect()
} }
/**
* @type {boolean}
*/
get synced () {
return this._synced
}
set synced (state) {
if (this._synced !== state) {
this._synced = state
this.emit('sync', [state])
}
}
destroy () { destroy () {
clearInterval(this._checkInterval) clearInterval(this._checkInterval)
this.disconnect() this.disconnect()