0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-04-03 14:43:03 +02:00

feat: Update the job limit handling and add tests

This commit is contained in:
Pascal Zimmermann 2026-04-03 09:46:49 +02:00
parent 66ee950a42
commit d3d23ecff5
3 changed files with 180 additions and 18 deletions

View File

@ -28,10 +28,8 @@ type ActionRunJob struct {
OwnerID int64 `xorm:"index"`
CommitSHA string `xorm:"index"`
IsForkPullRequest bool
// ...existing code...
Name string `xorm:"VARCHAR(255)"`
Attempt int64
Name string `xorm:"VARCHAR(255)"`
Attempt int64
// WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse
// it should contain exactly one job with global workflow fields for this model

View File

@ -313,8 +313,15 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
ret := map[int64]actions_model.Status{}
// promotedWaitingByJobID counts within-pass promotions to enforce max-parallel.
promotedWaitingByJobID := make(map[string]int)
// Pre-calculate the number of running-or-waiting jobs per JobID
runningOrWaiting := make(map[string]int)
for id, status := range r.statuses {
if status == actions_model.StatusRunning || status == actions_model.StatusWaiting {
runningOrWaiting[r.jobMap[id].JobID]++
}
}
for id, status := range r.statuses {
actionRunJob := r.jobMap[id]
if status != actions_model.StatusBlocked {
@ -352,21 +359,13 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
}
}
// Enforce max-parallel: count occupied slots from the current snapshot
// plus within-pass promotions.
// Enforce max-parallel: if the number of running-or-waiting jobs of the same
// JobID already fills the limit, leave this job blocked.
if newStatus == actions_model.StatusWaiting && actionRunJob.MaxParallel > 0 {
occupiedSlots := 0
for otherID, otherStatus := range r.statuses {
otherJob := r.jobMap[otherID]
if otherJob.JobID == actionRunJob.JobID &&
(otherStatus == actions_model.StatusRunning || otherStatus == actions_model.StatusWaiting) {
occupiedSlots++
}
}
if occupiedSlots+promotedWaitingByJobID[actionRunJob.JobID] >= actionRunJob.MaxParallel {
if runningOrWaiting[actionRunJob.JobID] >= actionRunJob.MaxParallel {
continue // no free slot; leave blocked
}
promotedWaitingByJobID[actionRunJob.JobID]++
runningOrWaiting[actionRunJob.JobID]++
}
if newStatus != actions_model.StatusBlocked {

View File

@ -4,6 +4,7 @@
package actions
import (
"fmt"
"testing"
actions_model "code.gitea.io/gitea/models/actions"
@ -183,3 +184,167 @@ jobs:
})
}
}
func Test_maxParallelWorkflowLifecycle(t *testing.T) {
const matrixJobID = "matrix"
countStatus := func(jobs actions_model.ActionJobList, s actions_model.Status) int {
n := 0
for _, j := range jobs {
if j.Status == s {
n++
}
}
return n
}
applyUpdates := func(jobs actions_model.ActionJobList, updates map[int64]actions_model.Status) {
for _, j := range jobs {
if s, ok := updates[j.ID]; ok {
j.Status = s
}
}
}
// pickUpAll simulates every WAITING job being accepted by a runner.
pickUpAll := func(jobs actions_model.ActionJobList) {
for _, j := range jobs {
if j.Status == actions_model.StatusWaiting {
j.Status = actions_model.StatusRunning
}
}
}
// completeOne marks the first RUNNING job as SUCCESS.
completeOne := func(jobs actions_model.ActionJobList) {
for _, j := range jobs {
if j.Status == actions_model.StatusRunning {
j.Status = actions_model.StatusSuccess
return
}
}
}
makeJobs := func(n, maxParallel int) actions_model.ActionJobList {
list := make(actions_model.ActionJobList, n)
for i := range n {
list[i] = &actions_model.ActionRunJob{
ID: int64(i + 1),
JobID: matrixJobID,
Status: actions_model.StatusBlocked,
Needs: []string{},
MaxParallel: maxParallel,
}
}
return list
}
runResolve := func(t *testing.T, jobs actions_model.ActionJobList) map[int64]actions_model.Status {
t.Helper()
return newJobStatusResolver(jobs, nil).Resolve(t.Context())
}
// assertSlotInvariant verifies both slot constraints after every resolve cycle.
// It is a no-op when maxParallel=0 (unlimited).
assertSlotInvariant := func(t *testing.T, jobs actions_model.ActionJobList, maxParallel int, label string) {
t.Helper()
if maxParallel == 0 {
return
}
running := countStatus(jobs, actions_model.StatusRunning)
waiting := countStatus(jobs, actions_model.StatusWaiting)
success := countStatus(jobs, actions_model.StatusSuccess)
remaining := len(jobs) - success
active := running + waiting
assert.LessOrEqual(t, active, maxParallel,
"%s: running(%d)+waiting(%d) must not exceed max-parallel(%d)",
label, running, waiting, maxParallel)
assert.Equal(t, min(remaining, maxParallel), active,
"%s: running(%d)+waiting(%d) should equal min(remaining=%d, maxParallel=%d)",
label, running, waiting, remaining, maxParallel)
}
tests := []struct {
name string
totalJobs int
maxParallel int
wantInitialWaiting int // expected WAITING count after the very first Resolve()
}{
{
// 0 means unlimited: the max-parallel branch in resolve() is skipped
name: "max-parallel=0 (unlimited): all 5 jobs start immediately",
totalJobs: 5,
maxParallel: 0,
wantInitialWaiting: 5,
},
{
// Strictest case: one slot, so the resolver must promote exactly 1 job
name: "max-parallel=1 (strict serial): exactly 1 job at a time",
totalJobs: 5,
maxParallel: 1,
wantInitialWaiting: 1,
},
{
// Limit higher than job count: behaves like unlimited for this run.
name: "max-parallel=10 (N<limit): all 5 jobs start immediately",
totalJobs: 5,
maxParallel: 10,
wantInitialWaiting: 5,
},
{
// Limit lower than job count: first 10 start, remaining 2 stay blocked until slots open up.
name: "max-parallel=10 (N>limit): first 10 of 12 start, rest queue",
totalJobs: 12,
maxParallel: 10,
wantInitialWaiting: 10,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
jobs := makeJobs(tc.totalJobs, tc.maxParallel)
applyUpdates(jobs, runResolve(t, jobs))
assert.Equal(t, tc.wantInitialWaiting, countStatus(jobs, actions_model.StatusWaiting),
"phase 1: Resolve should promote exactly %d jobs to WAITING", tc.wantInitialWaiting)
assert.Equal(t, tc.totalJobs-tc.wantInitialWaiting, countStatus(jobs, actions_model.StatusBlocked),
"phase 1: remaining %d jobs should still be BLOCKED", tc.totalJobs-tc.wantInitialWaiting)
pickUpAll(jobs)
assertSlotInvariant(t, jobs, tc.maxParallel, "phase 2 (after initial pickup)")
for cycle := 1; cycle <= tc.totalJobs; cycle++ {
if countStatus(jobs, actions_model.StatusRunning) == 0 {
break
}
completeOne(jobs)
applyUpdates(jobs, runResolve(t, jobs))
label := fmt.Sprintf("phase 3 cycle %d", cycle)
assertSlotInvariant(t, jobs, tc.maxParallel, label)
pickUpAll(jobs)
}
for countStatus(jobs, actions_model.StatusRunning) > 0 {
completeOne(jobs)
applyUpdates(jobs, runResolve(t, jobs))
pickUpAll(jobs)
}
assert.Equal(t, tc.totalJobs, countStatus(jobs, actions_model.StatusSuccess),
"phase 5: all %d jobs must reach SUCCESS", tc.totalJobs)
assert.Equal(t, 0, countStatus(jobs, actions_model.StatusBlocked),
"phase 5: no jobs may remain BLOCKED")
assert.Equal(t, 0, countStatus(jobs, actions_model.StatusWaiting),
"phase 5: no jobs may remain WAITING")
assert.Equal(t, 0, countStatus(jobs, actions_model.StatusRunning),
"phase 5: no jobs may remain RUNNING")
})
}
}