0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-04-04 10:05:18 +02:00

feat: Add max-parallel implementation inside the Gitea server

# Conflicts:
#	models/migrations/migrations.go
#	models/migrations/v1_26/v325.go

# Conflicts:
#	models/actions/run_job.go
#	models/actions/runner.go
#	models/migrations/migrations.go
#	modules/structs/repo_actions.go
#	routers/api/actions/runner/runner.go
#	routers/api/v1/admin/runners.go
#	routers/api/v1/api.go
#	routers/api/v1/shared/runners.go
#	services/actions/run.go
#	templates/swagger/v1_json.tmpl

# Conflicts:
#	models/migrations/migrations.go
This commit is contained in:
Pascal Zimmermann 2026-01-12 22:42:05 +01:00
parent a1c60ac854
commit 57400c725e
9 changed files with 958 additions and 3 deletions

View File

@ -0,0 +1,162 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestActionRunJob_MaxParallel(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("NoMaxParallel", func(t *testing.T) {
job := &ActionRunJob{
RunID: 1,
RepoID: 1,
OwnerID: 1,
JobID: "test-job-1",
Name: "Test Job",
Status: StatusWaiting,
MaxParallel: 0, // No limit
}
assert.NoError(t, db.Insert(ctx, job))
retrieved, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.Equal(t, 0, retrieved.MaxParallel)
})
t.Run("WithMaxParallel", func(t *testing.T) {
job := &ActionRunJob{
RunID: 1,
RepoID: 1,
OwnerID: 1,
JobID: "test-job-2",
Name: "Matrix Job",
Status: StatusWaiting,
MaxParallel: 3,
}
assert.NoError(t, db.Insert(ctx, job))
retrieved, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.Equal(t, 3, retrieved.MaxParallel)
})
t.Run("MatrixID", func(t *testing.T) {
job := &ActionRunJob{
RunID: 1,
RepoID: 1,
OwnerID: 1,
JobID: "test-job-3",
Name: "Matrix Job with ID",
Status: StatusWaiting,
MaxParallel: 2,
MatrixID: "os:ubuntu,node:16",
}
assert.NoError(t, db.Insert(ctx, job))
retrieved, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.Equal(t, 2, retrieved.MaxParallel)
assert.Equal(t, "os:ubuntu,node:16", retrieved.MatrixID)
})
t.Run("UpdateMaxParallel", func(t *testing.T) {
// Create ActionRun first
run := &ActionRun{
ID: 1,
RepoID: 1,
OwnerID: 1,
Status: StatusRunning,
}
// Note: This might fail if run already exists from previous tests, but that's okay
_ = db.Insert(ctx, run)
job := &ActionRunJob{
RunID: 1,
RepoID: 1,
OwnerID: 1,
JobID: "test-job-4",
Name: "Updatable Job",
Status: StatusWaiting,
MaxParallel: 5,
}
assert.NoError(t, db.Insert(ctx, job))
// Update max parallel
job.MaxParallel = 10
_, err := UpdateRunJob(ctx, job, nil, "max_parallel")
assert.NoError(t, err)
retrieved, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.Equal(t, 10, retrieved.MaxParallel)
})
}
func TestActionRunJob_MaxParallelEnforcement(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("EnforceMaxParallel", func(t *testing.T) {
runID := int64(5000)
jobID := "parallel-enforced-job"
maxParallel := 2
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 5000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs simulating matrix execution
jobs := []*ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel, MatrixID: "version:1"},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel, MatrixID: "version:2"},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel, MatrixID: "version:3"},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel, MatrixID: "version:4"},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// Verify running count
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, maxParallel, runningCount, "Should have exactly max-parallel jobs running")
// Simulate job completion
jobs[0].Status = StatusSuccess
_, err = UpdateRunJob(ctx, jobs[0], nil, "status")
assert.NoError(t, err)
// Now running count should be 1
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount)
// Simulate next job starting
jobs[2].Status = StatusRunning
_, err = UpdateRunJob(ctx, jobs[2], nil, "status")
assert.NoError(t, err)
// Back to max-parallel
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, maxParallel, runningCount)
})
}

View File

