From 0f5102427e1d6c9e1adebe07bb0faabe6ae5f680 Mon Sep 17 00:00:00 2001 From: bircni Date: Sat, 27 Jun 2026 19:56:12 +0200 Subject: [PATCH] fix(actions): ensure all waiting jobs get runners in large workflows (#38200) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Fixes two related bugs that cause jobs in large workflows (50+ parallel jobs) to never get a runner assigned even though runners are free. ### Bug 1 — Concurrent runner race When N runners all poll `FetchTask` with a stale `tasksVersion` simultaneously, they all query the same waiting job list sorted by `(updated, id)` and all pick **job #1**. Only one wins the `UPDATE WHERE task_id=0` optimistic lock; the rest return empty-handed but still receive `latestVersion` in the response. They then consider themselves "up to date" and skip `PickTask` on every subsequent poll, leaving jobs #2–50 permanently unassigned. **Fix:** `CreateTaskForRunner` now iterates through all matching waiting jobs. When the optimistic lock fails on job #1, it immediately tries job #2, then #3, etc., each in its own independent transaction so a failed attempt rolls back cleanly before the next candidate is tried. `PickTask` no longer wraps this call in an outer `db.WithTx` (which caused `halfCommitter` entanglement that prevented per-attempt rollbacks). ### Bug 2 — Idle runner doesn't re-check after finishing a task `tasks_version` only bumps when a job transitions **to** waiting (new workflow triggered, blocked→unblocked). After a runner finishes its current task it polls `FetchTask` with `tasksVersion == latestVersion`, so the server skips `PickTask` entirely — the remaining 45 waiting jobs are invisible to the now-idle runner. **Fix:** Also call `IncreaseTaskVersion` in `UpdateRunJob` when a (non-reusable-caller) job transitions to a **done** state. Idle runners then see a version mismatch on their next poll and attempt `PickTask`, picking up the remaining jobs. --- models/actions/run_job.go | 47 +++- models/actions/task.go | 208 ++++++++++++------ models/actions/task_test.go | 65 ++++++ services/actions/task.go | 104 ++++----- .../actions_concurrent_claim_test.go | 118 ++++++++++ 5 files changed, 414 insertions(+), 128 deletions(-) create mode 100644 tests/integration/actions_concurrent_claim_test.go diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 02877e0e2c..33696f7bee 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -361,6 +361,14 @@ func CollectAllDescendantJobs(parent *ActionRunJob, allJobs []*ActionRunJob) []* return out } +// hasWaitingJobsToPick reports whether any waiting, unclaimed, non-reusable job +// remains in the repo, i.e. work that an idle runner could still pick up. +func hasWaitingJobsToPick(ctx context.Context, repoID int64) (bool, error) { + return db.GetEngine(ctx). + Where("repo_id = ? AND task_id = ? AND status = ? AND is_reusable_caller = ?", repoID, 0, StatusWaiting, false). + Exist(&ActionRunJob{}) +} + func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) { e := db.GetEngine(ctx) @@ -385,14 +393,6 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col return affected, nil } - // Reusable workflow caller jobs are never picked up by runners, so they don't need a task-version bump. - if statusUpdated && job.Status.IsWaiting() && !job.IsReusableCaller { - // if the status of job changes to waiting again, increase tasks version. - if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { - return 0, err - } - } - if job.RunID == 0 { var err error if job, err = GetRunJobByRepoAndID(ctx, job.RepoID, job.ID); err != nil { @@ -400,6 +400,37 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col } } + // Reusable workflow caller jobs are never picked up by runners, so they don't need a task-version bump. + if statusUpdated && !job.IsReusableCaller { + switch { + case job.Status.IsWaiting(): + // A job returning to the waiting queue is work a runner can pick up, so bump the + // version to wake idle runners whose tasksVersion already equals latestVersion. + if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + return 0, err + } + case job.Status.IsDone(): + // When a job finishes, bump the version so that idle runners — whose + // tasksVersion already equals the current latestVersion — learn that + // remaining waiting jobs are still available and attempt PickTask again. + // Without this bump, runners that completed their tasks would see + // tasksVersion==latestVersion and skip PickTask, leaving the other jobs + // permanently unassigned until the version changes for another reason. + // Only bump when waiting work actually remains for this repo, otherwise + // every job completion would needlessly bump the global version and wake + // every idle runner instance-wide for nothing. + hasWaiting, err := hasWaitingJobsToPick(ctx, job.RepoID) + if err != nil { + return 0, err + } + if hasWaiting { + if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + return 0, err + } + } + } + } + if statusUpdated && job.ParentJobID > 0 { // Reusable workflow caller's children cascade their status changes upward to the parent caller. parent, err := GetRunJobByRunAndID(ctx, job.RunID, job.ParentJobID) diff --git a/models/actions/task.go b/models/actions/task.go index 36318c878a..27b7efe124 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -227,13 +227,18 @@ func makeTaskStepDisplayName(step *jobparser.Step, limit int) (name string) { return util.EllipsisDisplayString(name, limit) // database column has a length limit } -func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { - ctx, committer, err := db.TxContext(ctx) - if err != nil { - return nil, false, err - } - defer committer.Close() +// errJobAlreadyClaimed is a sentinel used inside claimJobForRunner to signal that +// another runner won the optimistic-lock race; it is never returned to callers. +var errJobAlreadyClaimed = errors.New("job already claimed by another runner") +// CreateTaskForRunner finds a waiting job that matches the runner's labels and +// atomically claims it. It iterates through all matching jobs so that a +// concurrent claim by another runner (which would lose the optimistic lock on +// job #1) does not leave the remaining jobs permanently unassigned. +func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { + if db.InTransaction(ctx) { + return nil, false, errors.New("CreateTaskForRunner must not be called within a database transaction") + } e := db.GetEngine(ctx) jobCond := builder.NewCond() @@ -254,83 +259,144 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask } // TODO: a more efficient way to filter labels - var job *ActionRunJob log.Trace("runner labels: %v", runner.AgentLabels) for _, v := range jobs { - if runner.CanMatchLabels(v.RunsOn) { - job = v - break + if !runner.CanMatchLabels(v.RunsOn) { + continue } - } - if job == nil { - return nil, false, nil - } - if err := job.LoadAttributes(ctx); err != nil { - return nil, false, err - } - - now := timeutil.TimeStampNow() - job.Started = now - job.Status = StatusRunning - - task := &ActionTask{ - JobID: job.ID, - Attempt: job.Attempt, - RunnerID: runner.ID, - Started: now, - Status: StatusRunning, - RepoID: job.RepoID, - OwnerID: job.OwnerID, - CommitSHA: job.CommitSHA, - IsForkPullRequest: job.IsForkPullRequest, - } - task.GenerateAndFillToken() - - workflowJob, err := job.ParseJob() - if err != nil { - return nil, false, fmt.Errorf("load job %d: %w", job.ID, err) - } - - if _, err := e.Insert(task); err != nil { - return nil, false, err - } - - task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) - if err := UpdateTask(ctx, task, "log_filename"); err != nil { - return nil, false, err - } - - if len(workflowJob.Steps) > 0 { - steps := make([]*ActionTaskStep, len(workflowJob.Steps)) - for i, v := range workflowJob.Steps { - steps[i] = &ActionTaskStep{ - Name: makeTaskStepDisplayName(v, 255), - TaskID: task.ID, - Index: int64(i), - RepoID: task.RepoID, - Status: StatusWaiting, - } - } - if _, err := e.Insert(steps); err != nil { + task, ok, err := claimJobForRunner(ctx, runner, v) + if err != nil { return nil, false, err } - task.Steps = steps + if ok { + return task, true, nil + } + // Another runner claimed this job concurrently; try the next one. } + return nil, false, nil +} - job.TaskID = task.ID - if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { - return nil, false, err - } else if n != 1 { +// claimJobForRunner attempts to atomically claim job for runner inside its own +// transaction. Returns (task, true, nil) on success, or (nil, false, nil) when +// another runner wins the optimistic-lock race (the caller should try the next +// candidate job). +func claimJobForRunner(ctx context.Context, runner *ActionRunner, job *ActionRunJob) (*ActionTask, bool, error) { + var resultTask *ActionTask + + err := db.WithTx(ctx, func(ctx context.Context) error { + e := db.GetEngine(ctx) + + if err := job.LoadAttributes(ctx); err != nil { + return err + } + + now := timeutil.TimeStampNow() + job.Started = now + job.Status = StatusRunning + + task := &ActionTask{ + JobID: job.ID, + Attempt: job.Attempt, + RunnerID: runner.ID, + Started: now, + Status: StatusRunning, + RepoID: job.RepoID, + OwnerID: job.OwnerID, + CommitSHA: job.CommitSHA, + IsForkPullRequest: job.IsForkPullRequest, + } + task.GenerateAndFillToken() + + workflowJob, err := job.ParseJob() + if err != nil { + return fmt.Errorf("load job %d: %w", job.ID, err) + } + + if _, err := e.Insert(task); err != nil { + return err + } + + task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID) + if err := UpdateTask(ctx, task, "log_filename"); err != nil { + return err + } + + if len(workflowJob.Steps) > 0 { + steps := make([]*ActionTaskStep, len(workflowJob.Steps)) + for i, v := range workflowJob.Steps { + steps[i] = &ActionTaskStep{ + Name: makeTaskStepDisplayName(v, 255), + TaskID: task.ID, + Index: int64(i), + RepoID: task.RepoID, + Status: StatusWaiting, + } + } + if _, err := e.Insert(steps); err != nil { + return err + } + task.Steps = steps + } + + job.TaskID = task.ID + n, err := UpdateRunJob(ctx, job, builder.And(builder.Eq{"task_id": 0}, builder.Eq{"status": StatusWaiting})) + if err != nil { + return err + } + if n != 1 { + // Another runner claimed this job between our scan and this update; + // signal the outer loop to move on without treating this as an error. + return errJobAlreadyClaimed + } + + task.Job = job + resultTask = task + return nil + }) + + if errors.Is(err, errJobAlreadyClaimed) { return nil, false, nil } - - task.Job = job - - if err := committer.Commit(); err != nil { + if err != nil { return nil, false, err } + return resultTask, true, nil +} - return task, true, nil +// ReleaseTaskForRunner reverts a freshly-claimed but undelivered task: it deletes +// the task together with its steps and returns the job to the waiting queue. It is +// used when assembling the runner response fails after the job was already claimed, +// so the job is not stranded in running state with no runner ever executing it. +func ReleaseTaskForRunner(ctx context.Context, task *ActionTask) error { + return db.WithTx(ctx, func(ctx context.Context) error { + e := db.GetEngine(ctx) + + job, err := GetRunJobByRepoAndID(ctx, task.RepoID, task.JobID) + if err != nil { + return err + } + + job.Status = StatusWaiting + job.Started = 0 + job.TaskID = 0 + // Guard on task_id and status so we only release while the job still + // references this task and has not progressed past running. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": task.ID, "status": StatusRunning}, "status", "started", "task_id") + if err != nil { + return err + } + if n != 1 { + return fmt.Errorf("release task %d: job %d no longer references it", task.ID, task.JobID) + } + + if _, err := e.Delete(&ActionTaskStep{TaskID: task.ID}); err != nil { + return err + } + if _, err := e.ID(task.ID).Delete(&ActionTask{}); err != nil { + return err + } + return nil + }) } func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error { diff --git a/models/actions/task_test.go b/models/actions/task_test.go index fe4436ec63..e028ad5106 100644 --- a/models/actions/task_test.go +++ b/models/actions/task_test.go @@ -306,3 +306,68 @@ func TestStopTaskCancellingFallsBackForMissingRunner(t *testing.T) { assert.Equal(t, StatusCancelled, jobAfterStop.Status) assert.NotZero(t, jobAfterStop.Stopped) } + +// TestReleaseTaskForRunner verifies that releasing a freshly-claimed task returns +// its job to the waiting queue and deletes the task and its steps, so a failure +// while assembling the runner response cannot strand the job in running state. +func TestReleaseTaskForRunner(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + run := &ActionRun{ + Title: "release-task-test-run", + RepoID: 1, + OwnerID: 2, + WorkflowID: "test.yaml", + Index: 9902, + TriggerUserID: 2, + Ref: "refs/heads/main", + CommitSHA: "c2d72f548424103f01ee1dc02889c1e2bff816b0", + Event: "push", + TriggerEvent: "push", + Status: StatusWaiting, + } + require.NoError(t, db.Insert(t.Context(), run)) + + job := &ActionRunJob{ + RunID: run.ID, + RepoID: run.RepoID, + OwnerID: run.OwnerID, + CommitSHA: run.CommitSHA, + Name: "release-job", + Attempt: 1, + JobID: "release-job", + Status: StatusWaiting, + RunsOn: []string{"ubuntu-latest"}, + WorkflowPayload: []byte("on: push\njobs:\n release-job:\n runs-on: ubuntu-latest\n steps:\n - run: echo hi\n"), + } + require.NoError(t, db.Insert(t.Context(), job)) + + runner := &ActionRunner{ + UUID: "release-runner-uuid", + Name: "release-runner", + AgentLabels: []string{"ubuntu-latest"}, + } + runner.GenerateAndFillToken() + require.NoError(t, db.Insert(t.Context(), runner)) + + task, ok, err := CreateTaskForRunner(t.Context(), runner) + require.NoError(t, err) + require.True(t, ok) + require.NotNil(t, task) + + claimed := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: job.ID}) + require.Equal(t, StatusRunning, claimed.Status) + require.Equal(t, task.ID, claimed.TaskID) + + require.NoError(t, ReleaseTaskForRunner(t.Context(), task)) + + // Job is back in the waiting queue with no task assigned. + released := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: job.ID}) + assert.Equal(t, StatusWaiting, released.Status) + assert.Zero(t, released.TaskID) + assert.Zero(t, released.Started) + + // The task and its steps are gone. + unittest.AssertNotExistsBean(t, &ActionTask{ID: task.ID}) + unittest.AssertNotExistsBean(t, &ActionTaskStep{TaskID: task.ID}) +} diff --git a/services/actions/task.go b/services/actions/task.go index ff54281cba..a76a981b50 100644 --- a/services/actions/task.go +++ b/services/actions/task.go @@ -12,6 +12,7 @@ import ( actions_model "gitea.dev/models/actions" "gitea.dev/models/db" secret_model "gitea.dev/models/secret" + "gitea.dev/modules/log" "google.golang.org/protobuf/types/known/structpb" ) @@ -47,59 +48,26 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv } } - if err := db.WithTx(ctx, func(ctx context.Context) error { - t, ok, err := actions_model.CreateTaskForRunner(ctx, runner) - if err != nil { - return fmt.Errorf("CreateTaskForRunner: %w", err) - } - if !ok { - return nil - } - - if err := t.LoadAttributes(ctx); err != nil { - return fmt.Errorf("task LoadAttributes: %w", err) - } - job = t.Job - actionTask = t - - secrets, err := secret_model.GetSecretsOfTask(ctx, t) - if err != nil { - return fmt.Errorf("GetSecretsOfTask: %w", err) - } - - vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run) - if err != nil { - return fmt.Errorf("GetVariablesOfRun: %w", err) - } - - needs, err := findTaskNeeds(ctx, job) - if err != nil { - return fmt.Errorf("findTaskNeeds: %w", err) - } - - taskContext, err := generateTaskContext(ctx, t) - if err != nil { - return fmt.Errorf("generateTaskContext: %w", err) - } - - task = &runnerv1.Task{ - Id: t.ID, - WorkflowPayload: t.Job.WorkflowPayload, - Context: taskContext, - Secrets: secrets, - Vars: vars, - Needs: needs, - } - - return nil - }); err != nil { - return nil, false, err + t, ok, err := actions_model.CreateTaskForRunner(ctx, runner) + if err != nil { + return nil, false, fmt.Errorf("CreateTaskForRunner: %w", err) } - - if task == nil { + if !ok { return nil, false, nil } + task, job, err = buildRunnerTask(ctx, t) + if err != nil { + // The job was already claimed but assembling its payload failed; release the + // claim so the job returns to the waiting queue instead of being stranded in + // running state with no runner ever executing it. + if relErr := actions_model.ReleaseTaskForRunner(ctx, t); relErr != nil { + log.Error("ReleaseTaskForRunner [task_id: %d]: %v", t.ID, relErr) + } + return nil, false, err + } + actionTask = t + CreateCommitStatusForRunJobs(ctx, job.Run, job) NotifyWorkflowJobStatusUpdateWithTask(ctx, job, actionTask) // job.Run is loaded inside the transaction before UpdateRunJob sets run.Started, @@ -111,6 +79,44 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv return task, true, nil } +// buildRunnerTask assembles the runner-facing task payload for an already-claimed +// task. All operations are read-only; on error the caller releases the claim. +func buildRunnerTask(ctx context.Context, t *actions_model.ActionTask) (*runnerv1.Task, *actions_model.ActionRunJob, error) { + if err := t.LoadAttributes(ctx); err != nil { + return nil, nil, fmt.Errorf("task LoadAttributes: %w", err) + } + job := t.Job + + secrets, err := secret_model.GetSecretsOfTask(ctx, t) + if err != nil { + return nil, nil, fmt.Errorf("GetSecretsOfTask: %w", err) + } + + vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run) + if err != nil { + return nil, nil, fmt.Errorf("GetVariablesOfRun: %w", err) + } + + needs, err := findTaskNeeds(ctx, job) + if err != nil { + return nil, nil, fmt.Errorf("findTaskNeeds: %w", err) + } + + taskContext, err := generateTaskContext(ctx, t) + if err != nil { + return nil, nil, fmt.Errorf("generateTaskContext: %w", err) + } + + return &runnerv1.Task{ + Id: t.ID, + WorkflowPayload: t.Job.WorkflowPayload, + Context: taskContext, + Secrets: secrets, + Vars: vars, + Needs: needs, + }, job, nil +} + func generateTaskContext(ctx context.Context, t *actions_model.ActionTask) (*structpb.Struct, error) { giteaRuntimeToken, err := CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID) if err != nil { diff --git a/tests/integration/actions_concurrent_claim_test.go b/tests/integration/actions_concurrent_claim_test.go new file mode 100644 index 0000000000..c33955352e --- /dev/null +++ b/tests/integration/actions_concurrent_claim_test.go @@ -0,0 +1,118 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "sync" + "testing" + + actions_model "gitea.dev/models/actions" + "gitea.dev/models/db" + "gitea.dev/models/unittest" + "gitea.dev/tests" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// minimalWorkflowPayload returns the minimal YAML for a single-job workflow with no steps. +func minimalConcurrentWorkflowPayload(jobID string) []byte { + return []byte("on: push\njobs:\n " + jobID + ":\n runs-on: ubuntu-latest\n") +} + +// TestCreateTaskForRunnerConcurrentClaim verifies that when multiple runners +// poll simultaneously and all initially see the same first waiting job, +// each runner claims a distinct job rather than all but one being left +// empty-handed. This is the regression test for the race condition where +// runners losing the optimistic-lock on job #1 would receive latestVersion +// and never retry the remaining 49+ jobs. +// +// It lives in tests/integration rather than a unit test because SQLite +// serializes write transactions, so the contended optimistic-lock path this +// guards only runs concurrently against MySQL/PostgreSQL in CI. +func TestCreateTaskForRunnerConcurrentClaim(t *testing.T) { + defer tests.PrepareTestEnv(t)() + + const numJobs = 3 + + run := &actions_model.ActionRun{ + Title: "concurrent-claim-test-run", + RepoID: 1, + OwnerID: 2, + WorkflowID: "test.yaml", + Index: 9901, + TriggerUserID: 2, + Ref: "refs/heads/main", + CommitSHA: "c2d72f548424103f01ee1dc02889c1e2bff816b0", + Event: "push", + TriggerEvent: "push", + Status: actions_model.StatusWaiting, + } + require.NoError(t, db.Insert(t.Context(), run)) + + jobs := make([]*actions_model.ActionRunJob, numJobs) + for i := range numJobs { + jobID := "concurrent-job-" + string(rune('a'+i)) + jobs[i] = &actions_model.ActionRunJob{ + RunID: run.ID, + RepoID: run.RepoID, + OwnerID: run.OwnerID, + CommitSHA: run.CommitSHA, + Name: jobID, + Attempt: 1, + JobID: jobID, + Status: actions_model.StatusWaiting, + RunsOn: []string{"ubuntu-latest"}, + WorkflowPayload: minimalConcurrentWorkflowPayload(jobID), + } + require.NoError(t, db.Insert(t.Context(), jobs[i])) + } + + runners := make([]*actions_model.ActionRunner, numJobs) + for i := range numJobs { + r := &actions_model.ActionRunner{ + UUID: "concurrent-runner-uuid-" + string(rune('a'+i)), + Name: "concurrent-runner-" + string(rune('a'+i)), + AgentLabels: []string{"ubuntu-latest"}, + } + r.GenerateAndFillToken() + runners[i] = r + require.NoError(t, db.Insert(t.Context(), runners[i])) + } + + // Simulate the burst: all runners call CreateTaskForRunner concurrently, + // as happens when all see the same stale tasksVersion simultaneously. + type result struct { + task *actions_model.ActionTask + ok bool + err error + } + results := make([]result, numJobs) + var wg sync.WaitGroup + for i := range numJobs { + wg.Go(func() { + task, ok, err := actions_model.CreateTaskForRunner(t.Context(), runners[i]) + results[i] = result{task, ok, err} + }) + } + wg.Wait() + + // Every runner must have received a task without error. + claimedJobIDs := make(map[int64]bool) + for i, r := range results { + require.NoError(t, r.err, "runner %d got an unexpected error", i) + require.True(t, r.ok, "runner %d did not get a task even though free jobs exist", i) + require.NotNil(t, r.task) + assert.False(t, claimedJobIDs[r.task.JobID], "job %d was claimed by more than one runner", r.task.JobID) + claimedJobIDs[r.task.JobID] = true + } + assert.Len(t, claimedJobIDs, numJobs, "expected %d distinct jobs to be claimed", numJobs) + + // All jobs must now be running with a task assigned. + for _, j := range jobs { + updated := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: j.ID}) + assert.Equal(t, actions_model.StatusRunning, updated.Status) + assert.NotZero(t, updated.TaskID) + } +}