0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-05-15 12:53:53 +02:00

feat(websocket): Phase 1 — replace SSE notification count with WebSocket

Add a thin in-memory pubsub broker and a SharedWorker-based WebSocket
client to deliver real-time notification count updates. This replaces
the SSE path for notification-count events with a persistent WebSocket
connection shared across all tabs.

New files:
- services/pubsub/broker.go: fan-out pubsub broker (DefaultBroker singleton)
- services/websocket/notifier.go: polls notification counts, publishes to broker
- routers/web/websocket/websocket.go: /-/ws endpoint, per-user topic subscription
- web_src/js/features/websocket.sharedworker.ts: SharedWorker with exponential
  backoff reconnect (50ms initial, 10s max, reconnect on close and error)

Modified files:
- routers/init.go: register websocket_service.Init()
- routers/web/web.go: add GET /-/ws route
- services/context/response.go: add Hijack() to forward http.Hijacker
  so coder/websocket can upgrade the connection
- web_src/js/features/notification.ts: port from SSE SharedWorker to WS SharedWorker
- webpack.config.ts: add websocket.sharedworker entry point

Part of RFC #36942.
This commit is contained in:
Epid 2026-03-23 23:31:17 +03:00
parent 788200de9f
commit cb33214773
9 changed files with 369 additions and 32 deletions

View File