@ -0,0 +1,95 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestActionRunner_Capacity(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("DefaultCapacity", func(t *testing.T) {
runner := &ActionRunner{
UUID: "test-uuid-1",
Name: "test-runner",
OwnerID: 0,
RepoID: 0,
TokenHash: "hash1",
Token: "token1",
}
assert.NoError(t, db.Insert(ctx, runner))
// Verify in database
retrieved, err := GetRunnerByID(ctx, runner.ID)
assert.NoError(t, err)
assert.Equal(t, 0, retrieved.Capacity, "Default capacity should be 0 (unlimited)")
})
t.Run("CustomCapacity", func(t *testing.T) {
runner := &ActionRunner{
UUID: "test-uuid-2",
Name: "test-runner-2",
OwnerID: 0,
RepoID: 0,
Capacity: 5,
TokenHash: "hash2",
Token: "token2",
}
assert.NoError(t, db.Insert(ctx, runner))
assert.Equal(t, 5, runner.Capacity)
// Verify in database
retrieved, err := GetRunnerByID(ctx, runner.ID)
assert.NoError(t, err)
assert.Equal(t, 5, retrieved.Capacity)
})
t.Run("UpdateCapacity", func(t *testing.T) {
runner := &ActionRunner{
UUID: "test-uuid-3",
Name: "test-runner-3",
OwnerID: 0,
RepoID: 0,
Capacity: 1,
TokenHash: "hash3",
Token: "token3",
}
assert.NoError(t, db.Insert(ctx, runner))
// Update capacity
runner.Capacity = 10
assert.NoError(t, UpdateRunner(ctx, runner, "capacity"))
// Verify update
retrieved, err := GetRunnerByID(ctx, runner.ID)
assert.NoError(t, err)
assert.Equal(t, 10, retrieved.Capacity)
})
t.Run("ZeroCapacity", func(t *testing.T) {
runner := &ActionRunner{
UUID: "test-uuid-4",
Name: "test-runner-4",
OwnerID: 0,
RepoID: 0,
Capacity: 0, // Unlimited
}
assert.NoError(t, db.Insert(ctx, runner))
assert.Equal(t, 0, runner.Capacity)
retrieved, err := GetRunnerByID(ctx, runner.ID)
assert.NoError(t, err)
assert.Equal(t, 0, retrieved.Capacity)
})
}

View File

@ -260,10 +260,26 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
var job *ActionRunJob
log.Trace("runner labels: %v", runner.AgentLabels)
for _, v := range jobs {
if runner.CanMatchLabels(v.RunsOn) {
job = v
break
if !runner.CanMatchLabels(v.RunsOn) {
continue
}
// Check max-parallel constraint for matrix jobs
if v.MaxParallel > 0 {
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, v.RunID, v.JobID)
if err != nil {
log.Error("Failed to count running jobs for max-parallel check: %v", err)
continue
}
if runningCount >= v.MaxParallel {
log.Debug("Job %s (run %d) skipped: %d/%d jobs already running (max-parallel)",
v.JobID, v.RunID, runningCount, v.MaxParallel)
continue
}
}
job = v
break
}
if job == nil {
return nil, false, nil
@ -522,3 +538,23 @@ func getTaskIDFromCache(token string) int64 {
}
return t
}
// CountRunningTasksByRunner counts the number of running tasks assigned to a specific runner
func CountRunningTasksByRunner(ctx context.Context, runnerID int64) (int, error) {
count, err := db.GetEngine(ctx).
Where("runner_id = ?", runnerID).
And("status = ?", StatusRunning).
Count(new(ActionTask))
return int(count), err
}
// CountRunningJobsByWorkflowAndRun counts running jobs for a specific workflow/run combo
// Used to enforce max-parallel limits on matrix jobs
func CountRunningJobsByWorkflowAndRun(ctx context.Context, runID int64, jobID string) (int, error) {
count, err := db.GetEngine(ctx).
Where("run_id = ?", runID).
And("job_id = ?", jobID).
And("status = ?", StatusRunning).
Count(new(ActionRunJob))
return int(count), err
}

View File

