0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-04-12 01:18:59 +02:00

Merge 0af095a41915851a1ae6997afdfd650d9c62fadf into 4fa319b9dca46d3d553d4d4e8f74ca0e009693c6

This commit is contained in:
mohammad rahimi 2026-04-03 09:31:39 +08:00 committed by GitHub
commit 58b40fb213
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 618 additions and 780 deletions

View File

@ -389,6 +389,11 @@
"path": "github.com/cloudflare/circl/LICENSE",
"licenseText": "Copyright (c) 2019 Cloudflare. All rights reserved.\n\nRedistribution and use in source and binary forms, with or without\nmodification, are permitted provided that the following conditions are\nmet:\n\n * Redistributions of source code must retain the above copyright\nnotice, this list of conditions and the following disclaimer.\n * Redistributions in binary form must reproduce the above\ncopyright notice, this list of conditions and the following disclaimer\nin the documentation and/or other materials provided with the\ndistribution.\n * Neither the name of Cloudflare nor the names of its\ncontributors may be used to endorse or promote products derived from\nthis software without specific prior written permission.\n\nTHIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\nLIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\nA PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\nOWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\nSPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\nLIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\nDATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\nTHEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\nOF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n\n========================================================================\n\nCopyright (c) 2009 The Go Authors. All rights reserved.\n\nRedistribution and use in source and binary forms, with or without\nmodification, are permitted provided that the following conditions are\nmet:\n\n * Redistributions of source code must retain the above copyright\nnotice, this list of conditions and the following disclaimer.\n * Redistributions in binary form must reproduce the above\ncopyright notice, this list of conditions and the following disclaimer\nin the documentation and/or other materials provided with the\ndistribution.\n * Neither the name of Google Inc. nor the names of its\ncontributors may be used to endorse or promote products derived from\nthis software without specific prior written permission.\n\nTHIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\nLIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\nA PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\nOWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\nSPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\nLIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\nDATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\nTHEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\nOF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
},
{
"name": "github.com/coder/websocket",
"path": "github.com/coder/websocket/LICENSE.txt",
"licenseText": "Copyright (c) 2025 Coder\n\nPermission to use, copy, modify, and distribute this software for any\npurpose with or without fee is hereby granted, provided that the above\ncopyright notice and this permission notice appear in all copies.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\" AND THE AUTHOR DISCLAIMS ALL WARRANTIES\nWITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF\nMERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR\nANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES\nWHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN\nACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF\nOR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.\n"
},
{
"name": "github.com/couchbase/go-couchbase",
"path": "github.com/couchbase/go-couchbase/LICENSE",

1
go.mod
View File

@ -36,6 +36,7 @@ require (
github.com/caddyserver/certmagic v0.25.2
github.com/charmbracelet/git-lfs-transfer v0.1.1-0.20251013092601-6327009efd21
github.com/chi-middleware/proxy v1.1.1
github.com/coder/websocket v1.8.14
github.com/dimiro1/reply v0.0.0-20200315094148-d0136a4c9e21
github.com/dlclark/regexp2 v1.11.5
github.com/dsnet/compress v0.0.2-0.20230904184137-39efe44ab707

2
go.sum
View File

@ -211,6 +211,8 @@ github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJ
github.com/clipperhouse/uax29/v2 v2.7.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM=
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=

View File

@ -1,118 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"bytes"
"fmt"
"io"
"strings"
"time"
"code.gitea.io/gitea/modules/json"
)
func wrapNewlines(w io.Writer, prefix, value []byte) (sum int64, err error) {
if len(value) == 0 {
return 0, nil
}
var n int
last := 0
for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') {
n, err = w.Write(prefix)
sum += int64(n)
if err != nil {
return sum, err
}
n, err = w.Write(value[last : last+j+1])
sum += int64(n)
if err != nil {
return sum, err
}
last += j + 1
}
n, err = w.Write(prefix)
sum += int64(n)
if err != nil {
return sum, err
}
n, err = w.Write(value[last:])
sum += int64(n)
if err != nil {
return sum, err
}
n, err = w.Write([]byte("\n"))
sum += int64(n)
return sum, err
}
// Event is an eventsource event, not all fields need to be set
type Event struct {
// Name represents the value of the event: tag in the stream
Name string
// Data is either JSONified []byte or any that can be JSONd
Data any
// ID represents the ID of an event
ID string
// Retry tells the receiver only to attempt to reconnect to the source after this time
Retry time.Duration
}
// WriteTo writes data to w until there's no more data to write or when an error occurs.
// The return value n is the number of bytes written. Any error encountered during the write is also returned.
func (e *Event) WriteTo(w io.Writer) (int64, error) {
sum := int64(0)
var nint int
n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name))
sum += n
if err != nil {
return sum, err
}
if e.Data != nil {
var data []byte
switch v := e.Data.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
var err error
data, err = json.Marshal(e.Data)
if err != nil {
return sum, err
}
}
n, err := wrapNewlines(w, []byte("data: "), data)
sum += n
if err != nil {
return sum, err
}
}
n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID))
sum += n
if err != nil {
return sum, err
}
if e.Retry != 0 {
nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond))
sum += int64(nint)
if err != nil {
return sum, err
}
}
nint, err = w.Write([]byte("\n"))
sum += int64(nint)
return sum, err
}
func (e *Event) String() string {
buf := new(strings.Builder)
_, _ = e.WriteTo(buf)
return buf.String()
}

View File

@ -1,50 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func Test_wrapNewlines(t *testing.T) {
tests := []struct {
name string
prefix string
value string
output string
}{
{
"check no new lines",
"prefix: ",
"value",
"prefix: value\n",
},
{
"check simple newline",
"prefix: ",
"value1\nvalue2",
"prefix: value1\nprefix: value2\n",
},
{
"check pathological newlines",
"p: ",
"\n1\n\n2\n3\n",
"p: \np: 1\np: \np: 2\np: 3\np: \n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &bytes.Buffer{}
gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value))
require.NoError(t, err)
assert.EqualValues(t, len(tt.output), gotSum)
assert.Equal(t, tt.output, w.String())
})
}
}

View File

