0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-04-13 05:15:01 +02:00

feat(websocket): Phase 2 — migrate stopwatches/logout to WebSocket, remove SSE

- Add stopwatch_notifier.go: periodic poller publishes stopwatches via pubsub broker
- Add logout_publisher.go: PublishLogout publishes logout events via pubsub broker
- websocket.go: rewrite logout messages server-side (sessionID to here/elsewhere)
- Remove entire SSE infrastructure: eventsource module, /user/events route, events.go
- Update blockexpensive/qos to handle /-/ws instead of /user/events
- Simplify eventsource.sharedworker.ts: remove EventSource, WebSocket-only delivery
This commit is contained in:
Epid 2026-04-02 00:41:09 +03:00
parent 155ef8fb0c
commit 256aeb9dc9
19 changed files with 203 additions and 734 deletions

View File

@ -1,118 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"bytes"
"fmt"
"io"
"strings"
"time"
"code.gitea.io/gitea/modules/json"
)
func wrapNewlines(w io.Writer, prefix, value []byte) (sum int64, err error) {
if len(value) == 0 {
return 0, nil
}
var n int
last := 0
for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') {
n, err = w.Write(prefix)
sum += int64(n)
if err != nil {
return sum, err
}
n, err = w.Write(value[last : last+j+1])
sum += int64(n)
if err != nil {
return sum, err
}
last += j + 1
}
n, err = w.Write(prefix)
sum += int64(n)
if err != nil {
return sum, err
}
n, err = w.Write(value[last:])
sum += int64(n)
if err != nil {
return sum, err
}
n, err = w.Write([]byte("\n"))
sum += int64(n)
return sum, err
}
// Event is an eventsource event, not all fields need to be set
type Event struct {
// Name represents the value of the event: tag in the stream
Name string
// Data is either JSONified []byte or any that can be JSONd
Data any
// ID represents the ID of an event
ID string
// Retry tells the receiver only to attempt to reconnect to the source after this time
Retry time.Duration
}
// WriteTo writes data to w until there's no more data to write or when an error occurs.
// The return value n is the number of bytes written. Any error encountered during the write is also returned.
func (e *Event) WriteTo(w io.Writer) (int64, error) {
sum := int64(0)
var nint int
n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name))
sum += n
if err != nil {
return sum, err
}
if e.Data != nil {
var data []byte
switch v := e.Data.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
var err error
data, err = json.Marshal(e.Data)
if err != nil {
return sum, err
}
}
n, err := wrapNewlines(w, []byte("data: "), data)
sum += n
if err != nil {
return sum, err
}
}
n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID))
sum += n
if err != nil {
return sum, err
}
if e.Retry != 0 {
nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond))
sum += int64(nint)
if err != nil {
return sum, err
}
}
nint, err = w.Write([]byte("\n"))
sum += int64(nint)
return sum, err
}
func (e *Event) String() string {
buf := new(strings.Builder)
_, _ = e.WriteTo(buf)
return buf.String()
}

View File

@ -1,50 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func Test_wrapNewlines(t *testing.T) {
tests := []struct {
name string
prefix string
value string
output string
}{
{
"check no new lines",
"prefix: ",
"value",
"prefix: value\n",
},
{
"check simple newline",
"prefix: ",
"value1\nvalue2",
"prefix: value1\nprefix: value2\n",
},
{
"check pathological newlines",
"p: ",
"\n1\n\n2\n3\n",
"p: \np: 1\np: \np: 2\np: 3\np: \n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &bytes.Buffer{}
gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value))
require.NoError(t, err)
assert.EqualValues(t, len(tt.output), gotSum)
assert.Equal(t, tt.output, w.String())
})
}
}

View File

