0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-04-04 18:56:28 +02:00

Merge d3d23ecff511cbf3e8acede0589bc64d27d5c2ac into 30c07c20e94551141cc1873ab14bdd4c104bba94

This commit is contained in:
Pascal Zimmermann 2026-04-03 09:46:55 +02:00 committed by GitHub
commit b28d817df8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 459 additions and 9 deletions

View File

@ -26,21 +26,21 @@ const MaxJobNumPerRun = 256
// ActionRunJob represents a job of a run
type ActionRunJob struct {
ID int64
RunID int64 `xorm:"index"`
RunID int64 `xorm:"index index(idx_run_id_job_id)"`
Run *ActionRun `xorm:"-"`
RepoID int64 `xorm:"index(repo_concurrency)"`
Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"`
CommitSHA string `xorm:"index"`
IsForkPullRequest bool
Name string `xorm:"VARCHAR(255)"`
Name string `xorm:"VARCHAR(255)"`
Attempt int64
// WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse
// it should contain exactly one job with global workflow fields for this model
WorkflowPayload []byte
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
JobID string `xorm:"VARCHAR(255) index(idx_run_id_job_id)"` // job id in workflow, not job's id
Needs []string `xorm:"JSON TEXT"`
RunsOn []string `xorm:"JSON TEXT"`
TaskID int64 // the latest task of the job
@ -60,6 +60,9 @@ type ActionRunJob struct {
// Org/repo clamps are enforced when the token is used at runtime.
// It is JSON-encoded repo_model.ActionsTokenPermissions and may be empty if not specified.
TokenPermissions *repo_model.ActionsTokenPermissions `xorm:"JSON TEXT"`
// MaxParallel is the max-parallel value from strategy.max-parallel (0 = unlimited).
// All matrix jobs sharing the same RunID+JobID share this value.
MaxParallel int `xorm:"NOT NULL DEFAULT 0"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp

View File

@ -0,0 +1,56 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func getRunJobByID(ctx context.Context, t *testing.T, id int64) *ActionRunJob {
t.Helper()
got, exist, err := db.GetByID[ActionRunJob](ctx, id)
require.NoError(t, err)
require.True(t, exist)
return got
}
// TestMaxParallel_FieldPersistence verifies that MaxParallel is stored and retrieved correctly.
func TestMaxParallel_FieldPersistence(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
run := &ActionRun{ID: 100, RepoID: 1, OwnerID: 1, Index: 100, Status: StatusRunning}
require.NoError(t, db.Insert(ctx, run))
t.Run("zero value means unlimited", func(t *testing.T) {
job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "no-limit", Name: "No Limit", Status: StatusWaiting, MaxParallel: 0}
require.NoError(t, db.Insert(ctx, job))
got := getRunJobByID(ctx, t, job.ID)
assert.Equal(t, 0, got.MaxParallel)
})
t.Run("positive value is persisted", func(t *testing.T) {
job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "with-limit", Name: "With Limit", Status: StatusWaiting, MaxParallel: 3}
require.NoError(t, db.Insert(ctx, job))
got := getRunJobByID(ctx, t, job.ID)
assert.Equal(t, 3, got.MaxParallel)
})
t.Run("can be updated via UpdateRunJob", func(t *testing.T) {
job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "updatable", Name: "Updatable", Status: StatusWaiting, MaxParallel: 5}
require.NoError(t, db.Insert(ctx, job))
job.MaxParallel = 10
_, err := UpdateRunJob(ctx, job, nil, "max_parallel")
require.NoError(t, err)
got := getRunJobByID(ctx, t, job.ID)
assert.Equal(t, 10, got.MaxParallel)
})
}

View File

@ -260,10 +260,15 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
var job *ActionRunJob
log.Trace("runner labels: %v", runner.AgentLabels)
for _, v := range jobs {
if runner.CanMatchLabels(v.RunsOn) {
job = v
break
if !runner.CanMatchLabels(v.RunsOn) {
continue
}
// max-parallel is enforced at insertion time (InsertRun) and by
// jobStatusResolver, so a Waiting job is guaranteed a free slot.
job = v
break
}
if job == nil {
return nil, false, nil
@ -324,9 +329,12 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
}
job.TaskID = task.ID
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
// Must explicitly specify which columns to update, including status and started
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "task_id", "status", "started", "attempt", "updated"); err != nil {
return nil, false, err
} else if n != 1 {
// Another runner may have claimed this job, skip it
log.Debug("Job %s (run %d) was claimed by another runner, skipping", job.JobID, job.RunID)
return nil, false, nil
}

View File

@ -405,6 +405,7 @@ func prepareMigrationTasks() []*migration {
newMigration(328, "Add TokenPermissions column to ActionRunJob", v1_26.AddTokenPermissionsToActionRunJob),
newMigration(329, "Add unique constraint for user badge", v1_26.AddUniqueIndexForUserBadge),
newMigration(330, "Add name column to webhook", v1_26.AddNameToWebhook),
newMigration(331, "Add job max-parallel support", v1_26.AddJobMaxParallel),
}
return preparedMigrations
}

View File

@ -0,0 +1,17 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package v1_26
import "xorm.io/xorm"
// AddJobMaxParallel adds max_parallel to action_run_job with a composite index on (run_id, job_id).
func AddJobMaxParallel(x *xorm.Engine) error {
type ActionRunJob struct {
RunID int64 `xorm:"index index(idx_run_id_job_id)"`
JobID string `xorm:"VARCHAR(255) index(idx_run_id_job_id)"`
MaxParallel int `xorm:"NOT NULL DEFAULT 0"`
}
return x.Sync(new(ActionRunJob))
}

View File

@ -313,6 +313,15 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
ret := map[int64]actions_model.Status{}
// Pre-calculate the number of running-or-waiting jobs per JobID
runningOrWaiting := make(map[string]int)
for id, status := range r.statuses {
if status == actions_model.StatusRunning || status == actions_model.StatusWaiting {
runningOrWaiting[r.jobMap[id].JobID]++
}
}
for id, status := range r.statuses {
actionRunJob := r.jobMap[id]
if status != actions_model.StatusBlocked {
@ -337,7 +346,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
shouldStartJob := true
if !allSucceed {
// Not all dependent jobs completed successfully:
// * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition.
// * if the job has an "if" condition, it can be started; then the act_runner will evaluate the "if" condition.
// * otherwise, the job should be skipped.
shouldStartJob = r.resolveJobHasIfCondition(actionRunJob)
}
@ -350,6 +359,15 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
}
}
// Enforce max-parallel: if the number of running-or-waiting jobs of the same
// JobID already fills the limit, leave this job blocked.
if newStatus == actions_model.StatusWaiting && actionRunJob.MaxParallel > 0 {
if runningOrWaiting[actionRunJob.JobID] >= actionRunJob.MaxParallel {
continue // no free slot; leave blocked
}
runningOrWaiting[actionRunJob.JobID]++
}
if newStatus != actions_model.StatusBlocked {
ret[id] = newStatus
}

View File

@ -4,6 +4,7 @@
package actions
import (
"fmt"
"testing"
actions_model "code.gitea.io/gitea/models/actions"
@ -126,11 +127,224 @@ jobs:
},
want: map[int64]actions_model.Status{2: actions_model.StatusSkipped},
},
{
name: "max-parallel=1 promotes exactly one blocked job when one slot is open",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "build", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 1},
{ID: 2, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
{ID: 3, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
},
want: map[int64]actions_model.Status{},
},
{
name: "max-parallel=1 promotes one job after running job finishes",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "build", Status: actions_model.StatusSuccess, Needs: []string{}, MaxParallel: 1},
{ID: 2, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
{ID: 3, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
},
want: nil, // map iteration is non-deterministic; checked by count below
},
{
name: "max-parallel=2 does not promote when limit is reached",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2},
{ID: 2, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2},
{ID: 3, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
{ID: 4, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
},
want: map[int64]actions_model.Status{},
},
{
name: "max-parallel=2 promotes one job when one slot opens",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "test", Status: actions_model.StatusSuccess, Needs: []string{}, MaxParallel: 2},
{ID: 2, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2},
{ID: 3, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
{ID: 4, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
},
want: nil, // checked by count below
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := newJobStatusResolver(tt.jobs, nil)
assert.Equal(t, tt.want, r.Resolve(t.Context()))
got := r.Resolve(t.Context())
if tt.want == nil {
waitingCount := 0
for _, s := range got {
if s == actions_model.StatusWaiting {
waitingCount++
}
}
assert.Equal(t, 1, waitingCount, "expected exactly 1 job promoted to Waiting, got %v", got)
} else {
assert.Equal(t, tt.want, got)
}
})
}
}
func Test_maxParallelWorkflowLifecycle(t *testing.T) {
const matrixJobID = "matrix"
countStatus := func(jobs actions_model.ActionJobList, s actions_model.Status) int {
n := 0
for _, j := range jobs {
if j.Status == s {
n++
}
}
return n
}
applyUpdates := func(jobs actions_model.ActionJobList, updates map[int64]actions_model.Status) {
for _, j := range jobs {
if s, ok := updates[j.ID]; ok {
j.Status = s
}
}
}
// pickUpAll simulates every WAITING job being accepted by a runner.
pickUpAll := func(jobs actions_model.ActionJobList) {
for _, j := range jobs {
if j.Status == actions_model.StatusWaiting {
j.Status = actions_model.StatusRunning
}
}
}
// completeOne marks the first RUNNING job as SUCCESS.
completeOne := func(jobs actions_model.ActionJobList) {
for _, j := range jobs {
if j.Status == actions_model.StatusRunning {
j.Status = actions_model.StatusSuccess
return
}
}
}
makeJobs := func(n, maxParallel int) actions_model.ActionJobList {
list := make(actions_model.ActionJobList, n)
for i := range n {
list[i] = &actions_model.ActionRunJob{
ID: int64(i + 1),
JobID: matrixJobID,
Status: actions_model.StatusBlocked,
Needs: []string{},
MaxParallel: maxParallel,
}
}
return list
}
runResolve := func(t *testing.T, jobs actions_model.ActionJobList) map[int64]actions_model.Status {
t.Helper()
return newJobStatusResolver(jobs, nil).Resolve(t.Context())
}
// assertSlotInvariant verifies both slot constraints after every resolve cycle.
// It is a no-op when maxParallel=0 (unlimited).
assertSlotInvariant := func(t *testing.T, jobs actions_model.ActionJobList, maxParallel int, label string) {
t.Helper()
if maxParallel == 0 {
return
}
running := countStatus(jobs, actions_model.StatusRunning)
waiting := countStatus(jobs, actions_model.StatusWaiting)
success := countStatus(jobs, actions_model.StatusSuccess)
remaining := len(jobs) - success
active := running + waiting
assert.LessOrEqual(t, active, maxParallel,
"%s: running(%d)+waiting(%d) must not exceed max-parallel(%d)",
label, running, waiting, maxParallel)
assert.Equal(t, min(remaining, maxParallel), active,
"%s: running(%d)+waiting(%d) should equal min(remaining=%d, maxParallel=%d)",
label, running, waiting, remaining, maxParallel)
}
tests := []struct {
name string
totalJobs int
maxParallel int
wantInitialWaiting int // expected WAITING count after the very first Resolve()
}{
{
// 0 means unlimited: the max-parallel branch in resolve() is skipped
name: "max-parallel=0 (unlimited): all 5 jobs start immediately",
totalJobs: 5,
maxParallel: 0,
wantInitialWaiting: 5,
},
{
// Strictest case: one slot, so the resolver must promote exactly 1 job
name: "max-parallel=1 (strict serial): exactly 1 job at a time",
totalJobs: 5,
maxParallel: 1,
wantInitialWaiting: 1,
},
{
// Limit higher than job count: behaves like unlimited for this run.
name: "max-parallel=10 (N<limit): all 5 jobs start immediately",
totalJobs: 5,
maxParallel: 10,
wantInitialWaiting: 5,
},
{
// Limit lower than job count: first 10 start, remaining 2 stay blocked until slots open up.
name: "max-parallel=10 (N>limit): first 10 of 12 start, rest queue",
totalJobs: 12,
maxParallel: 10,
wantInitialWaiting: 10,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
jobs := makeJobs(tc.totalJobs, tc.maxParallel)
applyUpdates(jobs, runResolve(t, jobs))
assert.Equal(t, tc.wantInitialWaiting, countStatus(jobs, actions_model.StatusWaiting),
"phase 1: Resolve should promote exactly %d jobs to WAITING", tc.wantInitialWaiting)
assert.Equal(t, tc.totalJobs-tc.wantInitialWaiting, countStatus(jobs, actions_model.StatusBlocked),
"phase 1: remaining %d jobs should still be BLOCKED", tc.totalJobs-tc.wantInitialWaiting)
pickUpAll(jobs)
assertSlotInvariant(t, jobs, tc.maxParallel, "phase 2 (after initial pickup)")
for cycle := 1; cycle <= tc.totalJobs; cycle++ {
if countStatus(jobs, actions_model.StatusRunning) == 0 {
break
}
completeOne(jobs)
applyUpdates(jobs, runResolve(t, jobs))
label := fmt.Sprintf("phase 3 cycle %d", cycle)
assertSlotInvariant(t, jobs, tc.maxParallel, label)
pickUpAll(jobs)
}
for countStatus(jobs, actions_model.StatusRunning) > 0 {
completeOne(jobs)
applyUpdates(jobs, runResolve(t, jobs))
pickUpAll(jobs)
}
assert.Equal(t, tc.totalJobs, countStatus(jobs, actions_model.StatusSuccess),
"phase 5: all %d jobs must reach SUCCESS", tc.totalJobs)
assert.Equal(t, 0, countStatus(jobs, actions_model.StatusBlocked),
"phase 5: no jobs may remain BLOCKED")
assert.Equal(t, 0, countStatus(jobs, actions_model.StatusWaiting),
"phase 5: no jobs may remain WAITING")
assert.Equal(t, 0, countStatus(jobs, actions_model.StatusRunning),
"phase 5: no jobs may remain RUNNING")
})
}
}

View File

@ -6,10 +6,12 @@ package actions
import (
"context"
"fmt"
"strconv"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/actions/jobparser"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"
notify_service "code.gitea.io/gitea/services/notify"
@ -104,6 +106,9 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
var hasWaitingJobs bool
// waitingCountByJobID limits initial Waiting slots per JobID to MaxParallel.
waitingCountByJobID := make(map[string]int)
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
@ -133,6 +138,15 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
runJob.TokenPermissions = perms
}
// Extract max-parallel from strategy if present
if job.Strategy.MaxParallelString != "" {
if maxParallel, err := strconv.Atoi(job.Strategy.MaxParallelString); err == nil && maxParallel > 0 {
runJob.MaxParallel = maxParallel
} else {
log.Debug("failed to process max-parallel for job %s: invalid value %v: %v", id, job.Strategy.MaxParallelString, err)
}
}
// check job concurrency
if job.RawConcurrency != nil {
rawConcurrency, err := yaml.Marshal(job.RawConcurrency)
@ -159,6 +173,16 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
}
}
// Enforce max-parallel: excess jobs start as Blocked and are promoted
// by jobStatusResolver when a slot opens.
if runJob.Status == actions_model.StatusWaiting && runJob.MaxParallel > 0 {
if waitingCountByJobID[id] >= runJob.MaxParallel {
runJob.Status = actions_model.StatusBlocked
} else {
waitingCountByJobID[id]++
}
}
hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting
if err := db.Insert(ctx, runJob); err != nil {
return err

View File

@ -0,0 +1,109 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// countJobsByStatus returns the number of jobs with the given status in allJobs.
func countJobsByStatus(allJobs actions_model.ActionJobList, status actions_model.Status) int {
n := 0
for _, j := range allJobs {
if j.Status == status {
n++
}
}
return n
}
// TestMaxParallel_ServiceLayer verifies the max-parallel invariant: Running+Waiting <= MaxParallel.
func TestMaxParallel_ServiceLayer(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
t.Run("invariant: Running+Waiting <= MaxParallel", func(t *testing.T) {
runID := int64(10000)
jobID := "svc-max-parallel"
maxParallel := 2
run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 10000, Status: actions_model.StatusRunning}
require.NoError(t, db.Insert(context.Background(), run))
jobs := []*actions_model.ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r1", Status: actions_model.StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "w1", Status: actions_model.StatusWaiting, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "b1", Status: actions_model.StatusBlocked, MaxParallel: maxParallel},
}
for _, j := range jobs {
require.NoError(t, db.Insert(context.Background(), j))
}
allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
require.NoError(t, err)
running := countJobsByStatus(allJobs, actions_model.StatusRunning)
waiting := countJobsByStatus(allJobs, actions_model.StatusWaiting)
blocked := countJobsByStatus(allJobs, actions_model.StatusBlocked)
assert.LessOrEqual(t, running+waiting, maxParallel)
assert.Equal(t, 1, blocked)
})
t.Run("slot becomes available after completion", func(t *testing.T) {
runID := int64(20000)
jobID := "svc-slot-free"
maxParallel := 2
run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 20000, Status: actions_model.StatusRunning}
require.NoError(t, db.Insert(context.Background(), run))
jobs := []*actions_model.ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r1", Status: actions_model.StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r2", Status: actions_model.StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "b1", Status: actions_model.StatusBlocked, MaxParallel: maxParallel},
}
for _, j := range jobs {
require.NoError(t, db.Insert(context.Background(), j))
}
jobs[0].Status = actions_model.StatusSuccess
_, err := actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
require.NoError(t, err)
allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
require.NoError(t, err)
running := countJobsByStatus(allJobs, actions_model.StatusRunning)
assert.Equal(t, 1, running)
assert.Less(t, running, maxParallel)
})
t.Run("no max-parallel means all jobs start as Waiting", func(t *testing.T) {
runID := int64(30000)
jobID := "svc-no-limit"
run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 30000, Status: actions_model.StatusRunning}
require.NoError(t, db.Insert(context.Background(), run))
for range 5 {
require.NoError(t, db.Insert(context.Background(), &actions_model.ActionRunJob{
RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "j",
Status: actions_model.StatusWaiting, MaxParallel: 0,
}))
}
allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
require.NoError(t, err)
assert.Equal(t, 5, countJobsByStatus(allJobs, actions_model.StatusWaiting))
assert.Equal(t, 0, countJobsByStatus(allJobs, actions_model.StatusBlocked))
})
}