From 36b28c7e04c683afbe4688ae597a80a8578ce5b0 Mon Sep 17 00:00:00 2001 From: Epid Date: Mon, 30 Mar 2026 06:39:04 +0300 Subject: [PATCH] fix(websocket): add WsSource to eventsource.sharedworker, remove websocket.sharedworker - Add WsSource class to eventsource.sharedworker.ts for WebSocket transport - Remove websocket.sharedworker.ts (functionality merged into eventsource.sharedworker.ts) --- .../js/features/eventsource.sharedworker.ts | 87 +++++++++++ web_src/js/features/websocket.sharedworker.ts | 145 ------------------ 2 files changed, 87 insertions(+), 145 deletions(-) delete mode 100644 web_src/js/features/websocket.sharedworker.ts diff --git a/web_src/js/features/eventsource.sharedworker.ts b/web_src/js/features/eventsource.sharedworker.ts index 816cd7020a..58b371e6a0 100644 --- a/web_src/js/features/eventsource.sharedworker.ts +++ b/web_src/js/features/eventsource.sharedworker.ts @@ -69,8 +69,82 @@ class Source { } } +// WsSource provides a WebSocket transport alongside EventSource. +// It delivers real-time notification-count pushes using the same client list +// as the associated Source, normalising messages to the SSE event format so +// that callers do not need to know which transport delivered the event. +class WsSource { + wsUrl: string; + ws: WebSocket | null; + source: Source; + reconnectTimer: ReturnType | null; + reconnectDelay: number; + + constructor(wsUrl: string, source: Source) { + this.wsUrl = wsUrl; + this.source = source; + this.ws = null; + this.reconnectTimer = null; + this.reconnectDelay = 50; + this.connect(); + } + + connect() { + this.ws = new WebSocket(this.wsUrl); + + this.ws.addEventListener('open', () => { + this.reconnectDelay = 50; + }); + + this.ws.addEventListener('message', (event: MessageEvent) => { + try { + const msg = JSON.parse(event.data); + if (msg.type === 'notification-count') { + // Normalise to SSE event format so the receiver is transport-agnostic. + this.source.notifyClients({ + type: 'notification-count', + data: JSON.stringify({Count: msg.count}), + }); + } + } catch { + // ignore malformed messages + } + }); + + this.ws.addEventListener('close', () => { + this.ws = null; + this.scheduleReconnect(); + }); + + this.ws.addEventListener('error', () => { + this.ws = null; + this.scheduleReconnect(); + }); + } + + scheduleReconnect() { + if (this.reconnectTimer !== null) return; + const delay = this.reconnectDelay; + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.connect(); + }, delay); + this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000); + } + + close() { + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + this.ws?.close(); + this.ws = null; + } +} + const sourcesByUrl = new Map(); const sourcesByPort = new Map(); +const wsSourcesByUrl = new Map(); (self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => { for (const port of e.ports) { @@ -102,6 +176,11 @@ const sourcesByPort = new Map(); if (count === 0) { source.close(); sourcesByUrl.set(source.url, null); + const ws = wsSourcesByUrl.get(source.url); + if (ws) { + ws.close(); + wsSourcesByUrl.set(source.url, null); + } } } // Create a new Source @@ -109,6 +188,9 @@ const sourcesByPort = new Map(); source.register(port); sourcesByUrl.set(url, source); sourcesByPort.set(port, source); + // Start WebSocket alongside EventSource for real-time notification pushes. + const wsUrl = url.replace(/^http/, 'ws').replace(/\/user\/events$/, '/-/ws'); + wsSourcesByUrl.set(url, new WsSource(wsUrl, source)); } else if (event.data.type === 'listen') { const source = sourcesByPort.get(port)!; source.listen(event.data.eventType); @@ -121,6 +203,11 @@ const sourcesByPort = new Map(); source.close(); sourcesByUrl.set(source.url, null); sourcesByPort.set(port, null); + const ws = wsSourcesByUrl.get(source.url); + if (ws) { + ws.close(); + wsSourcesByUrl.set(source.url, null); + } } } else if (event.data.type === 'status') { const source = sourcesByPort.get(port); diff --git a/web_src/js/features/websocket.sharedworker.ts b/web_src/js/features/websocket.sharedworker.ts deleted file mode 100644 index 539aa46037..0000000000 --- a/web_src/js/features/websocket.sharedworker.ts +++ /dev/null @@ -1,145 +0,0 @@ -// One WebSocket connection per URL, shared across all tabs via SharedWorker. -// Messages from the server are JSON objects broadcast to all connected ports. -export {}; // make this a module to avoid global scope conflicts with other sharedworker files - -const RECONNECT_DELAY_INITIAL = 50; -const RECONNECT_DELAY_MAX = 10000; - -class WsSource { - url: string; - ws: WebSocket | null; - clients: MessagePort[]; - reconnectTimer: ReturnType | null; - reconnectDelay: number; - - constructor(url: string) { - this.url = url; - this.ws = null; - this.clients = []; - this.reconnectTimer = null; - this.reconnectDelay = RECONNECT_DELAY_INITIAL; - this.connect(); - } - - connect() { - this.ws = new WebSocket(this.url); - - this.ws.addEventListener('open', () => { - this.reconnectDelay = RECONNECT_DELAY_INITIAL; - this.broadcast({type: 'status', message: `connected to ${this.url}`}); - }); - - this.ws.addEventListener('message', (event: MessageEvent) => { - try { - const msg = JSON.parse(event.data); - this.broadcast(msg); - } catch { - console.warn('websocket.sharedworker: received non-JSON message', event.data); - } - }); - - this.ws.addEventListener('close', () => { - this.ws = null; - this.scheduleReconnect(); - }); - - this.ws.addEventListener('error', () => { - this.broadcast({type: 'error', message: 'websocket error'}); - this.ws = null; - this.scheduleReconnect(); - }); - } - - scheduleReconnect() { - if (this.clients.length === 0 || this.reconnectTimer !== null) return; - const delay = this.reconnectDelay; - this.reconnectTimer = setTimeout(() => { - this.reconnectTimer = null; - this.connect(); - }, delay); - this.reconnectDelay = Math.min(this.reconnectDelay * 2, RECONNECT_DELAY_MAX); - } - - register(port: MessagePort) { - if (this.clients.includes(port)) return; - this.clients.push(port); - port.postMessage({type: 'status', message: `registered to ${this.url}`}); - } - - deregister(port: MessagePort): number { - const idx = this.clients.indexOf(port); - if (idx >= 0) this.clients.splice(idx, 1); - return this.clients.length; - } - - close() { - if (this.reconnectTimer !== null) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - this.ws?.close(); - this.ws = null; - } - - broadcast(msg: unknown) { - for (const port of this.clients) { - port.postMessage(msg); - } - } -} - -const sourcesByUrl = new Map(); -const sourcesByPort = new Map(); - -(self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => { - for (const port of e.ports) { - port.addEventListener('message', (event: MessageEvent) => { - if (event.data.type === 'start') { - const {url} = event.data; - let source = sourcesByUrl.get(url); - if (source) { - source.register(port); - sourcesByPort.set(port, source); - return; - } - source = sourcesByPort.get(port); - if (source) { - const count = source.deregister(port); - if (count === 0) { - source.close(); - sourcesByUrl.delete(source.url); - } - } - source = new WsSource(url); - source.register(port); - sourcesByUrl.set(url, source); - sourcesByPort.set(port, source); - } else if (event.data.type === 'close') { - const source = sourcesByPort.get(port); - if (!source) return; - const count = source.deregister(port); - sourcesByPort.delete(port); - if (count === 0) { - source.close(); - sourcesByUrl.delete(source.url); - } - } else if (event.data.type === 'status') { - const source = sourcesByPort.get(port); - if (!source) { - port.postMessage({type: 'status', message: 'not connected'}); - return; - } - port.postMessage({ - type: 'status', - message: `url: ${source.url} readyState: ${source.ws?.readyState ?? 'null'}`, - }); - } else { - port.postMessage({ - type: 'error', - message: `received but don't know how to handle: ${JSON.stringify(event.data)}`, - }); - } - }); - port.start(); - } -});