0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-05-10 11:51:33 +02:00

Merge 13422f80825bc0b8c030ff45485a6221a326e8c3 into a5d81d9ce230aaa6e1021b6236ca01cb6d2b56c3

This commit is contained in:
silverwind 2026-05-08 21:21:40 -07:00 committed by GitHub
commit ce65fa81ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 262 additions and 41 deletions

View File

@ -2976,6 +2976,11 @@ LEVEL = Info
;ENDLESS_TASK_TIMEOUT = 3h ;ENDLESS_TASK_TIMEOUT = 3h
;; Timeout to cancel the jobs which have waiting status, but haven't been picked by a runner for a long time ;; Timeout to cancel the jobs which have waiting status, but haven't been picked by a runner for a long time
;ABANDONED_JOB_TIMEOUT = 24h ;ABANDONED_JOB_TIMEOUT = 24h
;; Maximum time the server holds an idle FetchTask request open while waiting for new tasks.
;; When a task is queued during the wait, the request returns immediately so runners pick it up
;; without polling overhead. Set to 0 to disable long-polling. Should be less than any reverse-proxy
;; idle timeout (nginx default proxy_read_timeout is 60s).
;RUNNER_LONG_POLL_TIMEOUT = 50s
;; Strings committers can place inside a commit message or PR title to skip executing the corresponding actions workflow ;; Strings committers can place inside a commit message or PR title to skip executing the corresponding actions workflow
;SKIP_WORKFLOW_STRINGS = [skip ci],[ci skip],[no ci],[skip actions],[actions skip] ;SKIP_WORKFLOW_STRINGS = [skip ci],[ci skip],[no ci],[skip actions],[actions skip]
;; Comma-separated list of workflow directories, the first one to exist ;; Comma-separated list of workflow directories, the first one to exist

View File

@ -73,7 +73,7 @@ func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) err
} }
func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error {
return db.WithTx(ctx, func(ctx context.Context) error { if err := db.WithTx(ctx, func(ctx context.Context) error {
// 1. increase global // 1. increase global
if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil {
log.Error("IncreaseTasksVersionByScope(Global): %v", err) log.Error("IncreaseTasksVersionByScope(Global): %v", err)
@ -97,5 +97,9 @@ func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error {
} }
return nil return nil
}) }); err != nil {
return err
}
signalTaskVersionWake()
return nil
} }

View File

@ -0,0 +1,28 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import "sync"
// taskVersionWake broadcasts to FetchTask waiters when any task version is bumped.
var taskVersionWake = struct {
mu sync.Mutex
ch chan struct{}
}{ch: make(chan struct{})}
// TasksVersionWakeChannel returns a channel that is closed by the next
// IncreaseTaskVersion. Subscribe before reading the version so a signal
// arriving in between is not lost.
func TasksVersionWakeChannel() <-chan struct{} {
taskVersionWake.mu.Lock()
defer taskVersionWake.mu.Unlock()
return taskVersionWake.ch
}
func signalTaskVersionWake() {
taskVersionWake.mu.Lock()
defer taskVersionWake.mu.Unlock()
close(taskVersionWake.ch)
taskVersionWake.ch = make(chan struct{})
}

View File

@ -0,0 +1,62 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"sync"
"testing"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func assertChannelOpen(t *testing.T, ch <-chan struct{}, msg string) {
t.Helper()
select {
case <-ch:
t.Fatal(msg)
default:
}
}
func assertChannelClosed(t *testing.T, ch <-chan struct{}, msg string) {
t.Helper()
select {
case <-ch:
default:
t.Fatal(msg)
}
}
func TestTasksVersionWake_SignalClosesChannel(t *testing.T) {
ch := TasksVersionWakeChannel()
assertChannelOpen(t, ch, "channel should be open before signal")
signalTaskVersionWake()
assertChannelClosed(t, ch, "channel should be closed after signal")
assertChannelOpen(t, TasksVersionWakeChannel(), "replacement channel should be open")
}
func TestIncreaseTaskVersion_SignalsWake(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ch := TasksVersionWakeChannel()
assert.NoError(t, IncreaseTaskVersion(t.Context(), 0, 0))
assertChannelClosed(t, ch, "expected IncreaseTaskVersion to close the wake channel")
}
// TestTasksVersionWake_ConcurrentSafe runs concurrent get/signal calls;
// run with `-race` to catch regressions in the mutex.
func TestTasksVersionWake_ConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
for range 100 {
wg.Go(func() { _ = TasksVersionWakeChannel() })
wg.Go(func() { signalTaskVersionWake() })
}
wg.Wait()
assertChannelOpen(t, TasksVersionWakeChannel(), "a fresh subscription should be open until the next signal")
}

View File