@ -1,89 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"sync"
)
// Manager manages the eventsource Messengers
type Manager struct {
mutex sync.Mutex
messengers map[int64]*Messenger
connection chan struct{}
}
var manager *Manager
func init() {
manager = &Manager{
messengers: make(map[int64]*Messenger),
connection: make(chan struct{}, 1),
}
}
// GetManager returns a Manager and initializes one as singleton if there's none yet
func GetManager() *Manager {
return manager
}
// Register message channel
func (m *Manager) Register(uid int64) <-chan *Event {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
if !ok {
messenger = NewMessenger(uid)
m.messengers[uid] = messenger
}
select {
case m.connection <- struct{}{}:
default:
}
m.mutex.Unlock()
return messenger.Register()
}
// Unregister message channel
func (m *Manager) Unregister(uid int64, channel <-chan *Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
messenger, ok := m.messengers[uid]
if !ok {
return
}
if messenger.Unregister(channel) {
delete(m.messengers, uid)
}
}
// UnregisterAll message channels
func (m *Manager) UnregisterAll() {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, messenger := range m.messengers {
messenger.UnregisterAll()
}
m.messengers = map[int64]*Messenger{}
}
// SendMessage sends a message to a particular user
func (m *Manager) SendMessage(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessage(message)
}
}
// SendMessageBlocking sends a message to a particular user
func (m *Manager) SendMessageBlocking(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessageBlocking(message)
}
}

View File

@ -1,122 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import (
"context"
"time"
activities_model "code.gitea.io/gitea/models/activities"
issues_model "code.gitea.io/gitea/models/issues"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/convert"
)
// Init starts this eventsource
func (m *Manager) Init() {
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
go graceful.GetManager().RunWithShutdownContext(m.Run)
}
// Run runs the manager within a provided context
func (m *Manager) Run(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true)
defer finished()
then := timeutil.TimeStampNow().Add(-2)
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
loop:
for {
select {
case <-ctx.Done():
timer.Stop()
break loop
case <-timer.C:
m.mutex.Lock()
connectionCount := len(m.messengers)
if connectionCount == 0 {
log.Trace("Event source has no listeners")
// empty the connection channel
select {
case <-m.connection:
default:
}
}
m.mutex.Unlock()
if connectionCount == 0 {
// No listeners so the source can be paused
log.Trace("Pausing the eventsource")
select {
case <-ctx.Done():
break loop
case <-m.connection:
log.Trace("Connection detected - restarting the eventsource")
// OK we're back so lets reset the timer and start again
// We won't change the "then" time because there could be concurrency issues
select {
case <-timer.C:
default:
}
continue
}
}
now := timeutil.TimeStampNow().Add(-2)
uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
if err != nil {
log.Error("Unable to get UIDcounts: %v", err)
}
for _, uidCount := range uidCounts {
m.SendMessage(uidCount.UserID, &Event{
Name: "notification-count",
Data: uidCount,
})
}
then = now
if setting.Service.EnableTimetracking {
usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx)
if err != nil {
log.Error("Unable to get GetUIDsAndStopwatch: %v", err)
return
}
for _, userStopwatches := range usersStopwatches {
u, err := user_model.GetUserByID(ctx, userStopwatches.UserID)
if err != nil {
log.Error("Unable to get user %d: %v", userStopwatches.UserID, err)
continue
}
apiSWs, err := convert.ToStopWatches(ctx, u, userStopwatches.StopWatches)
if err != nil {
if !issues_model.IsErrIssueNotExist(err) {
log.Error("Unable to APIFormat stopwatches: %v", err)
}
continue
}
dataBs, err := json.Marshal(apiSWs)
if err != nil {
log.Error("Unable to marshal stopwatches: %v", err)
continue
}
m.SendMessage(userStopwatches.UserID, &Event{
Name: "stopwatches",
Data: string(dataBs),
})
}
}
}
}
m.UnregisterAll()
}

View File

@ -1,77 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package eventsource
import "sync"
// Messenger is a per uid message store
type Messenger struct {
mutex sync.Mutex
uid int64
channels []chan *Event
}
// NewMessenger creates a messenger for a particular uid
func NewMessenger(uid int64) *Messenger {
return &Messenger{
uid: uid,
channels: [](chan *Event){},
}
}
// Register returns a new chan []byte
func (m *Messenger) Register() <-chan *Event {
m.mutex.Lock()
// TODO: Limit the number of messengers per uid
channel := make(chan *Event, 1)
m.channels = append(m.channels, channel)
m.mutex.Unlock()
return channel
}
// Unregister removes the provider chan []byte
func (m *Messenger) Unregister(channel <-chan *Event) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
for i, toRemove := range m.channels {
if channel == toRemove {
m.channels = append(m.channels[:i], m.channels[i+1:]...)
close(toRemove)
break
}
}
return len(m.channels) == 0
}
// UnregisterAll removes all chan []byte
func (m *Messenger) UnregisterAll() {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, channel := range m.channels {
close(channel)
}
m.channels = nil
}
// SendMessage sends the message to all registered channels
func (m *Messenger) SendMessage(message *Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.channels {
channel := m.channels[i]
select {
case channel <- message:
default:
}
}
}
// SendMessageBlocking sends the message to all registered channels and ensures it gets sent
func (m *Messenger) SendMessageBlocking(message *Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.channels {
m.channels[i] <- message
}
}

View File

