diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 2ee323c687..d48a984eec 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -51,7 +51,7 @@ type ActionRunJob struct { ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress - RawStrategy string // raw strategy from job YAML's "strategy" section (stored before matrix expansion for deferred evaluation) + RawStrategy string `xorm:"TEXT"` // raw strategy from job YAML's "strategy" section (stored before matrix expansion for deferred evaluation) // IsMatrixEvaluated is only valid/needed when this job's RawStrategy is not empty and contains a matrix that depends on job outputs. // If the matrix can't be evaluated yet (e.g. job hasn't completed), this field will be false. diff --git a/models/migrations/v1_26/v326.go b/models/migrations/v1_26/v326.go index 9697c3f5ac..5e4a26c224 100644 --- a/models/migrations/v1_26/v326.go +++ b/models/migrations/v1_26/v326.go @@ -8,7 +8,12 @@ import ( ) func AddMatrixEvaluationColumnsToActionRunJob(x *xorm.Engine) error { - return x.Sync(new(ActionRunJobWithMatrixSupport)) + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreDropIndices: true, + }, new(ActionRunJobWithMatrixSupport)); err != nil { + return err + } + return nil } // ActionRunJobWithMatrixSupport is a temporary struct for migration purposes diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 154829edf4..a36fdeadc4 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -228,7 +228,7 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up } else if n != 1 { return fmt.Errorf("no affected for updating blocked job %v", job.ID) } - log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status) + log.Debug("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status) updatedJobs = append(updatedJobs, job) } } @@ -375,7 +375,12 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model resolveMetrics.matrixReevaluated++ log.Info("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)", id, actionRunJob.JobID, len(newMatrixJobs), duration) - // The new jobs will be picked up in the next resolution iteration + + // Mark the original matrix placeholder job as skipped so it won't be run later. + actionRunJob.Status = actions_model.StatusSkipped + if _, err := db.GetEngine(ctx).ID(actionRunJob.ID).Cols("status").Update(actionRunJob); err != nil { + log.Error("Failed to mark matrix placeholder job %d (JobID: %s) as skipped after re-evaluation: %v", id, actionRunJob.JobID, err) + } continue } diff --git a/services/actions/matrix.go b/services/actions/matrix.go index 5fddd2877a..ef9577dec6 100644 --- a/services/actions/matrix.go +++ b/services/actions/matrix.go @@ -62,8 +62,8 @@ func ExtractRawStrategies(content []byte) (map[string]string, error) { return strategies, nil } -// hasMatrixWithNeeds checks if a job's strategy contains a matrix that depends on job outputs -func hasMatrixWithNeeds(rawStrategy string) bool { +// HasMatrixWithNeeds checks if a job's strategy contains a matrix that depends on job outputs +func HasMatrixWithNeeds(rawStrategy string) bool { if rawStrategy == "" { return false } @@ -95,9 +95,12 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act return nil, nil } - if !hasMatrixWithNeeds(job.RawStrategy) { - // Mark as evaluated since there's no needs-dependent matrix + if !HasMatrixWithNeeds(job.RawStrategy) { + // Mark as evaluated since there's no needs-dependent matrix and persist to DB job.IsMatrixEvaluated = true + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil { + log.Error("Failed to persist is_matrix_evaluated flag for job %d: %v", job.ID, err) + } log.Debug("Matrix re-evaluation skipped for job %d: no needs-dependent matrix found", job.ID) return nil, nil } @@ -172,9 +175,16 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act // so that the jobparser can resolve needs.*.outputs.* expressions workflowYAML, err := constructWorkflowWithNeeds(job, taskNeeds) if err != nil { - log.Error("Failed to construct workflow for job %d (JobID: %s): %v", job.ID, job.JobID, err) + // If we can't construct the workflow, we can't expand the matrix + // Mark as evaluated and skip the job to prevent the placeholder from running unexpanded + job.IsMatrixEvaluated = true + job.Status = actions_model.StatusSkipped + if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); dbErr != nil { + log.Error("Failed to persist is_matrix_evaluated/status flag for job %d after workflow construction failure: %v", job.ID, dbErr) + } + log.Error("Failed to construct workflow for job %d (JobID: %s): %v, marking as skipped", job.ID, job.JobID, err) GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) - return nil, nil + return nil, err } // Parse the constructed workflow with job outputs to expand the matrix @@ -191,8 +201,11 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act if err != nil { // If parsing fails, we can't expand the matrix - // Mark as evaluated and skip + // Mark as evaluated and persist to DB to avoid repeated parse attempts job.IsMatrixEvaluated = true + if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); dbErr != nil { + log.Error("Failed to persist is_matrix_evaluated flag for job %d after parse failure: %v", job.ID, dbErr) + } errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion. Error: %v. RawStrategy: %s", job.ID, job.JobID, err, job.RawStrategy) log.Error("Matrix parse error: %s", errMsg) @@ -201,8 +214,13 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act } if len(jobs) == 0 { + // No jobs generated - mark as evaluated and skip the placeholder job job.IsMatrixEvaluated = true - log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID) + job.Status = actions_model.StatusSkipped + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil { + log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err) + } + log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s), marking as skipped", job.ID, job.JobID) return nil, nil } @@ -250,10 +268,14 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act newJobs = append(newJobs, newJob) } - // If no new jobs were created, mark as evaluated + // If no new jobs were created, mark as evaluated, skip the placeholder job, and persist to DB if len(newJobs) == 0 { job.IsMatrixEvaluated = true - log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d", job.ID, job.JobID, len(jobs)) + job.Status = actions_model.StatusSkipped + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil { + log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err) + } + log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d, marking as skipped", job.ID, job.JobID, len(jobs)) return nil, nil } @@ -270,10 +292,11 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act insertTime := time.Since(insertStartTime) GetMatrixMetrics().RecordInsertTime(insertTime) - // Mark the original job as evaluated + // Mark the original placeholder job as evaluated and skipped so it is never run job.IsMatrixEvaluated = true - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil { - log.Error("Failed to update job %d is_matrix_evaluated flag: %v", job.ID, err) + job.Status = actions_model.StatusSkipped + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil { + log.Error("Failed to update job %d after matrix expansion (is_matrix_evaluated/status): %v", job.ID, err) } totalTime := time.Since(startTime) diff --git a/services/actions/matrix_metrics.go b/services/actions/matrix_metrics.go index 5f7cd5f9d1..fab39f8bd7 100644 --- a/services/actions/matrix_metrics.go +++ b/services/actions/matrix_metrics.go @@ -32,22 +32,17 @@ type MatrixMetrics struct { var ( matrixMetricsInstance *MatrixMetrics - metricsMutex sync.Mutex + metricsOnce sync.Once ) - // GetMatrixMetrics returns the global matrix metrics instance func GetMatrixMetrics() *MatrixMetrics { - if matrixMetricsInstance == nil { - metricsMutex.Lock() - if matrixMetricsInstance == nil { - matrixMetricsInstance = &MatrixMetrics{ - ReevaluationTimes: make([]time.Duration, 0, 1000), - ParseTimes: make([]time.Duration, 0, 1000), - InsertTimes: make([]time.Duration, 0, 1000), - } + metricsOnce.Do(func() { + matrixMetricsInstance = &MatrixMetrics{ + ReevaluationTimes: make([]time.Duration, 0, 1000), + ParseTimes: make([]time.Duration, 0, 1000), + InsertTimes: make([]time.Duration, 0, 1000), } - metricsMutex.Unlock() - } + }) return matrixMetricsInstance } diff --git a/services/actions/matrix_metrics_prometheus.go b/services/actions/matrix_metrics_prometheus.go index f31d538739..eb27db3bb8 100644 --- a/services/actions/matrix_metrics_prometheus.go +++ b/services/actions/matrix_metrics_prometheus.go @@ -30,8 +30,8 @@ type MatrixMetricsCollector struct { } const ( - namespace = "gitea_" - subsystem = "matrix_" + namespace = "gitea" + subsystem = "matrix" ) // newMatrixGauge creates a new Prometheus Gauge with standard matrix metrics naming diff --git a/services/actions/matrix_metrics_test.go b/services/actions/matrix_metrics_test.go index 788a062665..db81000414 100644 --- a/services/actions/matrix_metrics_test.go +++ b/services/actions/matrix_metrics_test.go @@ -73,10 +73,16 @@ func BenchmarkMatrixMetricsCollectorCollect(b *testing.B) { } collector := NewMatrixMetricsCollector() - ch := make(chan prometheus.Metric, 100) b.ResetTimer() for b.Loop() { + // Use a fresh channel each iteration to avoid filling up the buffer + ch := make(chan prometheus.Metric, 100) collector.Collect(ch) + // Drain the channel to prevent blocking + close(ch) + for range ch { + // discard metrics + } } } diff --git a/services/actions/run.go b/services/actions/run.go index b24c3dddc9..e8ffa36ea6 100644 --- a/services/actions/run.go +++ b/services/actions/run.go @@ -9,6 +9,7 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" notify_service "code.gitea.io/gitea/services/notify" @@ -102,7 +103,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar } // Extract raw strategies from the original workflow before parsing - rawStrategies, _ := ExtractRawStrategies(workflowContent) + rawStrategies, err := ExtractRawStrategies(workflowContent) + if err != nil { + log.Warn("Failed to extract raw strategies from workflow: %v", err) + // Continue without raw strategies - jobs will work but dynamic matrix won't be supported + rawStrategies = nil + } runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs)) var hasWaitingJobs bool @@ -131,8 +137,9 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar Status: util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting), } - // Store raw strategy if job has matrix that depends on job outputs - if rawStrategy, exists := rawStrategies[id]; exists && len(needs) > 0 { + // Store raw strategy only if job has matrix that actually depends on job outputs (needs.*.outputs) + // This avoids unnecessary DB storage and later re-evaluation checks for purely static matrices + if rawStrategy, exists := rawStrategies[id]; exists && len(needs) > 0 && HasMatrixWithNeeds(rawStrategy) { runJob.RawStrategy = rawStrategy runJob.IsMatrixEvaluated = false } diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index 62eb2991b6..fc7b84299e 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -12,7 +12,6 @@ import ( auth_model "code.gitea.io/gitea/models/auth" "code.gitea.io/gitea/modules/setting" - "github.com/stretchr/testify/require" pingv1 "code.gitea.io/actions-proto-go/ping/v1" "code.gitea.io/actions-proto-go/ping/v1/pingv1connect" @@ -20,6 +19,7 @@ import ( "code.gitea.io/actions-proto-go/runner/v1/runnerv1connect" "connectrpc.com/connect" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/timestamppb" )