mirror of
https://github.com/go-gitea/gitea.git
synced 2026-05-10 01:01:28 +02:00
fix(websocket): add WsSource to eventsource.sharedworker, remove websocket.sharedworker
- Add WsSource class to eventsource.sharedworker.ts for WebSocket transport - Remove websocket.sharedworker.ts (functionality merged into eventsource.sharedworker.ts)
This commit is contained in:
parent
42f5342c50
commit
36b28c7e04
@ -69,8 +69,82 @@ class Source {
|
||||
}
|
||||
}
|
||||
|
||||
// WsSource provides a WebSocket transport alongside EventSource.
|
||||
// It delivers real-time notification-count pushes using the same client list
|
||||
// as the associated Source, normalising messages to the SSE event format so
|
||||
// that callers do not need to know which transport delivered the event.
|
||||
class WsSource {
|
||||
wsUrl: string;
|
||||
ws: WebSocket | null;
|
||||
source: Source;
|
||||
reconnectTimer: ReturnType<typeof setTimeout> | null;
|
||||
reconnectDelay: number;
|
||||
|
||||
constructor(wsUrl: string, source: Source) {
|
||||
this.wsUrl = wsUrl;
|
||||
this.source = source;
|
||||
this.ws = null;
|
||||
this.reconnectTimer = null;
|
||||
this.reconnectDelay = 50;
|
||||
this.connect();
|
||||
}
|
||||
|
||||
connect() {
|
||||
this.ws = new WebSocket(this.wsUrl);
|
||||
|
||||
this.ws.addEventListener('open', () => {
|
||||
this.reconnectDelay = 50;
|
||||
});
|
||||
|
||||
this.ws.addEventListener('message', (event: MessageEvent<string>) => {
|
||||
try {
|
||||
const msg = JSON.parse(event.data);
|
||||
if (msg.type === 'notification-count') {
|
||||
// Normalise to SSE event format so the receiver is transport-agnostic.
|
||||
this.source.notifyClients({
|
||||
type: 'notification-count',
|
||||
data: JSON.stringify({Count: msg.count}),
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// ignore malformed messages
|
||||
}
|
||||
});
|
||||
|
||||
this.ws.addEventListener('close', () => {
|
||||
this.ws = null;
|
||||
this.scheduleReconnect();
|
||||
});
|
||||
|
||||
this.ws.addEventListener('error', () => {
|
||||
this.ws = null;
|
||||
this.scheduleReconnect();
|
||||
});
|
||||
}
|
||||
|
||||
scheduleReconnect() {
|
||||
if (this.reconnectTimer !== null) return;
|
||||
const delay = this.reconnectDelay;
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.reconnectTimer = null;
|
||||
this.connect();
|
||||
}, delay);
|
||||
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000);
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this.reconnectTimer !== null) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
}
|
||||
this.ws?.close();
|
||||
this.ws = null;
|
||||
}
|
||||
}
|
||||
|
||||
const sourcesByUrl = new Map<string, Source | null>();
|
||||
const sourcesByPort = new Map<MessagePort, Source | null>();
|
||||
const wsSourcesByUrl = new Map<string, WsSource | null>();
|
||||
|
||||
(self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => {
|
||||
for (const port of e.ports) {
|
||||
@ -102,6 +176,11 @@ const sourcesByPort = new Map<MessagePort, Source | null>();
|
||||
if (count === 0) {
|
||||
source.close();
|
||||
sourcesByUrl.set(source.url, null);
|
||||
const ws = wsSourcesByUrl.get(source.url);
|
||||
if (ws) {
|
||||
ws.close();
|
||||
wsSourcesByUrl.set(source.url, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Create a new Source
|
||||
@ -109,6 +188,9 @@ const sourcesByPort = new Map<MessagePort, Source | null>();
|
||||
source.register(port);
|
||||
sourcesByUrl.set(url, source);
|
||||
sourcesByPort.set(port, source);
|
||||
// Start WebSocket alongside EventSource for real-time notification pushes.
|
||||
const wsUrl = url.replace(/^http/, 'ws').replace(/\/user\/events$/, '/-/ws');
|
||||
wsSourcesByUrl.set(url, new WsSource(wsUrl, source));
|
||||
} else if (event.data.type === 'listen') {
|
||||
const source = sourcesByPort.get(port)!;
|
||||
source.listen(event.data.eventType);
|
||||
@ -121,6 +203,11 @@ const sourcesByPort = new Map<MessagePort, Source | null>();
|
||||
source.close();
|
||||
sourcesByUrl.set(source.url, null);
|
||||
sourcesByPort.set(port, null);
|
||||
const ws = wsSourcesByUrl.get(source.url);
|
||||
if (ws) {
|
||||
ws.close();
|
||||
wsSourcesByUrl.set(source.url, null);
|
||||
}
|
||||
}
|
||||
} else if (event.data.type === 'status') {
|
||||
const source = sourcesByPort.get(port);
|
||||
|
||||
@ -1,145 +0,0 @@
|
||||
// One WebSocket connection per URL, shared across all tabs via SharedWorker.
|
||||
// Messages from the server are JSON objects broadcast to all connected ports.
|
||||
export {}; // make this a module to avoid global scope conflicts with other sharedworker files
|
||||
|
||||
const RECONNECT_DELAY_INITIAL = 50;
|
||||
const RECONNECT_DELAY_MAX = 10000;
|
||||
|
||||
class WsSource {
|
||||
url: string;
|
||||
ws: WebSocket | null;
|
||||
clients: MessagePort[];
|
||||
reconnectTimer: ReturnType<typeof setTimeout> | null;
|
||||
reconnectDelay: number;
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url;
|
||||
this.ws = null;
|
||||
this.clients = [];
|
||||
this.reconnectTimer = null;
|
||||
this.reconnectDelay = RECONNECT_DELAY_INITIAL;
|
||||
this.connect();
|
||||
}
|
||||
|
||||
connect() {
|
||||
this.ws = new WebSocket(this.url);
|
||||
|
||||
this.ws.addEventListener('open', () => {
|
||||
this.reconnectDelay = RECONNECT_DELAY_INITIAL;
|
||||
this.broadcast({type: 'status', message: `connected to ${this.url}`});
|
||||
});
|
||||
|
||||
this.ws.addEventListener('message', (event: MessageEvent<string>) => {
|
||||
try {
|
||||
const msg = JSON.parse(event.data);
|
||||
this.broadcast(msg);
|
||||
} catch {
|
||||
console.warn('websocket.sharedworker: received non-JSON message', event.data);
|
||||
}
|
||||
});
|
||||
|
||||
this.ws.addEventListener('close', () => {
|
||||
this.ws = null;
|
||||
this.scheduleReconnect();
|
||||
});
|
||||
|
||||
this.ws.addEventListener('error', () => {
|
||||
this.broadcast({type: 'error', message: 'websocket error'});
|
||||
this.ws = null;
|
||||
this.scheduleReconnect();
|
||||
});
|
||||
}
|
||||
|
||||
scheduleReconnect() {
|
||||
if (this.clients.length === 0 || this.reconnectTimer !== null) return;
|
||||
const delay = this.reconnectDelay;
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.reconnectTimer = null;
|
||||
this.connect();
|
||||
}, delay);
|
||||
this.reconnectDelay = Math.min(this.reconnectDelay * 2, RECONNECT_DELAY_MAX);
|
||||
}
|
||||
|
||||
register(port: MessagePort) {
|
||||
if (this.clients.includes(port)) return;
|
||||
this.clients.push(port);
|
||||
port.postMessage({type: 'status', message: `registered to ${this.url}`});
|
||||
}
|
||||
|
||||
deregister(port: MessagePort): number {
|
||||
const idx = this.clients.indexOf(port);
|
||||
if (idx >= 0) this.clients.splice(idx, 1);
|
||||
return this.clients.length;
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this.reconnectTimer !== null) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
}
|
||||
this.ws?.close();
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
broadcast(msg: unknown) {
|
||||
for (const port of this.clients) {
|
||||
port.postMessage(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const sourcesByUrl = new Map<string, WsSource>();
|
||||
const sourcesByPort = new Map<MessagePort, WsSource>();
|
||||
|
||||
(self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => {
|
||||
for (const port of e.ports) {
|
||||
port.addEventListener('message', (event: MessageEvent) => {
|
||||
if (event.data.type === 'start') {
|
||||
const {url} = event.data;
|
||||
let source = sourcesByUrl.get(url);
|
||||
if (source) {
|
||||
source.register(port);
|
||||
sourcesByPort.set(port, source);
|
||||
return;
|
||||
}
|
||||
source = sourcesByPort.get(port);
|
||||
if (source) {
|
||||
const count = source.deregister(port);
|
||||
if (count === 0) {
|
||||
source.close();
|
||||
sourcesByUrl.delete(source.url);
|
||||
}
|
||||
}
|
||||
source = new WsSource(url);
|
||||
source.register(port);
|
||||
sourcesByUrl.set(url, source);
|
||||
sourcesByPort.set(port, source);
|
||||
} else if (event.data.type === 'close') {
|
||||
const source = sourcesByPort.get(port);
|
||||
if (!source) return;
|
||||
const count = source.deregister(port);
|
||||
sourcesByPort.delete(port);
|
||||
if (count === 0) {
|
||||
source.close();
|
||||
sourcesByUrl.delete(source.url);
|
||||
}
|
||||
} else if (event.data.type === 'status') {
|
||||
const source = sourcesByPort.get(port);
|
||||
if (!source) {
|
||||
port.postMessage({type: 'status', message: 'not connected'});
|
||||
return;
|
||||
}
|
||||
port.postMessage({
|
||||
type: 'status',
|
||||
message: `url: ${source.url} readyState: ${source.ws?.readyState ?? 'null'}`,
|
||||
});
|
||||
} else {
|
||||
port.postMessage({
|
||||
type: 'error',
|
||||
message: `received but don't know how to handle: ${JSON.stringify(event.data)}`,
|
||||
});
|
||||
}
|
||||
});
|
||||
port.start();
|
||||
}
|
||||
});
|
||||
Loading…
x
Reference in New Issue
Block a user