diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 17a056efda..f3dbd364dd 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -28,10 +28,8 @@ type ActionRunJob struct { OwnerID int64 `xorm:"index"` CommitSHA string `xorm:"index"` IsForkPullRequest bool - - // ...existing code... - Name string `xorm:"VARCHAR(255)"` - Attempt int64 + Name string `xorm:"VARCHAR(255)"` + Attempt int64 // WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse // it should contain exactly one job with global workflow fields for this model diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index b3cdc6d3ad..bb4096d600 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -313,8 +313,15 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} - // promotedWaitingByJobID counts within-pass promotions to enforce max-parallel. - promotedWaitingByJobID := make(map[string]int) + + // Pre-calculate the number of running-or-waiting jobs per JobID + runningOrWaiting := make(map[string]int) + for id, status := range r.statuses { + if status == actions_model.StatusRunning || status == actions_model.StatusWaiting { + runningOrWaiting[r.jobMap[id].JobID]++ + } + } + for id, status := range r.statuses { actionRunJob := r.jobMap[id] if status != actions_model.StatusBlocked { @@ -352,21 +359,13 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model } } - // Enforce max-parallel: count occupied slots from the current snapshot - // plus within-pass promotions. + // Enforce max-parallel: if the number of running-or-waiting jobs of the same + // JobID already fills the limit, leave this job blocked. if newStatus == actions_model.StatusWaiting && actionRunJob.MaxParallel > 0 { - occupiedSlots := 0 - for otherID, otherStatus := range r.statuses { - otherJob := r.jobMap[otherID] - if otherJob.JobID == actionRunJob.JobID && - (otherStatus == actions_model.StatusRunning || otherStatus == actions_model.StatusWaiting) { - occupiedSlots++ - } - } - if occupiedSlots+promotedWaitingByJobID[actionRunJob.JobID] >= actionRunJob.MaxParallel { + if runningOrWaiting[actionRunJob.JobID] >= actionRunJob.MaxParallel { continue // no free slot; leave blocked } - promotedWaitingByJobID[actionRunJob.JobID]++ + runningOrWaiting[actionRunJob.JobID]++ } if newStatus != actions_model.StatusBlocked { diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 3b6768bdd8..9bd1fd1913 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -4,6 +4,7 @@ package actions import ( + "fmt" "testing" actions_model "code.gitea.io/gitea/models/actions" @@ -183,3 +184,167 @@ jobs: }) } } + +func Test_maxParallelWorkflowLifecycle(t *testing.T) { + const matrixJobID = "matrix" + + countStatus := func(jobs actions_model.ActionJobList, s actions_model.Status) int { + n := 0 + for _, j := range jobs { + if j.Status == s { + n++ + } + } + return n + } + + applyUpdates := func(jobs actions_model.ActionJobList, updates map[int64]actions_model.Status) { + for _, j := range jobs { + if s, ok := updates[j.ID]; ok { + j.Status = s + } + } + } + + // pickUpAll simulates every WAITING job being accepted by a runner. + pickUpAll := func(jobs actions_model.ActionJobList) { + for _, j := range jobs { + if j.Status == actions_model.StatusWaiting { + j.Status = actions_model.StatusRunning + } + } + } + + // completeOne marks the first RUNNING job as SUCCESS. + completeOne := func(jobs actions_model.ActionJobList) { + for _, j := range jobs { + if j.Status == actions_model.StatusRunning { + j.Status = actions_model.StatusSuccess + return + } + } + } + + makeJobs := func(n, maxParallel int) actions_model.ActionJobList { + list := make(actions_model.ActionJobList, n) + for i := range n { + list[i] = &actions_model.ActionRunJob{ + ID: int64(i + 1), + JobID: matrixJobID, + Status: actions_model.StatusBlocked, + Needs: []string{}, + MaxParallel: maxParallel, + } + } + return list + } + + runResolve := func(t *testing.T, jobs actions_model.ActionJobList) map[int64]actions_model.Status { + t.Helper() + return newJobStatusResolver(jobs, nil).Resolve(t.Context()) + } + + // assertSlotInvariant verifies both slot constraints after every resolve cycle. + // It is a no-op when maxParallel=0 (unlimited). + assertSlotInvariant := func(t *testing.T, jobs actions_model.ActionJobList, maxParallel int, label string) { + t.Helper() + if maxParallel == 0 { + return + } + running := countStatus(jobs, actions_model.StatusRunning) + waiting := countStatus(jobs, actions_model.StatusWaiting) + success := countStatus(jobs, actions_model.StatusSuccess) + remaining := len(jobs) - success + active := running + waiting + + assert.LessOrEqual(t, active, maxParallel, + "%s: running(%d)+waiting(%d) must not exceed max-parallel(%d)", + label, running, waiting, maxParallel) + + assert.Equal(t, min(remaining, maxParallel), active, + "%s: running(%d)+waiting(%d) should equal min(remaining=%d, maxParallel=%d)", + label, running, waiting, remaining, maxParallel) + } + + tests := []struct { + name string + totalJobs int + maxParallel int + wantInitialWaiting int // expected WAITING count after the very first Resolve() + }{ + { + // 0 means unlimited: the max-parallel branch in resolve() is skipped + name: "max-parallel=0 (unlimited): all 5 jobs start immediately", + totalJobs: 5, + maxParallel: 0, + wantInitialWaiting: 5, + }, + { + // Strictest case: one slot, so the resolver must promote exactly 1 job + name: "max-parallel=1 (strict serial): exactly 1 job at a time", + totalJobs: 5, + maxParallel: 1, + wantInitialWaiting: 1, + }, + { + // Limit higher than job count: behaves like unlimited for this run. + name: "max-parallel=10 (Nlimit): first 10 of 12 start, rest queue", + totalJobs: 12, + maxParallel: 10, + wantInitialWaiting: 10, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + jobs := makeJobs(tc.totalJobs, tc.maxParallel) + + applyUpdates(jobs, runResolve(t, jobs)) + + assert.Equal(t, tc.wantInitialWaiting, countStatus(jobs, actions_model.StatusWaiting), + "phase 1: Resolve should promote exactly %d jobs to WAITING", tc.wantInitialWaiting) + assert.Equal(t, tc.totalJobs-tc.wantInitialWaiting, countStatus(jobs, actions_model.StatusBlocked), + "phase 1: remaining %d jobs should still be BLOCKED", tc.totalJobs-tc.wantInitialWaiting) + + pickUpAll(jobs) + assertSlotInvariant(t, jobs, tc.maxParallel, "phase 2 (after initial pickup)") + + for cycle := 1; cycle <= tc.totalJobs; cycle++ { + if countStatus(jobs, actions_model.StatusRunning) == 0 { + break + } + + completeOne(jobs) + applyUpdates(jobs, runResolve(t, jobs)) + + label := fmt.Sprintf("phase 3 cycle %d", cycle) + assertSlotInvariant(t, jobs, tc.maxParallel, label) + + pickUpAll(jobs) + } + + for countStatus(jobs, actions_model.StatusRunning) > 0 { + completeOne(jobs) + applyUpdates(jobs, runResolve(t, jobs)) + pickUpAll(jobs) + } + + assert.Equal(t, tc.totalJobs, countStatus(jobs, actions_model.StatusSuccess), + "phase 5: all %d jobs must reach SUCCESS", tc.totalJobs) + assert.Equal(t, 0, countStatus(jobs, actions_model.StatusBlocked), + "phase 5: no jobs may remain BLOCKED") + assert.Equal(t, 0, countStatus(jobs, actions_model.StatusWaiting), + "phase 5: no jobs may remain WAITING") + assert.Equal(t, 0, countStatus(jobs, actions_model.StatusRunning), + "phase 5: no jobs may remain RUNNING") + }) + } +} +