This commit is contained in:
Kevin Jahns 2022-08-31 22:20:43 +02:00
parent 8c04e0a0c5
commit 8e85c7fb16

View File

@ -1,7 +1,3 @@
/*
Unlike stated in the LICENSE file, it is not necessary to include the copyright notice and permission notice when you copy code from this file.
*/
/** /**
* @module provider/websocket * @module provider/websocket
*/ */
@ -31,25 +27,71 @@ export const messageAuth = 2
*/ */
const messageHandlers = [] const messageHandlers = []
messageHandlers[messageSync] = (encoder, decoder, provider, emitSynced, messageType) => { messageHandlers[messageSync] = (
encoder,
decoder,
provider,
emitSynced,
_messageType
) => {
encoding.writeVarUint(encoder, messageSync) encoding.writeVarUint(encoder, messageSync)
const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider) const syncMessageType = syncProtocol.readSyncMessage(
if (emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 && !provider.synced) { decoder,
encoder,
provider.doc,
provider
)
if (
emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 &&
!provider.synced
) {
provider.synced = true provider.synced = true
} }
} }
messageHandlers[messageQueryAwareness] = (encoder, decoder, provider, emitSynced, messageType) => { messageHandlers[messageQueryAwareness] = (
encoder,
_decoder,
provider,
_emitSynced,
_messageType
) => {
encoding.writeVarUint(encoder, messageAwareness) encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys()))) encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
provider.awareness,
Array.from(provider.awareness.getStates().keys())
)
)
} }
messageHandlers[messageAwareness] = (encoder, decoder, provider, emitSynced, messageType) => { messageHandlers[messageAwareness] = (
awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider) _encoder,
decoder,
provider,
_emitSynced,
_messageType
) => {
awarenessProtocol.applyAwarenessUpdate(
provider.awareness,
decoding.readVarUint8Array(decoder),
provider
)
} }
messageHandlers[messageAuth] = (encoder, decoder, provider, emitSynced, messageType) => { messageHandlers[messageAuth] = (
authProtocol.readAuthMessage(decoder, provider.doc, (_ydoc, reason) => permissionDeniedHandler(provider, reason)) _encoder,
decoder,
provider,
_emitSynced,
_messageType
) => {
authProtocol.readAuthMessage(
decoder,
provider.doc,
(_ydoc, reason) => permissionDeniedHandler(provider, reason)
)
} }
// @todo - this should depend on awareness.outdatedTime // @todo - this should depend on awareness.outdatedTime
@ -59,7 +101,8 @@ const messageReconnectTimeout = 30000
* @param {WebsocketProvider} provider * @param {WebsocketProvider} provider
* @param {string} reason * @param {string} reason
*/ */
const permissionDeniedHandler = (provider, reason) => console.warn(`Permission denied to access ${provider.url}.\n${reason}`) const permissionDeniedHandler = (provider, reason) =>
console.warn(`Permission denied to access ${provider.url}.\n${reason}`)
/** /**
* @param {WebsocketProvider} provider * @param {WebsocketProvider} provider
@ -83,7 +126,7 @@ const readMessage = (provider, buf, emitSynced) => {
/** /**
* @param {WebsocketProvider} provider * @param {WebsocketProvider} provider
*/ */
const setupWS = provider => { const setupWS = (provider) => {
if (provider.shouldConnect && provider.ws === null) { if (provider.shouldConnect && provider.ws === null) {
const websocket = new provider._WS(provider.url) const websocket = new provider._WS(provider.url)
websocket.binaryType = 'arraybuffer' websocket.binaryType = 'arraybuffer'
@ -92,17 +135,17 @@ const setupWS = provider => {
provider.wsconnected = false provider.wsconnected = false
provider.synced = false provider.synced = false
websocket.onmessage = event => { websocket.onmessage = (event) => {
provider.wsLastMessageReceived = time.getUnixTime() provider.wsLastMessageReceived = time.getUnixTime()
const encoder = readMessage(provider, new Uint8Array(event.data), true) const encoder = readMessage(provider, new Uint8Array(event.data), true)
if (encoding.length(encoder) > 1) { if (encoding.length(encoder) > 1) {
websocket.send(encoding.toUint8Array(encoder)) websocket.send(encoding.toUint8Array(encoder))
} }
} }
websocket.onerror = event => { websocket.onerror = (event) => {
provider.emit('connection-error', [event, provider]) provider.emit('connection-error', [event, provider])
} }
websocket.onclose = event => { websocket.onclose = (event) => {
provider.emit('connection-close', [event, provider]) provider.emit('connection-close', [event, provider])
provider.ws = null provider.ws = null
provider.wsconnecting = false provider.wsconnecting = false
@ -110,7 +153,13 @@ const setupWS = provider => {
provider.wsconnected = false provider.wsconnected = false
provider.synced = false provider.synced = false
// update awareness (all users except local left) // update awareness (all users except local left)
awarenessProtocol.removeAwarenessStates(provider.awareness, Array.from(provider.awareness.getStates().keys()).filter(client => client !== provider.doc.clientID), provider) awarenessProtocol.removeAwarenessStates(
provider.awareness,
Array.from(provider.awareness.getStates().keys()).filter((client) =>
client !== provider.doc.clientID
),
provider
)
provider.emit('status', [{ provider.emit('status', [{
status: 'disconnected' status: 'disconnected'
}]) }])
@ -119,7 +168,14 @@ const setupWS = provider => {
} }
// Start with no reconnect timeout and increase timeout by // Start with no reconnect timeout and increase timeout by
// using exponential backoff starting with 100ms // using exponential backoff starting with 100ms
setTimeout(setupWS, math.min(math.pow(2, provider.wsUnsuccessfulReconnects) * 100, provider.maxBackoffTime), provider) setTimeout(
setupWS,
math.min(
math.pow(2, provider.wsUnsuccessfulReconnects) * 100,
provider.maxBackoffTime
),
provider
)
} }
websocket.onopen = () => { websocket.onopen = () => {
provider.wsLastMessageReceived = time.getUnixTime() provider.wsLastMessageReceived = time.getUnixTime()
@ -138,7 +194,12 @@ const setupWS = provider => {
if (provider.awareness.getLocalState() !== null) { if (provider.awareness.getLocalState() !== null) {
const encoderAwarenessState = encoding.createEncoder() const encoderAwarenessState = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessState, messageAwareness) encoding.writeVarUint(encoderAwarenessState, messageAwareness)
encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [provider.doc.clientID])) encoding.writeVarUint8Array(
encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [
provider.doc.clientID
])
)
websocket.send(encoding.toUint8Array(encoderAwarenessState)) websocket.send(encoding.toUint8Array(encoderAwarenessState))
} }
} }
@ -206,7 +267,8 @@ export class WebsocketProvider extends Observable {
const encodedParams = url.encodeQueryParams(params) const encodedParams = url.encodeQueryParams(params)
this.maxBackoffTime = maxBackoffTime this.maxBackoffTime = maxBackoffTime
this.bcChannel = serverUrl + '/' + roomname this.bcChannel = serverUrl + '/' + roomname
this.url = serverUrl + '/' + roomname + (encodedParams.length === 0 ? '' : '?' + encodedParams) this.url = serverUrl + '/' + roomname +
(encodedParams.length === 0 ? '' : '?' + encodedParams)
this.roomname = roomname this.roomname = roomname
this.doc = doc this.doc = doc
this._WS = WebSocketPolyfill this._WS = WebSocketPolyfill
@ -276,17 +338,24 @@ export class WebsocketProvider extends Observable {
this.doc.on('update', this._updateHandler) this.doc.on('update', this._updateHandler)
/** /**
* @param {any} changed * @param {any} changed
* @param {any} origin * @param {any} _origin
*/ */
this._awarenessUpdateHandler = ({ added, updated, removed }, origin) => { this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => {
const changedClients = added.concat(updated).concat(removed) const changedClients = added.concat(updated).concat(removed)
const encoder = encoding.createEncoder() const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness) encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients)) encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients)
)
broadcastMessage(this, encoding.toUint8Array(encoder)) broadcastMessage(this, encoding.toUint8Array(encoder))
} }
this._beforeUnloadHandler = () => { this._beforeUnloadHandler = () => {
awarenessProtocol.removeAwarenessStates(this.awareness, [doc.clientID], 'window unload') awarenessProtocol.removeAwarenessStates(
this.awareness,
[doc.clientID],
'window unload'
)
} }
if (typeof window !== 'undefined') { if (typeof window !== 'undefined') {
window.addEventListener('beforeunload', this._beforeUnloadHandler) window.addEventListener('beforeunload', this._beforeUnloadHandler)
@ -295,7 +364,11 @@ export class WebsocketProvider extends Observable {
} }
awareness.on('update', this._awarenessUpdateHandler) awareness.on('update', this._awarenessUpdateHandler)
this._checkInterval = /** @type {any} */ (setInterval(() => { this._checkInterval = /** @type {any} */ (setInterval(() => {
if (this.wsconnected && messageReconnectTimeout < time.getUnixTime() - this.wsLastMessageReceived) { if (
this.wsconnected &&
messageReconnectTimeout <
time.getUnixTime() - this.wsLastMessageReceived
) {
// no message received in a long time - not even your own awareness // no message received in a long time - not even your own awareness
// updates (which are updated every 15 seconds) // updates (which are updated every 15 seconds)
/** @type {WebSocket} */ (this.ws).close() /** @type {WebSocket} */ (this.ws).close()
@ -359,19 +432,37 @@ export class WebsocketProvider extends Observable {
// write queryAwareness // write queryAwareness
const encoderAwarenessQuery = encoding.createEncoder() const encoderAwarenessQuery = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessQuery), this) bc.publish(
this.bcChannel,
encoding.toUint8Array(encoderAwarenessQuery),
this
)
// broadcast local awareness state // broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder() const encoderAwarenessState = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessState, messageAwareness) encoding.writeVarUint(encoderAwarenessState, messageAwareness)
encoding.writeVarUint8Array(encoderAwarenessState, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID])) encoding.writeVarUint8Array(
bc.publish(this.bcChannel, encoding.toUint8Array(encoderAwarenessState), this) encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(this.awareness, [
this.doc.clientID
])
)
bc.publish(
this.bcChannel,
encoding.toUint8Array(encoderAwarenessState),
this
)
} }
disconnectBc () { disconnectBc () {
// broadcast message with local awareness state set to null (indicating disconnect) // broadcast message with local awareness state set to null (indicating disconnect)
const encoder = encoding.createEncoder() const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness) encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID], new Map())) encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(this.awareness, [
this.doc.clientID
], new Map())
)
broadcastMessage(this, encoding.toUint8Array(encoder)) broadcastMessage(this, encoding.toUint8Array(encoder))
if (this.bcconnected) { if (this.bcconnected) {
bc.unsubscribe(this.bcChannel, this._bcSubscriber) bc.unsubscribe(this.bcChannel, this._bcSubscriber)