@ -27,6 +27,7 @@ var (
ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"` ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"`
EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"` EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"`
AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"` AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"`
RunnerLongPollTimeout time.Duration `ini:"RUNNER_LONG_POLL_TIMEOUT"`
SkipWorkflowStrings []string `ini:"SKIP_WORKFLOW_STRINGS"` SkipWorkflowStrings []string `ini:"SKIP_WORKFLOW_STRINGS"`
WorkflowDirs []string `ini:"WORKFLOW_DIRS"` WorkflowDirs []string `ini:"WORKFLOW_DIRS"`
MaxRerunAttempts int64 `ini:"MAX_RERUN_ATTEMPTS"` MaxRerunAttempts int64 `ini:"MAX_RERUN_ATTEMPTS"`
@ -121,6 +122,7 @@ func loadActionsFrom(rootCfg ConfigProvider) error {
Actions.ZombieTaskTimeout = sec.Key("ZOMBIE_TASK_TIMEOUT").MustDuration(10 * time.Minute) Actions.ZombieTaskTimeout = sec.Key("ZOMBIE_TASK_TIMEOUT").MustDuration(10 * time.Minute)
Actions.EndlessTaskTimeout = sec.Key("ENDLESS_TASK_TIMEOUT").MustDuration(3 * time.Hour) Actions.EndlessTaskTimeout = sec.Key("ENDLESS_TASK_TIMEOUT").MustDuration(3 * time.Hour)
Actions.AbandonedJobTimeout = sec.Key("ABANDONED_JOB_TIMEOUT").MustDuration(24 * time.Hour) Actions.AbandonedJobTimeout = sec.Key("ABANDONED_JOB_TIMEOUT").MustDuration(24 * time.Hour)
Actions.RunnerLongPollTimeout = sec.Key("RUNNER_LONG_POLL_TIMEOUT").MustDuration(50 * time.Second)
if Actions.MaxRerunAttempts <= 0 { if Actions.MaxRerunAttempts <= 0 {
Actions.MaxRerunAttempts = defaultMaxRerunAttempts Actions.MaxRerunAttempts = defaultMaxRerunAttempts

View File

@ -7,12 +7,14 @@ import (
"context" "context"
"errors" "errors"
"net/http" "net/http"
"time"
actions_model "code.gitea.io/gitea/models/actions" actions_model "code.gitea.io/gitea/models/actions"
repo_model "code.gitea.io/gitea/models/repo" repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user" user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/actions" "code.gitea.io/gitea/modules/actions"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
actions_service "code.gitea.io/gitea/services/actions" actions_service "code.gitea.io/gitea/services/actions"
@ -130,50 +132,78 @@ func (s *Service) Declare(
}), nil }), nil
} }
// FetchTask assigns a task to the runner // FetchTask assigns a task to the runner. When no task is available, the
// request is held open until a new task version is signalled, the long-poll
// timeout elapses, or the client disconnects.
func (s *Service) FetchTask( func (s *Service) FetchTask(
ctx context.Context, ctx context.Context,
req *connect.Request[runnerv1.FetchTaskRequest], req *connect.Request[runnerv1.FetchTaskRequest],
) (*connect.Response[runnerv1.FetchTaskResponse], error) { ) (*connect.Response[runnerv1.FetchTaskResponse], error) {
runner := GetRunner(ctx) runner := GetRunner(ctx)
var task *runnerv1.Task var deadline <-chan time.Time
tasksVersion := req.Msg.TasksVersion // task version from runner if d := setting.Actions.RunnerLongPollTimeout; d > 0 {
latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) deadline = time.After(d)
if err != nil {
return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
} else if latestVersion == 0 {
if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
}
// if we don't increase the value of `latestVersion` here,
// the response of FetchTask will return tasksVersion as zero.
// and the runner will treat it as an old version of Gitea.
latestVersion++
} }
if tasksVersion != latestVersion { clientVersion := req.Msg.TasksVersion
// Re-load runner from DB so task assignment uses current IsDisabled state var latestVersion int64
// (avoids race where disable commits while this request still has stale runner). LongPoll:
freshRunner, err := actions_model.GetRunnerByUUID(ctx, runner.UUID) for {
wakeCh := actions_model.TasksVersionWakeChannel()
v, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
if err != nil { if err != nil {
return nil, status.Errorf(codes.Internal, "get runner: %v", err) return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
} else if v == 0 {
if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
}
// if we don't increase the value of `v` here,
// the response of FetchTask will return tasksVersion as zero.
// and the runner will treat it as an old version of Gitea.
v = 1
} }
// if the task version in request is not equal to the version in db, latestVersion = v
// it means there may still be some tasks that haven't been assigned.
// try to pick a task for the runner that send the request. if clientVersion != latestVersion {
if t, ok, err := actions_service.PickTask(ctx, freshRunner); err != nil { // Re-load runner from DB so task assignment uses current IsDisabled state
log.Error("pick task failed: %v", err) // (avoids race where disable commits while this request still has stale runner).
return nil, status.Errorf(codes.Internal, "pick task: %v", err) freshRunner, err := actions_model.GetRunnerByUUID(ctx, runner.UUID)
} else if ok { if err != nil {
task = t return nil, status.Errorf(codes.Internal, "get runner: %v", err)
}
// if the task version in request is not equal to the version in db,
// it means there may still be some tasks that haven't been assigned.
// try to pick a task for the runner that send the request.
if t, ok, err := actions_service.PickTask(ctx, freshRunner); err != nil {
log.Error("pick task failed: %v", err)
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
} else if ok {
return connect.NewResponse(&runnerv1.FetchTaskResponse{
Task: t,
TasksVersion: latestVersion,
}), nil
}
clientVersion = latestVersion
}
if deadline == nil {
break
}
select {
case <-wakeCh:
case <-deadline:
break LongPoll
case <-ctx.Done():
break LongPoll
} }
} }
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
Task: task, return connect.NewResponse(&runnerv1.FetchTaskResponse{
TasksVersion: latestVersion, TasksVersion: latestVersion,
}) }), nil
return res, nil
} }
// UpdateTask updates the task status. // UpdateTask updates the task status.

View File

@ -643,11 +643,7 @@ jobs:
err = actions_service.CleanupEphemeralRunners(t.Context()) err = actions_service.CleanupEphemeralRunners(t.Context())
assert.NoError(t, err) assert.NoError(t, err)
resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ runner.fetchNoTask(t)
TasksVersion: 0,
}))
assert.NoError(t, err)
assert.Nil(t, resp.Msg.Task)
// verify CleanupEphemeralRunners does not remove this runner // verify CleanupEphemeralRunners does not remove this runner
err = actions_service.CleanupEphemeralRunners(t.Context()) err = actions_service.CleanupEphemeralRunners(t.Context())
@ -661,7 +657,7 @@ jobs:
})) }))
assert.NoError(t, err) assert.NoError(t, err)
resp, err = runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
TasksVersion: 0, TasksVersion: 0,
})) }))
assert.Error(t, err) assert.Error(t, err)

View File

@ -0,0 +1,87 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package integration
import (
"fmt"
"net/http"
"net/url"
"sync"
"testing"
"time"
auth_model "code.gitea.io/gitea/models/auth"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/test"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestActionsRunnerLongPollWake exercises the end-to-end glue from workflow
// dispatch to task delivery: a waiting FetchTask returns the queued task
// without waiting for the long-poll deadline. Wake channel mechanics are
// covered by unit tests in models/actions/tasks_version_notify_test.go.
func TestActionsRunnerLongPollWake(t *testing.T) {
onGiteaRun(t, func(t *testing.T, _ *url.URL) {
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
session := loginUser(t, user2.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)
apiRepo := createActionsTestRepo(t, token, "actions-longpoll-wake", false)
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID})
httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository)
defer doAPIDeleteRepository(httpContext)(t)
runner := newMockRunner()
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "longpoll-wake-runner", []string{"linux-runner"}, false)
const wfFile = ".gitea/workflows/longpoll-wake.yml"
const wfContent = `name: Long-poll Wake
on: workflow_dispatch
jobs:
build:
runs-on: linux-runner
steps:
- run: echo hi
`
opts := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wfFile, wfContent)
createWorkflowFile(t, token, user2.Name, repo.Name, wfFile, opts)
defer test.MockVariableValue(&setting.Actions.RunnerLongPollTimeout, 5*time.Second)()
var (
wg sync.WaitGroup
task *runnerv1.Task
elapsed time.Duration
)
wg.Go(func() {
start := time.Now()
resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
TasksVersion: 0,
}))
elapsed = time.Since(start)
assert.NoError(t, err)
if resp != nil {
task = resp.Msg.Task
}
})
time.Sleep(100 * time.Millisecond)
runURL := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "longpoll-wake.yml")
session.MakeRequest(t, NewRequestWithValues(t, "POST", runURL, map[string]string{
"ref": "refs/heads/" + repo.DefaultBranch,
}), http.StatusSeeOther)
wg.Wait()
require.NotNil(t, task, "long-poll should return the queued task")
require.Less(t, elapsed, 4*time.Second, "long-poll should return promptly after the wake, not wait the full timeout")
})
}

View File

@ -5,6 +5,7 @@ package integration
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"testing" "testing"
@ -125,11 +126,17 @@ func (r *mockRunner) tryFetchTask(t *testing.T, timeout ...time.Duration) *runne
// fetchTaskOnce performs a single FetchTask request with the given TasksVersion // fetchTaskOnce performs a single FetchTask request with the given TasksVersion
// and returns the task (if any) and the TasksVersion from the response. // and returns the task (if any) and the TasksVersion from the response.
// Used to verify the production path where the runner sends the current version. // The request is bounded by a short context timeout so server-side long-poll
// (RUNNER_LONG_POLL_TIMEOUT) does not slow tests that repeatedly poll for state.
func (r *mockRunner) fetchTaskOnce(t *testing.T, tasksVersion int64) (*runnerv1.Task, int64) { func (r *mockRunner) fetchTaskOnce(t *testing.T, tasksVersion int64) (*runnerv1.Task, int64) {
resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond)
defer cancel()
resp, err := r.client.runnerServiceClient.FetchTask(ctx, connect.NewRequest(&runnerv1.FetchTaskRequest{
TasksVersion: tasksVersion, TasksVersion: tasksVersion,
})) }))
if errors.Is(err, context.DeadlineExceeded) {
return nil, 0
}
require.NoError(t, err) require.NoError(t, err)
return resp.Msg.Task, resp.Msg.TasksVersion return resp.Msg.Task, resp.Msg.TasksVersion
} }