@ -0,0 +1,206 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestCountRunningTasksByRunner(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("NoRunningTasks", func(t *testing.T) {
count, err := CountRunningTasksByRunner(ctx, 999999)
assert.NoError(t, err)
assert.Equal(t, 0, count)
})
t.Run("WithRunningTasks", func(t *testing.T) {
// Create a runner
runner := &ActionRunner{
UUID: "test-runner-tasks",
Name: "Test Runner",
OwnerID: 0,
RepoID: 0,
TokenHash: "test_hash_tasks",
Token: "test_token_tasks",
}
assert.NoError(t, db.Insert(ctx, runner))
// Create running tasks
task1 := &ActionTask{
JobID: 1,
RunnerID: runner.ID,
Status: StatusRunning,
RepoID: 1,
OwnerID: 1,
TokenHash: "task1_hash",
Token: "task1_token",
}
assert.NoError(t, db.Insert(ctx, task1))
task2 := &ActionTask{
JobID: 2,
RunnerID: runner.ID,
Status: StatusRunning,
RepoID: 1,
OwnerID: 1,
TokenHash: "task2_hash",
Token: "task2_token",
}
assert.NoError(t, db.Insert(ctx, task2))
// Count should be 2
count, err := CountRunningTasksByRunner(ctx, runner.ID)
assert.NoError(t, err)
assert.Equal(t, 2, count)
})
t.Run("MixedStatusTasks", func(t *testing.T) {
runner := &ActionRunner{
UUID: "test-runner-mixed",
Name: "Mixed Status Runner",
Capacity: 5,
TokenHash: "mixed_runner_hash",
Token: "mixed_runner_token",
}
assert.NoError(t, db.Insert(ctx, runner))
// Create tasks with different statuses
statuses := []Status{StatusRunning, StatusSuccess, StatusRunning, StatusFailure, StatusWaiting}
for i, status := range statuses {
task := &ActionTask{
JobID: int64(100 + i),
RunnerID: runner.ID,
Status: status,
RepoID: 1,
OwnerID: 1,
TokenHash: "mixed_task_hash_" + string(rune('a'+i)),
Token: "mixed_task_token_" + string(rune('a'+i)),
}
assert.NoError(t, db.Insert(ctx, task))
}
// Only 2 running tasks
count, err := CountRunningTasksByRunner(ctx, runner.ID)
assert.NoError(t, err)
assert.Equal(t, 2, count)
})
}
func TestCountRunningJobsByWorkflowAndRun(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("NoRunningJobs", func(t *testing.T) {
count, err := CountRunningJobsByWorkflowAndRun(ctx, 999999, "nonexistent")
assert.NoError(t, err)
assert.Equal(t, 0, count)
})
t.Run("WithRunningJobs", func(t *testing.T) {
runID := int64(1000)
jobID := "test-job"
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 1000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create running jobs
for range 3 {
job := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Test Job",
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, job))
}
count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 3, count)
})
t.Run("DifferentJobIDs", func(t *testing.T) {
runID := int64(2000)
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 2000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs with different job IDs
for i := range 5 {
job := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: "job-" + string(rune('A'+i)),
Name: "Test Job",
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, job))
}
// Count for specific job ID should be 1
count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, "job-A")
assert.NoError(t, err)
assert.Equal(t, 1, count)
})
t.Run("MatrixJobsWithMaxParallel", func(t *testing.T) {
runID := int64(3000)
jobID := "matrix-job"
maxParallel := 2
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 3000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create matrix jobs
jobs := []*ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// Count running jobs - should be 2 (matching max-parallel)
count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 2, count)
assert.Equal(t, maxParallel, count, "Running jobs should equal max-parallel")
})
}

View File

@ -405,6 +405,7 @@ func prepareMigrationTasks() []*migration {
newMigration(328, "Add TokenPermissions column to ActionRunJob", v1_26.AddTokenPermissionsToActionRunJob),
newMigration(329, "Add unique constraint for user badge", v1_26.AddUniqueIndexForUserBadge),
newMigration(330, "Add name column to webhook", v1_26.AddNameToWebhook),
newMigration(331, "Add runner capacity and job max-parallel support", v1_26.AddRunnerCapacityAndJobMaxParallel),
}
return preparedMigrations
}

View File

