0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-04-17 06:25:21 +02:00
gitea/services/websocket/notifier.go
Epid b9a94c688e feat(websocket): Phase 1 — replace SSE notification count with WebSocket
Add a thin in-memory pubsub broker and a SharedWorker-based WebSocket
client to deliver real-time notification count updates. This replaces
the SSE path for notification-count events with a persistent WebSocket
connection shared across all tabs.

New files:
- services/pubsub/broker.go: fan-out pubsub broker (DefaultBroker singleton)
- services/websocket/notifier.go: polls notification counts, publishes to broker
- routers/web/websocket/websocket.go: /-/ws endpoint, per-user topic subscription
- web_src/js/features/websocket.sharedworker.ts: SharedWorker with exponential
  backoff reconnect (50ms initial, 10s max, reconnect on close and error)

Modified files:
- routers/init.go: register websocket_service.Init()
- routers/web/web.go: add GET /-/ws route
- services/context/response.go: add Hijack() to forward http.Hijacker
  so coder/websocket can upgrade the connection
- web_src/js/features/notification.ts: port from SSE SharedWorker to WS SharedWorker
- webpack.config.ts: add websocket.sharedworker entry point

Part of RFC #36942.
2026-03-30 06:37:53 +03:00

77 lines
1.8 KiB
Go

// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"context"
"fmt"
"time"
activities_model "code.gitea.io/gitea/models/activities"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/pubsub"
)
type notificationCountEvent struct {
Type string `json:"type"`
Count int64 `json:"count"`
}
func userTopic(userID int64) string {
return fmt.Sprintf("user-%d", userID)
}
// Init starts the background goroutine that polls notification counts
// and pushes updates to connected WebSocket clients.
func Init() error {
go graceful.GetManager().RunWithShutdownContext(run)
return nil
}
func run(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket", process.SystemProcessType, true)
defer finished()
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
then := timeutil.TimeStampNow().Add(-2)
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
now := timeutil.TimeStampNow().Add(-2)
uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
if err != nil {
log.Error("websocket: GetUIDsAndNotificationCounts: %v", err)
continue
}
for _, uidCount := range uidCounts {
msg, err := json.Marshal(notificationCountEvent{
Type: "notification-count",
Count: uidCount.Count,
})
if err != nil {
continue
}
pubsub.DefaultBroker.Publish(userTopic(uidCount.UserID), msg)
}
then = now
}
}
}