0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-05-23 08:35:34 +02:00

feat: Use the Gitea queue functionality instead of the DB

This commit is contained in:
Pascal Zimmermann 2026-03-31 22:35:37 +02:00
parent 5d83d729b4
commit 5096d6c942
10 changed files with 191 additions and 1127 deletions

View File

@ -1,240 +0,0 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
// TestMaxParallelOne_SimpleSequence tests the core issue: max-parallel=1 execution
func TestMaxParallelOne_SimpleSequence(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
runID := int64(11000)
jobID := "simple-sequence-job"
maxParallel := 1
// Create ActionRun
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 11000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs
job1 := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Job 1",
Status: StatusWaiting,
MaxParallel: maxParallel,
}
job2 := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Job 2",
Status: StatusWaiting,
MaxParallel: maxParallel,
}
job3 := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Job 3",
Status: StatusWaiting,
MaxParallel: maxParallel,
}
assert.NoError(t, db.Insert(ctx, job1))
assert.NoError(t, db.Insert(ctx, job2))
assert.NoError(t, db.Insert(ctx, job3))
// TEST 1: Initially, 0 running
running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, running, "Should have 0 jobs running initially")
// TEST 2: Job1 starts
job1.Status = StatusRunning
_, err = UpdateRunJob(ctx, job1, nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, running, "Should have 1 job running after job1 starts")
// TEST 3: Job1 completes
job1.Status = StatusSuccess
_, err = UpdateRunJob(ctx, job1, nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, running, "Should have 0 jobs running after job1 completes - THIS IS THE CRITICAL TEST")
// TEST 4: Job2 should now be able to start
job2.Status = StatusRunning
_, err = UpdateRunJob(ctx, job2, nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, running, "Should have 1 job running after job2 starts - IF THIS FAILS, THE BUG IS NOT FIXED")
// TEST 5: Job2 completes
job2.Status = StatusSuccess
_, err = UpdateRunJob(ctx, job2, nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, running, "Should have 0 jobs running after job2 completes")
// TEST 6: Job3 starts
job3.Status = StatusRunning
_, err = UpdateRunJob(ctx, job3, nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, running, "Should have 1 job running after job3 starts")
t.Log("✅ All sequential execution tests passed!")
}
// TestMaxParallelOne_FreshJobFetch tests the fresh job fetch mechanism
func TestMaxParallelOne_FreshJobFetch(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
runID := int64(12000)
jobID := "fresh-fetch-job"
// Create ActionRun
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 12000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create a job
job := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Fresh Fetch Test Job",
Status: StatusWaiting,
}
assert.NoError(t, db.Insert(ctx, job))
// Fetch fresh copy (simulating CreateTaskForRunner behavior)
freshJob, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.NotNil(t, freshJob)
assert.Equal(t, StatusWaiting, freshJob.Status, "Fresh job should have WAITING status")
assert.Equal(t, int64(0), freshJob.TaskID, "Fresh job should have TaskID=0")
// Update original job to RUNNING
job.Status = StatusRunning
_, err = UpdateRunJob(ctx, job, nil)
assert.NoError(t, err)
// Fetch fresh copy again - should reflect the update
freshJob2, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.NotNil(t, freshJob2)
assert.Equal(t, StatusRunning, freshJob2.Status, "Fresh job should now have RUNNING status")
t.Log("✅ Fresh job fetch mechanism works correctly!")
}
// TestCountRunningJobs tests the CountRunningJobsByWorkflowAndRun function
func TestCountRunningJobs(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
runID := int64(13000)
jobID := "count-jobs"
// Create ActionRun
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 13000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create 5 jobs
for i := 1; i <= 5; i++ {
job := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Job " + string(rune(i)),
Status: StatusWaiting,
}
assert.NoError(t, db.Insert(ctx, job))
}
// Get all jobs
jobs, err := GetRunJobsByRunID(ctx, runID)
assert.NoError(t, err)
assert.Len(t, jobs, 5)
// Initially 0 running
running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, running)
// Set job 0 to RUNNING
jobs[0].Status = StatusRunning
_, err = UpdateRunJob(ctx, jobs[0], nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, running)
// Set job 1 to RUNNING
jobs[1].Status = StatusRunning
_, err = UpdateRunJob(ctx, jobs[1], nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 2, running)
// Set job 0 to SUCCESS (not counted as RUNNING anymore)
jobs[0].Status = StatusSuccess
_, err = UpdateRunJob(ctx, jobs[0], nil)
assert.NoError(t, err)
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, running)
t.Log("✅ Count running jobs works correctly!")
}

