From 13422f80825bc0b8c030ff45485a6221a326e8c3 Mon Sep 17 00:00:00 2001 From: silverwind Date: Wed, 29 Apr 2026 12:04:26 +0200 Subject: [PATCH] Long-poll FetchTask for instant runner task pickup Hold idle FetchTask requests open until a new task version is signalled, the long-poll deadline elapses, or the client disconnects. New setting RUNNER_LONG_POLL_TIMEOUT (default 50s, 0 disables) caps the held window. The wake channel uses a close-and-replace pattern signalled from IncreaseTaskVersion after the transaction commits. Existing runners benefit without any code changes; full instant pickup additionally requires runner config tuning (raise fetch_timeout, lower or disable fetch_interval_max). Co-Authored-By: Claude (Opus 4.7) --- custom/conf/app.example.ini | 5 + models/actions/tasks_version.go | 8 +- models/actions/tasks_version_notify.go | 28 ++++++ models/actions/tasks_version_notify_test.go | 62 +++++++++++++ modules/setting/actions.go | 2 + routers/api/actions/runner/runner.go | 92 ++++++++++++------- tests/integration/actions_job_test.go | 8 +- .../actions_runner_longpoll_test.go | 87 ++++++++++++++++++ tests/integration/actions_runner_test.go | 11 ++- 9 files changed, 262 insertions(+), 41 deletions(-) create mode 100644 models/actions/tasks_version_notify.go create mode 100644 models/actions/tasks_version_notify_test.go create mode 100644 tests/integration/actions_runner_longpoll_test.go diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index 4245957191..b63a42afe8 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -2976,6 +2976,11 @@ LEVEL = Info ;ENDLESS_TASK_TIMEOUT = 3h ;; Timeout to cancel the jobs which have waiting status, but haven't been picked by a runner for a long time ;ABANDONED_JOB_TIMEOUT = 24h +;; Maximum time the server holds an idle FetchTask request open while waiting for new tasks. +;; When a task is queued during the wait, the request returns immediately so runners pick it up +;; without polling overhead. Set to 0 to disable long-polling. Should be less than any reverse-proxy +;; idle timeout (nginx default proxy_read_timeout is 60s). +;RUNNER_LONG_POLL_TIMEOUT = 50s ;; Strings committers can place inside a commit message or PR title to skip executing the corresponding actions workflow ;SKIP_WORKFLOW_STRINGS = [skip ci],[ci skip],[no ci],[skip actions],[actions skip] ;; Comma-separated list of workflow directories, the first one to exist diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go index b686ce2443..a7d2ab5f3e 100644 --- a/models/actions/tasks_version.go +++ b/models/actions/tasks_version.go @@ -73,7 +73,7 @@ func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) err } func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { - return db.WithTx(ctx, func(ctx context.Context) error { + if err := db.WithTx(ctx, func(ctx context.Context) error { // 1. increase global if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { log.Error("IncreaseTasksVersionByScope(Global): %v", err) @@ -97,5 +97,9 @@ func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { } return nil - }) + }); err != nil { + return err + } + signalTaskVersionWake() + return nil } diff --git a/models/actions/tasks_version_notify.go b/models/actions/tasks_version_notify.go new file mode 100644 index 0000000000..1de20d257c --- /dev/null +++ b/models/actions/tasks_version_notify.go @@ -0,0 +1,28 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import "sync" + +// taskVersionWake broadcasts to FetchTask waiters when any task version is bumped. +var taskVersionWake = struct { + mu sync.Mutex + ch chan struct{} +}{ch: make(chan struct{})} + +// TasksVersionWakeChannel returns a channel that is closed by the next +// IncreaseTaskVersion. Subscribe before reading the version so a signal +// arriving in between is not lost. +func TasksVersionWakeChannel() <-chan struct{} { + taskVersionWake.mu.Lock() + defer taskVersionWake.mu.Unlock() + return taskVersionWake.ch +} + +func signalTaskVersionWake() { + taskVersionWake.mu.Lock() + defer taskVersionWake.mu.Unlock() + close(taskVersionWake.ch) + taskVersionWake.ch = make(chan struct{}) +} diff --git a/models/actions/tasks_version_notify_test.go b/models/actions/tasks_version_notify_test.go new file mode 100644 index 0000000000..a3a9654013 --- /dev/null +++ b/models/actions/tasks_version_notify_test.go @@ -0,0 +1,62 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "sync" + "testing" + + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" +) + +func assertChannelOpen(t *testing.T, ch <-chan struct{}, msg string) { + t.Helper() + select { + case <-ch: + t.Fatal(msg) + default: + } +} + +func assertChannelClosed(t *testing.T, ch <-chan struct{}, msg string) { + t.Helper() + select { + case <-ch: + default: + t.Fatal(msg) + } +} + +func TestTasksVersionWake_SignalClosesChannel(t *testing.T) { + ch := TasksVersionWakeChannel() + assertChannelOpen(t, ch, "channel should be open before signal") + + signalTaskVersionWake() + assertChannelClosed(t, ch, "channel should be closed after signal") + + assertChannelOpen(t, TasksVersionWakeChannel(), "replacement channel should be open") +} + +func TestIncreaseTaskVersion_SignalsWake(t *testing.T) { + assert.NoError(t, unittest.PrepareTestDatabase()) + + ch := TasksVersionWakeChannel() + assert.NoError(t, IncreaseTaskVersion(t.Context(), 0, 0)) + assertChannelClosed(t, ch, "expected IncreaseTaskVersion to close the wake channel") +} + +// TestTasksVersionWake_ConcurrentSafe runs concurrent get/signal calls; +// run with `-race` to catch regressions in the mutex. +func TestTasksVersionWake_ConcurrentSafe(t *testing.T) { + var wg sync.WaitGroup + for range 100 { + wg.Go(func() { _ = TasksVersionWakeChannel() }) + wg.Go(func() { signalTaskVersionWake() }) + } + wg.Wait() + + assertChannelOpen(t, TasksVersionWakeChannel(), "a fresh subscription should be open until the next signal") +} diff --git a/modules/setting/actions.go b/modules/setting/actions.go index 0d1bdadc8e..f5509d2539 100644 --- a/modules/setting/actions.go +++ b/modules/setting/actions.go @@ -27,6 +27,7 @@ var ( ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"` EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"` AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"` + RunnerLongPollTimeout time.Duration `ini:"RUNNER_LONG_POLL_TIMEOUT"` SkipWorkflowStrings []string `ini:"SKIP_WORKFLOW_STRINGS"` WorkflowDirs []string `ini:"WORKFLOW_DIRS"` MaxRerunAttempts int64 `ini:"MAX_RERUN_ATTEMPTS"` @@ -121,6 +122,7 @@ func loadActionsFrom(rootCfg ConfigProvider) error { Actions.ZombieTaskTimeout = sec.Key("ZOMBIE_TASK_TIMEOUT").MustDuration(10 * time.Minute) Actions.EndlessTaskTimeout = sec.Key("ENDLESS_TASK_TIMEOUT").MustDuration(3 * time.Hour) Actions.AbandonedJobTimeout = sec.Key("ABANDONED_JOB_TIMEOUT").MustDuration(24 * time.Hour) + Actions.RunnerLongPollTimeout = sec.Key("RUNNER_LONG_POLL_TIMEOUT").MustDuration(50 * time.Second) if Actions.MaxRerunAttempts <= 0 { Actions.MaxRerunAttempts = defaultMaxRerunAttempts diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index eee39760ed..3ac8ad6ad5 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -7,12 +7,14 @@ import ( "context" "errors" "net/http" + "time" actions_model "code.gitea.io/gitea/models/actions" repo_model "code.gitea.io/gitea/models/repo" user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/actions" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" actions_service "code.gitea.io/gitea/services/actions" @@ -130,50 +132,78 @@ func (s *Service) Declare( }), nil } -// FetchTask assigns a task to the runner +// FetchTask assigns a task to the runner. When no task is available, the +// request is held open until a new task version is signalled, the long-poll +// timeout elapses, or the client disconnects. func (s *Service) FetchTask( ctx context.Context, req *connect.Request[runnerv1.FetchTaskRequest], ) (*connect.Response[runnerv1.FetchTaskResponse], error) { runner := GetRunner(ctx) - var task *runnerv1.Task - tasksVersion := req.Msg.TasksVersion // task version from runner - latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) - if err != nil { - return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err) - } else if latestVersion == 0 { - if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { - return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err) - } - // if we don't increase the value of `latestVersion` here, - // the response of FetchTask will return tasksVersion as zero. - // and the runner will treat it as an old version of Gitea. - latestVersion++ + var deadline <-chan time.Time + if d := setting.Actions.RunnerLongPollTimeout; d > 0 { + deadline = time.After(d) } - if tasksVersion != latestVersion { - // Re-load runner from DB so task assignment uses current IsDisabled state - // (avoids race where disable commits while this request still has stale runner). - freshRunner, err := actions_model.GetRunnerByUUID(ctx, runner.UUID) + clientVersion := req.Msg.TasksVersion + var latestVersion int64 +LongPoll: + for { + wakeCh := actions_model.TasksVersionWakeChannel() + + v, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) if err != nil { - return nil, status.Errorf(codes.Internal, "get runner: %v", err) + return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err) + } else if v == 0 { + if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { + return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err) + } + // if we don't increase the value of `v` here, + // the response of FetchTask will return tasksVersion as zero. + // and the runner will treat it as an old version of Gitea. + v = 1 } - // if the task version in request is not equal to the version in db, - // it means there may still be some tasks that haven't been assigned. - // try to pick a task for the runner that send the request. - if t, ok, err := actions_service.PickTask(ctx, freshRunner); err != nil { - log.Error("pick task failed: %v", err) - return nil, status.Errorf(codes.Internal, "pick task: %v", err) - } else if ok { - task = t + latestVersion = v + + if clientVersion != latestVersion { + // Re-load runner from DB so task assignment uses current IsDisabled state + // (avoids race where disable commits while this request still has stale runner). + freshRunner, err := actions_model.GetRunnerByUUID(ctx, runner.UUID) + if err != nil { + return nil, status.Errorf(codes.Internal, "get runner: %v", err) + } + // if the task version in request is not equal to the version in db, + // it means there may still be some tasks that haven't been assigned. + // try to pick a task for the runner that send the request. + if t, ok, err := actions_service.PickTask(ctx, freshRunner); err != nil { + log.Error("pick task failed: %v", err) + return nil, status.Errorf(codes.Internal, "pick task: %v", err) + } else if ok { + return connect.NewResponse(&runnerv1.FetchTaskResponse{ + Task: t, + TasksVersion: latestVersion, + }), nil + } + clientVersion = latestVersion + } + + if deadline == nil { + break + } + + select { + case <-wakeCh: + case <-deadline: + break LongPoll + case <-ctx.Done(): + break LongPoll } } - res := connect.NewResponse(&runnerv1.FetchTaskResponse{ - Task: task, + + return connect.NewResponse(&runnerv1.FetchTaskResponse{ TasksVersion: latestVersion, - }) - return res, nil + }), nil } // UpdateTask updates the task status. diff --git a/tests/integration/actions_job_test.go b/tests/integration/actions_job_test.go index 3e1fd50eb5..e4d5297f2d 100644 --- a/tests/integration/actions_job_test.go +++ b/tests/integration/actions_job_test.go @@ -643,11 +643,7 @@ jobs: err = actions_service.CleanupEphemeralRunners(t.Context()) assert.NoError(t, err) - resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ - TasksVersion: 0, - })) - assert.NoError(t, err) - assert.Nil(t, resp.Msg.Task) + runner.fetchNoTask(t) // verify CleanupEphemeralRunners does not remove this runner err = actions_service.CleanupEphemeralRunners(t.Context()) @@ -661,7 +657,7 @@ jobs: })) assert.NoError(t, err) - resp, err = runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ TasksVersion: 0, })) assert.Error(t, err) diff --git a/tests/integration/actions_runner_longpoll_test.go b/tests/integration/actions_runner_longpoll_test.go new file mode 100644 index 0000000000..312d85dd1b --- /dev/null +++ b/tests/integration/actions_runner_longpoll_test.go @@ -0,0 +1,87 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "fmt" + "net/http" + "net/url" + "sync" + "testing" + "time" + + auth_model "code.gitea.io/gitea/models/auth" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unittest" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/test" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestActionsRunnerLongPollWake exercises the end-to-end glue from workflow +// dispatch to task delivery: a waiting FetchTask returns the queued task +// without waiting for the long-poll deadline. Wake channel mechanics are +// covered by unit tests in models/actions/tasks_version_notify_test.go. +func TestActionsRunnerLongPollWake(t *testing.T) { + onGiteaRun(t, func(t *testing.T, _ *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-longpoll-wake", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "longpoll-wake-runner", []string{"linux-runner"}, false) + + const wfFile = ".gitea/workflows/longpoll-wake.yml" + const wfContent = `name: Long-poll Wake +on: workflow_dispatch +jobs: + build: + runs-on: linux-runner + steps: + - run: echo hi +` + opts := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wfFile, wfContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wfFile, opts) + + defer test.MockVariableValue(&setting.Actions.RunnerLongPollTimeout, 5*time.Second)() + + var ( + wg sync.WaitGroup + task *runnerv1.Task + elapsed time.Duration + ) + wg.Go(func() { + start := time.Now() + resp, err := runner.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: 0, + })) + elapsed = time.Since(start) + assert.NoError(t, err) + if resp != nil { + task = resp.Msg.Task + } + }) + + time.Sleep(100 * time.Millisecond) + runURL := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "longpoll-wake.yml") + session.MakeRequest(t, NewRequestWithValues(t, "POST", runURL, map[string]string{ + "ref": "refs/heads/" + repo.DefaultBranch, + }), http.StatusSeeOther) + + wg.Wait() + + require.NotNil(t, task, "long-poll should return the queued task") + require.Less(t, elapsed, 4*time.Second, "long-poll should return promptly after the wake, not wait the full timeout") + }) +} diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index e9a0a96ca8..8b25783da0 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -5,6 +5,7 @@ package integration import ( "context" + "errors" "fmt" "net/http" "testing" @@ -125,11 +126,17 @@ func (r *mockRunner) tryFetchTask(t *testing.T, timeout ...time.Duration) *runne // fetchTaskOnce performs a single FetchTask request with the given TasksVersion // and returns the task (if any) and the TasksVersion from the response. -// Used to verify the production path where the runner sends the current version. +// The request is bounded by a short context timeout so server-side long-poll +// (RUNNER_LONG_POLL_TIMEOUT) does not slow tests that repeatedly poll for state. func (r *mockRunner) fetchTaskOnce(t *testing.T, tasksVersion int64) (*runnerv1.Task, int64) { - resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + defer cancel() + resp, err := r.client.runnerServiceClient.FetchTask(ctx, connect.NewRequest(&runnerv1.FetchTaskRequest{ TasksVersion: tasksVersion, })) + if errors.Is(err, context.DeadlineExceeded) { + return nil, 0 + } require.NoError(t, err) return resp.Msg.Task, resp.Msg.TasksVersion }