From 256aeb9dc9594e22bbc0479b6f16559dc7f79fee Mon Sep 17 00:00:00 2001 From: Epid Date: Thu, 2 Apr 2026 00:41:09 +0300 Subject: [PATCH] =?UTF-8?q?feat(websocket):=20Phase=202=20=E2=80=94=20migr?= =?UTF-8?q?ate=20stopwatches/logout=20to=20WebSocket,=20remove=20SSE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add stopwatch_notifier.go: periodic poller publishes stopwatches via pubsub broker - Add logout_publisher.go: PublishLogout publishes logout events via pubsub broker - websocket.go: rewrite logout messages server-side (sessionID to here/elsewhere) - Remove entire SSE infrastructure: eventsource module, /user/events route, events.go - Update blockexpensive/qos to handle /-/ws instead of /user/events - Simplify eventsource.sharedworker.ts: remove EventSource, WebSocket-only delivery --- modules/eventsource/event.go | 118 ----------------- modules/eventsource/event_test.go | 50 ------- modules/eventsource/manager.go | 89 ------------- modules/eventsource/manager_run.go | 122 ------------------ modules/eventsource/messenger.go | 77 ----------- routers/common/blockexpensive.go | 2 +- routers/common/blockexpensive_test.go | 2 +- routers/common/qos.go | 4 +- routers/init.go | 3 +- routers/web/auth/auth.go | 7 +- routers/web/events/events.go | 122 ------------------ routers/web/repo/issue_stopwatch.go | 7 +- routers/web/web.go | 2 - routers/web/websocket/websocket.go | 35 +++++ services/user/user.go | 6 +- services/websocket/logout_publisher.go | 30 +++++ services/websocket/stopwatch_notifier.go | 108 ++++++++++++++++ tests/integration/eventsource_test.go | 86 ------------ .../js/features/eventsource.sharedworker.ts | 67 +++------- 19 files changed, 203 insertions(+), 734 deletions(-) delete mode 100644 modules/eventsource/event.go delete mode 100644 modules/eventsource/event_test.go delete mode 100644 modules/eventsource/manager.go delete mode 100644 modules/eventsource/manager_run.go delete mode 100644 modules/eventsource/messenger.go delete mode 100644 routers/web/events/events.go create mode 100644 services/websocket/logout_publisher.go create mode 100644 services/websocket/stopwatch_notifier.go delete mode 100644 tests/integration/eventsource_test.go diff --git a/modules/eventsource/event.go b/modules/eventsource/event.go deleted file mode 100644 index ebcca50903..0000000000 --- a/modules/eventsource/event.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "bytes" - "fmt" - "io" - "strings" - "time" - - "code.gitea.io/gitea/modules/json" -) - -func wrapNewlines(w io.Writer, prefix, value []byte) (sum int64, err error) { - if len(value) == 0 { - return 0, nil - } - var n int - last := 0 - for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') { - n, err = w.Write(prefix) - sum += int64(n) - if err != nil { - return sum, err - } - n, err = w.Write(value[last : last+j+1]) - sum += int64(n) - if err != nil { - return sum, err - } - last += j + 1 - } - n, err = w.Write(prefix) - sum += int64(n) - if err != nil { - return sum, err - } - n, err = w.Write(value[last:]) - sum += int64(n) - if err != nil { - return sum, err - } - n, err = w.Write([]byte("\n")) - sum += int64(n) - return sum, err -} - -// Event is an eventsource event, not all fields need to be set -type Event struct { - // Name represents the value of the event: tag in the stream - Name string - // Data is either JSONified []byte or any that can be JSONd - Data any - // ID represents the ID of an event - ID string - // Retry tells the receiver only to attempt to reconnect to the source after this time - Retry time.Duration -} - -// WriteTo writes data to w until there's no more data to write or when an error occurs. -// The return value n is the number of bytes written. Any error encountered during the write is also returned. -func (e *Event) WriteTo(w io.Writer) (int64, error) { - sum := int64(0) - var nint int - n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name)) - sum += n - if err != nil { - return sum, err - } - - if e.Data != nil { - var data []byte - switch v := e.Data.(type) { - case []byte: - data = v - case string: - data = []byte(v) - default: - var err error - data, err = json.Marshal(e.Data) - if err != nil { - return sum, err - } - } - n, err := wrapNewlines(w, []byte("data: "), data) - sum += n - if err != nil { - return sum, err - } - } - - n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID)) - sum += n - if err != nil { - return sum, err - } - - if e.Retry != 0 { - nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond)) - sum += int64(nint) - if err != nil { - return sum, err - } - } - - nint, err = w.Write([]byte("\n")) - sum += int64(nint) - - return sum, err -} - -func (e *Event) String() string { - buf := new(strings.Builder) - _, _ = e.WriteTo(buf) - return buf.String() -} diff --git a/modules/eventsource/event_test.go b/modules/eventsource/event_test.go deleted file mode 100644 index a1c3e5c7a8..0000000000 --- a/modules/eventsource/event_test.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "bytes" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_wrapNewlines(t *testing.T) { - tests := []struct { - name string - prefix string - value string - output string - }{ - { - "check no new lines", - "prefix: ", - "value", - "prefix: value\n", - }, - { - "check simple newline", - "prefix: ", - "value1\nvalue2", - "prefix: value1\nprefix: value2\n", - }, - { - "check pathological newlines", - "p: ", - "\n1\n\n2\n3\n", - "p: \np: 1\np: \np: 2\np: 3\np: \n", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - w := &bytes.Buffer{} - gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value)) - require.NoError(t, err) - - assert.EqualValues(t, len(tt.output), gotSum) - assert.Equal(t, tt.output, w.String()) - }) - } -} diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go deleted file mode 100644 index 7ed2a82903..0000000000 --- a/modules/eventsource/manager.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "sync" -) - -// Manager manages the eventsource Messengers -type Manager struct { - mutex sync.Mutex - - messengers map[int64]*Messenger - connection chan struct{} -} - -var manager *Manager - -func init() { - manager = &Manager{ - messengers: make(map[int64]*Messenger), - connection: make(chan struct{}, 1), - } -} - -// GetManager returns a Manager and initializes one as singleton if there's none yet -func GetManager() *Manager { - return manager -} - -// Register message channel -func (m *Manager) Register(uid int64) <-chan *Event { - m.mutex.Lock() - messenger, ok := m.messengers[uid] - if !ok { - messenger = NewMessenger(uid) - m.messengers[uid] = messenger - } - select { - case m.connection <- struct{}{}: - default: - } - m.mutex.Unlock() - return messenger.Register() -} - -// Unregister message channel -func (m *Manager) Unregister(uid int64, channel <-chan *Event) { - m.mutex.Lock() - defer m.mutex.Unlock() - messenger, ok := m.messengers[uid] - if !ok { - return - } - if messenger.Unregister(channel) { - delete(m.messengers, uid) - } -} - -// UnregisterAll message channels -func (m *Manager) UnregisterAll() { - m.mutex.Lock() - defer m.mutex.Unlock() - for _, messenger := range m.messengers { - messenger.UnregisterAll() - } - m.messengers = map[int64]*Messenger{} -} - -// SendMessage sends a message to a particular user -func (m *Manager) SendMessage(uid int64, message *Event) { - m.mutex.Lock() - messenger, ok := m.messengers[uid] - m.mutex.Unlock() - if ok { - messenger.SendMessage(message) - } -} - -// SendMessageBlocking sends a message to a particular user -func (m *Manager) SendMessageBlocking(uid int64, message *Event) { - m.mutex.Lock() - messenger, ok := m.messengers[uid] - m.mutex.Unlock() - if ok { - messenger.SendMessageBlocking(message) - } -} diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go deleted file mode 100644 index 4a42224dda..0000000000 --- a/modules/eventsource/manager_run.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import ( - "context" - "time" - - activities_model "code.gitea.io/gitea/models/activities" - issues_model "code.gitea.io/gitea/models/issues" - user_model "code.gitea.io/gitea/models/user" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/json" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/process" - "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/timeutil" - "code.gitea.io/gitea/services/convert" -) - -// Init starts this eventsource -func (m *Manager) Init() { - if setting.UI.Notification.EventSourceUpdateTime <= 0 { - return - } - go graceful.GetManager().RunWithShutdownContext(m.Run) -} - -// Run runs the manager within a provided context -func (m *Manager) Run(ctx context.Context) { - ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true) - defer finished() - - then := timeutil.TimeStampNow().Add(-2) - timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) -loop: - for { - select { - case <-ctx.Done(): - timer.Stop() - break loop - case <-timer.C: - m.mutex.Lock() - connectionCount := len(m.messengers) - if connectionCount == 0 { - log.Trace("Event source has no listeners") - // empty the connection channel - select { - case <-m.connection: - default: - } - } - m.mutex.Unlock() - if connectionCount == 0 { - // No listeners so the source can be paused - log.Trace("Pausing the eventsource") - select { - case <-ctx.Done(): - break loop - case <-m.connection: - log.Trace("Connection detected - restarting the eventsource") - // OK we're back so lets reset the timer and start again - // We won't change the "then" time because there could be concurrency issues - select { - case <-timer.C: - default: - } - continue - } - } - - now := timeutil.TimeStampNow().Add(-2) - - uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now) - if err != nil { - log.Error("Unable to get UIDcounts: %v", err) - } - for _, uidCount := range uidCounts { - m.SendMessage(uidCount.UserID, &Event{ - Name: "notification-count", - Data: uidCount, - }) - } - then = now - - if setting.Service.EnableTimetracking { - usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx) - if err != nil { - log.Error("Unable to get GetUIDsAndStopwatch: %v", err) - return - } - - for _, userStopwatches := range usersStopwatches { - u, err := user_model.GetUserByID(ctx, userStopwatches.UserID) - if err != nil { - log.Error("Unable to get user %d: %v", userStopwatches.UserID, err) - continue - } - - apiSWs, err := convert.ToStopWatches(ctx, u, userStopwatches.StopWatches) - if err != nil { - if !issues_model.IsErrIssueNotExist(err) { - log.Error("Unable to APIFormat stopwatches: %v", err) - } - continue - } - dataBs, err := json.Marshal(apiSWs) - if err != nil { - log.Error("Unable to marshal stopwatches: %v", err) - continue - } - m.SendMessage(userStopwatches.UserID, &Event{ - Name: "stopwatches", - Data: string(dataBs), - }) - } - } - } - } - m.UnregisterAll() -} diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go deleted file mode 100644 index 6df26716be..0000000000 --- a/modules/eventsource/messenger.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package eventsource - -import "sync" - -// Messenger is a per uid message store -type Messenger struct { - mutex sync.Mutex - uid int64 - channels []chan *Event -} - -// NewMessenger creates a messenger for a particular uid -func NewMessenger(uid int64) *Messenger { - return &Messenger{ - uid: uid, - channels: [](chan *Event){}, - } -} - -// Register returns a new chan []byte -func (m *Messenger) Register() <-chan *Event { - m.mutex.Lock() - // TODO: Limit the number of messengers per uid - channel := make(chan *Event, 1) - m.channels = append(m.channels, channel) - m.mutex.Unlock() - return channel -} - -// Unregister removes the provider chan []byte -func (m *Messenger) Unregister(channel <-chan *Event) bool { - m.mutex.Lock() - defer m.mutex.Unlock() - for i, toRemove := range m.channels { - if channel == toRemove { - m.channels = append(m.channels[:i], m.channels[i+1:]...) - close(toRemove) - break - } - } - return len(m.channels) == 0 -} - -// UnregisterAll removes all chan []byte -func (m *Messenger) UnregisterAll() { - m.mutex.Lock() - defer m.mutex.Unlock() - for _, channel := range m.channels { - close(channel) - } - m.channels = nil -} - -// SendMessage sends the message to all registered channels -func (m *Messenger) SendMessage(message *Event) { - m.mutex.Lock() - defer m.mutex.Unlock() - for i := range m.channels { - channel := m.channels[i] - select { - case channel <- message: - default: - } - } -} - -// SendMessageBlocking sends the message to all registered channels and ensures it gets sent -func (m *Messenger) SendMessageBlocking(message *Event) { - m.mutex.Lock() - defer m.mutex.Unlock() - for i := range m.channels { - m.channels[i] <- message - } -} diff --git a/routers/common/blockexpensive.go b/routers/common/blockexpensive.go index fec364351c..18a56de72d 100644 --- a/routers/common/blockexpensive.go +++ b/routers/common/blockexpensive.go @@ -72,7 +72,7 @@ func isRoutePathExpensive(routePattern string) bool { } func isRoutePathForLongPolling(routePattern string) bool { - return routePattern == "/user/events" + return routePattern == "/-/ws" } func determineRequestPriority(reqCtx reqctx.RequestContext) (ret struct { diff --git a/routers/common/blockexpensive_test.go b/routers/common/blockexpensive_test.go index db5c0db7dd..eca7d88118 100644 --- a/routers/common/blockexpensive_test.go +++ b/routers/common/blockexpensive_test.go @@ -26,5 +26,5 @@ func TestBlockExpensive(t *testing.T) { assert.Equal(t, c.expensive, isRoutePathExpensive(c.routePath), "routePath: %s", c.routePath) } - assert.True(t, isRoutePathForLongPolling("/user/events")) + assert.True(t, isRoutePathForLongPolling("/-/ws")) } diff --git a/routers/common/qos.go b/routers/common/qos.go index 96f23b64fe..8124999a16 100644 --- a/routers/common/qos.go +++ b/routers/common/qos.go @@ -79,9 +79,9 @@ func QoS() func(next http.Handler) http.Handler { return } - // Release long-polling immediately, so they don't always + // Release long-lived connections immediately, so they don't always // take up an in-flight request - if strings.Contains(req.URL.Path, "/user/events") { + if strings.Contains(req.URL.Path, "/-/ws") { c.Release() } else { defer c.Release() diff --git a/routers/init.go b/routers/init.go index f6775dd8fe..8bad3be0bf 100644 --- a/routers/init.go +++ b/routers/init.go @@ -12,7 +12,6 @@ import ( "code.gitea.io/gitea/models" authmodel "code.gitea.io/gitea/models/auth" "code.gitea.io/gitea/modules/cache" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/git/gitcmd" "code.gitea.io/gitea/modules/log" @@ -160,8 +159,8 @@ func InitWebInstalled(ctx context.Context) { mustInit(automerge.Init) mustInit(task.Init) mustInit(repo_migrations.Init) - eventsource.GetManager().Init() mustInit(websocket_service.Init) + mustInit(websocket_service.InitStopwatch) mustInitCtx(ctx, mailer_incoming.Init) mustInitCtx(ctx, syncAppConfForGit) diff --git a/routers/web/auth/auth.go b/routers/web/auth/auth.go index 1219690200..a85d334fce 100644 --- a/routers/web/auth/auth.go +++ b/routers/web/auth/auth.go @@ -16,7 +16,6 @@ import ( "code.gitea.io/gitea/models/db" user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/auth/password" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/modules/httplib" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/optional" @@ -34,6 +33,7 @@ import ( "code.gitea.io/gitea/services/forms" "code.gitea.io/gitea/services/mailer" user_service "code.gitea.io/gitea/services/user" + websocket_service "code.gitea.io/gitea/services/websocket" "github.com/markbates/goth" ) @@ -445,10 +445,7 @@ func HandleSignOut(ctx *context.Context) { // SignOut sign out from login status func SignOut(ctx *context.Context) { if ctx.Doer != nil { - eventsource.GetManager().SendMessageBlocking(ctx.Doer.ID, &eventsource.Event{ - Name: "logout", - Data: ctx.Session.ID(), - }) + websocket_service.PublishLogout(ctx.Doer.ID, ctx.Session.ID()) } // prepare the sign-out URL before destroying the session diff --git a/routers/web/events/events.go b/routers/web/events/events.go deleted file mode 100644 index 52f20e07dc..0000000000 --- a/routers/web/events/events.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package events - -import ( - "net/http" - "time" - - "code.gitea.io/gitea/modules/eventsource" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/routers/web/auth" - "code.gitea.io/gitea/services/context" -) - -// Events listens for events -func Events(ctx *context.Context) { - // FIXME: Need to check if resp is actually a http.Flusher! - how though? - - // Set the headers related to event streaming. - ctx.Resp.Header().Set("Content-Type", "text/event-stream") - ctx.Resp.Header().Set("Cache-Control", "no-cache") - ctx.Resp.Header().Set("Connection", "keep-alive") - ctx.Resp.Header().Set("X-Accel-Buffering", "no") - ctx.Resp.WriteHeader(http.StatusOK) - - if !ctx.IsSigned { - // Return unauthorized status event - event := &eventsource.Event{ - Name: "close", - Data: "unauthorized", - } - _, _ = event.WriteTo(ctx) - ctx.Resp.Flush() - return - } - - // Listen to connection close and un-register messageChan - notify := ctx.Done() - ctx.Resp.Flush() - - shutdownCtx := graceful.GetManager().ShutdownContext() - - uid := ctx.Doer.ID - - messageChan := eventsource.GetManager().Register(uid) - - unregister := func() { - eventsource.GetManager().Unregister(uid, messageChan) - // ensure the messageChan is closed - for { - _, ok := <-messageChan - if !ok { - break - } - } - } - - if _, err := ctx.Resp.Write([]byte("\n")); err != nil { - log.Error("Unable to write to EventStream: %v", err) - unregister() - return - } - - timer := time.NewTicker(30 * time.Second) - -loop: - for { - select { - case <-timer.C: - event := &eventsource.Event{ - Name: "ping", - } - _, err := event.WriteTo(ctx.Resp) - if err != nil { - log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err) - go unregister() - break loop - } - ctx.Resp.Flush() - case <-notify: - go unregister() - break loop - case <-shutdownCtx.Done(): - go unregister() - break loop - case event, ok := <-messageChan: - if !ok { - break loop - } - - // Handle logout - if event.Name == "logout" { - if ctx.Session.ID() == event.Data { - _, _ = (&eventsource.Event{ - Name: "logout", - Data: "here", - }).WriteTo(ctx.Resp) - ctx.Resp.Flush() - go unregister() - auth.HandleSignOut(ctx) - break loop - } - // Replace the event - we don't want to expose the session ID to the user - event = &eventsource.Event{ - Name: "logout", - Data: "elsewhere", - } - } - - _, err := event.WriteTo(ctx.Resp) - if err != nil { - log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err) - go unregister() - break loop - } - ctx.Resp.Flush() - } - } - timer.Stop() -} diff --git a/routers/web/repo/issue_stopwatch.go b/routers/web/repo/issue_stopwatch.go index 2de3a7cfec..a228bf779a 100644 --- a/routers/web/repo/issue_stopwatch.go +++ b/routers/web/repo/issue_stopwatch.go @@ -6,8 +6,8 @@ package repo import ( "code.gitea.io/gitea/models/db" issues_model "code.gitea.io/gitea/models/issues" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/services/context" + websocket_service "code.gitea.io/gitea/services/websocket" ) // IssueStartStopwatch creates a stopwatch for the given issue. @@ -76,10 +76,7 @@ func CancelStopwatch(c *context.Context) { return } if len(stopwatches) == 0 { - eventsource.GetManager().SendMessage(c.Doer.ID, &eventsource.Event{ - Name: "stopwatches", - Data: "{}", - }) + websocket_service.PublishEmptyStopwatches(c.Doer.ID) } c.JSONRedirect("") diff --git a/routers/web/web.go b/routers/web/web.go index 6cf209a886..bb8c6a4ef7 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -27,7 +27,6 @@ import ( "code.gitea.io/gitea/routers/web/admin" "code.gitea.io/gitea/routers/web/auth" "code.gitea.io/gitea/routers/web/devtest" - "code.gitea.io/gitea/routers/web/events" "code.gitea.io/gitea/routers/web/explore" "code.gitea.io/gitea/routers/web/feed" "code.gitea.io/gitea/routers/web/healthcheck" @@ -592,7 +591,6 @@ func registerWebRoutes(m *web.Router, webAuth *AuthMiddleware) { }) }, reqSignOut) - m.Any("/user/events", routing.MarkLongPolling, events.Events) m.Get("/-/ws", gitea_websocket.Serve) m.Group("/login/oauth", func() { diff --git a/routers/web/websocket/websocket.go b/routers/web/websocket/websocket.go index b4d9619f6d..6fa4b76a08 100644 --- a/routers/web/websocket/websocket.go +++ b/routers/web/websocket/websocket.go @@ -6,6 +6,7 @@ package websocket import ( "net/http" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/pubsub" @@ -13,6 +14,38 @@ import ( gitea_ws "github.com/coder/websocket" ) +// logoutBrokerMsg is the internal broker message published by PublishLogout. +type logoutBrokerMsg struct { + Type string `json:"type"` + SessionID string `json:"sessionID,omitempty"` +} + +// logoutClientMsg is sent to the WebSocket client so the browser can tell +// whether the logout originated from this tab ("here") or another ("elsewhere"). +type logoutClientMsg struct { + Type string `json:"type"` + Data string `json:"data"` +} + +// rewriteLogout intercepts a broker logout message and rewrites it to the +// client format using "here"/"elsewhere" instead of the raw session ID. +// If sessionID is empty the logout applies to all sessions ("here" for all). +func rewriteLogout(msg []byte, connSessionID string) []byte { + var lm logoutBrokerMsg + if err := json.Unmarshal(msg, &lm); err != nil || lm.Type != "logout" { + return msg + } + where := "elsewhere" + if lm.SessionID == "" || lm.SessionID == connSessionID { + where = "here" + } + out, err := json.Marshal(logoutClientMsg{Type: "logout", Data: where}) + if err != nil { + return msg + } + return out +} + // Serve handles WebSocket upgrade and event delivery for the signed-in user. func Serve(ctx *context.Context) { if !ctx.IsSigned { @@ -28,6 +61,7 @@ func Serve(ctx *context.Context) { } defer conn.CloseNow() //nolint:errcheck // CloseNow is best-effort; error is intentionally ignored + sessionID := ctx.Session.ID() ch, cancel := pubsub.DefaultBroker.Subscribe(pubsub.UserTopic(ctx.Doer.ID)) defer cancel() @@ -40,6 +74,7 @@ func Serve(ctx *context.Context) { if !ok { return } + msg = rewriteLogout(msg, sessionID) if err := conn.Write(wsCtx, gitea_ws.MessageText, msg); err != nil { log.Trace("websocket: write failed: %v", err) return diff --git a/services/user/user.go b/services/user/user.go index 9b8bcf83c0..b540bc5909 100644 --- a/services/user/user.go +++ b/services/user/user.go @@ -16,7 +16,6 @@ import ( repo_model "code.gitea.io/gitea/models/repo" system_model "code.gitea.io/gitea/models/system" user_model "code.gitea.io/gitea/models/user" - "code.gitea.io/gitea/modules/eventsource" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" @@ -28,6 +27,7 @@ import ( "code.gitea.io/gitea/services/packages" container_service "code.gitea.io/gitea/services/packages/container" repo_service "code.gitea.io/gitea/services/repository" + websocket_service "code.gitea.io/gitea/services/websocket" ) // RenameUser renames a user @@ -147,9 +147,7 @@ func DeleteUser(ctx context.Context, u *user_model.User, purge bool) error { // Force any logged in sessions to log out // FIXME: We also need to tell the session manager to log them out too. - eventsource.GetManager().SendMessage(u.ID, &eventsource.Event{ - Name: "logout", - }) + websocket_service.PublishLogout(u.ID, "") // Delete all repos belonging to this user // Now this is not within a transaction because there are internal transactions within the DeleteRepository diff --git a/services/websocket/logout_publisher.go b/services/websocket/logout_publisher.go new file mode 100644 index 0000000000..91492fcc66 --- /dev/null +++ b/services/websocket/logout_publisher.go @@ -0,0 +1,30 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package websocket + +import ( + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/services/pubsub" +) + +type logoutEvent struct { + Type string `json:"type"` + SessionID string `json:"sessionID,omitempty"` +} + +// PublishLogout publishes a logout event to all WebSocket clients connected as +// the given user. sessionID identifies which session is signing out so the +// client can distinguish "this tab" from "another tab". +func PublishLogout(userID int64, sessionID string) { + msg, err := json.Marshal(logoutEvent{ + Type: "logout", + SessionID: sessionID, + }) + if err != nil { + log.Error("websocket: marshal logout event: %v", err) + return + } + pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg) +} diff --git a/services/websocket/stopwatch_notifier.go b/services/websocket/stopwatch_notifier.go new file mode 100644 index 0000000000..502b9afcaf --- /dev/null +++ b/services/websocket/stopwatch_notifier.go @@ -0,0 +1,108 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package websocket + +import ( + "context" + "time" + + issues_model "code.gitea.io/gitea/models/issues" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/services/convert" + "code.gitea.io/gitea/services/pubsub" +) + +type stopwatchesEvent struct { + Type string `json:"type"` + Data json.RawMessage `json:"data"` +} + +// PublishEmptyStopwatches immediately pushes an empty stopwatches list to the +// given user's WebSocket clients — used when the user's last stopwatch is cancelled. +func PublishEmptyStopwatches(userID int64) { + msg, err := json.Marshal(stopwatchesEvent{ + Type: "stopwatches", + Data: json.RawMessage(`[]`), + }) + if err != nil { + log.Error("websocket: marshal empty stopwatches: %v", err) + return + } + pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg) +} + +// InitStopwatch starts the background goroutine that polls active stopwatches +// and pushes updates to connected WebSocket clients. +func InitStopwatch() error { + if !setting.Service.EnableTimetracking { + return nil + } + go graceful.GetManager().RunWithShutdownContext(runStopwatch) + return nil +} + +func runStopwatch(ctx context.Context) { + ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket Stopwatch", process.SystemProcessType, true) + defer finished() + + if setting.UI.Notification.EventSourceUpdateTime <= 0 { + return + } + + timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + if !pubsub.DefaultBroker.HasSubscribers() { + continue + } + + userStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx) + if err != nil { + log.Error("websocket: GetUIDsAndStopwatch: %v", err) + continue + } + + for _, us := range userStopwatches { + u, err := user_model.GetUserByID(ctx, us.UserID) + if err != nil { + log.Error("websocket: GetUserByID %d: %v", us.UserID, err) + continue + } + + apiSWs, err := convert.ToStopWatches(ctx, u, us.StopWatches) + if err != nil { + if !issues_model.IsErrIssueNotExist(err) { + log.Error("websocket: ToStopWatches: %v", err) + } + continue + } + + dataBs, err := json.Marshal(apiSWs) + if err != nil { + log.Error("websocket: marshal stopwatches: %v", err) + continue + } + + msg, err := json.Marshal(stopwatchesEvent{ + Type: "stopwatches", + Data: dataBs, + }) + if err != nil { + continue + } + pubsub.DefaultBroker.Publish(pubsub.UserTopic(us.UserID), msg) + } + } + } +} diff --git a/tests/integration/eventsource_test.go b/tests/integration/eventsource_test.go deleted file mode 100644 index a13a8c346a..0000000000 --- a/tests/integration/eventsource_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2020 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package integration - -import ( - "fmt" - "net/http" - "testing" - "time" - - activities_model "code.gitea.io/gitea/models/activities" - auth_model "code.gitea.io/gitea/models/auth" - repo_model "code.gitea.io/gitea/models/repo" - "code.gitea.io/gitea/models/unittest" - user_model "code.gitea.io/gitea/models/user" - "code.gitea.io/gitea/modules/eventsource" - api "code.gitea.io/gitea/modules/structs" - "code.gitea.io/gitea/tests" - - "github.com/stretchr/testify/assert" -) - -func TestEventSourceManagerRun(t *testing.T) { - defer tests.PrepareTestEnv(t)() - manager := eventsource.GetManager() - - eventChan := manager.Register(2) - defer func() { - manager.Unregister(2, eventChan) - // ensure the eventChan is closed - for { - _, ok := <-eventChan - if !ok { - break - } - } - }() - expectNotificationCountEvent := func(count int64) func() bool { - return func() bool { - select { - case event, ok := <-eventChan: - if !ok { - return false - } - data, ok := event.Data.(activities_model.UserIDCount) - if !ok { - return false - } - return event.Name == "notification-count" && data.Count == count - default: - return false - } - } - } - - user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) - repo1 := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1}) - thread5 := unittest.AssertExistsAndLoadBean(t, &activities_model.Notification{ID: 5}) - assert.NoError(t, thread5.LoadAttributes(t.Context())) - session := loginUser(t, user2.Name) - token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteNotification, auth_model.AccessTokenScopeWriteRepository) - - var apiNL []api.NotificationThread - - // -- mark notifications as read -- - req := NewRequest(t, "GET", "/api/v1/notifications?status-types=unread"). - AddTokenAuth(token) - resp := session.MakeRequest(t, req, http.StatusOK) - - DecodeJSON(t, resp, &apiNL) - assert.Len(t, apiNL, 2) - - lastReadAt := "2000-01-01T00%3A50%3A01%2B00%3A00" // 946687801 <- only Notification 4 is in this filter ... - req = NewRequest(t, "PUT", fmt.Sprintf("/api/v1/repos/%s/%s/notifications?last_read_at=%s", user2.Name, repo1.Name, lastReadAt)). - AddTokenAuth(token) - session.MakeRequest(t, req, http.StatusResetContent) - - req = NewRequest(t, "GET", "/api/v1/notifications?status-types=unread"). - AddTokenAuth(token) - resp = session.MakeRequest(t, req, http.StatusOK) - DecodeJSON(t, resp, &apiNL) - assert.Len(t, apiNL, 1) - - assert.Eventually(t, expectNotificationCountEvent(1), 30*time.Second, 1*time.Second) -} diff --git a/web_src/js/features/eventsource.sharedworker.ts b/web_src/js/features/eventsource.sharedworker.ts index 58b371e6a0..370ec06533 100644 --- a/web_src/js/features/eventsource.sharedworker.ts +++ b/web_src/js/features/eventsource.sharedworker.ts @@ -1,20 +1,13 @@ +// Source manages the list of connected page ports for one logical connection. +// It no longer creates an EventSource; all real-time data is delivered by the +// accompanying WsSource over WebSocket. class Source { url: string; - eventSource: EventSource | null; - listening: Record; clients: Array; constructor(url: string) { this.url = url; - this.eventSource = new EventSource(url); - this.listening = {}; this.clients = []; - this.listen('open'); - this.listen('close'); - this.listen('logout'); - this.listen('notification-count'); - this.listen('stopwatches'); - this.listen('error'); } register(port: MessagePort) { @@ -37,24 +30,6 @@ class Source { return this.clients.length; } - close() { - if (!this.eventSource) return; - - this.eventSource.close(); - this.eventSource = null; - } - - listen(eventType: string) { - if (this.listening[eventType]) return; - this.listening[eventType] = true; - this.eventSource?.addEventListener(eventType, (event) => { - this.notifyClients({ - type: eventType, - data: event.data, - }); - }); - } - notifyClients(event: {type: string, data: any}) { for (const client of this.clients) { client.postMessage(event); @@ -64,15 +39,14 @@ class Source { status(port: MessagePort) { port.postMessage({ type: 'status', - message: `url: ${this.url} readyState: ${this.eventSource?.readyState}`, + message: `url: ${this.url}`, }); } } -// WsSource provides a WebSocket transport alongside EventSource. -// It delivers real-time notification-count pushes using the same client list -// as the associated Source, normalising messages to the SSE event format so -// that callers do not need to know which transport delivered the event. +// WsSource provides a WebSocket transport for real-time event delivery. +// It normalises messages to the SSE event format so that callers do not +// need to know which transport delivered the event. class WsSource { wsUrl: string; ws: WebSocket | null; @@ -105,6 +79,16 @@ class WsSource { type: 'notification-count', data: JSON.stringify({Count: msg.count}), }); + } else if (msg.type === 'stopwatches') { + this.source.notifyClients({ + type: 'stopwatches', + data: JSON.stringify(msg.data), + }); + } else if (msg.type === 'logout') { + this.source.notifyClients({ + type: 'logout', + data: msg.data ?? '', + }); } } catch { // ignore malformed messages @@ -149,13 +133,6 @@ const wsSourcesByUrl = new Map(); (self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => { for (const port of e.ports) { port.addEventListener('message', (event: MessageEvent) => { - if (!self.EventSource) { - // some browsers (like PaleMoon, Firefox<53) don't support EventSource in SharedWorkerGlobalScope. - // this event handler needs EventSource when doing "new Source(url)", so just post a message back to the caller, - // in case the caller would like to use a fallback method to do its work. - port.postMessage({type: 'no-event-source'}); - return; - } if (event.data.type === 'start') { const url = event.data.url; let source = sourcesByUrl.get(url); @@ -167,14 +144,13 @@ const wsSourcesByUrl = new Map(); } source = sourcesByPort.get(port); if (source) { - if (source.eventSource && source.url === url) return; + if (source.url === url) return; // How this has happened I don't understand... // deregister from that source const count = source.deregister(port); // Clean-up if (count === 0) { - source.close(); sourcesByUrl.set(source.url, null); const ws = wsSourcesByUrl.get(source.url); if (ws) { @@ -183,24 +159,19 @@ const wsSourcesByUrl = new Map(); } } } - // Create a new Source + // Create a new Source and its WebSocket transport source = new Source(url); source.register(port); sourcesByUrl.set(url, source); sourcesByPort.set(port, source); - // Start WebSocket alongside EventSource for real-time notification pushes. const wsUrl = url.replace(/^http/, 'ws').replace(/\/user\/events$/, '/-/ws'); wsSourcesByUrl.set(url, new WsSource(wsUrl, source)); - } else if (event.data.type === 'listen') { - const source = sourcesByPort.get(port)!; - source.listen(event.data.eventType); } else if (event.data.type === 'close') { const source = sourcesByPort.get(port); if (!source) return; const count = source.deregister(port); if (count === 0) { - source.close(); sourcesByUrl.set(source.url, null); sourcesByPort.set(port, null); const ws = wsSourcesByUrl.get(source.url);