View File

@ -21,7 +21,7 @@ import (
// ActionRunJob represents a job of a run
type ActionRunJob struct {
ID int64
RunID int64 `xorm:"index"`
RunID int64 `xorm:"index index(idx_action_run_job_run_id_job_id)"`
Run *ActionRun `xorm:"-"`
RepoID int64 `xorm:"index(repo_concurrency)"`
Repo *repo_model.Repository `xorm:"-"`
@ -35,7 +35,7 @@ type ActionRunJob struct {
// it should contain exactly one job with global workflow fields for this model
WorkflowPayload []byte
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
JobID string `xorm:"VARCHAR(255) index(idx_action_run_job_run_id_job_id)"` // job id in workflow, not job's id
Needs []string `xorm:"JSON TEXT"`
RunsOn []string `xorm:"JSON TEXT"`
TaskID int64 // the latest task of the job
@ -55,8 +55,9 @@ type ActionRunJob struct {
// Org/repo clamps are enforced when the token is used at runtime.
// It is JSON-encoded repo_model.ActionsTokenPermissions and may be empty if not specified.
TokenPermissions *repo_model.ActionsTokenPermissions `xorm:"JSON TEXT"`
// Matrix job support
MaxParallel int // Max parallel jobs from strategy.max-parallel (0 = unlimited)
// MaxParallel is the max-parallel value from strategy.max-parallel (0 = unlimited).
// All matrix jobs sharing the same RunID+JobID share this value.
MaxParallel int `xorm:"NOT NULL DEFAULT 0"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp

View File

@ -11,245 +11,46 @@ import (
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestActionRunJob_MaxParallel(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
func getRunJobByID(ctx context.Context, t *testing.T, id int64) *ActionRunJob {
t.Helper()
got, exist, err := db.GetByID[ActionRunJob](ctx, id)
require.NoError(t, err)
require.True(t, exist)
return got
}
// TestMaxParallel_FieldPersistence verifies that MaxParallel is stored and retrieved correctly.
func TestMaxParallel_FieldPersistence(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("NoMaxParallel", func(t *testing.T) {
job := &ActionRunJob{
RunID: 1,
RepoID: 1,
OwnerID: 1,
JobID: "test-job-1",
Name: "Test Job",
Status: StatusWaiting,
MaxParallel: 0, // No limit
}
assert.NoError(t, db.Insert(ctx, job))
run := &ActionRun{ID: 100, RepoID: 1, OwnerID: 1, Index: 100, Status: StatusRunning}
require.NoError(t, db.Insert(ctx, run))
retrieved, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.Equal(t, 0, retrieved.MaxParallel)
t.Run("zero value means unlimited", func(t *testing.T) {
job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "no-limit", Name: "No Limit", Status: StatusWaiting, MaxParallel: 0}
require.NoError(t, db.Insert(ctx, job))
got := getRunJobByID(ctx, t, job.ID)
assert.Equal(t, 0, got.MaxParallel)
})
t.Run("WithMaxParallel", func(t *testing.T) {
job := &ActionRunJob{
RunID: 1,
RepoID: 1,
OwnerID: 1,
JobID: "test-job-2",
Name: "Matrix Job",
Status: StatusWaiting,
MaxParallel: 3,
}
assert.NoError(t, db.Insert(ctx, job))
retrieved, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.Equal(t, 3, retrieved.MaxParallel)
t.Run("positive value is persisted", func(t *testing.T) {
job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "with-limit", Name: "With Limit", Status: StatusWaiting, MaxParallel: 3}
require.NoError(t, db.Insert(ctx, job))
got := getRunJobByID(ctx, t, job.ID)
assert.Equal(t, 3, got.MaxParallel)
})
t.Run("UpdateMaxParallel", func(t *testing.T) {
// Create ActionRun first
run := &ActionRun{
ID: 1,
RepoID: 1,
OwnerID: 1,
Status: StatusRunning,
}
// Note: This might fail if run already exists from previous tests, but that's okay
_ = db.Insert(ctx, run)
job := &ActionRunJob{
RunID: 1,
RepoID: 1,
OwnerID: 1,
JobID: "test-job-4",
Name: "Updatable Job",
Status: StatusWaiting,
MaxParallel: 5,
}
assert.NoError(t, db.Insert(ctx, job))
// Update max parallel
t.Run("can be updated via UpdateRunJob", func(t *testing.T) {
job := &ActionRunJob{RunID: 100, RepoID: 1, OwnerID: 1, JobID: "updatable", Name: "Updatable", Status: StatusWaiting, MaxParallel: 5}
require.NoError(t, db.Insert(ctx, job))
job.MaxParallel = 10
_, err := UpdateRunJob(ctx, job, nil, "max_parallel")
assert.NoError(t, err)
retrieved, err := GetRunJobByID(ctx, job.ID)
assert.NoError(t, err)
assert.Equal(t, 10, retrieved.MaxParallel)
})
}
func TestActionRunJob_MaxParallelEnforcement(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("EnforceMaxParallel", func(t *testing.T) {
runID := int64(5000)
jobID := "parallel-enforced-job"
maxParallel := 2
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 5000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs simulating matrix execution
jobs := []*ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// Verify running count
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, maxParallel, runningCount, "Should have exactly max-parallel jobs running")
// Simulate job completion
jobs[0].Status = StatusSuccess
_, err = UpdateRunJob(ctx, jobs[0], nil, "status")
assert.NoError(t, err)
// Now running count should be 1
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount)
// Simulate next job starting
jobs[2].Status = StatusRunning
_, err = UpdateRunJob(ctx, jobs[2], nil, "status")
assert.NoError(t, err)
// Back to max-parallel
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, maxParallel, runningCount)
})
t.Run("MaxParallelOne_SequentialExecution", func(t *testing.T) {
runID := int64(6000)
jobID := "sequential-job"
maxParallel := 1
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 6000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs simulating sequential execution with max-parallel=1
jobs := []*ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// Verify initial running count is 1
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1")
// Complete first job
jobs[0].Status = StatusSuccess
_, err = UpdateRunJob(ctx, jobs[0], nil, "status")
assert.NoError(t, err)
// Now running count should be 0
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes")
// Second job can now start
jobs[1].Status = StatusRunning
_, err = UpdateRunJob(ctx, jobs[1], nil, "status")
assert.NoError(t, err)
// Running count should be 1 again
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts")
// Complete second job
jobs[1].Status = StatusSuccess
_, err = UpdateRunJob(ctx, jobs[1], nil, "status")
assert.NoError(t, err)
// Third job can now start
jobs[2].Status = StatusRunning
_, err = UpdateRunJob(ctx, jobs[2], nil, "status")
assert.NoError(t, err)
// Running count should still be 1
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after third job starts")
})
t.Run("MaxParallelOne_WithFailure", func(t *testing.T) {
runID := int64(7000)
jobID := "sequential-with-failure-job"
maxParallel := 1
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 7000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs
jobs := []*ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// First job fails
jobs[0].Status = StatusFailure
_, err := UpdateRunJob(ctx, jobs[0], nil, "status")
assert.NoError(t, err)
// Verify no jobs are running
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after job fails")
// Second job can still start
jobs[1].Status = StatusRunning
_, err = UpdateRunJob(ctx, jobs[1], nil, "status")
assert.NoError(t, err)
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after job starts")
require.NoError(t, err)
got := getRunJobByID(ctx, t, job.ID)
assert.Equal(t, 10, got.MaxParallel)
})
}

View File

@ -264,25 +264,8 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
continue
}
// Check max-parallel constraint for matrix jobs
// Note: This is a best-effort check with a small race window. Multiple runners
// could simultaneously check the count before any commits, potentially allowing
// the limit to be briefly exceeded (e.g., if limit=2 and count=1, two runners
// might both proceed). Perfect enforcement would require row-level locking across
// all jobs with the same run_id+job_id, which has a significant performance impact.
// The race window is small since jobs are picked and committed quickly.
if v.MaxParallel > 0 {
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, v.RunID, v.JobID)
if err != nil {
log.Error("Failed to count running jobs for max-parallel check: %v", err)
continue
}
if runningCount >= v.MaxParallel {
log.Debug("Job %s (run %d) skipped: %d/%d jobs already running (max-parallel)",
v.JobID, v.RunID, runningCount, v.MaxParallel)
continue
}
}
// max-parallel is enforced at insertion time (InsertRun) and by
// jobStatusResolver, so a Waiting job is guaranteed a free slot.
job = v
break
@ -547,14 +530,3 @@ func getTaskIDFromCache(token string) int64 {
}
return t
}
// CountRunningJobsByWorkflowAndRun counts running jobs for a specific workflow/run combo
// Used to enforce max-parallel limits on matrix jobs
func CountRunningJobsByWorkflowAndRun(ctx context.Context, runID int64, jobID string) (int, error) {
count, err := db.GetEngine(ctx).
Where("run_id = ?", runID).
And("job_id = ?", jobID).
And("status = ?", StatusRunning).
Count(new(ActionRunJob))
return int(count), err
}

View File

@ -1,123 +0,0 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestCountRunningJobsByWorkflowAndRun(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("NoRunningJobs", func(t *testing.T) {
count, err := CountRunningJobsByWorkflowAndRun(ctx, 999999, "nonexistent")
assert.NoError(t, err)
assert.Equal(t, 0, count)
})
t.Run("WithRunningJobs", func(t *testing.T) {
runID := int64(1000)
jobID := "test-job"
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 1000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create running jobs
for range 3 {
job := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Test Job",
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, job))
}
count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 3, count)
})
t.Run("DifferentJobIDs", func(t *testing.T) {
runID := int64(2000)
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 2000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs with different job IDs
for i := range 5 {
job := &ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: "job-" + string(rune('A'+i)),
Name: "Test Job",
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, job))
}
// Count for specific job ID should be 1
count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, "job-A")
assert.NoError(t, err)
assert.Equal(t, 1, count)
})
t.Run("MatrixJobsWithMaxParallel", func(t *testing.T) {
runID := int64(3000)
jobID := "matrix-job"
maxParallel := 2
// Create ActionRun first
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 3000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create matrix jobs
jobs := []*ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 4", Status: StatusWaiting, MaxParallel: maxParallel},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// Count running jobs - should be 2 (matching max-parallel)
count, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 2, count)
assert.Equal(t, maxParallel, count, "Running jobs should equal max-parallel")
})
}

View File

@ -1,212 +0,0 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
// TestTaskCreation_MaxParallel_One tests that tasks are properly sequenced
// when max-parallel=1, ensuring no hang after task completion
func TestTaskCreation_MaxParallel_One(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
ctx := context.Background()
t.Run("SequentialTaskCreationMaxParallelOne", func(t *testing.T) {
runID := int64(8000)
jobID := "task-sequential-max-parallel-one"
maxParallel := 1
// Setup: Create ActionRun
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 8000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create runner
runner := &ActionRunner{
ID: 1,
UUID: "test-runner-1",
Name: "Test Runner",
OwnerID: 1,
}
assert.NoError(t, db.Insert(ctx, runner))
// Create jobs with max-parallel=1
jobs := []*ActionRunJob{
{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Sequential Job 1",
Status: StatusWaiting,
MaxParallel: maxParallel,
},
{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Sequential Job 2",
Status: StatusWaiting,
MaxParallel: maxParallel,
},
{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Sequential Job 3",
Status: StatusWaiting,
MaxParallel: maxParallel,
},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// Verify initial state: all jobs are waiting
allJobs, err := GetRunJobsByRunID(ctx, runID)
assert.NoError(t, err)
assert.Len(t, allJobs, 3)
// Verify that only 1 job should be able to run at a time with max-parallel=1
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, runningCount, "Should have 0 jobs running initially")
// Simulate starting first job
allJobs[0].Status = StatusRunning
_, err = UpdateRunJob(ctx, allJobs[0], nil)
assert.NoError(t, err)
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1")
// Complete first job
allJobs[0].Status = StatusSuccess
_, err = UpdateRunJob(ctx, allJobs[0], nil)
assert.NoError(t, err)
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes")
// This is the critical test: the second job should be able to start
// Previously, the system might hang here
allJobs[1].Status = StatusRunning
_, err = UpdateRunJob(ctx, allJobs[1], nil)
assert.NoError(t, err)
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts (critical test)")
// Complete the second job
allJobs[1].Status = StatusSuccess
_, err = UpdateRunJob(ctx, allJobs[1], nil)
assert.NoError(t, err)
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after second job completes")
// Third job should also be able to start
allJobs[2].Status = StatusRunning
_, err = UpdateRunJob(ctx, allJobs[2], nil)
assert.NoError(t, err)
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third job")
})
t.Run("MaxParallelConstraintAfterTaskFetch", func(t *testing.T) {
runID := int64(9000)
jobID := "max-parallel-fetch-job"
maxParallel := 1
// Setup: Create ActionRun
run := &ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 9000,
Status: StatusRunning,
}
assert.NoError(t, db.Insert(ctx, run))
// Create jobs
jobs := []*ActionRunJob{
{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Job A",
Status: StatusWaiting,
MaxParallel: maxParallel,
},
{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Job B",
Status: StatusWaiting,
MaxParallel: maxParallel,
},
}
for _, job := range jobs {
assert.NoError(t, db.Insert(ctx, job))
}
// Refresh jobs to get IDs
allJobs, err := GetRunJobsByRunID(ctx, runID)
assert.NoError(t, err)
// Start first job
allJobs[0].Status = StatusRunning
_, err = UpdateRunJob(ctx, allJobs[0], nil)
assert.NoError(t, err)
// Verify constraint
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount)
// Try to start second job while first is still running
// This should not be allowed due to max-parallel=1
freshAllJobs, err := GetRunJobsByRunID(ctx, runID)
assert.NoError(t, err)
// Check if we can determine from the count that the second job should not start
for i := 1; i < len(freshAllJobs); i++ {
if freshAllJobs[i].Status == StatusWaiting {
// Before starting this job, verify max-parallel constraint
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
assert.NoError(t, err)
if runningCount >= maxParallel {
// This job should wait
assert.Equal(t, StatusWaiting, freshAllJobs[i].Status,
"Job should remain waiting when max-parallel limit is reached")
}
}
}
})
}

View File

@ -313,6 +313,8 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
ret := map[int64]actions_model.Status{}
// promotedWaitingByJobID counts within-pass promotions to enforce max-parallel.
promotedWaitingByJobID := make(map[string]int)
for id, status := range r.statuses {
actionRunJob := r.jobMap[id]
if status != actions_model.StatusBlocked {
@ -337,7 +339,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
shouldStartJob := true
if !allSucceed {
// Not all dependent jobs completed successfully:
// * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition.
// * if the job has an "if" condition, it can be started; then the act_runner will evaluate the "if" condition.
// * otherwise, the job should be skipped.
shouldStartJob = r.resolveJobHasIfCondition(actionRunJob)
}
@ -350,6 +352,23 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
}
}
// Enforce max-parallel: count occupied slots from the current snapshot
// plus within-pass promotions.
if newStatus == actions_model.StatusWaiting && actionRunJob.MaxParallel > 0 {
occupiedSlots := 0
for otherID, otherStatus := range r.statuses {
otherJob := r.jobMap[otherID]
if otherJob.JobID == actionRunJob.JobID &&
(otherStatus == actions_model.StatusRunning || otherStatus == actions_model.StatusWaiting) {
occupiedSlots++
}
}
if occupiedSlots+promotedWaitingByJobID[actionRunJob.JobID] >= actionRunJob.MaxParallel {
continue // no free slot; leave blocked
}
promotedWaitingByJobID[actionRunJob.JobID]++
}
if newStatus != actions_model.StatusBlocked {
ret[id] = newStatus
}

View File

@ -126,11 +126,60 @@ jobs:
},
want: map[int64]actions_model.Status{2: actions_model.StatusSkipped},
},
{
name: "max-parallel=1 promotes exactly one blocked job when one slot is open",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "build", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 1},
{ID: 2, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
{ID: 3, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
},
want: map[int64]actions_model.Status{},
},
{
name: "max-parallel=1 promotes one job after running job finishes",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "build", Status: actions_model.StatusSuccess, Needs: []string{}, MaxParallel: 1},
{ID: 2, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
{ID: 3, JobID: "build", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 1},
},
want: nil, // map iteration is non-deterministic; checked by count below
},
{
name: "max-parallel=2 does not promote when limit is reached",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2},
{ID: 2, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2},
{ID: 3, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
{ID: 4, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
},
want: map[int64]actions_model.Status{},
},
{
name: "max-parallel=2 promotes one job when one slot opens",
jobs: actions_model.ActionJobList{
{ID: 1, JobID: "test", Status: actions_model.StatusSuccess, Needs: []string{}, MaxParallel: 2},
{ID: 2, JobID: "test", Status: actions_model.StatusRunning, Needs: []string{}, MaxParallel: 2},
{ID: 3, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
{ID: 4, JobID: "test", Status: actions_model.StatusBlocked, Needs: []string{}, MaxParallel: 2},
},
want: nil, // checked by count below
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := newJobStatusResolver(tt.jobs, nil)
assert.Equal(t, tt.want, r.Resolve(t.Context()))
got := r.Resolve(t.Context())
if tt.want == nil {
waitingCount := 0
for _, s := range got {
if s == actions_model.StatusWaiting {
waitingCount++
}
}
assert.Equal(t, 1, waitingCount, "expected exactly 1 job promoted to Waiting, got %v", got)
} else {
assert.Equal(t, tt.want, got)
}
})
}
}

View File

@ -10,8 +10,8 @@ import (
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/actions/jobparser"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"
notify_service "code.gitea.io/gitea/services/notify"
@ -106,6 +106,9 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
var hasWaitingJobs bool
// waitingCountByJobID limits initial Waiting slots per JobID to MaxParallel.
waitingCountByJobID := make(map[string]int)
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
@ -170,6 +173,16 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
}
}
// Enforce max-parallel: excess jobs start as Blocked and are promoted
// by jobStatusResolver when a slot opens.
if runJob.Status == actions_model.StatusWaiting && runJob.MaxParallel > 0 {
if waitingCountByJobID[id] >= runJob.MaxParallel {
runJob.Status = actions_model.StatusBlocked
} else {
waitingCountByJobID[id]++
}
}
hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting
if err := db.Insert(ctx, runJob); err != nil {
return err

View File

@ -12,314 +12,98 @@ import (
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMaxParallelJobStatusAndCounting(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
// countJobsByStatus returns the number of jobs with the given status in allJobs.
func countJobsByStatus(allJobs actions_model.ActionJobList, status actions_model.Status) int {
n := 0
for _, j := range allJobs {
if j.Status == status {
n++
}
}
return n
}
t.Run("MaxParallelReached", func(t *testing.T) {
// TestMaxParallel_ServiceLayer verifies the max-parallel invariant: Running+Waiting <= MaxParallel.
func TestMaxParallel_ServiceLayer(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
t.Run("invariant: Running+Waiting <= MaxParallel", func(t *testing.T) {
runID := int64(10000)
jobID := "max-parallel-job"
jobID := "svc-max-parallel"
maxParallel := 2
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 10000,
Status: actions_model.StatusRunning,
}
assert.NoError(t, db.Insert(context.Background(), run))
run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 10000, Status: actions_model.StatusRunning}
require.NoError(t, db.Insert(context.Background(), run))
// Create waiting jobs
for range 4 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Test Job",
Status: actions_model.StatusWaiting,
MaxParallel: maxParallel,
}
assert.NoError(t, db.Insert(context.Background(), job))
jobs := []*actions_model.ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r1", Status: actions_model.StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "w1", Status: actions_model.StatusWaiting, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "b1", Status: actions_model.StatusBlocked, MaxParallel: maxParallel},
}
for _, j := range jobs {
require.NoError(t, db.Insert(context.Background(), j))
}
// Start 2 jobs (max-parallel limit)
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
assert.Len(t, jobs, 4)
allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
require.NoError(t, err)
for i := range 2 {
jobs[i].Status = actions_model.StatusRunning
_, err := actions_model.UpdateRunJob(context.Background(), jobs[i], nil, "status")
assert.NoError(t, err)
}
running := countJobsByStatus(allJobs, actions_model.StatusRunning)
waiting := countJobsByStatus(allJobs, actions_model.StatusWaiting)
blocked := countJobsByStatus(allJobs, actions_model.StatusBlocked)
// Verify max-parallel is enforced
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, maxParallel, runningCount)
// Remaining jobs should stay in waiting
remainingJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
waitingCount := 0
for _, job := range remainingJobs {
if job.Status == actions_model.StatusWaiting {
waitingCount++
}
}
assert.Equal(t, 2, waitingCount)
assert.LessOrEqual(t, running+waiting, maxParallel)
assert.Equal(t, 1, blocked)
})
t.Run("MaxParallelNotSet", func(t *testing.T) {
t.Run("slot becomes available after completion", func(t *testing.T) {
runID := int64(20000)
jobID := "no-limit-job"
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 20000,
Status: actions_model.StatusRunning,
}
assert.NoError(t, db.Insert(context.Background(), run))
// Create jobs without max-parallel
for range 5 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Test Job",
Status: actions_model.StatusWaiting,
MaxParallel: 0, // No limit
}
assert.NoError(t, db.Insert(context.Background(), job))
}
// All jobs can run simultaneously
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
for _, job := range jobs {
job.Status = actions_model.StatusRunning
_, err := actions_model.UpdateRunJob(context.Background(), job, nil, "status")
assert.NoError(t, err)
}
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 5, runningCount, "All jobs should be able to run without limit")
})
t.Run("MaxParallelWrongValue", func(t *testing.T) {
runID := int64(30000)
jobID := "wrong-value-use-default-value-job"
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 30000,
Status: actions_model.StatusRunning,
}
assert.NoError(t, db.Insert(context.Background(), run))
// Test different invalid max-parallel values
testCases := []struct {
name string
maxParallel int
description string
}{
{
name: "negative value",
maxParallel: -1,
description: "Negative max-parallel should default to 0 (no limit)",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create jobs with the test max-parallel value
for i := range 5 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Test Job " + tc.name,
Status: actions_model.StatusWaiting,
MaxParallel: tc.maxParallel,
}
assert.NoError(t, db.Insert(context.Background(), job))
// Verify the value was stored
if i == 0 {
storedJob, err := actions_model.GetRunJobByID(context.Background(), job.ID)
assert.NoError(t, err)
assert.Equal(t, tc.maxParallel, storedJob.MaxParallel, tc.description)
}
}
// All jobs can run simultaneously when max-parallel <= 0
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
for _, job := range jobs {
job.Status = actions_model.StatusRunning
_, err := actions_model.UpdateRunJob(context.Background(), job, nil, "status")
assert.NoError(t, err)
}
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.GreaterOrEqual(t, runningCount, 5, "All jobs should be able to run when max-parallel is "+tc.name)
})
}
})
t.Run("MaxParallelOne_TaskCreation", func(t *testing.T) {
runID := int64(40000)
jobID := "task-creation-sequential-job"
maxParallel := 1
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 40000,
Status: actions_model.StatusRunning,
}
assert.NoError(t, db.Insert(context.Background(), run))
// Create waiting jobs with max-parallel=1
for i := range 3 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Sequential Job " + string(rune(i+1)),
Status: actions_model.StatusWaiting,
MaxParallel: maxParallel,
}
assert.NoError(t, db.Insert(context.Background(), job))
}
// Simulate the first job starting
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
assert.Len(t, jobs, 3)
jobs[0].Status = actions_model.StatusRunning
_, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
assert.NoError(t, err)
// Verify only one is running
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1")
// Complete first job
jobs[0].Status = actions_model.StatusSuccess
_, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
assert.NoError(t, err)
// Verify no jobs are running now
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after completion with max-parallel=1")
// Second job can now start
jobs[1].Status = actions_model.StatusRunning
_, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status")
assert.NoError(t, err)
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for second task")
// Complete second job
jobs[1].Status = actions_model.StatusSuccess
_, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status")
assert.NoError(t, err)
// Third job can now start
jobs[2].Status = actions_model.StatusRunning
_, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status")
assert.NoError(t, err)
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third task")
})
t.Run("MaxParallelTwo_CompletionAndStart", func(t *testing.T) {
runID := int64(50000)
jobID := "completion-start-job"
jobID := "svc-slot-free"
maxParallel := 2
// Create ActionRun first
run := &actions_model.ActionRun{
ID: runID,
RepoID: 1,
OwnerID: 1,
Index: 50000,
Status: actions_model.StatusRunning,
run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 20000, Status: actions_model.StatusRunning}
require.NoError(t, db.Insert(context.Background(), run))
jobs := []*actions_model.ActionRunJob{
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r1", Status: actions_model.StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "r2", Status: actions_model.StatusRunning, MaxParallel: maxParallel},
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "b1", Status: actions_model.StatusBlocked, MaxParallel: maxParallel},
}
assert.NoError(t, db.Insert(context.Background(), run))
// Create jobs with max-parallel=2
for i := range 5 {
job := &actions_model.ActionRunJob{
RunID: runID,
RepoID: 1,
OwnerID: 1,
JobID: jobID,
Name: "Parallel Job " + string(rune(i+1)),
Status: actions_model.StatusWaiting,
MaxParallel: maxParallel,
}
if i < 2 {
job.Status = actions_model.StatusRunning
}
assert.NoError(t, db.Insert(context.Background(), job))
for _, j := range jobs {
require.NoError(t, db.Insert(context.Background(), j))
}
// Verify 2 jobs are running
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 2, runningCount)
// Complete one job
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
assert.NoError(t, err)
jobs[0].Status = actions_model.StatusSuccess
_, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
assert.NoError(t, err)
_, err := actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
require.NoError(t, err)
// Should have 1 running
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 1, runningCount, "Should have 1 job running after one completes")
allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
require.NoError(t, err)
// Now another job can start
jobs[2].Status = actions_model.StatusRunning
_, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status")
assert.NoError(t, err)
running := countJobsByStatus(allJobs, actions_model.StatusRunning)
assert.Equal(t, 1, running)
assert.Less(t, running, maxParallel)
})
// Back to 2 running
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
assert.NoError(t, err)
assert.Equal(t, 2, runningCount, "Should have 2 jobs running after new job starts")
t.Run("no max-parallel means all jobs start as Waiting", func(t *testing.T) {
runID := int64(30000)
jobID := "svc-no-limit"
run := &actions_model.ActionRun{ID: runID, RepoID: 1, OwnerID: 1, Index: 30000, Status: actions_model.StatusRunning}
require.NoError(t, db.Insert(context.Background(), run))
for range 5 {
require.NoError(t, db.Insert(context.Background(), &actions_model.ActionRunJob{
RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "j",
Status: actions_model.StatusWaiting, MaxParallel: 0,
}))
}
allJobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
require.NoError(t, err)
assert.Equal(t, 5, countJobsByStatus(allJobs, actions_model.StatusWaiting))
assert.Equal(t, 0, countJobsByStatus(allJobs, actions_model.StatusBlocked))
})
}