@ -1,89 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"sync"
)
// Manager manages the eventsource Messengers
type Manager struct {
mutex sync.Mutex
messengers map[int64]*Messenger
connection chan struct{}
}
var manager *Manager
func init() {
manager = &Manager{
messengers: make(map[int64]*Messenger),
connection: make(chan struct{}, 1),
}
}
// GetManager returns a Manager and initializes one as singleton if there's none yet
func GetManager() *Manager {
return manager
}
// Register message channel
func (m *Manager) Register(uid int64) <-chan *Event {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
if !ok {
messenger = NewMessenger(uid)
m.messengers[uid] = messenger
}
select {
case m.connection <- struct{}{}:
default:
}
m.mutex.Unlock()
return messenger.Register()
}
// Unregister message channel
func (m *Manager) Unregister(uid int64, channel <-chan *Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
messenger, ok := m.messengers[uid]
if !ok {
return
}
if messenger.Unregister(channel) {
delete(m.messengers, uid)
}
}
// UnregisterAll message channels
func (m *Manager) UnregisterAll() {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, messenger := range m.messengers {
messenger.UnregisterAll()
}
m.messengers = map[int64]*Messenger{}
}
// SendMessage sends a message to a particular user
func (m *Manager) SendMessage(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessage(message)
}
}
// SendMessageBlocking sends a message to a particular user
func (m *Manager) SendMessageBlocking(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessageBlocking(message)
}
}

View File

@ -1,122 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"context"
"time"
activities_model "code.gitea.io/gitea/models/activities"
issues_model "code.gitea.io/gitea/models/issues"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/convert"
)
// Init starts this eventsource
func (m *Manager) Init() {
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
go graceful.GetManager().RunWithShutdownContext(m.Run)
}
// Run runs the manager within a provided context
func (m *Manager) Run(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true)
defer finished()
then := timeutil.TimeStampNow().Add(-2)
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
loop:
for {
select {
case <-ctx.Done():
timer.Stop()
break loop
case <-timer.C:
m.mutex.Lock()
connectionCount := len(m.messengers)
if connectionCount == 0 {
log.Trace("Event source has no listeners")
// empty the connection channel
select {
case <-m.connection:
default:
}
}
m.mutex.Unlock()
if connectionCount == 0 {
// No listeners so the source can be paused
log.Trace("Pausing the eventsource")
select {
case <-ctx.Done():
break loop
case <-m.connection:
log.Trace("Connection detected - restarting the eventsource")
// OK we're back so lets reset the timer and start again
// We won't change the "then" time because there could be concurrency issues
select {
case <-timer.C:
default:
}
continue
}
}
now := timeutil.TimeStampNow().Add(-2)
uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
if err != nil {
log.Error("Unable to get UIDcounts: %v", err)
}
for _, uidCount := range uidCounts {
m.SendMessage(uidCount.UserID, &Event{
Name: "notification-count",
Data: uidCount,
})
}
then = now
if setting.Service.EnableTimetracking {
usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx)
if err != nil {
log.Error("Unable to get GetUIDsAndStopwatch: %v", err)
return
}
for _, userStopwatches := range usersStopwatches {
u, err := user_model.GetUserByID(ctx, userStopwatches.UserID)
if err != nil {
log.Error("Unable to get user %d: %v", userStopwatches.UserID, err)
continue
}
apiSWs, err := convert.ToStopWatches(ctx, u, userStopwatches.StopWatches)
if err != nil {
if !issues_model.IsErrIssueNotExist(err) {
log.Error("Unable to APIFormat stopwatches: %v", err)
}
continue
}
dataBs, err := json.Marshal(apiSWs)
if err != nil {
log.Error("Unable to marshal stopwatches: %v", err)
continue
}
m.SendMessage(userStopwatches.UserID, &Event{
Name: "stopwatches",
Data: string(dataBs),
})
}
}
}
}
m.UnregisterAll()
}

View File

