0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-06-28 12:28:18 +02:00

fix(actions): ensure all waiting jobs get runners in large workflows (#38200)

## Summary

Fixes two related bugs that cause jobs in large workflows (50+ parallel
jobs) to never get a runner assigned even though runners are free.

### Bug 1 — Concurrent runner race

When N runners all poll `FetchTask` with a stale `tasksVersion`
simultaneously, they all query the same waiting job list sorted by
`(updated, id)` and all pick **job #1**. Only one wins the `UPDATE WHERE
task_id=0` optimistic lock; the rest return empty-handed but still
receive `latestVersion` in the response. They then consider themselves
"up to date" and skip `PickTask` on every subsequent poll, leaving jobs
#2–50 permanently unassigned.

**Fix:** `CreateTaskForRunner` now iterates through all matching waiting
jobs. When the optimistic lock fails on job #1, it immediately tries job
#2, then #3, etc., each in its own independent transaction so a failed
attempt rolls back cleanly before the next candidate is tried.
`PickTask` no longer wraps this call in an outer `db.WithTx` (which
caused `halfCommitter` entanglement that prevented per-attempt
rollbacks).

### Bug 2 — Idle runner doesn't re-check after finishing a task

`tasks_version` only bumps when a job transitions **to** waiting (new
workflow triggered, blocked→unblocked). After a runner finishes its
current task it polls `FetchTask` with `tasksVersion == latestVersion`,
so the server skips `PickTask` entirely — the remaining 45 waiting jobs
are invisible to the now-idle runner.

**Fix:** Also call `IncreaseTaskVersion` in `UpdateRunJob` when a
(non-reusable-caller) job transitions to a **done** state. Idle runners
then see a version mismatch on their next poll and attempt `PickTask`,
picking up the remaining jobs.
This commit is contained in:
bircni 2026-06-27 19:56:12 +02:00 committed by GitHub
parent cbe1b703dc
commit 0f5102427e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 414 additions and 128 deletions

View File

@ -361,6 +361,14 @@ func CollectAllDescendantJobs(parent *ActionRunJob, allJobs []*ActionRunJob) []*
return out
}
// hasWaitingJobsToPick reports whether any waiting, unclaimed, non-reusable job
// remains in the repo, i.e. work that an idle runner could still pick up.
func hasWaitingJobsToPick(ctx context.Context, repoID int64) (bool, error) {
return db.GetEngine(ctx).
Where("repo_id = ? AND task_id = ? AND status = ? AND is_reusable_caller = ?", repoID, 0, StatusWaiting, false).
Exist(&ActionRunJob{})
}
func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) {
e := db.GetEngine(ctx)
@ -385,14 +393,6 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
return affected, nil
}
// Reusable workflow caller jobs are never picked up by runners, so they don't need a task-version bump.
if statusUpdated && job.Status.IsWaiting() && !job.IsReusableCaller {
// if the status of job changes to waiting again, increase tasks version.
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return 0, err
}
}
if job.RunID == 0 {
var err error
if job, err = GetRunJobByRepoAndID(ctx, job.RepoID, job.ID); err != nil {
@ -400,6 +400,37 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
}
}
// Reusable workflow caller jobs are never picked up by runners, so they don't need a task-version bump.
if statusUpdated && !job.IsReusableCaller {
switch {
case job.Status.IsWaiting():
// A job returning to the waiting queue is work a runner can pick up, so bump the
// version to wake idle runners whose tasksVersion already equals latestVersion.
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return 0, err
}
case job.Status.IsDone():
// When a job finishes, bump the version so that idle runners — whose
// tasksVersion already equals the current latestVersion — learn that
// remaining waiting jobs are still available and attempt PickTask again.
// Without this bump, runners that completed their tasks would see
// tasksVersion==latestVersion and skip PickTask, leaving the other jobs
// permanently unassigned until the version changes for another reason.
// Only bump when waiting work actually remains for this repo, otherwise
// every job completion would needlessly bump the global version and wake
// every idle runner instance-wide for nothing.
hasWaiting, err := hasWaitingJobsToPick(ctx, job.RepoID)
if err != nil {
return 0, err
}
if hasWaiting {
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return 0, err
}
}
}
}
if statusUpdated && job.ParentJobID > 0 {
// Reusable workflow caller's children cascade their status changes upward to the parent caller.
parent, err := GetRunJobByRunAndID(ctx, job.RunID, job.ParentJobID)

View File

@ -227,13 +227,18 @@ func makeTaskStepDisplayName(step *jobparser.Step, limit int) (name string) {
return util.EllipsisDisplayString(name, limit) // database column has a length limit
}
func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return nil, false, err
}
defer committer.Close()
// errJobAlreadyClaimed is a sentinel used inside claimJobForRunner to signal that
// another runner won the optimistic-lock race; it is never returned to callers.
var errJobAlreadyClaimed = errors.New("job already claimed by another runner")
// CreateTaskForRunner finds a waiting job that matches the runner's labels and
// atomically claims it. It iterates through all matching jobs so that a
// concurrent claim by another runner (which would lose the optimistic lock on
// job #1) does not leave the remaining jobs permanently unassigned.
func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
if db.InTransaction(ctx) {
return nil, false, errors.New("CreateTaskForRunner must not be called within a database transaction")
}
e := db.GetEngine(ctx)
jobCond := builder.NewCond()
@ -254,83 +259,144 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
}
// TODO: a more efficient way to filter labels
var job *ActionRunJob
log.Trace("runner labels: %v", runner.AgentLabels)
for _, v := range jobs {
if runner.CanMatchLabels(v.RunsOn) {
job = v
break
if !runner.CanMatchLabels(v.RunsOn) {
continue
}
}
if job == nil {
return nil, false, nil
}
if err := job.LoadAttributes(ctx); err != nil {
return nil, false, err
}
now := timeutil.TimeStampNow()
job.Started = now
job.Status = StatusRunning
task := &ActionTask{
JobID: job.ID,
Attempt: job.Attempt,
RunnerID: runner.ID,
Started: now,
Status: StatusRunning,
RepoID: job.RepoID,
OwnerID: job.OwnerID,
CommitSHA: job.CommitSHA,
IsForkPullRequest: job.IsForkPullRequest,
}
task.GenerateAndFillToken()
workflowJob, err := job.ParseJob()
if err != nil {
return nil, false, fmt.Errorf("load job %d: %w", job.ID, err)
}
if _, err := e.Insert(task); err != nil {
return nil, false, err
}
task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)
if err := UpdateTask(ctx, task, "log_filename"); err != nil {
return nil, false, err
}
if len(workflowJob.Steps) > 0 {
steps := make([]*ActionTaskStep, len(workflowJob.Steps))
for i, v := range workflowJob.Steps {
steps[i] = &ActionTaskStep{
Name: makeTaskStepDisplayName(v, 255),
TaskID: task.ID,
Index: int64(i),
RepoID: task.RepoID,
Status: StatusWaiting,
}
}
if _, err := e.Insert(steps); err != nil {
task, ok, err := claimJobForRunner(ctx, runner, v)
if err != nil {
return nil, false, err
}
task.Steps = steps
if ok {
return task, true, nil
}
// Another runner claimed this job concurrently; try the next one.
}
return nil, false, nil
}
job.TaskID = task.ID
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
return nil, false, err
} else if n != 1 {
// claimJobForRunner attempts to atomically claim job for runner inside its own
// transaction. Returns (task, true, nil) on success, or (nil, false, nil) when
// another runner wins the optimistic-lock race (the caller should try the next
// candidate job).
func claimJobForRunner(ctx context.Context, runner *ActionRunner, job *ActionRunJob) (*ActionTask, bool, error) {
var resultTask *ActionTask
err := db.WithTx(ctx, func(ctx context.Context) error {
e := db.GetEngine(ctx)
if err := job.LoadAttributes(ctx); err != nil {
return err
}
now := timeutil.TimeStampNow()
job.Started = now
job.Status = StatusRunning
task := &ActionTask{
JobID: job.ID,
Attempt: job.Attempt,
RunnerID: runner.ID,
Started: now,
Status: StatusRunning,
RepoID: job.RepoID,
OwnerID: job.OwnerID,
CommitSHA: job.CommitSHA,
IsForkPullRequest: job.IsForkPullRequest,
}
task.GenerateAndFillToken()
workflowJob, err := job.ParseJob()
if err != nil {
return fmt.Errorf("load job %d: %w", job.ID, err)
}
if _, err := e.Insert(task); err != nil {
return err
}
task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)
if err := UpdateTask(ctx, task, "log_filename"); err != nil {
return err
}
if len(workflowJob.Steps) > 0 {
steps := make([]*ActionTaskStep, len(workflowJob.Steps))
for i, v := range workflowJob.Steps {
steps[i] = &ActionTaskStep{
Name: makeTaskStepDisplayName(v, 255),
TaskID: task.ID,
Index: int64(i),
RepoID: task.RepoID,
Status: StatusWaiting,
}
}
if _, err := e.Insert(steps); err != nil {
return err
}
task.Steps = steps
}
job.TaskID = task.ID
n, err := UpdateRunJob(ctx, job, builder.And(builder.Eq{"task_id": 0}, builder.Eq{"status": StatusWaiting}))
if err != nil {
return err
}
if n != 1 {
// Another runner claimed this job between our scan and this update;
// signal the outer loop to move on without treating this as an error.
return errJobAlreadyClaimed
}
task.Job = job
resultTask = task
return nil
})
if errors.Is(err, errJobAlreadyClaimed) {
return nil, false, nil
}
task.Job = job
if err := committer.Commit(); err != nil {
if err != nil {
return nil, false, err
}
return resultTask, true, nil
}
return task, true, nil
// ReleaseTaskForRunner reverts a freshly-claimed but undelivered task: it deletes
// the task together with its steps and returns the job to the waiting queue. It is
// used when assembling the runner response fails after the job was already claimed,
// so the job is not stranded in running state with no runner ever executing it.
func ReleaseTaskForRunner(ctx context.Context, task *ActionTask) error {
return db.WithTx(ctx, func(ctx context.Context) error {
e := db.GetEngine(ctx)
job, err := GetRunJobByRepoAndID(ctx, task.RepoID, task.JobID)
if err != nil {
return err
}
job.Status = StatusWaiting
job.Started = 0
job.TaskID = 0
// Guard on task_id and status so we only release while the job still
// references this task and has not progressed past running.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": task.ID, "status": StatusRunning}, "status", "started", "task_id")
if err != nil {
return err
}
if n != 1 {
return fmt.Errorf("release task %d: job %d no longer references it", task.ID, task.JobID)
}
if _, err := e.Delete(&ActionTaskStep{TaskID: task.ID}); err != nil {
return err
}
if _, err := e.ID(task.ID).Delete(&ActionTask{}); err != nil {
return err
}
return nil
})
}
func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error {

View File

@ -306,3 +306,68 @@ func TestStopTaskCancellingFallsBackForMissingRunner(t *testing.T) {
assert.Equal(t, StatusCancelled, jobAfterStop.Status)
assert.NotZero(t, jobAfterStop.Stopped)
}
// TestReleaseTaskForRunner verifies that releasing a freshly-claimed task returns
// its job to the waiting queue and deletes the task and its steps, so a failure
// while assembling the runner response cannot strand the job in running state.
func TestReleaseTaskForRunner(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
run := &ActionRun{
Title: "release-task-test-run",
RepoID: 1,
OwnerID: 2,
WorkflowID: "test.yaml",
Index: 9902,
TriggerUserID: 2,
Ref: "refs/heads/main",
CommitSHA: "c2d72f548424103f01ee1dc02889c1e2bff816b0",
Event: "push",
TriggerEvent: "push",
Status: StatusWaiting,
}
require.NoError(t, db.Insert(t.Context(), run))
job := &ActionRunJob{
RunID: run.ID,
RepoID: run.RepoID,
OwnerID: run.OwnerID,
CommitSHA: run.CommitSHA,
Name: "release-job",
Attempt: 1,
JobID: "release-job",
Status: StatusWaiting,
RunsOn: []string{"ubuntu-latest"},
WorkflowPayload: []byte("on: push\njobs:\n release-job:\n runs-on: ubuntu-latest\n steps:\n - run: echo hi\n"),
}
require.NoError(t, db.Insert(t.Context(), job))
runner := &ActionRunner{
UUID: "release-runner-uuid",
Name: "release-runner",
AgentLabels: []string{"ubuntu-latest"},
}
runner.GenerateAndFillToken()
require.NoError(t, db.Insert(t.Context(), runner))
task, ok, err := CreateTaskForRunner(t.Context(), runner)
require.NoError(t, err)
require.True(t, ok)
require.NotNil(t, task)
claimed := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: job.ID})
require.Equal(t, StatusRunning, claimed.Status)
require.Equal(t, task.ID, claimed.TaskID)
require.NoError(t, ReleaseTaskForRunner(t.Context(), task))
// Job is back in the waiting queue with no task assigned.
released := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: job.ID})
assert.Equal(t, StatusWaiting, released.Status)
assert.Zero(t, released.TaskID)
assert.Zero(t, released.Started)
// The task and its steps are gone.
unittest.AssertNotExistsBean(t, &ActionTask{ID: task.ID})
unittest.AssertNotExistsBean(t, &ActionTaskStep{TaskID: task.ID})
}

View File

@ -12,6 +12,7 @@ import (
actions_model "gitea.dev/models/actions"
"gitea.dev/models/db"
secret_model "gitea.dev/models/secret"
"gitea.dev/modules/log"
"google.golang.org/protobuf/types/known/structpb"
)
@ -47,59 +48,26 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
}
}
if err := db.WithTx(ctx, func(ctx context.Context) error {
t, ok, err := actions_model.CreateTaskForRunner(ctx, runner)
if err != nil {
return fmt.Errorf("CreateTaskForRunner: %w", err)
}
if !ok {
return nil
}
if err := t.LoadAttributes(ctx); err != nil {
return fmt.Errorf("task LoadAttributes: %w", err)
}
job = t.Job
actionTask = t
secrets, err := secret_model.GetSecretsOfTask(ctx, t)
if err != nil {
return fmt.Errorf("GetSecretsOfTask: %w", err)
}
vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run)
if err != nil {
return fmt.Errorf("GetVariablesOfRun: %w", err)
}
needs, err := findTaskNeeds(ctx, job)
if err != nil {
return fmt.Errorf("findTaskNeeds: %w", err)
}
taskContext, err := generateTaskContext(ctx, t)
if err != nil {
return fmt.Errorf("generateTaskContext: %w", err)
}
task = &runnerv1.Task{
Id: t.ID,
WorkflowPayload: t.Job.WorkflowPayload,
Context: taskContext,
Secrets: secrets,
Vars: vars,
Needs: needs,
}
return nil
}); err != nil {
return nil, false, err
t, ok, err := actions_model.CreateTaskForRunner(ctx, runner)
if err != nil {
return nil, false, fmt.Errorf("CreateTaskForRunner: %w", err)
}
if task == nil {
if !ok {
return nil, false, nil
}
task, job, err = buildRunnerTask(ctx, t)
if err != nil {
// The job was already claimed but assembling its payload failed; release the
// claim so the job returns to the waiting queue instead of being stranded in
// running state with no runner ever executing it.
if relErr := actions_model.ReleaseTaskForRunner(ctx, t); relErr != nil {
log.Error("ReleaseTaskForRunner [task_id: %d]: %v", t.ID, relErr)
}
return nil, false, err
}
actionTask = t
CreateCommitStatusForRunJobs(ctx, job.Run, job)
NotifyWorkflowJobStatusUpdateWithTask(ctx, job, actionTask)
// job.Run is loaded inside the transaction before UpdateRunJob sets run.Started,
@ -111,6 +79,44 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
return task, true, nil
}
// buildRunnerTask assembles the runner-facing task payload for an already-claimed
// task. All operations are read-only; on error the caller releases the claim.
func buildRunnerTask(ctx context.Context, t *actions_model.ActionTask) (*runnerv1.Task, *actions_model.ActionRunJob, error) {
if err := t.LoadAttributes(ctx); err != nil {
return nil, nil, fmt.Errorf("task LoadAttributes: %w", err)
}
job := t.Job
secrets, err := secret_model.GetSecretsOfTask(ctx, t)
if err != nil {
return nil, nil, fmt.Errorf("GetSecretsOfTask: %w", err)
}
vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run)
if err != nil {
return nil, nil, fmt.Errorf("GetVariablesOfRun: %w", err)
}
needs, err := findTaskNeeds(ctx, job)
if err != nil {
return nil, nil, fmt.Errorf("findTaskNeeds: %w", err)
}
taskContext, err := generateTaskContext(ctx, t)
if err != nil {
return nil, nil, fmt.Errorf("generateTaskContext: %w", err)
}
return &runnerv1.Task{
Id: t.ID,
WorkflowPayload: t.Job.WorkflowPayload,
Context: taskContext,
Secrets: secrets,
Vars: vars,
Needs: needs,
}, job, nil
}
func generateTaskContext(ctx context.Context, t *actions_model.ActionTask) (*structpb.Struct, error) {
giteaRuntimeToken, err := CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID)
if err != nil {

View File

@ -0,0 +1,118 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package integration
import (
"sync"
"testing"
actions_model "gitea.dev/models/actions"
"gitea.dev/models/db"
"gitea.dev/models/unittest"
"gitea.dev/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// minimalWorkflowPayload returns the minimal YAML for a single-job workflow with no steps.
func minimalConcurrentWorkflowPayload(jobID string) []byte {
return []byte("on: push\njobs:\n " + jobID + ":\n runs-on: ubuntu-latest\n")
}
// TestCreateTaskForRunnerConcurrentClaim verifies that when multiple runners
// poll simultaneously and all initially see the same first waiting job,
// each runner claims a distinct job rather than all but one being left
// empty-handed. This is the regression test for the race condition where
// runners losing the optimistic-lock on job #1 would receive latestVersion
// and never retry the remaining 49+ jobs.
//
// It lives in tests/integration rather than a unit test because SQLite
// serializes write transactions, so the contended optimistic-lock path this
// guards only runs concurrently against MySQL/PostgreSQL in CI.
func TestCreateTaskForRunnerConcurrentClaim(t *testing.T) {
defer tests.PrepareTestEnv(t)()
const numJobs = 3
run := &actions_model.ActionRun{
Title: "concurrent-claim-test-run",
RepoID: 1,
OwnerID: 2,
WorkflowID: "test.yaml",
Index: 9901,
TriggerUserID: 2,
Ref: "refs/heads/main",
CommitSHA: "c2d72f548424103f01ee1dc02889c1e2bff816b0",
Event: "push",
TriggerEvent: "push",
Status: actions_model.StatusWaiting,
}
require.NoError(t, db.Insert(t.Context(), run))
jobs := make([]*actions_model.ActionRunJob, numJobs)
for i := range numJobs {
jobID := "concurrent-job-" + string(rune('a'+i))
jobs[i] = &actions_model.ActionRunJob{
RunID: run.ID,
RepoID: run.RepoID,
OwnerID: run.OwnerID,
CommitSHA: run.CommitSHA,
Name: jobID,
Attempt: 1,
JobID: jobID,
Status: actions_model.StatusWaiting,
RunsOn: []string{"ubuntu-latest"},
WorkflowPayload: minimalConcurrentWorkflowPayload(jobID),
}
require.NoError(t, db.Insert(t.Context(), jobs[i]))
}
runners := make([]*actions_model.ActionRunner, numJobs)
for i := range numJobs {
r := &actions_model.ActionRunner{
UUID: "concurrent-runner-uuid-" + string(rune('a'+i)),
Name: "concurrent-runner-" + string(rune('a'+i)),
AgentLabels: []string{"ubuntu-latest"},
}
r.GenerateAndFillToken()
runners[i] = r
require.NoError(t, db.Insert(t.Context(), runners[i]))
}
// Simulate the burst: all runners call CreateTaskForRunner concurrently,
// as happens when all see the same stale tasksVersion simultaneously.
type result struct {
task *actions_model.ActionTask
ok bool
err error
}
results := make([]result, numJobs)
var wg sync.WaitGroup
for i := range numJobs {
wg.Go(func() {
task, ok, err := actions_model.CreateTaskForRunner(t.Context(), runners[i])
results[i] = result{task, ok, err}
})
}
wg.Wait()
// Every runner must have received a task without error.
claimedJobIDs := make(map[int64]bool)
for i, r := range results {
require.NoError(t, r.err, "runner %d got an unexpected error", i)
require.True(t, r.ok, "runner %d did not get a task even though free jobs exist", i)
require.NotNil(t, r.task)
assert.False(t, claimedJobIDs[r.task.JobID], "job %d was claimed by more than one runner", r.task.JobID)
claimedJobIDs[r.task.JobID] = true
}
assert.Len(t, claimedJobIDs, numJobs, "expected %d distinct jobs to be claimed", numJobs)
// All jobs must now be running with a task assigned.
for _, j := range jobs {
updated := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: j.ID})
assert.Equal(t, actions_model.StatusRunning, updated.Status)
assert.NotZero(t, updated.TaskID)
}
}