@ -6,6 +6,7 @@ package actions
import (
"context"
"fmt"
"strconv"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
@ -133,6 +134,14 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
runJob.TokenPermissions = perms
}
// Extract max-parallel from strategy if present
if job.Strategy.MaxParallelString != "" {
if maxParallel, err := strconv.Atoi(job.Strategy.MaxParallelString); err == nil && maxParallel > 0 {
runJob.MaxParallel = maxParallel
}
}
// check job concurrency
if job.RawConcurrency != nil {
rawConcurrency, err := yaml.Marshal(job.RawConcurrency)

View File

@ -0,0 +1,296 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestCreateTaskForRunner_CapacityEnforcement(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
t.Run("RunnerAtCapacity", func(t *testing.T) {
// Create runner with capacity 2
runner := &actions_model.ActionRunner{
UUID: "capacity-test-1",
Name: "Capacity Test Runner",
Capacity: 2,
TokenHash: "capacity_test_hash_1",
Token: "capacity_test_token_1",
}
assert.NoError(t, db.Insert(context.Background(), runner))
// Create 2 running tasks
for i := range 2 {
task := &actions_model.ActionTask{
JobID: int64(1000 + i),
RunnerID: runner.ID,
Status: actions_model.StatusRunning,
RepoID: 1,
OwnerID: 1,
TokenHash: "task_hash_" + string(rune('1'+i)),
Token: "task_token_" + string(rune('1'+i)),
}
assert.NoError(t, db.Insert(context.Background(), task))
}
// Verify runner is at capacity
count, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID)
assert.NoError(t, err)
assert.Equal(t, 2, count)
// Try to create another task - should fail due to capacity
// Note: This would be tested in actual CreateTaskForRunner which checks capacity
// For now, verify the count
assert.Equal(t, runner.Capacity, count, "Runner should be at capacity")
})
t.Run("RunnerBelowCapacity", func(t *testing.T) {
runner := &actions_model.ActionRunner{
UUID: "capacity-test-2",
Name: "Below Capacity Runner",
Capacity: 5,
TokenHash: "capacity_test_hash_2",
Token: "capacity_test_token_2",
}
assert.NoError(t, db.Insert(context.Background(), runner))
// Create 2 running tasks
for i := range 2 {
task := &actions_model.ActionTask{
JobID: int64(2000 + i),
RunnerID: runner.ID,
Status: actions_model.StatusRunning,
RepoID: 1,
OwnerID: 1,
TokenHash: "task_hash_2_" + string(rune('a'+i)),
Token: "task_token_2_" + string(rune('a'+i)),
}
assert.NoError(t, db.Insert(context.Background(), task))
}
count, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID)
assert.NoError(t, err)
assert.Equal(t, 2, count)
assert.Less(t, count, runner.Capacity, "Runner should be below capacity")
})
t.Run("UnlimitedCapacity", func(t *testing.T) {
runner := &actions_model.ActionRunner{
UUID: "capacity-test-3",
Name: "Unlimited Runner",
Capacity: 0, // 0 = unlimited
TokenHash: "capacity_test_hash_3",
Token: "capacity_test_token_3",
}
assert.NoError(t, db.Insert(context.Background(), runner))
// Create many running tasks
for i := range 10 {
task := &actions_model.ActionTask{
JobID: int64(3000 + i),
RunnerID: runner.ID,
Status: actions_model.StatusRunning,
RepoID: 1,
OwnerID: 1,
TokenHash: "task_hash_3_" + string(rune('a'+i)),
Token: "task_token_3_" + string(rune('a'+i)),
}
assert.NoError(t, db.Insert(context.Background(), task))
}
count, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID)
assert.NoError(t, err)
assert.Equal(t, 10, count)
// With capacity 0, there's no limit
})
}
func TestCreateTaskForRunner_MaxParallelEnforcement(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
t.Run("MaxParallelReached", func(t *testing.T) {
runID := int64(10000)
jobID := "max-parallel-job"
maxParallel := 2
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 10000,
Status: actions_model.StatusRunning,
}
assert.NoError(t, db.Insert(context.Background(), run))
// Create waiting jobs
for range 4 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Test Job",
Status: actions_model.StatusWaiting,
MaxParallel: maxParallel,
}
assert.NoError(t, db.Insert(context.Background(), job))
}
// Start 2 jobs (max-parallel limit)
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
assert.Len(t, jobs, 4)
for i := range 2 {
jobs[i].Status = actions_model.StatusRunning
_, err := actions_model.UpdateRunJob(context.Background(), jobs[i], nil, "status")
assert.NoError(t, err)
}
// Verify max-parallel is enforced
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, maxParallel, runningCount)
// Remaining jobs should stay in waiting
remainingJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
waitingCount := 0
for _, job := range remainingJobs {
if job.Status == actions_model.StatusWaiting {
waitingCount++
}
}
assert.Equal(t, 2, waitingCount)
})
t.Run("MaxParallelNotSet", func(t *testing.T) {
runID := int64(20000)
jobID := "no-limit-job"
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 20000,
Status: actions_model.StatusRunning,
}
assert.NoError(t, db.Insert(context.Background(), run))
// Create jobs without max-parallel
for range 5 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Test Job",
Status: actions_model.StatusWaiting,
MaxParallel: 0, // No limit
}
assert.NoError(t, db.Insert(context.Background(), job))
}
// All jobs can run simultaneously
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
for _, job := range jobs {
job.Status = actions_model.StatusRunning
_, err := actions_model.UpdateRunJob(context.Background(), job, nil, "status")
assert.NoError(t, err)
}
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 5, runningCount, "All jobs should be able to run without limit")
})
}
func TestCreateTaskForRunner_CombinedEnforcement(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
t.Run("BothRunnerCapacityAndMaxParallel", func(t *testing.T) {
// Create runner with capacity 3
runner := &actions_model.ActionRunner{
UUID: "combined-test",
Name: "Combined Test Runner",
Capacity: 3,
TokenHash: "combined_test_hash",
Token: "combined_test_token",
}
assert.NoError(t, db.Insert(context.Background(), runner))
runID := int64(30000)
jobID := "combined-job"
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 30000,
Status: actions_model.StatusRunning,
}
assert.NoError(t, db.Insert(context.Background(), run))
// Create jobs with max-parallel 2
for range 5 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Combined Job",
Status: actions_model.StatusWaiting,
MaxParallel: 2,
}
assert.NoError(t, db.Insert(context.Background(), job))
}
// The most restrictive limit should apply
// In this case: max-parallel = 2 (more restrictive than runner capacity = 3)
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
// Simulate starting jobs
for i, job := range jobs[:2] {
job.Status = actions_model.StatusRunning
_, err := actions_model.UpdateRunJob(context.Background(), job, nil, "status")
assert.NoError(t, err)
task := &actions_model.ActionTask{
JobID: job.ID,
RunnerID: runner.ID,
Status: actions_model.StatusRunning,
RepoID: 1,
OwnerID: 1,
TokenHash: "combined_task_hash_" + string(rune('a'+i)),
Token: "combined_task_token_" + string(rune('a'+i)),
}
assert.NoError(t, db.Insert(context.Background(), task))
}
// Verify both limits
runningTasks, err := actions_model.CountRunningTasksByRunner(context.Background(), runner.ID)
assert.NoError(t, err)
assert.Equal(t, 2, runningTasks)
assert.Less(t, runningTasks, runner.Capacity, "Should be under runner capacity")
runningJobs, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 2, runningJobs, "Should respect max-parallel")
})
}