@ -54,6 +54,7 @@ import (
"code.gitea.io/gitea/services/task" "code.gitea.io/gitea/services/task"
"code.gitea.io/gitea/services/uinotification" "code.gitea.io/gitea/services/uinotification"
"code.gitea.io/gitea/services/webhook" "code.gitea.io/gitea/services/webhook"
websocket_service "code.gitea.io/gitea/services/websocket"
) )
func mustInit(fn func() error) { func mustInit(fn func() error) {
@ -160,6 +161,7 @@ func InitWebInstalled(ctx context.Context) {
mustInit(task.Init) mustInit(task.Init)
mustInit(repo_migrations.Init) mustInit(repo_migrations.Init)
eventsource.GetManager().Init() eventsource.GetManager().Init()
mustInit(websocket_service.Init)
mustInitCtx(ctx, mailer_incoming.Init) mustInitCtx(ctx, mailer_incoming.Init)
mustInitCtx(ctx, syncAppConfForGit) mustInitCtx(ctx, syncAppConfForGit)

View File

@ -41,6 +41,7 @@ import (
"code.gitea.io/gitea/routers/web/user" "code.gitea.io/gitea/routers/web/user"
user_setting "code.gitea.io/gitea/routers/web/user/setting" user_setting "code.gitea.io/gitea/routers/web/user/setting"
"code.gitea.io/gitea/routers/web/user/setting/security" "code.gitea.io/gitea/routers/web/user/setting/security"
gitea_websocket "code.gitea.io/gitea/routers/web/websocket"
auth_service "code.gitea.io/gitea/services/auth" auth_service "code.gitea.io/gitea/services/auth"
"code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/forms" "code.gitea.io/gitea/services/forms"
@ -588,6 +589,7 @@ func registerWebRoutes(m *web.Router, webAuth *AuthMiddleware) {
}, reqSignOut) }, reqSignOut)
m.Any("/user/events", routing.MarkLongPolling, events.Events) m.Any("/user/events", routing.MarkLongPolling, events.Events)
m.Get("/-/ws", gitea_websocket.Serve)
m.Group("/login/oauth", func() { m.Group("/login/oauth", func() {
m.Group("", func() { m.Group("", func() {

View File

@ -0,0 +1,53 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"encoding/json"
"fmt"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/pubsub"
gitea_ws "github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
)
// Serve handles WebSocket upgrade and event delivery for the signed-in user.
func Serve(ctx *context.Context) {
if !ctx.IsSigned {
ctx.Status(401)
return
}
conn, err := gitea_ws.Accept(ctx.Resp, ctx.Req, &gitea_ws.AcceptOptions{
InsecureSkipVerify: false,
})
if err != nil {
log.Error("websocket: accept failed: %v", err)
return
}
defer conn.CloseNow() //nolint:errcheck
topic := fmt.Sprintf("user-%d", ctx.Doer.ID)
ch, cancel := pubsub.DefaultBroker.Subscribe(topic)
defer cancel()
wsCtx := ctx.Req.Context()
for {
select {
case <-wsCtx.Done():
return
case msg, ok := <-ch:
if !ok {
return
}
if err := wsjson.Write(wsCtx, conn, json.RawMessage(msg)); err != nil {
log.Trace("websocket: write failed: %v", err)
return
}
}
}
}

View File

@ -4,6 +4,8 @@
package context package context
import ( import (
"bufio"
"net"
"net/http" "net/http"
web_types "code.gitea.io/gitea/modules/web/types" web_types "code.gitea.io/gitea/modules/web/types"
@ -67,6 +69,15 @@ func (r *Response) WriteHeader(statusCode int) {
} }
} }
// Hijack implements http.Hijacker by forwarding to the underlying ResponseWriter.
// This is required for WebSocket upgrades.
func (r *Response) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if h, ok := r.ResponseWriter.(http.Hijacker); ok {
return h.Hijack()
}
return nil, nil, http.ErrNotSupported
}
// Flush flushes cached data // Flush flushes cached data
func (r *Response) Flush() { func (r *Response) Flush() {
if f, ok := r.ResponseWriter.(http.Flusher); ok { if f, ok := r.ResponseWriter.(http.Flusher); ok {

65
services/pubsub/broker.go Normal file
View File

@ -0,0 +1,65 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package pubsub
import (
"sync"
)
// Broker is a simple in-memory pub/sub broker.
// It supports fan-out: one Publish call delivers the message to all active subscribers.
type Broker struct {
mu sync.RWMutex
subs map[string][]chan []byte
}
// DefaultBroker is the global singleton used by both routers and notifiers.
var DefaultBroker = NewBroker()
// NewBroker creates a new in-memory Broker.
func NewBroker() *Broker {
return &Broker{
subs: make(map[string][]chan []byte),
}
}
// Subscribe returns a channel that receives messages published to topic.
// Call the returned cancel function to unsubscribe.
func (b *Broker) Subscribe(topic string) (<-chan []byte, func()) {
ch := make(chan []byte, 8)
b.mu.Lock()
b.subs[topic] = append(b.subs[topic], ch)
b.mu.Unlock()
cancel := func() {
b.mu.Lock()
defer b.mu.Unlock()
subs := b.subs[topic]
for i, sub := range subs {
if sub == ch {
b.subs[topic] = append(subs[:i], subs[i+1:]...)
break
}
}
close(ch)
}
return ch, cancel
}
// Publish sends msg to all subscribers of topic.
// Non-blocking: slow subscribers are skipped.
func (b *Broker) Publish(topic string, msg []byte) {
b.mu.RLock()
subs := b.subs[topic]
b.mu.RUnlock()
for _, ch := range subs {
select {
case ch <- msg:
default:
// subscriber too slow — skip
}
}
}

View File

@ -0,0 +1,76 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"context"
"fmt"
"time"
activities_model "code.gitea.io/gitea/models/activities"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/pubsub"
)
type notificationCountEvent struct {
Type string `json:"type"`
Count int64 `json:"count"`
}
func userTopic(userID int64) string {
return fmt.Sprintf("user-%d", userID)
}
// Init starts the background goroutine that polls notification counts
// and pushes updates to connected WebSocket clients.
func Init() error {
go graceful.GetManager().RunWithShutdownContext(run)
return nil
}
func run(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket", process.SystemProcessType, true)
defer finished()
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
then := timeutil.TimeStampNow().Add(-2)
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
now := timeutil.TimeStampNow().Add(-2)
uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
if err != nil {
log.Error("websocket: GetUIDsAndNotificationCounts: %v", err)
continue
}
for _, uidCount := range uidCounts {
msg, err := json.Marshal(notificationCountEvent{
Type: "notification-count",
Count: uidCount.Count,
})
if err != nil {
continue
}
pubsub.DefaultBroker.Publish(userTopic(uidCount.UserID), msg)
}
then = now
}
}
}

View File

@ -5,12 +5,12 @@ import {logoutFromWorker} from '../modules/worker.ts';
const {appSubUrl, notificationSettings, assetVersionEncoded} = window.config; const {appSubUrl, notificationSettings, assetVersionEncoded} = window.config;
let notificationSequenceNumber = 0; let notificationSequenceNumber = 0;
async function receiveUpdateCount(event: MessageEvent<{type: string, data: string}>) { async function receiveUpdateCount(event: MessageEvent<{type: string, count: number}>) {
try { try {
const data = JSON.parse(event.data.data); const {count} = event.data;
for (const count of document.querySelectorAll('.notification_count')) { for (const el of document.querySelectorAll('.notification_count')) {
count.classList.toggle('tw-hidden', data.Count === 0); el.classList.toggle('tw-hidden', count === 0);
count.textContent = `${data.Count}`; el.textContent = `${count}`;
} }
await updateNotificationTable(); await updateNotificationTable();
} catch (error) { } catch (error) {
@ -21,55 +21,38 @@ async function receiveUpdateCount(event: MessageEvent<{type: string, data: strin
export function initNotificationCount() { export function initNotificationCount() {
if (!document.querySelector('.notification_count')) return; if (!document.querySelector('.notification_count')) return;
let usingPeriodicPoller = false;
const startPeriodicPoller = (timeout: number, lastCount?: number) => { const startPeriodicPoller = (timeout: number, lastCount?: number) => {
if (timeout <= 0 || !Number.isFinite(timeout)) return; if (timeout <= 0 || !Number.isFinite(timeout)) return;
usingPeriodicPoller = true;
lastCount = lastCount ?? getCurrentCount(); lastCount = lastCount ?? getCurrentCount();
setTimeout(async () => { setTimeout(async () => {
await updateNotificationCountWithCallback(startPeriodicPoller, timeout, lastCount); await updateNotificationCountWithCallback(startPeriodicPoller, timeout, lastCount);
}, timeout); }, timeout);
}; };
if (notificationSettings.EventSourceUpdateTime > 0 && window.EventSource && window.SharedWorker) { if (notificationSettings.EventSourceUpdateTime > 0 && window.SharedWorker) {
// Try to connect to the event source via the shared worker first // Connect via WebSocket SharedWorker (one connection shared across all tabs)
const worker = new SharedWorker(`${window.__webpack_public_path__}js/eventsource.sharedworker.js?v=${assetVersionEncoded}`, 'notification-worker'); const wsUrl = `${window.location.origin}${appSubUrl}/-/ws`.replace(/^http/, 'ws');
const worker = new SharedWorker(`${window.__webpack_public_path__}js/websocket.sharedworker.js?v=${assetVersionEncoded}`, 'notification-worker');
worker.addEventListener('error', (event) => { worker.addEventListener('error', (event) => {
console.error('worker error', event); console.error('worker error', event);
}); });
worker.port.addEventListener('messageerror', () => { worker.port.addEventListener('messageerror', () => {
console.error('unable to deserialize message'); console.error('unable to deserialize message');
}); });
worker.port.postMessage({ worker.port.postMessage({type: 'start', url: wsUrl});
type: 'start', worker.port.addEventListener('message', (event: MessageEvent<{type: string, count: number, message?: string}>) => {
url: `${window.location.origin}${appSubUrl}/user/events`,
});
worker.port.addEventListener('message', (event: MessageEvent<{type: string, data: string}>) => {
if (!event.data || !event.data.type) { if (!event.data || !event.data.type) {
console.error('unknown worker message event', event); console.error('unknown worker message event', event);
return; return;
} }
if (event.data.type === 'notification-count') { if (event.data.type === 'notification-count') {
receiveUpdateCount(event); // no await receiveUpdateCount(event); // no await
} else if (event.data.type === 'no-event-source') {
// browser doesn't support EventSource, falling back to periodic poller
if (!usingPeriodicPoller) startPeriodicPoller(notificationSettings.MinTimeout);
} else if (event.data.type === 'error') { } else if (event.data.type === 'error') {
console.error('worker port event error', event.data); console.error('worker port event error', event.data);
} else if (event.data.type === 'logout') { } else if (event.data.type === 'logout') {
if (event.data.data !== 'here') { worker.port.postMessage({type: 'close'});
return;
}
worker.port.postMessage({
type: 'close',
});
worker.port.close(); worker.port.close();
logoutFromWorker(); logoutFromWorker();
} else if (event.data.type === 'close') {
worker.port.postMessage({
type: 'close',
});
worker.port.close();
} }
}); });
worker.port.addEventListener('error', (e) => { worker.port.addEventListener('error', (e) => {
@ -77,9 +60,7 @@ export function initNotificationCount() {
}); });
worker.port.start(); worker.port.start();
window.addEventListener('beforeunload', () => { window.addEventListener('beforeunload', () => {
worker.port.postMessage({ worker.port.postMessage({type: 'close'});
type: 'close',
});
worker.port.close(); worker.port.close();
}); });

View File

@ -0,0 +1,144 @@
// One WebSocket connection per URL, shared across all tabs via SharedWorker.
// Messages from the server are JSON objects broadcast to all connected ports.
export {}; // make this a module to avoid global scope conflicts with other sharedworker files
const RECONNECT_DELAY_INITIAL = 50;
const RECONNECT_DELAY_MAX = 10000;
class WsSource {
url: string;
ws: WebSocket | null;
clients: MessagePort[];
reconnectTimer: ReturnType<typeof setTimeout> | null;
reconnectDelay: number;
constructor(url: string) {
this.url = url;
this.ws = null;
this.clients = [];
this.reconnectTimer = null;
this.reconnectDelay = RECONNECT_DELAY_INITIAL;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.addEventListener('open', () => {
this.reconnectDelay = RECONNECT_DELAY_INITIAL;
this.broadcast({type: 'status', message: `connected to ${this.url}`});
});
this.ws.addEventListener('message', (event: MessageEvent<string>) => {
try {
const msg = JSON.parse(event.data);
this.broadcast(msg);
} catch {
// ignore malformed JSON
}
});
this.ws.addEventListener('close', () => {
this.ws = null;
this.scheduleReconnect();
});
this.ws.addEventListener('error', () => {
this.broadcast({type: 'error', message: 'websocket error'});
this.ws = null;
this.scheduleReconnect();
});
}
scheduleReconnect() {
if (this.clients.length === 0 || this.reconnectTimer !== null) return;
this.reconnectDelay = Math.min(this.reconnectDelay * 2, RECONNECT_DELAY_MAX);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, this.reconnectDelay);
}
register(port: MessagePort) {
if (this.clients.includes(port)) return;
this.clients.push(port);
port.postMessage({type: 'status', message: `registered to ${this.url}`});
}
deregister(port: MessagePort): number {
const idx = this.clients.indexOf(port);
if (idx >= 0) this.clients.splice(idx, 1);
return this.clients.length;
}
close() {
if (this.reconnectTimer !== null) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.ws?.close();
this.ws = null;
}
broadcast(msg: unknown) {
for (const port of this.clients) {
port.postMessage(msg);
}
}
}
const sourcesByUrl = new Map<string, WsSource | null>();
const sourcesByPort = new Map<MessagePort, WsSource | null>();
(self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => {
for (const port of e.ports) {
port.addEventListener('message', (event: MessageEvent) => {
if (event.data.type === 'start') {
const {url} = event.data;
let source = sourcesByUrl.get(url);
if (source) {
source.register(port);
sourcesByPort.set(port, source);
return;
}
source = sourcesByPort.get(port);
if (source) {
const count = source.deregister(port);
if (count === 0) {
source.close();
sourcesByUrl.set(source.url, null);
}
}
source = new WsSource(url);
source.register(port);
sourcesByUrl.set(url, source);
sourcesByPort.set(port, source);
} else if (event.data.type === 'close') {
const source = sourcesByPort.get(port);
if (!source) return;
const count = source.deregister(port);
if (count === 0) {
source.close();
sourcesByUrl.set(source.url, null);
sourcesByPort.set(port, null);
}
} else if (event.data.type === 'status') {
const source = sourcesByPort.get(port);
if (!source) {
port.postMessage({type: 'status', message: 'not connected'});
return;
}
port.postMessage({
type: 'status',
message: `url: ${source.url} readyState: ${source.ws?.readyState ?? 'null'}`,
});
} else {
port.postMessage({
type: 'error',
message: `received but don't know how to handle: ${JSON.stringify(event.data)}`,
});
}
});
port.start();
}
});

View File

@ -79,6 +79,9 @@ export default {
'eventsource.sharedworker': [ 'eventsource.sharedworker': [
fileURLToPath(new URL('web_src/js/features/eventsource.sharedworker.ts', import.meta.url)), fileURLToPath(new URL('web_src/js/features/eventsource.sharedworker.ts', import.meta.url)),
], ],
'websocket.sharedworker': [
fileURLToPath(new URL('web_src/js/features/websocket.sharedworker.ts', import.meta.url)),
],
...(!isProduction && { ...(!isProduction && {
devtest: [ devtest: [
fileURLToPath(new URL('web_src/js/standalone/devtest.ts', import.meta.url)), fileURLToPath(new URL('web_src/js/standalone/devtest.ts', import.meta.url)),