@ -1,77 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import "sync"
// Messenger is a per uid message store
type Messenger struct {
mutex sync.Mutex
uid int64
channels []chan *Event
}
// NewMessenger creates a messenger for a particular uid
func NewMessenger(uid int64) *Messenger {
return &Messenger{
uid: uid,
channels: [](chan *Event){},
}
}
// Register returns a new chan []byte
func (m *Messenger) Register() <-chan *Event {
m.mutex.Lock()
// TODO: Limit the number of messengers per uid
channel := make(chan *Event, 1)
m.channels = append(m.channels, channel)
m.mutex.Unlock()
return channel
}
// Unregister removes the provider chan []byte
func (m *Messenger) Unregister(channel <-chan *Event) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
for i, toRemove := range m.channels {
if channel == toRemove {
m.channels = append(m.channels[:i], m.channels[i+1:]...)
close(toRemove)
break
}
}
return len(m.channels) == 0
}
// UnregisterAll removes all chan []byte
func (m *Messenger) UnregisterAll() {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, channel := range m.channels {
close(channel)
}
m.channels = nil
}
// SendMessage sends the message to all registered channels
func (m *Messenger) SendMessage(message *Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.channels {
channel := m.channels[i]
select {
case channel <- message:
default:
}
}
}
// SendMessageBlocking sends the message to all registered channels and ensures it gets sent
func (m *Messenger) SendMessageBlocking(message *Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.channels {
m.channels[i] <- message
}
}

View File

