reconnect when no message received in a long time
This commit is contained in:
@@ -10,6 +10,7 @@ Unlike stated in the LICENSE file, it is not necessary to include the copyright
|
||||
|
||||
import * as Y from 'yjs' // eslint-disable-line
|
||||
import * as bc from 'lib0/broadcastchannel.js'
|
||||
import * as time from 'lib0/time.js'
|
||||
import * as encoding from 'lib0/encoding.js'
|
||||
import * as decoding from 'lib0/decoding.js'
|
||||
import * as syncProtocol from 'y-protocols/sync.js'
|
||||
@@ -17,13 +18,17 @@ import * as authProtocol from 'y-protocols/auth.js'
|
||||
import * as awarenessProtocol from 'y-protocols/awareness.js'
|
||||
import * as mutex from 'lib0/mutex.js'
|
||||
import { Observable } from 'lib0/observable.js'
|
||||
import * as math from 'lib0/math.js'
|
||||
|
||||
const messageSync = 0
|
||||
const messageQueryAwareness = 3
|
||||
const messageAwareness = 1
|
||||
const messageAuth = 2
|
||||
|
||||
const reconnectTimeout = 3000
|
||||
const reconnectTimeoutBase = 1200
|
||||
const maxReconnectTimeout = 2500
|
||||
// @todo - this should depend on awareness.outdatedTime
|
||||
const messageReconnectTimeout = 30000
|
||||
|
||||
/**
|
||||
* @param {WebsocketProvider} provider
|
||||
@@ -73,6 +78,7 @@ const setupWS = provider => {
|
||||
provider.wsconnecting = true
|
||||
provider.wsconnected = false
|
||||
websocket.onmessage = event => {
|
||||
provider.wsLastMessageReceived = time.getUnixTime()
|
||||
const encoder = readMessage(provider, new Uint8Array(event.data))
|
||||
if (encoding.length(encoder) > 1) {
|
||||
websocket.send(encoding.toUint8Array(encoder))
|
||||
@@ -81,19 +87,27 @@ const setupWS = provider => {
|
||||
websocket.onclose = () => {
|
||||
provider.ws = null
|
||||
provider.wsconnecting = false
|
||||
provider.wsconnected = false
|
||||
if (provider.wsconnected) {
|
||||
provider.wsconnected = false
|
||||
// update awareness (all users left)
|
||||
awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()), provider)
|
||||
provider.emit('status', [{
|
||||
status: 'disconnected'
|
||||
}])
|
||||
} else {
|
||||
provider.wsUnsuccessfulReconnects++
|
||||
}
|
||||
setTimeout(setupWS, reconnectTimeout, provider)
|
||||
// Start with no reconnect timeout and increase timeout by
|
||||
// log10(wsUnsuccessfulReconnects).
|
||||
// The idea is to increase reconnect timeout slowly and have no reconnect
|
||||
// timeout at the beginning (log(1) = 0)
|
||||
setTimeout(setupWS, math.min(Math.log10(provider.wsUnsuccessfulReconnects + 1) * reconnectTimeoutBase, maxReconnectTimeout), provider)
|
||||
}
|
||||
websocket.onopen = () => {
|
||||
provider.wsLastMessageReceived = time.getUnixTime()
|
||||
provider.wsconnecting = false
|
||||
provider.wsconnected = true
|
||||
provider.wsUnsuccessfulReconnects = 0
|
||||
provider.emit('status', [{
|
||||
status: 'connected'
|
||||
}])
|
||||
@@ -167,11 +181,13 @@ export class WebsocketProvider extends Observable {
|
||||
this.wsconnected = false
|
||||
this.wsconnecting = false
|
||||
this.bcconnected = false
|
||||
this.wsUnsuccessfulReconnects = 0
|
||||
this.mux = mutex.createMutex()
|
||||
/**
|
||||
* @type {WebSocket?}
|
||||
*/
|
||||
this.ws = null
|
||||
this.wsLastMessageReceived = 0
|
||||
/**
|
||||
* Whether to connect to other peers or not
|
||||
* @type {boolean}
|
||||
@@ -219,9 +235,17 @@ export class WebsocketProvider extends Observable {
|
||||
broadcastMessage(this, encoding.toUint8Array(encoder))
|
||||
})
|
||||
awareness.on('change', this._awarenessUpdateHandler)
|
||||
this._checkInterval = setInterval(() => {
|
||||
if (this.wsconnected && messageReconnectTimeout < time.getUnixTime() - this.wsLastMessageReceived) {
|
||||
// no message received in a long time - not even your own awareness
|
||||
// updates (which are updated every 15 seconds)
|
||||
/** @type {WebSocket} */ (this.ws).close()
|
||||
}
|
||||
})
|
||||
this.connect()
|
||||
}
|
||||
destroy () {
|
||||
clearInterval(this._checkInterval)
|
||||
this.disconnect()
|
||||
this.awareness.off('change', this._awarenessUpdateHandler)
|
||||
super.destroy()
|
||||
|
||||
Reference in New Issue
Block a user