diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 616e298dc9..de16741a82 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -55,6 +55,8 @@ type ActionRunJob struct { // Org/repo clamps are enforced when the token is used at runtime. // It is JSON-encoded repo_model.ActionsTokenPermissions and may be empty if not specified. TokenPermissions *repo_model.ActionsTokenPermissions `xorm:"JSON TEXT"` + // Matrix job support + MaxParallel int // Max parallel jobs from strategy.max-parallel (0 = unlimited) Started timeutil.TimeStamp Stopped timeutil.TimeStamp diff --git a/models/actions/run_job_maxparallel_test.go b/models/actions/run_job_maxparallel_test.go index 423907ce73..275a70fc37 100644 --- a/models/actions/run_job_maxparallel_test.go +++ b/models/actions/run_job_maxparallel_test.go @@ -51,25 +51,6 @@ func TestActionRunJob_MaxParallel(t *testing.T) { assert.Equal(t, 3, retrieved.MaxParallel) }) - t.Run("MatrixID", func(t *testing.T) { - job := &ActionRunJob{ - RunID: 1, - RepoID: 1, - OwnerID: 1, - JobID: "test-job-3", - Name: "Matrix Job with ID", - Status: StatusWaiting, - MaxParallel: 2, - MatrixID: "os:ubuntu,node:16", - } - assert.NoError(t, db.Insert(ctx, job)) - - retrieved, err := GetRunJobByID(ctx, job.ID) - assert.NoError(t, err) - assert.Equal(t, 2, retrieved.MaxParallel) - assert.Equal(t, "os:ubuntu,node:16", retrieved.MatrixID) - }) - t.Run("UpdateMaxParallel", func(t *testing.T) { // Create ActionRun first run := &ActionRun{ @@ -124,10 +105,10 @@ func TestActionRunJob_MaxParallelEnforcement(t *testing.T) { // Create jobs simulating matrix execution jobs := []*ActionRunJob{ - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel, MatrixID: "version:1"}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel, MatrixID: "version:2"}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel, MatrixID: "version:3"}, - {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel, MatrixID: "version:4"}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel}, + {RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel}, } for _, job := range jobs { diff --git a/models/actions/task.go b/models/actions/task.go index e42de06e16..87bbc76ef8 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -539,15 +539,6 @@ func getTaskIDFromCache(token string) int64 { return t } -// CountRunningTasksByRunner counts the number of running tasks assigned to a specific runner -func CountRunningTasksByRunner(ctx context.Context, runnerID int64) (int, error) { - count, err := db.GetEngine(ctx). - Where("runner_id = ?", runnerID). - And("status = ?", StatusRunning). - Count(new(ActionTask)) - return int(count), err -} - // CountRunningJobsByWorkflowAndRun counts running jobs for a specific workflow/run combo // Used to enforce max-parallel limits on matrix jobs func CountRunningJobsByWorkflowAndRun(ctx context.Context, runID int64, jobID string) (int, error) { diff --git a/models/actions/task_count_test.go b/models/actions/task_count_test.go index 2edcf1b950..12117a87e5 100644 --- a/models/actions/task_count_test.go +++ b/models/actions/task_count_test.go @@ -13,89 +13,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCountRunningTasksByRunner(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - ctx := context.Background() - - t.Run("NoRunningTasks", func(t *testing.T) { - count, err := CountRunningTasksByRunner(ctx, 999999) - assert.NoError(t, err) - assert.Equal(t, 0, count) - }) - - t.Run("WithRunningTasks", func(t *testing.T) { - // Create a runner - runner := &ActionRunner{ - UUID: "test-runner-tasks", - Name: "Test Runner", - OwnerID: 0, - RepoID: 0, - TokenHash: "test_hash_tasks", - Token: "test_token_tasks", - } - assert.NoError(t, db.Insert(ctx, runner)) - - // Create running tasks - task1 := &ActionTask{ - JobID: 1, - RunnerID: runner.ID, - Status: StatusRunning, - RepoID: 1, - OwnerID: 1, - TokenHash: "task1_hash", - Token: "task1_token", - } - assert.NoError(t, db.Insert(ctx, task1)) - - task2 := &ActionTask{ - JobID: 2, - RunnerID: runner.ID, - Status: StatusRunning, - RepoID: 1, - OwnerID: 1, - TokenHash: "task2_hash", - Token: "task2_token", - } - assert.NoError(t, db.Insert(ctx, task2)) - - // Count should be 2 - count, err := CountRunningTasksByRunner(ctx, runner.ID) - assert.NoError(t, err) - assert.Equal(t, 2, count) - }) - - t.Run("MixedStatusTasks", func(t *testing.T) { - runner := &ActionRunner{ - UUID: "test-runner-mixed", - Name: "Mixed Status Runner", - Capacity: 5, - TokenHash: "mixed_runner_hash", - Token: "mixed_runner_token", - } - assert.NoError(t, db.Insert(ctx, runner)) - - // Create tasks with different statuses - statuses := []Status{StatusRunning, StatusSuccess, StatusRunning, StatusFailure, StatusWaiting} - for i, status := range statuses { - task := &ActionTask{ - JobID: int64(100 + i), - RunnerID: runner.ID, - Status: status, - RepoID: 1, - OwnerID: 1, - TokenHash: "mixed_task_hash_" + string(rune('a'+i)), - Token: "mixed_task_token_" + string(rune('a'+i)), - } - assert.NoError(t, db.Insert(ctx, task)) - } - - // Only 2 running tasks - count, err := CountRunningTasksByRunner(ctx, runner.ID) - assert.NoError(t, err) - assert.Equal(t, 2, count) - }) -} - func TestCountRunningJobsByWorkflowAndRun(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) ctx := context.Background() diff --git a/services/actions/task_assignment_test.go b/services/actions/task_assignment_test.go index 2ff1d77c7d..d9af3777f3 100644 --- a/services/actions/task_assignment_test.go +++ b/services/actions/task_assignment_test.go @@ -14,106 +14,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCreateTaskForRunner_CapacityEnforcement(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - - t.Run("RunnerAtCapacity", func(t *testing.T) { - // Create runner with capacity 2 - runner := &actions_model.ActionRunner{ - UUID: "capacity-test-1", - Name: "Capacity Test Runner", - Capacity: 2, - TokenHash: "capacity_test_hash_1", - Token: "capacity_test_token_1", - } - assert.NoError(t, db.Insert(context.Background(), runner)) - - // Create 2 running tasks - for i := range 2 { - task := &actions_model.ActionTask{ - JobID: int64(1000 + i), - RunnerID: runner.ID, - Status: actions_model.StatusRunning, - RepoID: 1, - OwnerID: 1, - TokenHash: "task_hash_" + string(rune('1'+i)), - Token: "task_token_" + string(rune('1'+i)), - } - assert.NoError(t, db.Insert(context.Background(), task)) - } - - // Verify runner is at capacity - count, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID) - assert.NoError(t, err) - assert.Equal(t, 2, count) - - // Try to create another task - should fail due to capacity - // Note: This would be tested in actual CreateTaskForRunner which checks capacity - // For now, verify the count - assert.Equal(t, runner.Capacity, count, "Runner should be at capacity") - }) - - t.Run("RunnerBelowCapacity", func(t *testing.T) { - runner := &actions_model.ActionRunner{ - UUID: "capacity-test-2", - Name: "Below Capacity Runner", - Capacity: 5, - TokenHash: "capacity_test_hash_2", - Token: "capacity_test_token_2", - } - assert.NoError(t, db.Insert(context.Background(), runner)) - - // Create 2 running tasks - for i := range 2 { - task := &actions_model.ActionTask{ - JobID: int64(2000 + i), - RunnerID: runner.ID, - Status: actions_model.StatusRunning, - RepoID: 1, - OwnerID: 1, - TokenHash: "task_hash_2_" + string(rune('a'+i)), - Token: "task_token_2_" + string(rune('a'+i)), - } - assert.NoError(t, db.Insert(context.Background(), task)) - } - - count, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID) - assert.NoError(t, err) - assert.Equal(t, 2, count) - assert.Less(t, count, runner.Capacity, "Runner should be below capacity") - }) - - t.Run("UnlimitedCapacity", func(t *testing.T) { - runner := &actions_model.ActionRunner{ - UUID: "capacity-test-3", - Name: "Unlimited Runner", - Capacity: 0, // 0 = unlimited - TokenHash: "capacity_test_hash_3", - Token: "capacity_test_token_3", - } - assert.NoError(t, db.Insert(context.Background(), runner)) - - // Create many running tasks - for i := range 10 { - task := &actions_model.ActionTask{ - JobID: int64(3000 + i), - RunnerID: runner.ID, - Status: actions_model.StatusRunning, - RepoID: 1, - OwnerID: 1, - TokenHash: "task_hash_3_" + string(rune('a'+i)), - Token: "task_token_3_" + string(rune('a'+i)), - } - assert.NoError(t, db.Insert(context.Background(), task)) - } - - count, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID) - assert.NoError(t, err) - assert.Equal(t, 10, count) - // With capacity 0, there's no limit - }) -} - func TestCreateTaskForRunner_MaxParallelEnforcement(t *testing.T) { assert.NoError(t, unittest.PrepareTestDatabase()) @@ -218,79 +118,3 @@ func TestCreateTaskForRunner_MaxParallelEnforcement(t *testing.T) { assert.Equal(t, 5, runningCount, "All jobs should be able to run without limit") }) } - -func TestCreateTaskForRunner_CombinedEnforcement(t *testing.T) { - assert.NoError(t, unittest.PrepareTestDatabase()) - - t.Run("BothRunnerCapacityAndMaxParallel", func(t *testing.T) { - // Create runner with capacity 3 - runner := &actions_model.ActionRunner{ - UUID: "combined-test", - Name: "Combined Test Runner", - Capacity: 3, - TokenHash: "combined_test_hash", - Token: "combined_test_token", - } - assert.NoError(t, db.Insert(context.Background(), runner)) - - runID := int64(30000) - jobID := "combined-job" - - // Create ActionRun first - run := &actions_model.ActionRun{ - ID: runID, - RepoID: 1, - OwnerID: 1, - Index: 30000, - Status: actions_model.StatusRunning, - } - assert.NoError(t, db.Insert(context.Background(), run)) - - // Create jobs with max-parallel 2 - for range 5 { - job := &actions_model.ActionRunJob{ - RunID: runID, - RepoID: 1, - OwnerID: 1, - JobID: jobID, - Name: "Combined Job", - Status: actions_model.StatusWaiting, - MaxParallel: 2, - } - assert.NoError(t, db.Insert(context.Background(), job)) - } - - // The most restrictive limit should apply - // In this case: max-parallel = 2 (more restrictive than runner capacity = 3) - jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID) - assert.NoError(t, err) - - // Simulate starting jobs - for i, job := range jobs[:2] { - job.Status = actions_model.StatusRunning - _, err := actions_model.UpdateRunJob(context.Background(), job, nil, "status") - assert.NoError(t, err) - - task := &actions_model.ActionTask{ - JobID: job.ID, - RunnerID: runner.ID, - Status: actions_model.StatusRunning, - RepoID: 1, - OwnerID: 1, - TokenHash: "combined_task_hash_" + string(rune('a'+i)), - Token: "combined_task_token_" + string(rune('a'+i)), - } - assert.NoError(t, db.Insert(context.Background(), task)) - } - - // Verify both limits - runningTasks, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID) - assert.NoError(t, err) - assert.Equal(t, 2, runningTasks) - assert.Less(t, runningTasks, runner.Capacity, "Should be under runner capacity") - - runningJobs, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID) - assert.NoError(t, err) - assert.Equal(t, 2, runningJobs, "Should respect max-parallel") - }) -}