View File

@ -524,6 +524,7 @@ func ToActionRunner(ctx context.Context, runner *actions_model.ActionRunner) *ap
Disabled: runner.IsDisabled,
Ephemeral: runner.Ephemeral,
Labels: labels,
Capacity: runner.Capacity,
}
}

View File

@ -0,0 +1,149 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package integration
import (
"context"
"fmt"
"net/http"
"testing"
actions_model "code.gitea.io/gitea/models/actions"
auth_model "code.gitea.io/gitea/models/auth"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAPIUpdateRunnerCapacity(t *testing.T) {
defer tests.PrepareTestEnv(t)()
ctx := context.Background()
// Clean up existing runners
require.NoError(t, db.DeleteAllRecords("action_runner"))
// Create a test runner
runner := &actions_model.ActionRunner{
UUID: "test-capacity-runner",
Name: "Test Capacity Runner",
OwnerID: 0,
RepoID: 0,
Capacity: 1,
TokenHash: "test-capacity-hash",
Token: "test-capacity-token",
}
require.NoError(t, actions_model.CreateRunner(ctx, runner))
// Load the created runner to get the ID
runner = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunner{UUID: "test-capacity-runner"})
session := loginUser(t, "user1")
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteAdmin)
t.Run("UpdateCapacity", func(t *testing.T) {
req := NewRequestWithJSON(t, "PATCH",
fmt.Sprintf("/api/v1/admin/actions/runners/%d/capacity", runner.ID),
&api.UpdateRunnerCapacityOption{Capacity: 5}).
AddTokenAuth(token)
resp := MakeRequest(t, req, http.StatusOK)
var apiRunner api.ActionRunner
DecodeJSON(t, resp, &apiRunner)
assert.Equal(t, runner.ID, apiRunner.ID)
assert.Equal(t, 5, apiRunner.Capacity)
// Verify in database
updated, err := actions_model.GetRunnerByID(context.Background(), runner.ID)
assert.NoError(t, err)
assert.Equal(t, 5, updated.Capacity)
})
t.Run("UpdateCapacityToZero", func(t *testing.T) {
req := NewRequestWithJSON(t, "PATCH",
fmt.Sprintf("/api/v1/admin/actions/runners/%d/capacity", runner.ID),
&api.UpdateRunnerCapacityOption{Capacity: 0}).
AddTokenAuth(token)
resp := MakeRequest(t, req, http.StatusOK)
var apiRunner api.ActionRunner
DecodeJSON(t, resp, &apiRunner)
assert.Equal(t, 0, apiRunner.Capacity)
})
t.Run("InvalidCapacity", func(t *testing.T) {
req := NewRequestWithJSON(t, "PATCH",
fmt.Sprintf("/api/v1/admin/actions/runners/%d/capacity", runner.ID),
&api.UpdateRunnerCapacityOption{Capacity: -1}).
AddTokenAuth(token)
MakeRequest(t, req, http.StatusBadRequest)
})
t.Run("NonExistentRunner", func(t *testing.T) {
req := NewRequestWithJSON(t, "PATCH",
"/api/v1/admin/actions/runners/999999/capacity",
&api.UpdateRunnerCapacityOption{Capacity: 5}).
AddTokenAuth(token)
MakeRequest(t, req, http.StatusNotFound)
})
t.Run("GetRunnerWithCapacity", func(t *testing.T) {
// First set capacity
runner.Capacity = 7
assert.NoError(t, actions_model.UpdateRunner(context.Background(), runner, "capacity"))
// Get runner
req := NewRequest(t, "GET",
fmt.Sprintf("/api/v1/admin/actions/runners/%d", runner.ID)).
AddTokenAuth(token)
resp := MakeRequest(t, req, http.StatusOK)
var apiRunner api.ActionRunner
DecodeJSON(t, resp, &apiRunner)
assert.Equal(t, runner.ID, apiRunner.ID)
assert.Equal(t, 7, apiRunner.Capacity)
})
t.Run("ListRunnersWithCapacity", func(t *testing.T) {
req := NewRequest(t, "GET", "/api/v1/admin/actions/runners").
AddTokenAuth(token)
resp := MakeRequest(t, req, http.StatusOK)
var response api.ActionRunnersResponse
DecodeJSON(t, resp, &response)
// Find our test runner
found := false
for _, r := range response.Entries {
if r.ID == runner.ID {
found = true
assert.Equal(t, 7, r.Capacity)
break
}
}
assert.True(t, found, "Test runner should be in list")
})
t.Run("UnauthorizedAccess", func(t *testing.T) {
req := NewRequestWithJSON(t, "PATCH",
fmt.Sprintf("/api/v1/admin/actions/runners/%d/capacity", runner.ID),
&api.UpdateRunnerCapacityOption{Capacity: 5})
// No token
MakeRequest(t, req, http.StatusUnauthorized)
})
}