mirror of
https://github.com/go-gitea/gitea.git
synced 2026-05-24 21:16:30 +02:00
feat: Update the conditions of the actions to continue the tasks executions
Signed-off-by: Pascal Zimmermann <pascal.zimmermann@theiotstudio.com>
This commit is contained in:
parent
2a4558da37
commit
63be2b1103
240
models/actions/max_parallel_simple_test.go
Normal file
240
models/actions/max_parallel_simple_test.go
Normal file
@ -0,0 +1,240 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"code.gitea.io/gitea/models/db"
|
||||
"code.gitea.io/gitea/models/unittest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TestMaxParallelOne_SimpleSequence tests the core issue: max-parallel=1 execution
|
||||
func TestMaxParallelOne_SimpleSequence(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := context.Background()
|
||||
|
||||
runID := int64(11000)
|
||||
jobID := "simple-sequence-job"
|
||||
maxParallel := 1
|
||||
|
||||
// Create ActionRun
|
||||
run := &ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 11000,
|
||||
Status: StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
// Create jobs
|
||||
job1 := &ActionRunJob{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Job 1",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
}
|
||||
job2 := &ActionRunJob{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Job 2",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
}
|
||||
job3 := &ActionRunJob{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Job 3",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
}
|
||||
|
||||
assert.NoError(t, db.Insert(ctx, job1))
|
||||
assert.NoError(t, db.Insert(ctx, job2))
|
||||
assert.NoError(t, db.Insert(ctx, job3))
|
||||
|
||||
// TEST 1: Initially, 0 running
|
||||
running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, running, "Should have 0 jobs running initially")
|
||||
|
||||
// TEST 2: Job1 starts
|
||||
job1.Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, job1, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, running, "Should have 1 job running after job1 starts")
|
||||
|
||||
// TEST 3: Job1 completes
|
||||
job1.Status = StatusSuccess
|
||||
_, err = UpdateRunJob(ctx, job1, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, running, "Should have 0 jobs running after job1 completes - THIS IS THE CRITICAL TEST")
|
||||
|
||||
// TEST 4: Job2 should now be able to start
|
||||
job2.Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, job2, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, running, "Should have 1 job running after job2 starts - IF THIS FAILS, THE BUG IS NOT FIXED")
|
||||
|
||||
// TEST 5: Job2 completes
|
||||
job2.Status = StatusSuccess
|
||||
_, err = UpdateRunJob(ctx, job2, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, running, "Should have 0 jobs running after job2 completes")
|
||||
|
||||
// TEST 6: Job3 starts
|
||||
job3.Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, job3, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, running, "Should have 1 job running after job3 starts")
|
||||
|
||||
t.Log("✅ All sequential execution tests passed!")
|
||||
}
|
||||
|
||||
// TestMaxParallelOne_FreshJobFetch tests the fresh job fetch mechanism
|
||||
func TestMaxParallelOne_FreshJobFetch(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := context.Background()
|
||||
|
||||
runID := int64(12000)
|
||||
jobID := "fresh-fetch-job"
|
||||
|
||||
// Create ActionRun
|
||||
run := &ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 12000,
|
||||
Status: StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
// Create a job
|
||||
job := &ActionRunJob{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Fresh Fetch Test Job",
|
||||
Status: StatusWaiting,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, job))
|
||||
|
||||
// Fetch fresh copy (simulating CreateTaskForRunner behavior)
|
||||
freshJob, err := GetRunJobByID(ctx, job.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, freshJob)
|
||||
assert.Equal(t, StatusWaiting, freshJob.Status, "Fresh job should have WAITING status")
|
||||
assert.Equal(t, int64(0), freshJob.TaskID, "Fresh job should have TaskID=0")
|
||||
|
||||
// Update original job to RUNNING
|
||||
job.Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, job, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Fetch fresh copy again - should reflect the update
|
||||
freshJob2, err := GetRunJobByID(ctx, job.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, freshJob2)
|
||||
assert.Equal(t, StatusRunning, freshJob2.Status, "Fresh job should now have RUNNING status")
|
||||
|
||||
t.Log("✅ Fresh job fetch mechanism works correctly!")
|
||||
}
|
||||
|
||||
// TestCountRunningJobs tests the CountRunningJobsByWorkflowAndRun function
|
||||
func TestCountRunningJobs(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := context.Background()
|
||||
|
||||
runID := int64(13000)
|
||||
jobID := "count-jobs"
|
||||
|
||||
// Create ActionRun
|
||||
run := &ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 13000,
|
||||
Status: StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
// Create 5 jobs
|
||||
for i := 1; i <= 5; i++ {
|
||||
job := &ActionRunJob{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Job " + string(rune(i)),
|
||||
Status: StatusWaiting,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, job))
|
||||
}
|
||||
|
||||
// Get all jobs
|
||||
jobs, err := GetRunJobsByRunID(ctx, runID)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, jobs, 5)
|
||||
|
||||
// Initially 0 running
|
||||
running, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, running)
|
||||
|
||||
// Set job 0 to RUNNING
|
||||
jobs[0].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, jobs[0], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, running)
|
||||
|
||||
// Set job 1 to RUNNING
|
||||
jobs[1].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, jobs[1], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, running)
|
||||
|
||||
// Set job 0 to SUCCESS (not counted as RUNNING anymore)
|
||||
jobs[0].Status = StatusSuccess
|
||||
_, err = UpdateRunJob(ctx, jobs[0], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
running, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, running)
|
||||
|
||||
t.Log("✅ Count running jobs works correctly!")
|
||||
}
|
||||
@ -140,4 +140,116 @@ func TestActionRunJob_MaxParallelEnforcement(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, maxParallel, runningCount)
|
||||
})
|
||||
|
||||
t.Run("MaxParallelOne_SequentialExecution", func(t *testing.T) {
|
||||
runID := int64(6000)
|
||||
jobID := "sequential-job"
|
||||
maxParallel := 1
|
||||
|
||||
// Create ActionRun first
|
||||
run := &ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 6000,
|
||||
Status: StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
// Create jobs simulating sequential execution with max-parallel=1
|
||||
jobs := []*ActionRunJob{
|
||||
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel},
|
||||
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel},
|
||||
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 3", Status: StatusWaiting, MaxParallel: maxParallel},
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
assert.NoError(t, db.Insert(ctx, job))
|
||||
}
|
||||
|
||||
// Verify initial running count is 1
|
||||
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1")
|
||||
|
||||
// Complete first job
|
||||
jobs[0].Status = StatusSuccess
|
||||
_, err = UpdateRunJob(ctx, jobs[0], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Now running count should be 0
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes")
|
||||
|
||||
// Second job can now start
|
||||
jobs[1].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, jobs[1], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Running count should be 1 again
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts")
|
||||
|
||||
// Complete second job
|
||||
jobs[1].Status = StatusSuccess
|
||||
_, err = UpdateRunJob(ctx, jobs[1], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Third job can now start
|
||||
jobs[2].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, jobs[2], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Running count should still be 1
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after third job starts")
|
||||
})
|
||||
|
||||
t.Run("MaxParallelOne_WithFailure", func(t *testing.T) {
|
||||
runID := int64(7000)
|
||||
jobID := "sequential-with-failure-job"
|
||||
maxParallel := 1
|
||||
|
||||
// Create ActionRun first
|
||||
run := &ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 7000,
|
||||
Status: StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
// Create jobs
|
||||
jobs := []*ActionRunJob{
|
||||
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 1", Status: StatusRunning, MaxParallel: maxParallel},
|
||||
{RunID: runID, RepoID: 1, OwnerID: 1, JobID: jobID, Name: "Job 2", Status: StatusWaiting, MaxParallel: maxParallel},
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
assert.NoError(t, db.Insert(ctx, job))
|
||||
}
|
||||
|
||||
// First job fails
|
||||
jobs[0].Status = StatusFailure
|
||||
_, err := UpdateRunJob(ctx, jobs[0], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify no jobs are running
|
||||
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after job fails")
|
||||
|
||||
// Second job can still start
|
||||
jobs[1].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, jobs[1], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after job starts")
|
||||
})
|
||||
}
|
||||
|
||||
@ -346,9 +346,12 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
|
||||
}
|
||||
|
||||
job.TaskID = task.ID
|
||||
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
|
||||
// Must explicitly specify which columns to update, including status and started
|
||||
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "task_id", "status", "started", "attempt", "updated"); err != nil {
|
||||
return nil, false, err
|
||||
} else if n != 1 {
|
||||
// Another runner may have claimed this job, skip it
|
||||
log.Debug("Job %s (run %d) was claimed by another runner, skipping", job.JobID, job.RunID)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
|
||||
212
models/actions/task_sequential_execution_test.go
Normal file
212
models/actions/task_sequential_execution_test.go
Normal file
@ -0,0 +1,212 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"code.gitea.io/gitea/models/db"
|
||||
"code.gitea.io/gitea/models/unittest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TestTaskCreation_MaxParallel_One tests that tasks are properly sequenced
|
||||
// when max-parallel=1, ensuring no hang after task completion
|
||||
func TestTaskCreation_MaxParallel_One(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("SequentialTaskCreationMaxParallelOne", func(t *testing.T) {
|
||||
runID := int64(8000)
|
||||
jobID := "task-sequential-max-parallel-one"
|
||||
maxParallel := 1
|
||||
|
||||
// Setup: Create ActionRun
|
||||
run := &ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 8000,
|
||||
Status: StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
// Create runner
|
||||
runner := &ActionRunner{
|
||||
ID: 1,
|
||||
UUID: "test-runner-1",
|
||||
Name: "Test Runner",
|
||||
OwnerID: 1,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, runner))
|
||||
|
||||
// Create jobs with max-parallel=1
|
||||
jobs := []*ActionRunJob{
|
||||
{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Sequential Job 1",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
},
|
||||
{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Sequential Job 2",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
},
|
||||
{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Sequential Job 3",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
},
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
assert.NoError(t, db.Insert(ctx, job))
|
||||
}
|
||||
|
||||
// Verify initial state: all jobs are waiting
|
||||
allJobs, err := GetRunJobsByRunID(ctx, runID)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, allJobs, 3)
|
||||
|
||||
// Verify that only 1 job should be able to run at a time with max-parallel=1
|
||||
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, runningCount, "Should have 0 jobs running initially")
|
||||
|
||||
// Simulate starting first job
|
||||
allJobs[0].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, allJobs[0], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1")
|
||||
|
||||
// Complete first job
|
||||
allJobs[0].Status = StatusSuccess
|
||||
_, err = UpdateRunJob(ctx, allJobs[0], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after first job completes")
|
||||
|
||||
// This is the critical test: the second job should be able to start
|
||||
// Previously, the system might hang here
|
||||
allJobs[1].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, allJobs[1], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running after second job starts (critical test)")
|
||||
|
||||
// Complete the second job
|
||||
allJobs[1].Status = StatusSuccess
|
||||
_, err = UpdateRunJob(ctx, allJobs[1], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after second job completes")
|
||||
|
||||
// Third job should also be able to start
|
||||
allJobs[2].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, allJobs[2], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third job")
|
||||
})
|
||||
|
||||
t.Run("MaxParallelConstraintAfterTaskFetch", func(t *testing.T) {
|
||||
runID := int64(9000)
|
||||
jobID := "max-parallel-fetch-job"
|
||||
maxParallel := 1
|
||||
|
||||
// Setup: Create ActionRun
|
||||
run := &ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 9000,
|
||||
Status: StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(ctx, run))
|
||||
|
||||
// Create jobs
|
||||
jobs := []*ActionRunJob{
|
||||
{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Job A",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
},
|
||||
{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Job B",
|
||||
Status: StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
},
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
assert.NoError(t, db.Insert(ctx, job))
|
||||
}
|
||||
|
||||
// Refresh jobs to get IDs
|
||||
allJobs, err := GetRunJobsByRunID(ctx, runID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Start first job
|
||||
allJobs[0].Status = StatusRunning
|
||||
_, err = UpdateRunJob(ctx, allJobs[0], nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify constraint
|
||||
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount)
|
||||
|
||||
// Try to start second job while first is still running
|
||||
// This should not be allowed due to max-parallel=1
|
||||
freshAllJobs, err := GetRunJobsByRunID(ctx, runID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Check if we can determine from the count that the second job should not start
|
||||
for i := 1; i < len(freshAllJobs); i++ {
|
||||
if freshAllJobs[i].Status == StatusWaiting {
|
||||
// Before starting this job, verify max-parallel constraint
|
||||
runningCount, err := CountRunningJobsByWorkflowAndRun(ctx, runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
if runningCount >= maxParallel {
|
||||
// This job should wait
|
||||
assert.Equal(t, StatusWaiting, freshAllJobs[i].Status,
|
||||
"Job should remain waiting when max-parallel limit is reached")
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -222,16 +222,67 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
|
||||
for _, job := range jobs {
|
||||
if status, ok := updates[job.ID]; ok {
|
||||
oldStatus := job.Status
|
||||
// IMPORTANT: Even if status hasn't changed, we must reset task_id for WAITING jobs
|
||||
// after a previous task has completed (max-parallel constraint is released)
|
||||
if status == actions_model.StatusWaiting && job.TaskID != 0 {
|
||||
// This job was running but is now ready to be reassigned
|
||||
job.Status = status
|
||||
job.TaskID = 0
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "task_id"); err != nil {
|
||||
return fmt.Errorf("reset task_id for job %d: %w", job.ID, err)
|
||||
}
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
continue
|
||||
}
|
||||
|
||||
if job.Status == status {
|
||||
// Status hasn't changed, skip
|
||||
continue
|
||||
}
|
||||
|
||||
job.Status = status
|
||||
if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
|
||||
|
||||
// For jobs transitioning to WAITING status (can happen after max-parallel constraint is released),
|
||||
// we need to reset task_id to 0 so a new runner can pick them up
|
||||
if status == actions_model.StatusWaiting {
|
||||
job.TaskID = 0
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "task_id"); err != nil {
|
||||
return fmt.Errorf("reset task_id for job %d: %w", job.ID, err)
|
||||
}
|
||||
} else {
|
||||
// For other status changes (BLOCKED, RUNNING, etc.), only update status
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusWaiting}, "status"); err != nil {
|
||||
return fmt.Errorf("update status for job %d: %w", job.ID, err)
|
||||
}
|
||||
}
|
||||
log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status)
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
}
|
||||
}
|
||||
|
||||
// CRITICAL FIX for max-parallel: Even if no jobs were updated above (because they already have task_id=0),
|
||||
// we must notify runners to poll for new tasks when a job completes.
|
||||
// This handles the max-parallel=1 case where jobs were never started (task_id=0)
|
||||
// and won't be processed by jobStatusResolver (which only handles BLOCKED jobs).
|
||||
if len(jobs) > 0 && jobs[0].OwnerID > 0 && jobs[0].RepoID > 0 {
|
||||
// Check if there are any WAITING jobs with task_id=0 (ready to be picked up)
|
||||
hasWaitingJobs := false
|
||||
for _, job := range jobs {
|
||||
if job.Status == actions_model.StatusWaiting && job.TaskID == 0 {
|
||||
hasWaitingJobs = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if hasWaitingJobs {
|
||||
// Notify runners to poll for new tasks
|
||||
if err := actions_model.IncreaseTaskVersion(ctx, jobs[0].OwnerID, jobs[0].RepoID); err != nil {
|
||||
log.Error("Failed to increase task version for repo %d: %v", jobs[0].RepoID, err)
|
||||
} else {
|
||||
log.Debug("Increased task version for repo %d (max-parallel waiting jobs ready)", jobs[0].RepoID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, err
|
||||
|
||||
@ -184,4 +184,142 @@ func TestMaxParallelJobStatusAndCounting(t *testing.T) {
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("MaxParallelOne_TaskCreation", func(t *testing.T) {
|
||||
runID := int64(40000)
|
||||
jobID := "task-creation-sequential-job"
|
||||
maxParallel := 1
|
||||
|
||||
// Create ActionRun first
|
||||
run := &actions_model.ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 40000,
|
||||
Status: actions_model.StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(context.Background(), run))
|
||||
|
||||
// Create waiting jobs with max-parallel=1
|
||||
for i := range 3 {
|
||||
job := &actions_model.ActionRunJob{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Sequential Job " + string(rune(i+1)),
|
||||
Status: actions_model.StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
}
|
||||
assert.NoError(t, db.Insert(context.Background(), job))
|
||||
}
|
||||
|
||||
// Simulate the first job starting
|
||||
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, jobs, 3)
|
||||
|
||||
jobs[0].Status = actions_model.StatusRunning
|
||||
_, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify only one is running
|
||||
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running with max-parallel=1")
|
||||
|
||||
// Complete first job
|
||||
jobs[0].Status = actions_model.StatusSuccess
|
||||
_, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify no jobs are running now
|
||||
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, runningCount, "Should have 0 jobs running after completion with max-parallel=1")
|
||||
|
||||
// Second job can now start
|
||||
jobs[1].Status = actions_model.StatusRunning
|
||||
_, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for second task")
|
||||
|
||||
// Complete second job
|
||||
jobs[1].Status = actions_model.StatusSuccess
|
||||
_, err = actions_model.UpdateRunJob(context.Background(), jobs[1], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Third job can now start
|
||||
jobs[2].Status = actions_model.StatusRunning
|
||||
_, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have exactly 1 job running for third task")
|
||||
})
|
||||
|
||||
t.Run("MaxParallelTwo_CompletionAndStart", func(t *testing.T) {
|
||||
runID := int64(50000)
|
||||
jobID := "completion-start-job"
|
||||
maxParallel := 2
|
||||
|
||||
// Create ActionRun first
|
||||
run := &actions_model.ActionRun{
|
||||
ID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
Index: 50000,
|
||||
Status: actions_model.StatusRunning,
|
||||
}
|
||||
assert.NoError(t, db.Insert(context.Background(), run))
|
||||
|
||||
// Create jobs with max-parallel=2
|
||||
for i := range 5 {
|
||||
job := &actions_model.ActionRunJob{
|
||||
RunID: runID,
|
||||
RepoID: 1,
|
||||
OwnerID: 1,
|
||||
JobID: jobID,
|
||||
Name: "Parallel Job " + string(rune(i+1)),
|
||||
Status: actions_model.StatusWaiting,
|
||||
MaxParallel: maxParallel,
|
||||
}
|
||||
if i < 2 {
|
||||
job.Status = actions_model.StatusRunning
|
||||
}
|
||||
assert.NoError(t, db.Insert(context.Background(), job))
|
||||
}
|
||||
|
||||
// Verify 2 jobs are running
|
||||
runningCount, err := actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, runningCount)
|
||||
|
||||
// Complete one job
|
||||
jobs, err := actions_model.GetRunJobsByRunID(context.Background(), runID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
jobs[0].Status = actions_model.StatusSuccess
|
||||
_, err = actions_model.UpdateRunJob(context.Background(), jobs[0], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Should have 1 running
|
||||
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, runningCount, "Should have 1 job running after one completes")
|
||||
|
||||
// Now another job can start
|
||||
jobs[2].Status = actions_model.StatusRunning
|
||||
_, err = actions_model.UpdateRunJob(context.Background(), jobs[2], nil, "status")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Back to 2 running
|
||||
runningCount, err = actions_model.CountRunningJobsByWorkflowAndRun(context.Background(), runID, jobID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, runningCount, "Should have 2 jobs running after new job starts")
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user