@ -72,7 +72,7 @@ func isRoutePathExpensive(routePattern string) bool {
}
func isRoutePathForLongPolling(routePattern string) bool {
return routePattern == "/user/events"
return routePattern == "/-/ws"
}
func determineRequestPriority(reqCtx reqctx.RequestContext) (ret struct {

View File

@ -26,5 +26,5 @@ func TestBlockExpensive(t *testing.T) {
assert.Equal(t, c.expensive, isRoutePathExpensive(c.routePath), "routePath: %s", c.routePath)
}
assert.True(t, isRoutePathForLongPolling("/user/events"))
assert.True(t, isRoutePathForLongPolling("/-/ws"))
}

View File

@ -79,9 +79,9 @@ func QoS() func(next http.Handler) http.Handler {
return
}
// Release long-polling immediately, so they don't always
// Release long-lived connections immediately, so they don't always
// take up an in-flight request
if strings.Contains(req.URL.Path, "/user/events") {
if strings.Contains(req.URL.Path, "/-/ws") {
c.Release()
} else {
defer c.Release()

View File

@ -12,7 +12,6 @@ import (
"code.gitea.io/gitea/models"
authmodel "code.gitea.io/gitea/models/auth"
"code.gitea.io/gitea/modules/cache"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/git/gitcmd"
"code.gitea.io/gitea/modules/log"
@ -160,8 +159,8 @@ func InitWebInstalled(ctx context.Context) {
mustInit(automerge.Init)
mustInit(task.Init)
mustInit(repo_migrations.Init)
eventsource.GetManager().Init()
mustInit(websocket_service.Init)
mustInit(websocket_service.InitStopwatch)
mustInitCtx(ctx, mailer_incoming.Init)
mustInitCtx(ctx, syncAppConfForGit)

View File

@ -16,7 +16,6 @@ import (
"code.gitea.io/gitea/models/db"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/auth/password"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/httplib"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/optional"
@ -34,6 +33,7 @@ import (
"code.gitea.io/gitea/services/forms"
"code.gitea.io/gitea/services/mailer"
user_service "code.gitea.io/gitea/services/user"
websocket_service "code.gitea.io/gitea/services/websocket"
"github.com/markbates/goth"
)
@ -445,10 +445,7 @@ func HandleSignOut(ctx *context.Context) {
// SignOut sign out from login status
func SignOut(ctx *context.Context) {
if ctx.Doer != nil {
eventsource.GetManager().SendMessageBlocking(ctx.Doer.ID, &eventsource.Event{
Name: "logout",
Data: ctx.Session.ID(),
})
websocket_service.PublishLogout(ctx.Doer.ID, ctx.Session.ID())
}
// prepare the sign-out URL before destroying the session

View File

@ -1,122 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package events
import (
"net/http"
"time"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/routers/web/auth"
"code.gitea.io/gitea/services/context"
)
// Events listens for events
func Events(ctx *context.Context) {
// FIXME: Need to check if resp is actually a http.Flusher! - how though?
// Set the headers related to event streaming.
ctx.Resp.Header().Set("Content-Type", "text/event-stream")
ctx.Resp.Header().Set("Cache-Control", "no-cache")
ctx.Resp.Header().Set("Connection", "keep-alive")
ctx.Resp.Header().Set("X-Accel-Buffering", "no")
ctx.Resp.WriteHeader(http.StatusOK)
if !ctx.IsSigned {
// Return unauthorized status event
event := &eventsource.Event{
Name: "close",
Data: "unauthorized",
}
_, _ = event.WriteTo(ctx)
ctx.Resp.Flush()
return
}
// Listen to connection close and un-register messageChan
notify := ctx.Done()
ctx.Resp.Flush()
shutdownCtx := graceful.GetManager().ShutdownContext()
uid := ctx.Doer.ID
messageChan := eventsource.GetManager().Register(uid)
unregister := func() {
eventsource.GetManager().Unregister(uid, messageChan)
// ensure the messageChan is closed
for {
_, ok := <-messageChan
if !ok {
break
}
}
}
if _, err := ctx.Resp.Write([]byte("\n")); err != nil {
log.Error("Unable to write to EventStream: %v", err)
unregister()
return
}
timer := time.NewTicker(30 * time.Second)
loop:
for {
select {
case <-timer.C:
event := &eventsource.Event{
Name: "ping",
}
_, err := event.WriteTo(ctx.Resp)
if err != nil {
log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
go unregister()
break loop
}
ctx.Resp.Flush()
case <-notify:
go unregister()
break loop
case <-shutdownCtx.Done():
go unregister()
break loop
case event, ok := <-messageChan:
if !ok {
break loop
}
// Handle logout
if event.Name == "logout" {
if ctx.Session.ID() == event.Data {
_, _ = (&eventsource.Event{
Name: "logout",
Data: "here",
}).WriteTo(ctx.Resp)
ctx.Resp.Flush()
go unregister()
auth.HandleSignOut(ctx)
break loop
}
// Replace the event - we don't want to expose the session ID to the user
event = &eventsource.Event{
Name: "logout",
Data: "elsewhere",
}
}
_, err := event.WriteTo(ctx.Resp)
if err != nil {
log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
go unregister()
break loop
}
ctx.Resp.Flush()
}
}
timer.Stop()
}

View File

@ -6,8 +6,8 @@ package repo
import (
"code.gitea.io/gitea/models/db"
issues_model "code.gitea.io/gitea/models/issues"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/services/context"
websocket_service "code.gitea.io/gitea/services/websocket"
)
// IssueStartStopwatch creates a stopwatch for the given issue.
@ -76,10 +76,7 @@ func CancelStopwatch(c *context.Context) {
return
}
if len(stopwatches) == 0 {
eventsource.GetManager().SendMessage(c.Doer.ID, &eventsource.Event{
Name: "stopwatches",
Data: "{}",
})
websocket_service.PublishEmptyStopwatches(c.Doer.ID)
}
c.JSONRedirect("")

View File

@ -27,7 +27,6 @@ import (
"code.gitea.io/gitea/routers/web/admin"
"code.gitea.io/gitea/routers/web/auth"
"code.gitea.io/gitea/routers/web/devtest"
"code.gitea.io/gitea/routers/web/events"
"code.gitea.io/gitea/routers/web/explore"
"code.gitea.io/gitea/routers/web/feed"
"code.gitea.io/gitea/routers/web/healthcheck"
@ -592,7 +591,6 @@ func registerWebRoutes(m *web.Router, webAuth *AuthMiddleware) {
})
}, reqSignOut)
m.Any("/user/events", routing.MarkLongPolling, events.Events)
m.Get("/-/ws", gitea_websocket.Serve)
m.Group("/login/oauth", func() {

View File

@ -6,6 +6,7 @@ package websocket
import (
"net/http"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/pubsub"
@ -13,6 +14,38 @@ import (
gitea_ws "github.com/coder/websocket"
)
// logoutBrokerMsg is the internal broker message published by PublishLogout.
type logoutBrokerMsg struct {
Type string `json:"type"`
SessionID string `json:"sessionID,omitempty"`
}
// logoutClientMsg is sent to the WebSocket client so the browser can tell
// whether the logout originated from this tab ("here") or another ("elsewhere").
type logoutClientMsg struct {
Type string `json:"type"`
Data string `json:"data"`
}
// rewriteLogout intercepts a broker logout message and rewrites it to the
// client format using "here"/"elsewhere" instead of the raw session ID.
// If sessionID is empty the logout applies to all sessions ("here" for all).
func rewriteLogout(msg []byte, connSessionID string) []byte {
var lm logoutBrokerMsg
if err := json.Unmarshal(msg, &lm); err != nil || lm.Type != "logout" {
return msg
}
where := "elsewhere"
if lm.SessionID == "" || lm.SessionID == connSessionID {
where = "here"
}
out, err := json.Marshal(logoutClientMsg{Type: "logout", Data: where})
if err != nil {
return msg
}
return out
}
// Serve handles WebSocket upgrade and event delivery for the signed-in user.
func Serve(ctx *context.Context) {
if !ctx.IsSigned {
@ -28,6 +61,7 @@ func Serve(ctx *context.Context) {
}
defer conn.CloseNow() //nolint:errcheck // CloseNow is best-effort; error is intentionally ignored
sessionID := ctx.Session.ID()
ch, cancel := pubsub.DefaultBroker.Subscribe(pubsub.UserTopic(ctx.Doer.ID))
defer cancel()
@ -40,6 +74,7 @@ func Serve(ctx *context.Context) {
if !ok {
return
}
msg = rewriteLogout(msg, sessionID)
if err := conn.Write(wsCtx, gitea_ws.MessageText, msg); err != nil {
log.Trace("websocket: write failed: %v", err)
return

View File

@ -16,7 +16,6 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
system_model "code.gitea.io/gitea/models/system"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
@ -28,6 +27,7 @@ import (
"code.gitea.io/gitea/services/packages"
container_service "code.gitea.io/gitea/services/packages/container"
repo_service "code.gitea.io/gitea/services/repository"
websocket_service "code.gitea.io/gitea/services/websocket"
)
// RenameUser renames a user
@ -147,9 +147,7 @@ func DeleteUser(ctx context.Context, u *user_model.User, purge bool) error {
// Force any logged in sessions to log out
// FIXME: We also need to tell the session manager to log them out too.
eventsource.GetManager().SendMessage(u.ID, &eventsource.Event{
Name: "logout",
})
websocket_service.PublishLogout(u.ID, "")
// Delete all repos belonging to this user
// Now this is not within a transaction because there are internal transactions within the DeleteRepository

View File

@ -0,0 +1,30 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/services/pubsub"
)
type logoutEvent struct {
Type string `json:"type"`
SessionID string `json:"sessionID,omitempty"`
}
// PublishLogout publishes a logout event to all WebSocket clients connected as
// the given user. sessionID identifies which session is signing out so the
// client can distinguish "this tab" from "another tab".
func PublishLogout(userID int64, sessionID string) {
msg, err := json.Marshal(logoutEvent{
Type: "logout",
SessionID: sessionID,
})
if err != nil {
log.Error("websocket: marshal logout event: %v", err)
return
}
pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg)
}

View File

@ -0,0 +1,108 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"context"
"time"
issues_model "code.gitea.io/gitea/models/issues"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/services/convert"
"code.gitea.io/gitea/services/pubsub"
)
type stopwatchesEvent struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
}
// PublishEmptyStopwatches immediately pushes an empty stopwatches list to the
// given user's WebSocket clients — used when the user's last stopwatch is cancelled.
func PublishEmptyStopwatches(userID int64) {
msg, err := json.Marshal(stopwatchesEvent{
Type: "stopwatches",
Data: json.RawMessage(`[]`),
})
if err != nil {
log.Error("websocket: marshal empty stopwatches: %v", err)
return
}
pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg)
}
// InitStopwatch starts the background goroutine that polls active stopwatches
// and pushes updates to connected WebSocket clients.
func InitStopwatch() error {
if !setting.Service.EnableTimetracking {
return nil
}
go graceful.GetManager().RunWithShutdownContext(runStopwatch)
return nil
}
func runStopwatch(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket Stopwatch", process.SystemProcessType, true)
defer finished()
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if !pubsub.DefaultBroker.HasSubscribers() {
continue
}
userStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx)
if err != nil {
log.Error("websocket: GetUIDsAndStopwatch: %v", err)
continue
}
for _, us := range userStopwatches {
u, err := user_model.GetUserByID(ctx, us.UserID)
if err != nil {
log.Error("websocket: GetUserByID %d: %v", us.UserID, err)
continue
}
apiSWs, err := convert.ToStopWatches(ctx, u, us.StopWatches)
if err != nil {
if !issues_model.IsErrIssueNotExist(err) {
log.Error("websocket: ToStopWatches: %v", err)
}
continue
}
dataBs, err := json.Marshal(apiSWs)
if err != nil {
log.Error("websocket: marshal stopwatches: %v", err)
continue
}
msg, err := json.Marshal(stopwatchesEvent{
Type: "stopwatches",
Data: dataBs,
})
if err != nil {
continue
}
pubsub.DefaultBroker.Publish(pubsub.UserTopic(us.UserID), msg)
}
}
}
}

View File

@ -1,86 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package integration
import (
"fmt"
"net/http"
"testing"
"time"
activities_model "code.gitea.io/gitea/models/activities"
auth_model "code.gitea.io/gitea/models/auth"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/eventsource"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/tests"
"github.com/stretchr/testify/assert"
)
func TestEventSourceManagerRun(t *testing.T) {
defer tests.PrepareTestEnv(t)()
manager := eventsource.GetManager()
eventChan := manager.Register(2)
defer func() {
manager.Unregister(2, eventChan)
// ensure the eventChan is closed
for {
_, ok := <-eventChan
if !ok {
break
}
}
}()
expectNotificationCountEvent := func(count int64) func() bool {
return func() bool {
select {
case event, ok := <-eventChan:
if !ok {
return false
}
data, ok := event.Data.(activities_model.UserIDCount)
if !ok {
return false
}
return event.Name == "notification-count" && data.Count == count
default:
return false
}
}
}
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
repo1 := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
thread5 := unittest.AssertExistsAndLoadBean(t, &activities_model.Notification{ID: 5})
assert.NoError(t, thread5.LoadAttributes(t.Context()))
session := loginUser(t, user2.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteNotification, auth_model.AccessTokenScopeWriteRepository)
var apiNL []api.NotificationThread
// -- mark notifications as read --
req := NewRequest(t, "GET", "/api/v1/notifications?status-types=unread").
AddTokenAuth(token)
resp := session.MakeRequest(t, req, http.StatusOK)
DecodeJSON(t, resp, &apiNL)
assert.Len(t, apiNL, 2)
lastReadAt := "2000-01-01T00%3A50%3A01%2B00%3A00" // 946687801 <- only Notification 4 is in this filter ...
req = NewRequest(t, "PUT", fmt.Sprintf("/api/v1/repos/%s/%s/notifications?last_read_at=%s", user2.Name, repo1.Name, lastReadAt)).
AddTokenAuth(token)
session.MakeRequest(t, req, http.StatusResetContent)
req = NewRequest(t, "GET", "/api/v1/notifications?status-types=unread").
AddTokenAuth(token)
resp = session.MakeRequest(t, req, http.StatusOK)
DecodeJSON(t, resp, &apiNL)
assert.Len(t, apiNL, 1)
assert.Eventually(t, expectNotificationCountEvent(1), 30*time.Second, 1*time.Second)
}

View File

@ -1,20 +1,13 @@
// Source manages the list of connected page ports for one logical connection.
// It no longer creates an EventSource; all real-time data is delivered by the
// accompanying WsSource over WebSocket.
class Source {
url: string;
eventSource: EventSource | null;
listening: Record<string, boolean>;
clients: Array<MessagePort>;
constructor(url: string) {
this.url = url;
this.eventSource = new EventSource(url);
this.listening = {};
this.clients = [];
this.listen('open');
this.listen('close');
this.listen('logout');
this.listen('notification-count');
this.listen('stopwatches');
this.listen('error');
}
register(port: MessagePort) {
@ -37,24 +30,6 @@ class Source {
return this.clients.length;
}
close() {
if (!this.eventSource) return;
this.eventSource.close();
this.eventSource = null;
}
listen(eventType: string) {
if (this.listening[eventType]) return;
this.listening[eventType] = true;
this.eventSource?.addEventListener(eventType, (event) => {
this.notifyClients({
type: eventType,
data: event.data,
});
});
}
notifyClients(event: {type: string, data: any}) {
for (const client of this.clients) {
client.postMessage(event);
@ -64,15 +39,14 @@ class Source {
status(port: MessagePort) {
port.postMessage({
type: 'status',
message: `url: ${this.url} readyState: ${this.eventSource?.readyState}`,
message: `url: ${this.url}`,
});
}
}
// WsSource provides a WebSocket transport alongside EventSource.
// It delivers real-time notification-count pushes using the same client list
// as the associated Source, normalising messages to the SSE event format so
// that callers do not need to know which transport delivered the event.
// WsSource provides a WebSocket transport for real-time event delivery.
// It normalises messages to the SSE event format so that callers do not
// need to know which transport delivered the event.
class WsSource {
wsUrl: string;
ws: WebSocket | null;
@ -105,6 +79,16 @@ class WsSource {
type: 'notification-count',
data: JSON.stringify({Count: msg.count}),
});
} else if (msg.type === 'stopwatches') {
this.source.notifyClients({
type: 'stopwatches',
data: JSON.stringify(msg.data),
});
} else if (msg.type === 'logout') {
this.source.notifyClients({
type: 'logout',
data: msg.data ?? '',
});
}
} catch {
// ignore malformed messages
@ -149,13 +133,6 @@ const wsSourcesByUrl = new Map<string, WsSource | null>();
(self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => {
for (const port of e.ports) {
port.addEventListener('message', (event: MessageEvent) => {
if (!self.EventSource) {
// some browsers (like PaleMoon, Firefox<53) don't support EventSource in SharedWorkerGlobalScope.
// this event handler needs EventSource when doing "new Source(url)", so just post a message back to the caller,
// in case the caller would like to use a fallback method to do its work.
port.postMessage({type: 'no-event-source'});
return;
}
if (event.data.type === 'start') {
const url = event.data.url;
let source = sourcesByUrl.get(url);
@ -167,14 +144,13 @@ const wsSourcesByUrl = new Map<string, WsSource | null>();
}
source = sourcesByPort.get(port);
if (source) {
if (source.eventSource && source.url === url) return;
if (source.url === url) return;
// How this has happened I don't understand...
// deregister from that source
const count = source.deregister(port);
// Clean-up
if (count === 0) {
source.close();
sourcesByUrl.set(source.url, null);
const ws = wsSourcesByUrl.get(source.url);
if (ws) {
@ -183,24 +159,19 @@ const wsSourcesByUrl = new Map<string, WsSource | null>();
}
}
}
// Create a new Source
// Create a new Source and its WebSocket transport
source = new Source(url);
source.register(port);
sourcesByUrl.set(url, source);
sourcesByPort.set(port, source);
// Start WebSocket alongside EventSource for real-time notification pushes.
const wsUrl = url.replace(/^http/, 'ws').replace(/\/user\/events$/, '/-/ws');
wsSourcesByUrl.set(url, new WsSource(wsUrl, source));
} else if (event.data.type === 'listen') {
const source = sourcesByPort.get(port)!;
source.listen(event.data.eventType);
} else if (event.data.type === 'close') {
const source = sourcesByPort.get(port);
if (!source) return;
const count = source.deregister(port);
if (count === 0) {
source.close();
sourcesByUrl.set(source.url, null);
sourcesByPort.set(port, null);
const ws = wsSourcesByUrl.get(source.url);