@ -72,7 +72,7 @@ func isRoutePathExpensive(routePattern string) bool {
}
func isRoutePathForLongPolling(routePattern string) bool {
return routePattern == "/user/events"
return routePattern == "/-/ws"
}
func determineRequestPriority(reqCtx reqctx.RequestContext) (ret struct {

View File

@ -26,5 +26,5 @@ func TestBlockExpensive(t *testing.T) {
assert.Equal(t, c.expensive, isRoutePathExpensive(c.routePath), "routePath: %s", c.routePath)
}
assert.True(t, isRoutePathForLongPolling("/user/events"))
assert.True(t, isRoutePathForLongPolling("/-/ws"))
}

View File

@ -79,9 +79,9 @@ func QoS() func(next http.Handler) http.Handler {
return
}
// Release long-polling immediately, so they don't always
// Release long-lived connections immediately, so they don't always
// take up an in-flight request
if strings.Contains(req.URL.Path, "/user/events") {
if strings.Contains(req.URL.Path, "/-/ws") {
c.Release()
} else {
defer c.Release()

View File

@ -12,7 +12,6 @@ import (
"code.gitea.io/gitea/models"
authmodel "code.gitea.io/gitea/models/auth"
"code.gitea.io/gitea/modules/cache"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/git/gitcmd"
"code.gitea.io/gitea/modules/log"
@ -54,6 +53,7 @@ import (
"code.gitea.io/gitea/services/task"
"code.gitea.io/gitea/services/uinotification"
"code.gitea.io/gitea/services/webhook"
websocket_service "code.gitea.io/gitea/services/websocket"
)
func mustInit(fn func() error) {
@ -159,7 +159,8 @@ func InitWebInstalled(ctx context.Context) {
mustInit(automerge.Init)
mustInit(task.Init)
mustInit(repo_migrations.Init)
eventsource.GetManager().Init()
mustInit(websocket_service.Init)
mustInit(websocket_service.InitStopwatch)
mustInitCtx(ctx, mailer_incoming.Init)
mustInitCtx(ctx, syncAppConfForGit)

View File

@ -16,7 +16,6 @@ import (
"code.gitea.io/gitea/models/db"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/auth/password"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/httplib"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/optional"
@ -34,6 +33,7 @@ import (
"code.gitea.io/gitea/services/forms"
"code.gitea.io/gitea/services/mailer"
user_service "code.gitea.io/gitea/services/user"
websocket_service "code.gitea.io/gitea/services/websocket"
"github.com/markbates/goth"
)
@ -480,10 +480,7 @@ func HandleSignOut(ctx *context.Context) {
// SignOut sign out from login status
func SignOut(ctx *context.Context) {
if ctx.Doer != nil {
eventsource.GetManager().SendMessageBlocking(ctx.Doer.ID, &eventsource.Event{
Name: "logout",
Data: ctx.Session.ID(),
})
websocket_service.PublishLogout(ctx.Doer.ID, ctx.Session.ID())
}
// prepare the sign-out URL before destroying the session

View File

@ -1,122 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package events
import (
"net/http"
"time"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/routers/web/auth"
"code.gitea.io/gitea/services/context"
)
// Events listens for events
func Events(ctx *context.Context) {
// FIXME: Need to check if resp is actually a http.Flusher! - how though?
// Set the headers related to event streaming.
ctx.Resp.Header().Set("Content-Type", "text/event-stream")
ctx.Resp.Header().Set("Cache-Control", "no-cache")
ctx.Resp.Header().Set("Connection", "keep-alive")
ctx.Resp.Header().Set("X-Accel-Buffering", "no")
ctx.Resp.WriteHeader(http.StatusOK)
if !ctx.IsSigned {
// Return unauthorized status event
event := &eventsource.Event{
Name: "close",
Data: "unauthorized",
}
_, _ = event.WriteTo(ctx)
ctx.Resp.Flush()
return
}
// Listen to connection close and un-register messageChan
notify := ctx.Done()
ctx.Resp.Flush()
shutdownCtx := graceful.GetManager().ShutdownContext()
uid := ctx.Doer.ID
messageChan := eventsource.GetManager().Register(uid)
unregister := func() {
eventsource.GetManager().Unregister(uid, messageChan)
// ensure the messageChan is closed
for {
_, ok := <-messageChan
if !ok {
break
}
}
}
if _, err := ctx.Resp.Write([]byte("\n")); err != nil {
log.Error("Unable to write to EventStream: %v", err)
unregister()
return
}
timer := time.NewTicker(30 * time.Second)
loop:
for {
select {
case <-timer.C:
event := &eventsource.Event{
Name: "ping",
}
_, err := event.WriteTo(ctx.Resp)
if err != nil {
log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
go unregister()
break loop
}
ctx.Resp.Flush()
case <-notify:
go unregister()
break loop
case <-shutdownCtx.Done():
go unregister()
break loop
case event, ok := <-messageChan:
if !ok {
break loop
}
// Handle logout
if event.Name == "logout" {
if ctx.Session.ID() == event.Data {
_, _ = (&eventsource.Event{
Name: "logout",
Data: "here",
}).WriteTo(ctx.Resp)
ctx.Resp.Flush()
go unregister()
auth.HandleSignOut(ctx)
break loop
}
// Replace the event - we don't want to expose the session ID to the user
event = &eventsource.Event{
Name: "logout",
Data: "elsewhere",
}
}
_, err := event.WriteTo(ctx.Resp)
if err != nil {
log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
go unregister()
break loop
}
ctx.Resp.Flush()
}
}
timer.Stop()
}

View File

@ -6,8 +6,8 @@ package repo
import (
"code.gitea.io/gitea/models/db"
issues_model "code.gitea.io/gitea/models/issues"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/services/context"
websocket_service "code.gitea.io/gitea/services/websocket"
)
// IssueStartStopwatch creates a stopwatch for the given issue.
@ -51,6 +51,16 @@ func IssueStopStopwatch(c *context.Context) {
} else if !ok {
c.Flash.Warning(c.Tr("repo.issues.stopwatch_already_stopped"))
}
stopwatches, err := issues_model.GetUserStopwatches(c, c.Doer.ID, db.ListOptions{})
if err != nil {
c.ServerError("GetUserStopwatches", err)
return
}
if len(stopwatches) == 0 {
websocket_service.PublishEmptyStopwatches(c.Doer.ID)
}
c.JSONRedirect("")
}
@ -76,10 +86,7 @@ func CancelStopwatch(c *context.Context) {
return
}
if len(stopwatches) == 0 {
eventsource.GetManager().SendMessage(c.Doer.ID, &eventsource.Event{
Name: "stopwatches",
Data: "{}",
})
websocket_service.PublishEmptyStopwatches(c.Doer.ID)
}
c.JSONRedirect("")

View File

@ -27,7 +27,6 @@ import (
"code.gitea.io/gitea/routers/web/admin"
"code.gitea.io/gitea/routers/web/auth"
"code.gitea.io/gitea/routers/web/devtest"
"code.gitea.io/gitea/routers/web/events"
"code.gitea.io/gitea/routers/web/explore"
"code.gitea.io/gitea/routers/web/feed"
"code.gitea.io/gitea/routers/web/healthcheck"
@ -41,6 +40,7 @@ import (
"code.gitea.io/gitea/routers/web/user"
user_setting "code.gitea.io/gitea/routers/web/user/setting"
"code.gitea.io/gitea/routers/web/user/setting/security"
gitea_websocket "code.gitea.io/gitea/routers/web/websocket"
auth_service "code.gitea.io/gitea/services/auth"
"code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/forms"
@ -587,7 +587,7 @@ func registerWebRoutes(m *web.Router, webAuth *AuthMiddleware) {
})
}, reqSignOut)
m.Any("/user/events", routing.MarkLongPolling, events.Events)
m.Get("/-/ws", gitea_websocket.Serve)
m.Group("/login/oauth", func() {
m.Group("", func() {

View File

@ -0,0 +1,84 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"net/http"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/pubsub"
gitea_ws "github.com/coder/websocket"
)
// logoutBrokerMsg is the internal broker message published by PublishLogout.
type logoutBrokerMsg struct {
Type string `json:"type"`
SessionID string `json:"sessionID,omitempty"`
}
// logoutClientMsg is sent to the WebSocket client so the browser can tell
// whether the logout originated from this tab ("here") or another ("elsewhere").
type logoutClientMsg struct {
Type string `json:"type"`
Data string `json:"data"`
}
// rewriteLogout intercepts a broker logout message and rewrites it to the
// client format using "here"/"elsewhere" instead of the raw session ID.
// If sessionID is empty the logout applies to all sessions ("here" for all).
func rewriteLogout(msg []byte, connSessionID string) []byte {
var lm logoutBrokerMsg
if err := json.Unmarshal(msg, &lm); err != nil || lm.Type != "logout" {
return msg
}
where := "elsewhere"
if lm.SessionID == "" || lm.SessionID == connSessionID {
where = "here"
}
out, err := json.Marshal(logoutClientMsg{Type: "logout", Data: where})
if err != nil {
return msg
}
return out
}
// Serve handles WebSocket upgrade and event delivery for the signed-in user.
func Serve(ctx *context.Context) {
if !ctx.IsSigned {
ctx.Status(http.StatusUnauthorized)
return
}
conn, err := gitea_ws.Accept(ctx.Resp, ctx.Req, &gitea_ws.AcceptOptions{
InsecureSkipVerify: false,
})
if err != nil {
log.Error("websocket: accept failed: %v", err)
return
}
defer conn.CloseNow() //nolint:errcheck // CloseNow is best-effort; error is intentionally ignored
sessionID := ctx.Session.ID()
ch, cancel := pubsub.DefaultBroker.Subscribe(pubsub.UserTopic(ctx.Doer.ID))
defer cancel()
wsCtx := ctx.Req.Context()
for {
select {
case <-wsCtx.Done():
return
case msg, ok := <-ch:
if !ok {
return
}
msg = rewriteLogout(msg, sessionID)
if err := conn.Write(wsCtx, gitea_ws.MessageText, msg); err != nil {
log.Trace("websocket: write failed: %v", err)
return
}
}
}
}

85
services/pubsub/broker.go Normal file
View File

@ -0,0 +1,85 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package pubsub
import (
"fmt"
"sync"
)
// Broker is a simple in-memory pub/sub broker.
// It supports fan-out: one Publish call delivers the message to all active subscribers.
type Broker struct {
mu sync.RWMutex
subs map[string][]chan []byte
}
// DefaultBroker is the global singleton used by both routers and notifiers.
var DefaultBroker = NewBroker()
// NewBroker creates a new in-memory Broker.
func NewBroker() *Broker {
return &Broker{
subs: make(map[string][]chan []byte),
}
}
// Subscribe returns a channel that receives messages published to topic.
// Call the returned cancel function to unsubscribe.
func (b *Broker) Subscribe(topic string) (<-chan []byte, func()) {
ch := make(chan []byte, 8)
b.mu.Lock()
b.subs[topic] = append(b.subs[topic], ch)
b.mu.Unlock()
cancel := func() {
b.mu.Lock()
defer b.mu.Unlock()
subs := b.subs[topic]
for i, sub := range subs {
if sub == ch {
b.subs[topic] = append(subs[:i], subs[i+1:]...)
break
}
}
close(ch)
}
return ch, cancel
}
// UserTopic returns the pub/sub topic name for a given user ID.
// Centralised here so the notifier and the WebSocket handler always agree on the format.
func UserTopic(userID int64) string {
return fmt.Sprintf("user-%d", userID)
}
// HasSubscribers reports whether the broker has at least one active subscriber across all topics.
func (b *Broker) HasSubscribers() bool {
b.mu.RLock()
defer b.mu.RUnlock()
for _, subs := range b.subs {
if len(subs) > 0 {
return true
}
}
return false
}
// Publish sends msg to all subscribers of topic.
// Non-blocking: slow subscribers are skipped.
// The RLock is held for the entire fan-out to prevent a race where cancel()
// closes a channel between the slice read and the send.
func (b *Broker) Publish(topic string, msg []byte) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, ch := range b.subs[topic] {
select {
case ch <- msg:
default:
// subscriber too slow — skip
}
}
}

View File

@ -16,7 +16,6 @@ import (
repo_model "code.gitea.io/gitea/models/repo"
system_model "code.gitea.io/gitea/models/system"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
@ -28,6 +27,7 @@ import (
"code.gitea.io/gitea/services/packages"
container_service "code.gitea.io/gitea/services/packages/container"
repo_service "code.gitea.io/gitea/services/repository"
websocket_service "code.gitea.io/gitea/services/websocket"
)
// RenameUser renames a user
@ -147,9 +147,7 @@ func DeleteUser(ctx context.Context, u *user_model.User, purge bool) error {
// Force any logged in sessions to log out
// FIXME: We also need to tell the session manager to log them out too.
eventsource.GetManager().SendMessage(u.ID, &eventsource.Event{
Name: "logout",
})
websocket_service.PublishLogout(u.ID, "")
// Delete all repos belonging to this user
// Now this is not within a transaction because there are internal transactions within the DeleteRepository

View File

@ -0,0 +1,30 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/services/pubsub"
)
type logoutEvent struct {
Type string `json:"type"`
SessionID string `json:"sessionID,omitempty"`
}
// PublishLogout publishes a logout event to all WebSocket clients connected as
// the given user. sessionID identifies which session is signing out so the
// client can distinguish "this tab" from "another tab".
func PublishLogout(userID int64, sessionID string) {
msg, err := json.Marshal(logoutEvent{
Type: "logout",
SessionID: sessionID,
})
if err != nil {
log.Error("websocket: marshal logout event: %v", err)
return
}
pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg)
}

View File

@ -0,0 +1,82 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"context"
"time"
activities_model "code.gitea.io/gitea/models/activities"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/services/pubsub"
)
// nowTS returns the current time as a TimeStamp using the real wall clock,
// avoiding data races with timeutil.MockUnset during tests.
func nowTS() timeutil.TimeStamp {
return timeutil.TimeStamp(time.Now().Unix())
}
type notificationCountEvent struct {
Type string `json:"type"`
Count int64 `json:"count"`
}
// Init starts the background goroutine that polls notification counts
// and pushes updates to connected WebSocket clients.
func Init() error {
go graceful.GetManager().RunWithShutdownContext(run)
return nil
}
func run(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket", process.SystemProcessType, true)
defer finished()
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
then := nowTS().Add(-2)
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if !pubsub.DefaultBroker.HasSubscribers() {
then = nowTS().Add(-2)
continue
}
now := nowTS().Add(-2)
uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
if err != nil {
log.Error("websocket: GetUIDsAndNotificationCounts: %v", err)
continue
}
for _, uidCount := range uidCounts {
msg, err := json.Marshal(notificationCountEvent{
Type: "notification-count",
Count: uidCount.Count,
})
if err != nil {
continue
}
pubsub.DefaultBroker.Publish(pubsub.UserTopic(uidCount.UserID), msg)
}
then = now
}
}
}

View File

@ -0,0 +1,102 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package websocket
import (
"context"
"time"
issues_model "code.gitea.io/gitea/models/issues"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/services/convert"
"code.gitea.io/gitea/services/pubsub"
)
type stopwatchesEvent struct {
Type string `json:"type"`
Data any `json:"data"`
}
// PublishEmptyStopwatches immediately pushes an empty stopwatches list to the
// given user's WebSocket clients — used when the user's last stopwatch is cancelled.
func PublishEmptyStopwatches(userID int64) {
msg, err := json.Marshal(stopwatchesEvent{
Type: "stopwatches",
Data: []any{},
})
if err != nil {
log.Error("websocket: marshal empty stopwatches: %v", err)
return
}
pubsub.DefaultBroker.Publish(pubsub.UserTopic(userID), msg)
}
// InitStopwatch starts the background goroutine that polls active stopwatches
// and pushes updates to connected WebSocket clients.
func InitStopwatch() error {
if !setting.Service.EnableTimetracking {
return nil
}
go graceful.GetManager().RunWithShutdownContext(runStopwatch)
return nil
}
func runStopwatch(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: WebSocket Stopwatch", process.SystemProcessType, true)
defer finished()
if setting.UI.Notification.EventSourceUpdateTime <= 0 {
return
}
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if !pubsub.DefaultBroker.HasSubscribers() {
continue
}
userStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx)
if err != nil {
log.Error("websocket: GetUIDsAndStopwatch: %v", err)
continue
}
for _, us := range userStopwatches {
u, err := user_model.GetUserByID(ctx, us.UserID)
if err != nil {
log.Error("websocket: GetUserByID %d: %v", us.UserID, err)
continue
}
apiSWs, err := convert.ToStopWatches(ctx, u, us.StopWatches)
if err != nil {
if !issues_model.IsErrIssueNotExist(err) {
log.Error("websocket: ToStopWatches: %v", err)
}
continue
}
msg, err := json.Marshal(stopwatchesEvent{
Type: "stopwatches",
Data: apiSWs,
})
if err != nil {
continue
}
pubsub.DefaultBroker.Publish(pubsub.UserTopic(us.UserID), msg)
}
}
}
}

View File

@ -152,31 +152,29 @@
</div><!-- end full right menu -->
{{$activeStopwatch := and .PageGlobalData (call .PageGlobalData.GetActiveStopwatch)}}
{{if $activeStopwatch}}
<div class="active-stopwatch-popup tippy-target">
<div class="flex-text-block tw-p-3">
<a class="stopwatch-link flex-text-block muted" href="{{$activeStopwatch.IssueLink}}">
{{svg "octicon-issue-opened" 16}}
<span class="stopwatch-issue">{{$activeStopwatch.RepoSlug}}#{{$activeStopwatch.IssueIndex}}</span>
</a>
<div class="tw-flex tw-gap-1">
<form class="stopwatch-commit form-fetch-action" method="post" action="{{$activeStopwatch.IssueLink}}/times/stopwatch/stop">
<button
type="submit"
class="ui button mini compact basic icon tw-mr-0"
data-tooltip-content="{{ctx.Locale.Tr "repo.issues.stop_tracking"}}"
>{{svg "octicon-square-fill"}}</button>
</form>
<form class="stopwatch-cancel form-fetch-action" method="post" action="{{$activeStopwatch.IssueLink}}/times/stopwatch/cancel">
<button
type="submit"
class="ui button mini compact basic icon tw-mr-0"
data-tooltip-content="{{ctx.Locale.Tr "repo.issues.cancel_tracking"}}"
>{{svg "octicon-trash"}}</button>
</form>
</div>
<div class="active-stopwatch-popup tippy-target">
<div class="flex-text-block tw-p-3">
<a class="stopwatch-link flex-text-block muted" href="{{if $activeStopwatch}}{{$activeStopwatch.IssueLink}}{{end}}">
{{svg "octicon-issue-opened" 16}}
<span class="stopwatch-issue">{{if $activeStopwatch}}{{$activeStopwatch.RepoSlug}}#{{$activeStopwatch.IssueIndex}}{{end}}</span>
</a>
<div class="tw-flex tw-gap-1">
<form class="stopwatch-commit" method="post" action="{{if $activeStopwatch}}{{$activeStopwatch.IssueLink}}/times/stopwatch/stop{{end}}">
<button
type="submit"
class="ui button mini compact basic icon tw-mr-0"
data-tooltip-content="{{ctx.Locale.Tr "repo.issues.stop_tracking"}}"
>{{svg "octicon-square-fill"}}</button>
</form>
<form class="stopwatch-cancel" method="post" action="{{if $activeStopwatch}}{{$activeStopwatch.IssueLink}}/times/stopwatch/cancel{{end}}">
<button
type="submit"
class="ui button mini compact basic icon tw-mr-0"
data-tooltip-content="{{ctx.Locale.Tr "repo.issues.cancel_tracking"}}"
>{{svg "octicon-trash"}}</button>
</form>
</div>
</div>
{{end}}
</div>
</nav>
{{template "base/head_banner"}}

View File

@ -3,14 +3,12 @@
{{if and $data $data.IsSigned}}{{/* data may not exist, for example: rendering 503 page before the PageGlobalData middleware */}}
{{- $activeStopwatch := call $data.GetActiveStopwatch -}}
{{- $notificationUnreadCount := call $data.GetNotificationUnreadCount -}}
{{if $activeStopwatch}}
<a class="item active-stopwatch {{$itemExtraClass}}" href="{{$activeStopwatch.IssueLink}}" title="{{ctx.Locale.Tr "active_stopwatch"}}" data-seconds="{{$activeStopwatch.Seconds}}">
<a class="item active-stopwatch{{if not $activeStopwatch}} tw-hidden{{end}} {{$itemExtraClass}}" {{if $activeStopwatch}}href="{{$activeStopwatch.IssueLink}}" data-seconds="{{$activeStopwatch.Seconds}}"{{end}} title="{{ctx.Locale.Tr "active_stopwatch"}}">
<div class="tw-relative flex-text-block">
{{svg "octicon-stopwatch"}}
<span class="header-stopwatch-dot"></span>
</div>
</a>
{{end}}
<a class="item {{$itemExtraClass}}" href="{{AppSubUrl}}/notifications" data-tooltip-content="{{ctx.Locale.Tr "notifications"}}">
<div class="tw-relative flex-text-block">
{{svg "octicon-bell"}}

View File

@ -8,27 +8,24 @@
{{svg "octicon-pencil"}}
</button>
</div>
<div class="ui buttons tw-mt-2 tw-w-full">
{{if $.IsStopwatchRunning}}
<button class="ui button tw-flex-1 issue-stop-time link-action" data-url="{{.Issue.Link}}/times/stopwatch/stop">
<div class="ui buttons tw-mt-2 tw-w-full issue-stop-cancel-buttons{{if not $.IsStopwatchRunning}} tw-hidden{{end}}">
<button class="ui button tw-flex-1 issue-stop-time" data-url="{{.Issue.Link}}/times/stopwatch/stop">
{{svg "octicon-stopwatch"}} {{ctx.Locale.Tr "repo.issues.timetracker_timer_stop"}}
</button>
<button class="ui icon button issue-cancel-time link-action" data-url="{{.Issue.Link}}/times/stopwatch/cancel" data-tooltip-content="{{ctx.Locale.Tr "repo.issues.timetracker_timer_discard"}}">
<button class="ui icon button issue-cancel-time" data-url="{{.Issue.Link}}/times/stopwatch/cancel" data-tooltip-content="{{ctx.Locale.Tr "repo.issues.timetracker_timer_discard"}}">
{{svg "octicon-trash"}}
</button>
{{else}}
<button class="ui button tw-flex-1 issue-start-time link-action" data-url="{{.Issue.Link}}/times/stopwatch/start">
</div>
<div class="ui buttons tw-mt-2 tw-w-full issue-start-buttons{{if $.IsStopwatchRunning}} tw-hidden{{end}}">
<button class="ui button tw-flex-1 issue-start-time" data-url="{{.Issue.Link}}/times/stopwatch/start">
{{svg "octicon-stopwatch"}} {{ctx.Locale.Tr "repo.issues.timetracker_timer_start"}}
</button>
<button class="ui icon button issue-add-time show-modal" data-modal="#issue-time-manually-add-modal" data-tooltip-content="{{ctx.Locale.Tr "repo.issues.timetracker_timer_manually_add"}}">
{{svg "octicon-plus"}}
</button>
{{end}}
</div>
{{if and (not $.IsStopwatchRunning) .HasUserStopwatch}}
<div class="ui warning message">{{ctx.Locale.Tr "repo.issues.tracking_already_started" .OtherStopwatchURL}}</div>
{{end}}
<div class="ui warning message issue-tracking-already-started{{if or $.IsStopwatchRunning (not .HasUserStopwatch)}} tw-hidden{{end}}">{{ctx.Locale.Tr "repo.issues.tracking_already_started" .OtherStopwatchURL}}</div>
{{if .Issue.TimeEstimate}}
<div class="tw-my-2">{{ctx.Locale.Tr "repo.issues.time_estimate_display" (TimeEstimateString .Issue.TimeEstimate)}}</div>

View File

@ -29,7 +29,7 @@ test.describe('events', () => {
await Promise.all([apiDeleteUser(request, commenter), apiDeleteUser(request, owner)]);
});
test('stopwatch', async ({page, request}) => {
test('stopwatch visible at page load', async ({page, request}) => {
const name = `ev-sw-${Date.now()}-${Math.random().toString(36).slice(2, 6)}`;
const headers = apiUserHeaders(name);
@ -51,6 +51,28 @@ test.describe('events', () => {
await apiDeleteUser(request, name);
});
test('stopwatch appears via real-time push', async ({page, request}) => {
const name = `ev-sw-push-${Date.now()}-${Math.random().toString(36).slice(2, 6)}`;
const headers = apiUserHeaders(name);
await apiCreateUser(request, name);
await apiCreateRepo(request, {name, headers});
await apiCreateIssue(request, name, name, {title: 'events stopwatch push test', headers});
// Login before starting stopwatch — page loads without active stopwatch
await loginUser(page, name);
const stopwatch = page.locator('.active-stopwatch.not-mobile');
await expect(stopwatch).toBeHidden();
// Start stopwatch after page is loaded — icon should appear via WebSocket push
await apiStartStopwatch(request, name, name, 1, {headers});
await expect(stopwatch).toBeVisible({timeout: 15000});
// Cleanup
await apiDeleteUser(request, name);
});
test('logout propagation', async ({browser, request}) => {
const name = `ev-logout-${Date.now()}-${Math.random().toString(36).slice(2, 6)}`;

View File

@ -1,86 +0,0 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package integration
import (
"fmt"
"net/http"
"testing"
"time"
activities_model "code.gitea.io/gitea/models/activities"
auth_model "code.gitea.io/gitea/models/auth"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/eventsource"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/tests"
"github.com/stretchr/testify/assert"
)
func TestEventSourceManagerRun(t *testing.T) {
defer tests.PrepareTestEnv(t)()
manager := eventsource.GetManager()
eventChan := manager.Register(2)
defer func() {
manager.Unregister(2, eventChan)
// ensure the eventChan is closed
for {
_, ok := <-eventChan
if !ok {
break
}
}
}()
expectNotificationCountEvent := func(count int64) func() bool {
return func() bool {
select {
case event, ok := <-eventChan:
if !ok {
return false
}
data, ok := event.Data.(activities_model.UserIDCount)
if !ok {
return false
}
return event.Name == "notification-count" && data.Count == count
default:
return false
}
}
}
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
repo1 := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
thread5 := unittest.AssertExistsAndLoadBean(t, &activities_model.Notification{ID: 5})
assert.NoError(t, thread5.LoadAttributes(t.Context()))
session := loginUser(t, user2.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteNotification, auth_model.AccessTokenScopeWriteRepository)
var apiNL []api.NotificationThread
// -- mark notifications as read --
req := NewRequest(t, "GET", "/api/v1/notifications?status-types=unread").
AddTokenAuth(token)
resp := session.MakeRequest(t, req, http.StatusOK)
DecodeJSON(t, resp, &apiNL)
assert.Len(t, apiNL, 2)
lastReadAt := "2000-01-01T00%3A50%3A01%2B00%3A00" // 946687801 <- only Notification 4 is in this filter ...
req = NewRequest(t, "PUT", fmt.Sprintf("/api/v1/repos/%s/%s/notifications?last_read_at=%s", user2.Name, repo1.Name, lastReadAt)).
AddTokenAuth(token)
session.MakeRequest(t, req, http.StatusResetContent)
req = NewRequest(t, "GET", "/api/v1/notifications?status-types=unread").
AddTokenAuth(token)
resp = session.MakeRequest(t, req, http.StatusOK)
DecodeJSON(t, resp, &apiNL)
assert.Len(t, apiNL, 1)
assert.Eventually(t, expectNotificationCountEvent(1), 30*time.Second, 1*time.Second)
}

View File

@ -1,20 +1,13 @@
// Source manages the list of connected page ports for one logical connection.
// It no longer creates an EventSource; all real-time data is delivered by the
// accompanying WsSource over WebSocket.
class Source {
url: string;
eventSource: EventSource | null;
listening: Record<string, boolean>;
clients: Array<MessagePort>;
constructor(url: string) {
this.url = url;
this.eventSource = new EventSource(url);
this.listening = {};
this.clients = [];
this.listen('open');
this.listen('close');
this.listen('logout');
this.listen('notification-count');
this.listen('stopwatches');
this.listen('error');
}
register(port: MessagePort) {
@ -37,24 +30,6 @@ class Source {
return this.clients.length;
}
close() {
if (!this.eventSource) return;
this.eventSource.close();
this.eventSource = null;
}
listen(eventType: string) {
if (this.listening[eventType]) return;
this.listening[eventType] = true;
this.eventSource?.addEventListener(eventType, (event) => {
this.notifyClients({
type: eventType,
data: event.data,
});
});
}
notifyClients(event: {type: string, data: any}) {
for (const client of this.clients) {
client.postMessage(event);
@ -64,24 +39,100 @@ class Source {
status(port: MessagePort) {
port.postMessage({
type: 'status',
message: `url: ${this.url} readyState: ${this.eventSource?.readyState}`,
message: `url: ${this.url}`,
});
}
}
// WsSource provides a WebSocket transport for real-time event delivery.
// It normalises messages to the SSE event format so that callers do not
// need to know which transport delivered the event.
class WsSource {
wsUrl: string;
ws: WebSocket | null;
source: Source;
reconnectTimer: ReturnType<typeof setTimeout> | null;
reconnectDelay: number;
constructor(wsUrl: string, source: Source) {
this.wsUrl = wsUrl;
this.source = source;
this.ws = null;
this.reconnectTimer = null;
this.reconnectDelay = 50;
this.connect();
}
connect() {
this.ws = new WebSocket(this.wsUrl);
this.ws.addEventListener('open', () => {
this.reconnectDelay = 50;
});
this.ws.addEventListener('message', (event: MessageEvent<string>) => {
try {
const msg = JSON.parse(event.data);
if (msg.type === 'notification-count') {
// Normalise to SSE event format so the receiver is transport-agnostic.
this.source.notifyClients({
type: 'notification-count',
data: JSON.stringify({Count: msg.count}),
});
} else if (msg.type === 'stopwatches') {
this.source.notifyClients({
type: 'stopwatches',
data: JSON.stringify(msg.data),
});
} else if (msg.type === 'logout') {
this.source.notifyClients({
type: 'logout',
data: msg.data ?? '',
});
}
} catch {
// ignore malformed messages
}
});
this.ws.addEventListener('close', () => {
this.ws = null;
this.scheduleReconnect();
});
this.ws.addEventListener('error', () => {
this.ws = null;
this.scheduleReconnect();
});
}
scheduleReconnect() {
if (this.reconnectTimer !== null) return;
const delay = this.reconnectDelay;
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, delay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000);
}
close() {
if (this.reconnectTimer !== null) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.ws?.close();
this.ws = null;
}
}
const sourcesByUrl = new Map<string, Source | null>();
const sourcesByPort = new Map<MessagePort, Source | null>();
const wsSourcesByUrl = new Map<string, WsSource | null>();
(self as unknown as SharedWorkerGlobalScope).addEventListener('connect', (e: MessageEvent) => {
for (const port of e.ports) {
port.addEventListener('message', (event: MessageEvent) => {
if (!self.EventSource) {
// some browsers (like PaleMoon, Firefox<53) don't support EventSource in SharedWorkerGlobalScope.
// this event handler needs EventSource when doing "new Source(url)", so just post a message back to the caller,
// in case the caller would like to use a fallback method to do its work.
port.postMessage({type: 'no-event-source'});
return;
}
if (event.data.type === 'start') {
const url = event.data.url;
let source = sourcesByUrl.get(url);
@ -93,34 +144,41 @@ const sourcesByPort = new Map<MessagePort, Source | null>();
}
source = sourcesByPort.get(port);
if (source) {
if (source.eventSource && source.url === url) return;
if (source.url === url) return;
// How this has happened I don't understand...
// deregister from that source
const count = source.deregister(port);
// Clean-up
if (count === 0) {
source.close();
sourcesByUrl.set(source.url, null);
const ws = wsSourcesByUrl.get(source.url);
if (ws) {
ws.close();
wsSourcesByUrl.set(source.url, null);
}
}
}
// Create a new Source
// Create a new Source and its WebSocket transport
source = new Source(url);
source.register(port);
sourcesByUrl.set(url, source);
sourcesByPort.set(port, source);
} else if (event.data.type === 'listen') {
const source = sourcesByPort.get(port)!;
source.listen(event.data.eventType);
const wsUrl = url.replace(/^http/, 'ws').replace(/\/user\/events$/, '/-/ws');
wsSourcesByUrl.set(url, new WsSource(wsUrl, source));
} else if (event.data.type === 'close') {
const source = sourcesByPort.get(port);
if (!source) return;
const count = source.deregister(port);
if (count === 0) {
source.close();
sourcesByUrl.set(source.url, null);
sourcesByPort.set(port, null);
const ws = wsSourcesByUrl.get(source.url);
if (ws) {
ws.close();
wsSourcesByUrl.set(source.url, null);
}
}
} else if (event.data.type === 'status') {
const source = sourcesByPort.get(port);

View File

@ -21,10 +21,8 @@ async function receiveUpdateCount(event: MessageEvent<{type: string, data: strin
export function initNotificationCount() {
if (!document.querySelector('.notification_count')) return;
let usingPeriodicPoller = false;
const startPeriodicPoller = (timeout: number, lastCount?: number) => {
if (timeout <= 0 || !Number.isFinite(timeout)) return;
usingPeriodicPoller = true;
lastCount = lastCount ?? getCurrentCount();
setTimeout(async () => {
await updateNotificationCountWithCallback(startPeriodicPoller, timeout, lastCount);
@ -35,9 +33,7 @@ export function initNotificationCount() {
// Try to connect to the event source via the shared worker first
const worker = new UserEventsSharedWorker('notification-worker');
worker.addMessageEventListener((event: MessageEvent) => {
if (event.data.type === 'no-event-source') {
if (!usingPeriodicPoller) startPeriodicPoller(notificationSettings.MinTimeout);
} else if (event.data.type === 'notification-count') {
if (event.data.type === 'notification-count') {
receiveUpdateCount(event); // no await
}
});

View File

@ -1,6 +1,6 @@
import {createTippy} from '../modules/tippy.ts';
import {GET} from '../modules/fetch.ts';
import {hideElem, queryElems, showElem} from '../utils/dom.ts';
import {GET, POST} from '../modules/fetch.ts';
import {addDelegatedEventListener, hideElem, queryElems, showElem} from '../utils/dom.ts';
import {UserEventsSharedWorker} from '../modules/worker.ts';
const {appSubUrl, notificationSettings, enableTimeTracking} = window.config;
@ -34,13 +34,55 @@ export function initStopwatch() {
interactive: true,
hideOnClick: true,
theme: 'default',
onShow(instance) {
// Re-clone so the tooltip always reflects the latest stopwatch state,
// even when the icon became visible via a real-time WebSocket push.
instance.setContent(stopwatchPopup.cloneNode(true) as Element);
},
});
}
let usingPeriodicPoller = false;
// Handle stop/cancel from the navbar popup without triggering a page reload.
// These forms are not form-fetch-action so they won't navigate; the WebSocket
// push (or periodic poller) updates the icon after the action completes.
addDelegatedEventListener(document, 'submit', '.stopwatch-commit,.stopwatch-cancel', async (form: HTMLFormElement, e: SubmitEvent) => {
e.preventDefault();
const action = form.getAttribute('action');
if (!action) return;
await POST(action, {data: new FormData(form)});
});
// Handle start/stop/cancel from the issue sidebar without a page reload.
// Buttons toggle between the two groups (.issue-start-buttons / .issue-stop-cancel-buttons)
// immediately; the navbar icon is updated by the WebSocket push or periodic poller.
addDelegatedEventListener(document, 'click', '.issue-start-time,.issue-stop-time,.issue-cancel-time', async (btn: HTMLElement, e: MouseEvent) => {
e.preventDefault();
const url = btn.getAttribute('data-url');
if (!url) return;
const startGroup = document.querySelector('.issue-start-buttons')!;
const stopGroup = document.querySelector('.issue-stop-cancel-buttons')!;
const isStart = btn.classList.contains('issue-start-time');
btn.classList.add('is-loading');
try {
const resp = await POST(url);
if (!resp.ok) return;
// Toggle sidebar button groups immediately, no reload needed.
if (isStart) {
hideElem(startGroup);
showElem(stopGroup);
} else {
hideElem(stopGroup);
showElem(startGroup);
}
} finally {
btn.classList.remove('is-loading');
}
});
const startPeriodicPoller = (timeout: number) => {
if (timeout <= 0 || !Number.isFinite(timeout)) return;
usingPeriodicPoller = true;
setTimeout(() => updateStopwatchWithCallback(startPeriodicPoller, timeout), timeout);
};
@ -49,10 +91,7 @@ export function initStopwatch() {
// Try to connect to the event source via the shared worker first
const worker = new UserEventsSharedWorker('stopwatch-worker');
worker.addMessageEventListener((event) => {
if (event.data.type === 'no-event-source') {
// browser doesn't support EventSource, falling back to periodic poller
if (!usingPeriodicPoller) startPeriodicPoller(notificationSettings.MinTimeout);
} else if (event.data.type === 'stopwatches') {
if (event.data.type === 'stopwatches') {
updateStopwatchData(JSON.parse(event.data.data));
}
});