diff --git a/models/actions/max_parallel_simple_test.go b/models/actions/max_parallel_simple_test.go deleted file mode 100644 index ac75036b6a..0000000000 --- a/models/actions/max_parallel_simple_test.go +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright 2026 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package actions - -import ( - "context" - "testing" - - "code.gitea.io/gitea/models/db" - "code.gitea.io/gitea/models/unittest" - - "github.com/stretchr/testify/assert" -) - -// TestMaxParallelOne_SimpleSequence tests the core issue: max-parallel=1 execution -func TestMaxParallelOne_SimpleSequence(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - ctx := context.Background() - - runID := int64(11000) - jobID := "simple-sequence-job" - maxParallel := 1 - - // Create ActionRun - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 11000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create jobs - job1 := &ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Job 1", - Status: StatusWaiting, - MaxParallel: maxParallel, - } - job2 := &ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Job 2", - Status: StatusWaiting, - MaxParallel: maxParallel, - } - job3 := &ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Job 3", - Status: StatusWaiting, - MaxParallel: maxParallel, - } - - assert.NoError(t, db.Insert(ctx, job1)) - assert.NoError(t, db.Insert(ctx, job2)) - assert.NoError(t, db.Insert(ctx, job3)) - - // TEST 1: Initially, 0 running - running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, running, "Should have 0 jobs running initially") - - // TEST 2: Job1 starts - job1.Status = StatusRunning - _, err = UpdateRunJob(ctx, job1, nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, running, "Should have 1 job running after job1 starts") - - // TEST 3: Job1 completes - job1.Status = StatusSuccess - _, err = UpdateRunJob(ctx, job1, nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, running, "Should have 0 jobs running after job1 completes - THIS IS THE CRITICAL TEST") - - // TEST 4: Job2 should now be able to start - job2.Status = StatusRunning - _, err = UpdateRunJob(ctx, job2, nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, running, "Should have 1 job running after job2 starts - IF THIS FAILS, THE BUG IS NOT FIXED") - - // TEST 5: Job2 completes - job2.Status = StatusSuccess - _, err = UpdateRunJob(ctx, job2, nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, running, "Should have 0 jobs running after job2 completes") - - // TEST 6: Job3 starts - job3.Status = StatusRunning - _, err = UpdateRunJob(ctx, job3, nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, running, "Should have 1 job running after job3 starts") - - t.Log("✅ All sequential execution tests passed!") -} - -// TestMaxParallelOne_FreshJobFetch tests the fresh job fetch mechanism -func TestMaxParallelOne_FreshJobFetch(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - ctx := context.Background() - - runID := int64(12000) - jobID := "fresh-fetch-job" - - // Create ActionRun - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 12000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create a job - job := &ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Fresh Fetch Test Job", - Status: StatusWaiting, - } - assert.NoError(t, db.Insert(ctx, job)) - - // Fetch fresh copy (simulating CreateTaskForRunner behavior) - freshJob, err := GetRunJobByID(ctx, job.ID) - assert.NoError(t, err) - assert.NotNil(t, freshJob) - assert.Equal(t, StatusWaiting, freshJob.Status, "Fresh job should have WAITING status") - assert.Equal(t, int64(0), freshJob.TaskID, "Fresh job should have TaskID=0") - - // Update original job to RUNNING - job.Status = StatusRunning - _, err = UpdateRunJob(ctx, job, nil) - assert.NoError(t, err) - - // Fetch fresh copy again - should reflect the update - freshJob2, err := GetRunJobByID(ctx, job.ID) - assert.NoError(t, err) - assert.NotNil(t, freshJob2) - assert.Equal(t, StatusRunning, freshJob2.Status, "Fresh job should now have RUNNING status") - - t.Log("✅ Fresh job fetch mechanism works correctly!") -} - -// TestCountRunningJobs tests the CountRunningJobsByWorkflowAndRun function -func TestCountRunningJobs(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - ctx := context.Background() - - runID := int64(13000) - jobID := "count-jobs" - - // Create ActionRun - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 13000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create 5 jobs - for i := 1; i <= 5; i++ { - job := &ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Job " + string(rune(i)), - Status: StatusWaiting, - } - assert.NoError(t, db.Insert(ctx, job)) - } - - // Get all jobs - jobs, err := GetRunJobsByRunID(ctx, runID) - assert.NoError(t, err) - assert.Len(t, jobs, 5) - - // Initially 0 running - running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, running) - - // Set job 0 to RUNNING - jobs[0].Status = StatusRunning - _, err = UpdateRunJob(ctx, jobs[0], nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, running) - - // Set job 1 to RUNNING - jobs[1].Status = StatusRunning - _, err = UpdateRunJob(ctx, jobs[1], nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 2, running) - - // Set job 0 to SUCCESS (not counted as RUNNING anymore) - jobs[0].Status = StatusSuccess - _, err = UpdateRunJob(ctx, jobs[0], nil) - assert.NoError(t, err) - - running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, running) - - t.Log("✅ Count running jobs works correctly!") -} diff --git a/models/actions/run_job.go b/models/actions/run_job.go index de16741a82..30a24368ea 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -21,7 +21,7 @@ import ( // ActionRunJob represents a job of a run type ActionRunJob struct { ID int64 - RunID int64 `xorm:"index"` + RunID int64 `xorm:"index index(idx_action_run_job_run_id_job_id)"` Run *ActionRun `xorm:"-"` RepoID int64 `xorm:"index(repo_concurrency)"` Repo *repo_model.Repository `xorm:"-"` @@ -35,7 +35,7 @@ type ActionRunJob struct { // it should contain exactly one job with global workflow fields for this model WorkflowPayload []byte - JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id + JobID string `xorm:"VARCHAR(255) index(idx_action_run_job_run_id_job_id)"` // job id in workflow, not job's id Needs []string `xorm:"JSON TEXT"` RunsOn []string `xorm:"JSON TEXT"` TaskID int64 // the latest task of the job @@ -55,8 +55,9 @@ type ActionRunJob struct { // Org/repo clamps are enforced when the token is used at runtime. // It is JSON-encoded repo_model.ActionsTokenPermissions and may be empty if not specified. TokenPermissions *repo_model.ActionsTokenPermissions `xorm:"JSON TEXT"` - // Matrix job support - MaxParallel int // Max parallel jobs from strategy.max-parallel (0 = unlimited) + // MaxParallel is the max-parallel value from strategy.max-parallel (0 = unlimited). + // All matrix jobs sharing the same RunID+JobID share this value. + MaxParallel int `xorm:"NOT NULL DEFAULT 0"` Started timeutil.TimeStamp Stopped timeutil.TimeStamp diff --git a/models/actions/run_job_maxparallel_test.go b/models/actions/run_job_maxparallel_test.go index badc0cce36..9993482451 100644 --- a/models/actions/run_job_maxparallel_test.go +++ b/models/actions/run_job_maxparallel_test.go @@ -11,245 +11,46 @@ import ( "code.gitea.io/gitea/models/unittest" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestActionRunJob_MaxParallel(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) +func getRunJobByID(ctx context.Context, t *testing.T, id int64) *ActionRunJob { + t.Helper() + got, exist, err := db.GetByID[ActionRunJob](ctx, id) + require.NoError(t, err) + require.True(t, exist) + return got +} + +// TestMaxParallel_FieldPersistence verifies that MaxParallel is stored and retrieved correctly. +func TestMaxParallel_FieldPersistence(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) ctx := context.Background() - t.Run("NoMaxParallel", func(t *testing.T) { - job := &ActionRunJob{ - RunID: 1, - RepoID: 1, - OwnerID: 1, - JobID: "test-job-1", - Name: "Test Job", - Status: StatusWaiting, - MaxParallel: 0, // No limit - } - assert.NoError(t, db.Insert(ctx, job)) + run := &ActionRun{ID: 100, RepoID: 1, OwnerID: 1, Index: 100, Status: StatusRunning} + require.NoError(t, db.Insert(ctx, run)) - retrieved, err := GetRunJobByID(ctx, job.ID) - assert.NoError(t, err) - assert.Equal(t, 0, retrieved.MaxParallel) + t.Run("zero value means unlimited", func(t *testing.T) { + job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "no-limit", Name: "No Limit", Status: StatusWaiting, MaxParallel: 0} + require.NoError(t, db.Insert(ctx, job)) + got := getRunJobByID(ctx, t, job.ID) + assert.Equal(t, 0, got.MaxParallel) }) - t.Run("WithMaxParallel", func(t *testing.T) { - job := &ActionRunJob{ - RunID: 1, - RepoID: 1, - OwnerID: 1, - JobID: "test-job-2", - Name: "Matrix Job", - Status: StatusWaiting, - MaxParallel: 3, - } - assert.NoError(t, db.Insert(ctx, job)) - - retrieved, err := GetRunJobByID(ctx, job.ID) - assert.NoError(t, err) - assert.Equal(t, 3, retrieved.MaxParallel) + t.Run("positive value is persisted", func(t *testing.T) { + job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "with-limit", Name: "With Limit", Status: StatusWaiting, MaxParallel: 3} + require.NoError(t, db.Insert(ctx, job)) + got := getRunJobByID(ctx, t, job.ID) + assert.Equal(t, 3, got.MaxParallel) }) - t.Run("UpdateMaxParallel", func(t *testing.T) { - // Create ActionRun first - run := &ActionRun{ - ID: 1, - RepoID: 1, - OwnerID: 1, - Status: StatusRunning, - } - // Note: This might fail if run already exists from previous tests, but that's okay - _ = db.Insert(ctx, run) - - job := &ActionRunJob{ - RunID: 1, - RepoID: 1, - OwnerID: 1, - JobID: "test-job-4", - Name: "Updatable Job", - Status: StatusWaiting, - MaxParallel: 5, - } - assert.NoError(t, db.Insert(ctx, job)) - - // Update max parallel + t.Run("can be updated via UpdateRunJob", func(t *testing.T) { + job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "updatable", Name: "Updatable", Status: StatusWaiting, MaxParallel: 5} + require.NoError(t, db.Insert(ctx, job)) job.MaxParallel = 10 _, err := UpdateRunJob(ctx, job, nil, "max_parallel") - assert.NoError(t, err) - - retrieved, err := GetRunJobByID(ctx, job.ID) - assert.NoError(t, err) - assert.Equal(t, 10, retrieved.MaxParallel) - }) -} - -func TestActionRunJob_MaxParallelEnforcement(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - ctx := context.Background() - - t.Run("EnforceMaxParallel", func(t *testing.T) { - runID := int64(5000) - jobID := "parallel-enforced-job" - maxParallel := 2 - - // Create ActionRun first - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 5000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create jobs simulating matrix execution - jobs := []*ActionRunJob{ - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel}, - } - - for _, job := range jobs { - assert.NoError(t, db.Insert(ctx, job)) - } - - // Verify running count - runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, maxParallel, runningCount, "Should have exactly max-parallel jobs running") - - // Simulate job completion - jobs[0].Status = StatusSuccess - _, err = UpdateRunJob(ctx, jobs[0], nil, "status") - assert.NoError(t, err) - - // Now running count should be 1 - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount) - - // Simulate next job starting - jobs[2].Status = StatusRunning - _, err = UpdateRunJob(ctx, jobs[2], nil, "status") - assert.NoError(t, err) - - // Back to max-parallel - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, maxParallel, runningCount) - }) - - t.Run("MaxParallelOne_SequentialExecution", func(t *testing.T) { - runID := int64(6000) - jobID := "sequential-job" - maxParallel := 1 - - // Create ActionRun first - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 6000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create jobs simulating sequential execution with max-parallel=1 - jobs := []*ActionRunJob{ - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel}, - } - - for _, job := range jobs { - assert.NoError(t, db.Insert(ctx, job)) - } - - // Verify initial running count is 1 - runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1") - - // Complete first job - jobs[0].Status = StatusSuccess - _, err = UpdateRunJob(ctx, jobs[0], nil, "status") - assert.NoError(t, err) - - // Now running count should be 0 - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes") - - // Second job can now start - jobs[1].Status = StatusRunning - _, err = UpdateRunJob(ctx, jobs[1], nil, "status") - assert.NoError(t, err) - - // Running count should be 1 again - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts") - - // Complete second job - jobs[1].Status = StatusSuccess - _, err = UpdateRunJob(ctx, jobs[1], nil, "status") - assert.NoError(t, err) - - // Third job can now start - jobs[2].Status = StatusRunning - _, err = UpdateRunJob(ctx, jobs[2], nil, "status") - assert.NoError(t, err) - - // Running count should still be 1 - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after third job starts") - }) - - t.Run("MaxParallelOne_WithFailure", func(t *testing.T) { - runID := int64(7000) - jobID := "sequential-with-failure-job" - maxParallel := 1 - - // Create ActionRun first - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 7000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create jobs - jobs := []*ActionRunJob{ - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel}, - } - - for _, job := range jobs { - assert.NoError(t, db.Insert(ctx, job)) - } - - // First job fails - jobs[0].Status = StatusFailure - _, err := UpdateRunJob(ctx, jobs[0], nil, "status") - assert.NoError(t, err) - - // Verify no jobs are running - runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, runningCount, "Should have 0 jobs running after job fails") - - // Second job can still start - jobs[1].Status = StatusRunning - _, err = UpdateRunJob(ctx, jobs[1], nil, "status") - assert.NoError(t, err) - - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after job starts") + require.NoError(t, err) + got := getRunJobByID(ctx, t, job.ID) + assert.Equal(t, 10, got.MaxParallel) }) } diff --git a/models/actions/task.go b/models/actions/task.go index 49bdfaba54..1f4a522512 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -264,25 +264,8 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask continue } - // Check max-parallel constraint for matrix jobs - // Note: This is a best-effort check with a small race window. Multiple runners - // could simultaneously check the count before any commits, potentially allowing - // the limit to be briefly exceeded (e.g., if limit=2 and count=1, two runners - // might both proceed). Perfect enforcement would require row-level locking across - // all jobs with the same run_id+job_id, which has a significant performance impact. - // The race window is small since jobs are picked and committed quickly. - if v.MaxParallel > 0 { - runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, v.RunID, v.JobID) - if err != nil { - log.Error("Failed to count running jobs for max-parallel check: %v", err) - continue - } - if runningCount >= v.MaxParallel { - log.Debug("Job %s (run %d) skipped: %d/%d jobs already running (max-parallel)", - v.JobID, v.RunID, runningCount, v.MaxParallel) - continue - } - } + // max-parallel is enforced at insertion time (InsertRun) and by + // jobStatusResolver, so a Waiting job is guaranteed a free slot. job = v break @@ -547,14 +530,3 @@ func getTaskIDFromCache(token string) int64 { } return t } - -// CountRunningJobsByWorkflowAndRun counts running jobs for a specific workflow/run combo -// Used to enforce max-parallel limits on matrix jobs -func CountRunningJobsByWorkflowAndRun(ctx context.Context, runID int64, jobID string) (int, error) { - count, err := db.GetEngine(ctx). - Where("run_id = ?", runID). - And("job_id = ?", jobID). - And("status = ?", StatusRunning). - Count(new(ActionRunJob)) - return int(count), err -} diff --git a/models/actions/task_count_test.go b/models/actions/task_count_test.go deleted file mode 100644 index 12117a87e5..0000000000 --- a/models/actions/task_count_test.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2026 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package actions - -import ( - "context" - "testing" - - "code.gitea.io/gitea/models/db" - "code.gitea.io/gitea/models/unittest" - - "github.com/stretchr/testify/assert" -) - -func TestCountRunningJobsByWorkflowAndRun(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - ctx := context.Background() - - t.Run("NoRunningJobs", func(t *testing.T) { - count, err := CountRunningJobsByWorkflowAndRun(ctx, 999999, "nonexistent") - assert.NoError(t, err) - assert.Equal(t, 0, count) - }) - - t.Run("WithRunningJobs", func(t *testing.T) { - runID := int64(1000) - jobID := "test-job" - - // Create ActionRun first - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 1000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create running jobs - for range 3 { - job := &ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Test Job", - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, job)) - } - - count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 3, count) - }) - - t.Run("DifferentJobIDs", func(t *testing.T) { - runID := int64(2000) - - // Create ActionRun first - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 2000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create jobs with different job IDs - for i := range 5 { - job := &ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: "job-" + string(rune('A'+i)), - Name: "Test Job", - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, job)) - } - - // Count for specific job ID should be 1 - count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, "job-A") - assert.NoError(t, err) - assert.Equal(t, 1, count) - }) - - t.Run("MatrixJobsWithMaxParallel", func(t *testing.T) { - runID := int64(3000) - jobID := "matrix-job" - maxParallel := 2 - - // Create ActionRun first - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 3000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create matrix jobs - jobs := []*ActionRunJob{ - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel}, - } - - for _, job := range jobs { - assert.NoError(t, db.Insert(ctx, job)) - } - - // Count running jobs - should be 2 (matching max-parallel) - count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 2, count) - assert.Equal(t, maxParallel, count, "Running jobs should equal max-parallel") - }) -} diff --git a/models/actions/task_sequential_execution_test.go b/models/actions/task_sequential_execution_test.go deleted file mode 100644 index 017c8437ad..0000000000 --- a/models/actions/task_sequential_execution_test.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright 2026 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package actions - -import ( - "context" - "testing" - - "code.gitea.io/gitea/models/db" - "code.gitea.io/gitea/models/unittest" - - "github.com/stretchr/testify/assert" -) - -// TestTaskCreation_MaxParallel_One tests that tasks are properly sequenced -// when max-parallel=1, ensuring no hang after task completion -func TestTaskCreation_MaxParallel_One(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - ctx := context.Background() - - t.Run("SequentialTaskCreationMaxParallelOne", func(t *testing.T) { - runID := int64(8000) - jobID := "task-sequential-max-parallel-one" - maxParallel := 1 - - // Setup: Create ActionRun - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 8000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create runner - runner := &ActionRunner{ - ID: 1, - UUID: "test-runner-1", - Name: "Test Runner", - OwnerID: 1, - } - assert.NoError(t, db.Insert(ctx, runner)) - - // Create jobs with max-parallel=1 - jobs := []*ActionRunJob{ - { - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Sequential Job 1", - Status: StatusWaiting, - MaxParallel: maxParallel, - }, - { - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Sequential Job 2", - Status: StatusWaiting, - MaxParallel: maxParallel, - }, - { - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Sequential Job 3", - Status: StatusWaiting, - MaxParallel: maxParallel, - }, - } - - for _, job := range jobs { - assert.NoError(t, db.Insert(ctx, job)) - } - - // Verify initial state: all jobs are waiting - allJobs, err := GetRunJobsByRunID(ctx, runID) - assert.NoError(t, err) - assert.Len(t, allJobs, 3) - - // Verify that only 1 job should be able to run at a time with max-parallel=1 - runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, runningCount, "Should have 0 jobs running initially") - - // Simulate starting first job - allJobs[0].Status = StatusRunning - _, err = UpdateRunJob(ctx, allJobs[0], nil) - assert.NoError(t, err) - - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1") - - // Complete first job - allJobs[0].Status = StatusSuccess - _, err = UpdateRunJob(ctx, allJobs[0], nil) - assert.NoError(t, err) - - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes") - - // This is the critical test: the second job should be able to start - // Previously, the system might hang here - allJobs[1].Status = StatusRunning - _, err = UpdateRunJob(ctx, allJobs[1], nil) - assert.NoError(t, err) - - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts (critical test)") - - // Complete the second job - allJobs[1].Status = StatusSuccess - _, err = UpdateRunJob(ctx, allJobs[1], nil) - assert.NoError(t, err) - - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, runningCount, "Should have 0 jobs running after second job completes") - - // Third job should also be able to start - allJobs[2].Status = StatusRunning - _, err = UpdateRunJob(ctx, allJobs[2], nil) - assert.NoError(t, err) - - runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third job") - }) - - t.Run("MaxParallelConstraintAfterTaskFetch", func(t *testing.T) { - runID := int64(9000) - jobID := "max-parallel-fetch-job" - maxParallel := 1 - - // Setup: Create ActionRun - run := &ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 9000, - Status: StatusRunning, - } - assert.NoError(t, db.Insert(ctx, run)) - - // Create jobs - jobs := []*ActionRunJob{ - { - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Job A", - Status: StatusWaiting, - MaxParallel: maxParallel, - }, - { - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Job B", - Status: StatusWaiting, - MaxParallel: maxParallel, - }, - } - - for _, job := range jobs { - assert.NoError(t, db.Insert(ctx, job)) - } - - // Refresh jobs to get IDs - allJobs, err := GetRunJobsByRunID(ctx, runID) - assert.NoError(t, err) - - // Start first job - allJobs[0].Status = StatusRunning - _, err = UpdateRunJob(ctx, allJobs[0], nil) - assert.NoError(t, err) - - // Verify constraint - runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount) - - // Try to start second job while first is still running - // This should not be allowed due to max-parallel=1 - freshAllJobs, err := GetRunJobsByRunID(ctx, runID) - assert.NoError(t, err) - - // Check if we can determine from the count that the second job should not start - for i := 1; i < len(freshAllJobs); i++ { - if freshAllJobs[i].Status == StatusWaiting { - // Before starting this job, verify max-parallel constraint - runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) - assert.NoError(t, err) - if runningCount >= maxParallel { - // This job should wait - assert.Equal(t, StatusWaiting, freshAllJobs[i].Status, - "Job should remain waiting when max-parallel limit is reached") - } - } - } - }) -} diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 20a4f81eab..3d41771c57 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -313,6 +313,8 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} + // promotedWaitingByJobID counts within-pass promotions to enforce max-parallel. + promotedWaitingByJobID := make(map[string]int) for id, status := range r.statuses { actionRunJob := r.jobMap[id] if status != actions_model.StatusBlocked { @@ -337,7 +339,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model shouldStartJob := true if !allSucceed { // Not all dependent jobs completed successfully: - // * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition. + // * if the job has an "if" condition, it can be started; then the act_runner will evaluate the "if" condition. // * otherwise, the job should be skipped. shouldStartJob = r.resolveJobHasIfCondition(actionRunJob) } @@ -350,6 +352,23 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model } } + // Enforce max-parallel: count occupied slots from the current snapshot + // plus within-pass promotions. + if newStatus == actions_model.StatusWaiting && actionRunJob.MaxParallel > 0 { + occupiedSlots := 0 + for otherID, otherStatus := range r.statuses { + otherJob := r.jobMap[otherID] + if otherJob.JobID == actionRunJob.JobID && + (otherStatus == actions_model.StatusRunning || otherStatus == actions_model.StatusWaiting) { + occupiedSlots++ + } + } + if occupiedSlots+promotedWaitingByJobID[actionRunJob.JobID] >= actionRunJob.MaxParallel { + continue // no free slot; leave blocked + } + promotedWaitingByJobID[actionRunJob.JobID]++ + } + if newStatus != actions_model.StatusBlocked { ret[id] = newStatus } diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index a2152fb270..3b6768bdd8 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -126,11 +126,60 @@ jobs: }, want: map[int64]actions_model.Status{2: actions_model.StatusSkipped}, }, + { + name: "max-parallel=1 promotes exactly one blocked job when one slot is open", + jobs: actions_model.ActionJobList{ + {ID: 1, JobID: "build", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 1}, + {ID: 2, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1}, + {ID: 3, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1}, + }, + want: map[int64]actions_model.Status{}, + }, + { + name: "max-parallel=1 promotes one job after running job finishes", + jobs: actions_model.ActionJobList{ + {ID: 1, JobID: "build", Status: actions_model.StatusSuccess, Needs: []string{}, MaxParallel: 1}, + {ID: 2, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1}, + {ID: 3, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1}, + }, + want: nil, // map iteration is non-deterministic; checked by count below + }, + { + name: "max-parallel=2 does not promote when limit is reached", + jobs: actions_model.ActionJobList{ + {ID: 1, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2}, + {ID: 2, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2}, + {ID: 3, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2}, + {ID: 4, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2}, + }, + want: map[int64]actions_model.Status{}, + }, + { + name: "max-parallel=2 promotes one job when one slot opens", + jobs: actions_model.ActionJobList{ + {ID: 1, JobID: "test", Status: actions_model.StatusSuccess, Needs: []string{}, MaxParallel: 2}, + {ID: 2, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2}, + {ID: 3, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2}, + {ID: 4, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2}, + }, + want: nil, // checked by count below + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := newJobStatusResolver(tt.jobs, nil) - assert.Equal(t, tt.want, r.Resolve(t.Context())) + got := r.Resolve(t.Context()) + if tt.want == nil { + waitingCount := 0 + for _, s := range got { + if s == actions_model.StatusWaiting { + waitingCount++ + } + } + assert.Equal(t, 1, waitingCount, "expected exactly 1 job promoted to Waiting, got %v", got) + } else { + assert.Equal(t, tt.want, got) + } }) } } diff --git a/services/actions/run.go b/services/actions/run.go index bec95fe7fc..ace9ceb08f 100644 --- a/services/actions/run.go +++ b/services/actions/run.go @@ -10,8 +10,8 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" - "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/actions/jobparser" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" notify_service "code.gitea.io/gitea/services/notify" @@ -106,6 +106,9 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs)) var hasWaitingJobs bool + // waitingCountByJobID limits initial Waiting slots per JobID to MaxParallel. + waitingCountByJobID := make(map[string]int) + for _, v := range jobs { id, job := v.Job() needs := job.Needs() @@ -170,6 +173,16 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar } } + // Enforce max-parallel: excess jobs start as Blocked and are promoted + // by jobStatusResolver when a slot opens. + if runJob.Status == actions_model.StatusWaiting && runJob.MaxParallel > 0 { + if waitingCountByJobID[id] >= runJob.MaxParallel { + runJob.Status = actions_model.StatusBlocked + } else { + waitingCountByJobID[id]++ + } + } + hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting if err := db.Insert(ctx, runJob); err != nil { return err diff --git a/services/actions/task_assignment_test.go b/services/actions/task_assignment_test.go index b1bb82fcd2..ad5d0b5087 100644 --- a/services/actions/task_assignment_test.go +++ b/services/actions/task_assignment_test.go @@ -12,314 +12,98 @@ import ( "code.gitea.io/gitea/models/unittest" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestMaxParallelJobStatusAndCounting(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) +// countJobsByStatus returns the number of jobs with the given status in allJobs. +func countJobsByStatus(allJobs actions_model.ActionJobList, status actions_model.Status) int { + n := 0 + for _, j := range allJobs { + if j.Status == status { + n++ + } + } + return n +} - t.Run("MaxParallelReached", func(t *testing.T) { +// TestMaxParallel_ServiceLayer verifies the max-parallel invariant: Running+Waiting <= MaxParallel. +func TestMaxParallel_ServiceLayer(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + t.Run("invariant: Running+Waiting <= MaxParallel", func(t *testing.T) { runID := int64(10000) - jobID := "max-parallel-job" + jobID := "svc-max-parallel" maxParallel := 2 - // Create ActionRun first - run := &actions_model.ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 10000, - Status: actions_model.StatusRunning, - } - assert.NoError(t, db.Insert(context.Background(), run)) + run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 10000, Status: actions_model.StatusRunning} + require.NoError(t, db.Insert(context.Background(), run)) - // Create waiting jobs - for range 4 { - job := &actions_model.ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Test Job", - Status: actions_model.StatusWaiting, - MaxParallel: maxParallel, - } - assert.NoError(t, db.Insert(context.Background(), job)) + jobs := []*actions_model.ActionRunJob{ + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r1", Status: actions_model.StatusRunning, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "w1", Status: actions_model.StatusWaiting, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "b1", Status: actions_model.StatusBlocked, MaxParallel: maxParallel}, + } + for _, j := range jobs { + require.NoError(t, db.Insert(context.Background(), j)) } - // Start 2 jobs (max-parallel limit) - jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) - assert.NoError(t, err) - assert.Len(t, jobs, 4) + allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) + require.NoError(t, err) - for i := range 2 { - jobs[i].Status = actions_model.StatusRunning - _, err := actions_model.UpdateRunJob(context.Background(), jobs[i], nil, "status") - assert.NoError(t, err) - } + running := countJobsByStatus(allJobs, actions_model.StatusRunning) + waiting := countJobsByStatus(allJobs, actions_model.StatusWaiting) + blocked := countJobsByStatus(allJobs, actions_model.StatusBlocked) - // Verify max-parallel is enforced - runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, maxParallel, runningCount) - - // Remaining jobs should stay in waiting - remainingJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) - assert.NoError(t, err) - - waitingCount := 0 - for _, job := range remainingJobs { - if job.Status == actions_model.StatusWaiting { - waitingCount++ - } - } - assert.Equal(t, 2, waitingCount) + assert.LessOrEqual(t, running+waiting, maxParallel) + assert.Equal(t, 1, blocked) }) - t.Run("MaxParallelNotSet", func(t *testing.T) { + t.Run("slot becomes available after completion", func(t *testing.T) { runID := int64(20000) - jobID := "no-limit-job" - - // Create ActionRun first - run := &actions_model.ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 20000, - Status: actions_model.StatusRunning, - } - assert.NoError(t, db.Insert(context.Background(), run)) - - // Create jobs without max-parallel - for range 5 { - job := &actions_model.ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Test Job", - Status: actions_model.StatusWaiting, - MaxParallel: 0, // No limit - } - assert.NoError(t, db.Insert(context.Background(), job)) - } - - // All jobs can run simultaneously - jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) - assert.NoError(t, err) - - for _, job := range jobs { - job.Status = actions_model.StatusRunning - _, err := actions_model.UpdateRunJob(context.Background(), job, nil, "status") - assert.NoError(t, err) - } - - runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 5, runningCount, "All jobs should be able to run without limit") - }) - - t.Run("MaxParallelWrongValue", func(t *testing.T) { - runID := int64(30000) - jobID := "wrong-value-use-default-value-job" - - // Create ActionRun first - run := &actions_model.ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 30000, - Status: actions_model.StatusRunning, - } - assert.NoError(t, db.Insert(context.Background(), run)) - - // Test different invalid max-parallel values - testCases := []struct { - name string - maxParallel int - description string - }{ - { - name: "negative value", - maxParallel: -1, - description: "Negative max-parallel should default to 0 (no limit)", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Create jobs with the test max-parallel value - for i := range 5 { - job := &actions_model.ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Test Job " + tc.name, - Status: actions_model.StatusWaiting, - MaxParallel: tc.maxParallel, - } - assert.NoError(t, db.Insert(context.Background(), job)) - - // Verify the value was stored - if i == 0 { - storedJob, err := actions_model.GetRunJobByID(context.Background(), job.ID) - assert.NoError(t, err) - assert.Equal(t, tc.maxParallel, storedJob.MaxParallel, tc.description) - } - } - - // All jobs can run simultaneously when max-parallel <= 0 - jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) - assert.NoError(t, err) - - for _, job := range jobs { - job.Status = actions_model.StatusRunning - _, err := actions_model.UpdateRunJob(context.Background(), job, nil, "status") - assert.NoError(t, err) - } - - runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.GreaterOrEqual(t, runningCount, 5, "All jobs should be able to run when max-parallel is "+tc.name) - }) - } - }) - - t.Run("MaxParallelOne_TaskCreation", func(t *testing.T) { - runID := int64(40000) - jobID := "task-creation-sequential-job" - maxParallel := 1 - - // Create ActionRun first - run := &actions_model.ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 40000, - Status: actions_model.StatusRunning, - } - assert.NoError(t, db.Insert(context.Background(), run)) - - // Create waiting jobs with max-parallel=1 - for i := range 3 { - job := &actions_model.ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Sequential Job " + string(rune(i+1)), - Status: actions_model.StatusWaiting, - MaxParallel: maxParallel, - } - assert.NoError(t, db.Insert(context.Background(), job)) - } - - // Simulate the first job starting - jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) - assert.NoError(t, err) - assert.Len(t, jobs, 3) - - jobs[0].Status = actions_model.StatusRunning - _, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status") - assert.NoError(t, err) - - // Verify only one is running - runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1") - - // Complete first job - jobs[0].Status = actions_model.StatusSuccess - _, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status") - assert.NoError(t, err) - - // Verify no jobs are running now - runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 0, runningCount, "Should have 0 jobs running after completion with max-parallel=1") - - // Second job can now start - jobs[1].Status = actions_model.StatusRunning - _, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status") - assert.NoError(t, err) - - runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for second task") - - // Complete second job - jobs[1].Status = actions_model.StatusSuccess - _, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status") - assert.NoError(t, err) - - // Third job can now start - jobs[2].Status = actions_model.StatusRunning - _, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status") - assert.NoError(t, err) - - runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third task") - }) - - t.Run("MaxParallelTwo_CompletionAndStart", func(t *testing.T) { - runID := int64(50000) - jobID := "completion-start-job" + jobID := "svc-slot-free" maxParallel := 2 - // Create ActionRun first - run := &actions_model.ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 50000, - Status: actions_model.StatusRunning, + run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 20000, Status: actions_model.StatusRunning} + require.NoError(t, db.Insert(context.Background(), run)) + + jobs := []*actions_model.ActionRunJob{ + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r1", Status: actions_model.StatusRunning, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r2", Status: actions_model.StatusRunning, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "b1", Status: actions_model.StatusBlocked, MaxParallel: maxParallel}, } - assert.NoError(t, db.Insert(context.Background(), run)) - - // Create jobs with max-parallel=2 - for i := range 5 { - job := &actions_model.ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Parallel Job " + string(rune(i+1)), - Status: actions_model.StatusWaiting, - MaxParallel: maxParallel, - } - if i < 2 { - job.Status = actions_model.StatusRunning - } - assert.NoError(t, db.Insert(context.Background(), job)) + for _, j := range jobs { + require.NoError(t, db.Insert(context.Background(), j)) } - // Verify 2 jobs are running - runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 2, runningCount) - - // Complete one job - jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) - assert.NoError(t, err) - jobs[0].Status = actions_model.StatusSuccess - _, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status") - assert.NoError(t, err) + _, err := actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status") + require.NoError(t, err) - // Should have 1 running - runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 1, runningCount, "Should have 1 job running after one completes") + allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) + require.NoError(t, err) - // Now another job can start - jobs[2].Status = actions_model.StatusRunning - _, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status") - assert.NoError(t, err) + running := countJobsByStatus(allJobs, actions_model.StatusRunning) + assert.Equal(t, 1, running) + assert.Less(t, running, maxParallel) + }) - // Back to 2 running - runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 2, runningCount, "Should have 2 jobs running after new job starts") + t.Run("no max-parallel means all jobs start as Waiting", func(t *testing.T) { + runID := int64(30000) + jobID := "svc-no-limit" + + run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 30000, Status: actions_model.StatusRunning} + require.NoError(t, db.Insert(context.Background(), run)) + + for range 5 { + require.NoError(t, db.Insert(context.Background(), &actions_model.ActionRunJob{ + RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "j", + Status: actions_model.StatusWaiting, MaxParallel: 0, + })) + } + + allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) + require.NoError(t, err) + assert.Equal(t, 5, countJobsByStatus(allJobs, actions_model.StatusWaiting)) + assert.Equal(t, 0, countJobsByStatus(allJobs, actions_model.StatusBlocked)) }) }