From 63be2b1103745b25b1730322c20d8500204249bd Mon Sep 17 00:00:00 2001 From: Pascal Zimmermann Date: Sat, 7 Feb 2026 19:40:11 +0100 Subject: [PATCH] feat: Update the conditions of the actions to continue the tasks executions Signed-off-by: Pascal Zimmermann --- models/actions/max_parallel_simple_test.go | 240 ++++++++++++++++++ models/actions/run_job_maxparallel_test.go | 112 ++++++++ models/actions/task.go | 5 +- .../actions/task_sequential_execution_test.go | 212 ++++++++++++++++ services/actions/job_emitter.go | 59 ++++- services/actions/task_assignment_test.go | 138 ++++++++++ 6 files changed, 761 insertions(+), 5 deletions(-) create mode 100644 models/actions/max_parallel_simple_test.go create mode 100644 models/actions/task_sequential_execution_test.go diff --git a/models/actions/max_parallel_simple_test.go b/models/actions/max_parallel_simple_test.go new file mode 100644 index 0000000000..ac75036b6a --- /dev/null +++ b/models/actions/max_parallel_simple_test.go @@ -0,0 +1,240 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "testing" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" +) + +// TestMaxParallelOne_SimpleSequence tests the core issue: max-parallel=1 execution +func TestMaxParallelOne_SimpleSequence(t *testing.T) { + assert.NoError(t, unittest.PrepareTestDatabase()) + ctx := context.Background() + + runID := int64(11000) + jobID := "simple-sequence-job" + maxParallel := 1 + + // Create ActionRun + run := &ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 11000, + Status: StatusRunning, + } + assert.NoError(t, db.Insert(ctx, run)) + + // Create jobs + job1 := &ActionRunJob{ + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Job 1", + Status: StatusWaiting, + MaxParallel: maxParallel, + } + job2 := &ActionRunJob{ + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Job 2", + Status: StatusWaiting, + MaxParallel: maxParallel, + } + job3 := &ActionRunJob{ + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Job 3", + Status: StatusWaiting, + MaxParallel: maxParallel, + } + + assert.NoError(t, db.Insert(ctx, job1)) + assert.NoError(t, db.Insert(ctx, job2)) + assert.NoError(t, db.Insert(ctx, job3)) + + // TEST 1: Initially, 0 running + running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, running, "Should have 0 jobs running initially") + + // TEST 2: Job1 starts + job1.Status = StatusRunning + _, err = UpdateRunJob(ctx, job1, nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, running, "Should have 1 job running after job1 starts") + + // TEST 3: Job1 completes + job1.Status = StatusSuccess + _, err = UpdateRunJob(ctx, job1, nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, running, "Should have 0 jobs running after job1 completes - THIS IS THE CRITICAL TEST") + + // TEST 4: Job2 should now be able to start + job2.Status = StatusRunning + _, err = UpdateRunJob(ctx, job2, nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, running, "Should have 1 job running after job2 starts - IF THIS FAILS, THE BUG IS NOT FIXED") + + // TEST 5: Job2 completes + job2.Status = StatusSuccess + _, err = UpdateRunJob(ctx, job2, nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, running, "Should have 0 jobs running after job2 completes") + + // TEST 6: Job3 starts + job3.Status = StatusRunning + _, err = UpdateRunJob(ctx, job3, nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, running, "Should have 1 job running after job3 starts") + + t.Log("✅ All sequential execution tests passed!") +} + +// TestMaxParallelOne_FreshJobFetch tests the fresh job fetch mechanism +func TestMaxParallelOne_FreshJobFetch(t *testing.T) { + assert.NoError(t, unittest.PrepareTestDatabase()) + ctx := context.Background() + + runID := int64(12000) + jobID := "fresh-fetch-job" + + // Create ActionRun + run := &ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 12000, + Status: StatusRunning, + } + assert.NoError(t, db.Insert(ctx, run)) + + // Create a job + job := &ActionRunJob{ + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Fresh Fetch Test Job", + Status: StatusWaiting, + } + assert.NoError(t, db.Insert(ctx, job)) + + // Fetch fresh copy (simulating CreateTaskForRunner behavior) + freshJob, err := GetRunJobByID(ctx, job.ID) + assert.NoError(t, err) + assert.NotNil(t, freshJob) + assert.Equal(t, StatusWaiting, freshJob.Status, "Fresh job should have WAITING status") + assert.Equal(t, int64(0), freshJob.TaskID, "Fresh job should have TaskID=0") + + // Update original job to RUNNING + job.Status = StatusRunning + _, err = UpdateRunJob(ctx, job, nil) + assert.NoError(t, err) + + // Fetch fresh copy again - should reflect the update + freshJob2, err := GetRunJobByID(ctx, job.ID) + assert.NoError(t, err) + assert.NotNil(t, freshJob2) + assert.Equal(t, StatusRunning, freshJob2.Status, "Fresh job should now have RUNNING status") + + t.Log("✅ Fresh job fetch mechanism works correctly!") +} + +// TestCountRunningJobs tests the CountRunningJobsByWorkflowAndRun function +func TestCountRunningJobs(t *testing.T) { + assert.NoError(t, unittest.PrepareTestDatabase()) + ctx := context.Background() + + runID := int64(13000) + jobID := "count-jobs" + + // Create ActionRun + run := &ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 13000, + Status: StatusRunning, + } + assert.NoError(t, db.Insert(ctx, run)) + + // Create 5 jobs + for i := 1; i <= 5; i++ { + job := &ActionRunJob{ + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Job " + string(rune(i)), + Status: StatusWaiting, + } + assert.NoError(t, db.Insert(ctx, job)) + } + + // Get all jobs + jobs, err := GetRunJobsByRunID(ctx, runID) + assert.NoError(t, err) + assert.Len(t, jobs, 5) + + // Initially 0 running + running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, running) + + // Set job 0 to RUNNING + jobs[0].Status = StatusRunning + _, err = UpdateRunJob(ctx, jobs[0], nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, running) + + // Set job 1 to RUNNING + jobs[1].Status = StatusRunning + _, err = UpdateRunJob(ctx, jobs[1], nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 2, running) + + // Set job 0 to SUCCESS (not counted as RUNNING anymore) + jobs[0].Status = StatusSuccess + _, err = UpdateRunJob(ctx, jobs[0], nil) + assert.NoError(t, err) + + running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, running) + + t.Log("✅ Count running jobs works correctly!") +} diff --git a/models/actions/run_job_maxparallel_test.go b/models/actions/run_job_maxparallel_test.go index 275a70fc37..badc0cce36 100644 --- a/models/actions/run_job_maxparallel_test.go +++ b/models/actions/run_job_maxparallel_test.go @@ -140,4 +140,116 @@ func TestActionRunJob_MaxParallelEnforcement(t *testing.T) { assert.NoError(t, err) assert.Equal(t, maxParallel, runningCount) }) + + t.Run("MaxParallelOne_SequentialExecution", func(t *testing.T) { + runID := int64(6000) + jobID := "sequential-job" + maxParallel := 1 + + // Create ActionRun first + run := &ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 6000, + Status: StatusRunning, + } + assert.NoError(t, db.Insert(ctx, run)) + + // Create jobs simulating sequential execution with max-parallel=1 + jobs := []*ActionRunJob{ + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel}, + } + + for _, job := range jobs { + assert.NoError(t, db.Insert(ctx, job)) + } + + // Verify initial running count is 1 + runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1") + + // Complete first job + jobs[0].Status = StatusSuccess + _, err = UpdateRunJob(ctx, jobs[0], nil, "status") + assert.NoError(t, err) + + // Now running count should be 0 + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes") + + // Second job can now start + jobs[1].Status = StatusRunning + _, err = UpdateRunJob(ctx, jobs[1], nil, "status") + assert.NoError(t, err) + + // Running count should be 1 again + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts") + + // Complete second job + jobs[1].Status = StatusSuccess + _, err = UpdateRunJob(ctx, jobs[1], nil, "status") + assert.NoError(t, err) + + // Third job can now start + jobs[2].Status = StatusRunning + _, err = UpdateRunJob(ctx, jobs[2], nil, "status") + assert.NoError(t, err) + + // Running count should still be 1 + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after third job starts") + }) + + t.Run("MaxParallelOne_WithFailure", func(t *testing.T) { + runID := int64(7000) + jobID := "sequential-with-failure-job" + maxParallel := 1 + + // Create ActionRun first + run := &ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 7000, + Status: StatusRunning, + } + assert.NoError(t, db.Insert(ctx, run)) + + // Create jobs + jobs := []*ActionRunJob{ + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel}, + } + + for _, job := range jobs { + assert.NoError(t, db.Insert(ctx, job)) + } + + // First job fails + jobs[0].Status = StatusFailure + _, err := UpdateRunJob(ctx, jobs[0], nil, "status") + assert.NoError(t, err) + + // Verify no jobs are running + runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, runningCount, "Should have 0 jobs running after job fails") + + // Second job can still start + jobs[1].Status = StatusRunning + _, err = UpdateRunJob(ctx, jobs[1], nil, "status") + assert.NoError(t, err) + + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after job starts") + }) } diff --git a/models/actions/task.go b/models/actions/task.go index 34c61b3ffb..49bdfaba54 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -346,9 +346,12 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask } job.TaskID = task.ID - if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { + // Must explicitly specify which columns to update, including status and started + if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "task_id", "status", "started", "attempt", "updated"); err != nil { return nil, false, err } else if n != 1 { + // Another runner may have claimed this job, skip it + log.Debug("Job %s (run %d) was claimed by another runner, skipping", job.JobID, job.RunID) return nil, false, nil } diff --git a/models/actions/task_sequential_execution_test.go b/models/actions/task_sequential_execution_test.go new file mode 100644 index 0000000000..017c8437ad --- /dev/null +++ b/models/actions/task_sequential_execution_test.go @@ -0,0 +1,212 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "testing" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/unittest" + + "github.com/stretchr/testify/assert" +) + +// TestTaskCreation_MaxParallel_One tests that tasks are properly sequenced +// when max-parallel=1, ensuring no hang after task completion +func TestTaskCreation_MaxParallel_One(t *testing.T) { + assert.NoError(t, unittest.PrepareTestDatabase()) + ctx := context.Background() + + t.Run("SequentialTaskCreationMaxParallelOne", func(t *testing.T) { + runID := int64(8000) + jobID := "task-sequential-max-parallel-one" + maxParallel := 1 + + // Setup: Create ActionRun + run := &ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 8000, + Status: StatusRunning, + } + assert.NoError(t, db.Insert(ctx, run)) + + // Create runner + runner := &ActionRunner{ + ID: 1, + UUID: "test-runner-1", + Name: "Test Runner", + OwnerID: 1, + } + assert.NoError(t, db.Insert(ctx, runner)) + + // Create jobs with max-parallel=1 + jobs := []*ActionRunJob{ + { + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Sequential Job 1", + Status: StatusWaiting, + MaxParallel: maxParallel, + }, + { + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Sequential Job 2", + Status: StatusWaiting, + MaxParallel: maxParallel, + }, + { + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Sequential Job 3", + Status: StatusWaiting, + MaxParallel: maxParallel, + }, + } + + for _, job := range jobs { + assert.NoError(t, db.Insert(ctx, job)) + } + + // Verify initial state: all jobs are waiting + allJobs, err := GetRunJobsByRunID(ctx, runID) + assert.NoError(t, err) + assert.Len(t, allJobs, 3) + + // Verify that only 1 job should be able to run at a time with max-parallel=1 + runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, runningCount, "Should have 0 jobs running initially") + + // Simulate starting first job + allJobs[0].Status = StatusRunning + _, err = UpdateRunJob(ctx, allJobs[0], nil) + assert.NoError(t, err) + + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1") + + // Complete first job + allJobs[0].Status = StatusSuccess + _, err = UpdateRunJob(ctx, allJobs[0], nil) + assert.NoError(t, err) + + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes") + + // This is the critical test: the second job should be able to start + // Previously, the system might hang here + allJobs[1].Status = StatusRunning + _, err = UpdateRunJob(ctx, allJobs[1], nil) + assert.NoError(t, err) + + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts (critical test)") + + // Complete the second job + allJobs[1].Status = StatusSuccess + _, err = UpdateRunJob(ctx, allJobs[1], nil) + assert.NoError(t, err) + + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, runningCount, "Should have 0 jobs running after second job completes") + + // Third job should also be able to start + allJobs[2].Status = StatusRunning + _, err = UpdateRunJob(ctx, allJobs[2], nil) + assert.NoError(t, err) + + runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third job") + }) + + t.Run("MaxParallelConstraintAfterTaskFetch", func(t *testing.T) { + runID := int64(9000) + jobID := "max-parallel-fetch-job" + maxParallel := 1 + + // Setup: Create ActionRun + run := &ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 9000, + Status: StatusRunning, + } + assert.NoError(t, db.Insert(ctx, run)) + + // Create jobs + jobs := []*ActionRunJob{ + { + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Job A", + Status: StatusWaiting, + MaxParallel: maxParallel, + }, + { + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Job B", + Status: StatusWaiting, + MaxParallel: maxParallel, + }, + } + + for _, job := range jobs { + assert.NoError(t, db.Insert(ctx, job)) + } + + // Refresh jobs to get IDs + allJobs, err := GetRunJobsByRunID(ctx, runID) + assert.NoError(t, err) + + // Start first job + allJobs[0].Status = StatusRunning + _, err = UpdateRunJob(ctx, allJobs[0], nil) + assert.NoError(t, err) + + // Verify constraint + runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount) + + // Try to start second job while first is still running + // This should not be allowed due to max-parallel=1 + freshAllJobs, err := GetRunJobsByRunID(ctx, runID) + assert.NoError(t, err) + + // Check if we can determine from the count that the second job should not start + for i := 1; i < len(freshAllJobs); i++ { + if freshAllJobs[i].Status == StatusWaiting { + // Before starting this job, verify max-parallel constraint + runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID) + assert.NoError(t, err) + if runningCount >= maxParallel { + // This job should wait + assert.Equal(t, StatusWaiting, freshAllJobs[i].Status, + "Job should remain waiting when max-parallel limit is reached") + } + } + } + }) +} diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index ed6e2fe5aa..8c1e5fc422 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -222,16 +222,67 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up for _, job := range jobs { if status, ok := updates[job.ID]; ok { oldStatus := job.Status + // IMPORTANT: Even if status hasn't changed, we must reset task_id for WAITING jobs + // after a previous task has completed (max-parallel constraint is released) + if status == actions_model.StatusWaiting && job.TaskID != 0 { + // This job was running but is now ready to be reassigned + job.Status = status + job.TaskID = 0 + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "task_id"); err != nil { + return fmt.Errorf("reset task_id for job %d: %w", job.ID, err) + } + updatedJobs = append(updatedJobs, job) + continue + } + + if job.Status == status { + // Status hasn't changed, skip + continue + } + job.Status = status - if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { - return err - } else if n != 1 { - return fmt.Errorf("no affected for updating blocked job %v", job.ID) + + // For jobs transitioning to WAITING status (can happen after max-parallel constraint is released), + // we need to reset task_id to 0 so a new runner can pick them up + if status == actions_model.StatusWaiting { + job.TaskID = 0 + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "task_id"); err != nil { + return fmt.Errorf("reset task_id for job %d: %w", job.ID, err) + } + } else { + // For other status changes (BLOCKED, RUNNING, etc.), only update status + if _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusWaiting}, "status"); err != nil { + return fmt.Errorf("update status for job %d: %w", job.ID, err) + } } log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status) updatedJobs = append(updatedJobs, job) } } + + // CRITICAL FIX for max-parallel: Even if no jobs were updated above (because they already have task_id=0), + // we must notify runners to poll for new tasks when a job completes. + // This handles the max-parallel=1 case where jobs were never started (task_id=0) + // and won't be processed by jobStatusResolver (which only handles BLOCKED jobs). + if len(jobs) > 0 && jobs[0].OwnerID > 0 && jobs[0].RepoID > 0 { + // Check if there are any WAITING jobs with task_id=0 (ready to be picked up) + hasWaitingJobs := false + for _, job := range jobs { + if job.Status == actions_model.StatusWaiting && job.TaskID == 0 { + hasWaitingJobs = true + break + } + } + if hasWaitingJobs { + // Notify runners to poll for new tasks + if err := actions_model.IncreaseTaskVersion(ctx, jobs[0].OwnerID, jobs[0].RepoID); err != nil { + log.Error("Failed to increase task version for repo %d: %v", jobs[0].RepoID, err) + } else { + log.Debug("Increased task version for repo %d (max-parallel waiting jobs ready)", jobs[0].RepoID) + } + } + } + return nil }); err != nil { return nil, nil, err diff --git a/services/actions/task_assignment_test.go b/services/actions/task_assignment_test.go index 4ceb7d3d46..b1bb82fcd2 100644 --- a/services/actions/task_assignment_test.go +++ b/services/actions/task_assignment_test.go @@ -184,4 +184,142 @@ func TestMaxParallelJobStatusAndCounting(t *testing.T) { }) } }) + + t.Run("MaxParallelOne_TaskCreation", func(t *testing.T) { + runID := int64(40000) + jobID := "task-creation-sequential-job" + maxParallel := 1 + + // Create ActionRun first + run := &actions_model.ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 40000, + Status: actions_model.StatusRunning, + } + assert.NoError(t, db.Insert(context.Background(), run)) + + // Create waiting jobs with max-parallel=1 + for i := range 3 { + job := &actions_model.ActionRunJob{ + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Sequential Job " + string(rune(i+1)), + Status: actions_model.StatusWaiting, + MaxParallel: maxParallel, + } + assert.NoError(t, db.Insert(context.Background(), job)) + } + + // Simulate the first job starting + jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) + assert.NoError(t, err) + assert.Len(t, jobs, 3) + + jobs[0].Status = actions_model.StatusRunning + _, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status") + assert.NoError(t, err) + + // Verify only one is running + runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1") + + // Complete first job + jobs[0].Status = actions_model.StatusSuccess + _, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status") + assert.NoError(t, err) + + // Verify no jobs are running now + runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 0, runningCount, "Should have 0 jobs running after completion with max-parallel=1") + + // Second job can now start + jobs[1].Status = actions_model.StatusRunning + _, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status") + assert.NoError(t, err) + + runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for second task") + + // Complete second job + jobs[1].Status = actions_model.StatusSuccess + _, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status") + assert.NoError(t, err) + + // Third job can now start + jobs[2].Status = actions_model.StatusRunning + _, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status") + assert.NoError(t, err) + + runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third task") + }) + + t.Run("MaxParallelTwo_CompletionAndStart", func(t *testing.T) { + runID := int64(50000) + jobID := "completion-start-job" + maxParallel := 2 + + // Create ActionRun first + run := &actions_model.ActionRun{ + ID: runID, + RepoID: 1, + OwnerID: 1, + Index: 50000, + Status: actions_model.StatusRunning, + } + assert.NoError(t, db.Insert(context.Background(), run)) + + // Create jobs with max-parallel=2 + for i := range 5 { + job := &actions_model.ActionRunJob{ + RunID: runID, + RepoID: 1, + OwnerID: 1, + JobID: jobID, + Name: "Parallel Job " + string(rune(i+1)), + Status: actions_model.StatusWaiting, + MaxParallel: maxParallel, + } + if i < 2 { + job.Status = actions_model.StatusRunning + } + assert.NoError(t, db.Insert(context.Background(), job)) + } + + // Verify 2 jobs are running + runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 2, runningCount) + + // Complete one job + jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) + assert.NoError(t, err) + + jobs[0].Status = actions_model.StatusSuccess + _, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status") + assert.NoError(t, err) + + // Should have 1 running + runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 1, runningCount, "Should have 1 job running after one completes") + + // Now another job can start + jobs[2].Status = actions_model.StatusRunning + _, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status") + assert.NoError(t, err) + + // Back to 2 running + runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) + assert.NoError(t, err) + assert.Equal(t, 2, runningCount, "Should have 2 jobs running after new job starts") + }) }