From 5d83d729b4205535cee3ad375ce36f82539e3e29 Mon Sep 17 00:00:00 2001 From: Pascal Zimmermann Date: Sun, 29 Mar 2026 16:27:54 +0200 Subject: [PATCH] Revert "feat: Add support for dynamic matrix strategies with job outputs" This reverts commit e6a0b07713097f71d6330ceeb51a60b63d6b1d4c. --- routers/web/web.go | 3 - services/actions/job_emitter.go | 142 +--------- services/actions/matrix.go | 268 ------------------ services/actions/matrix_metrics.go | 173 ----------- services/actions/matrix_metrics_prometheus.go | 191 ------------- services/actions/matrix_metrics_test.go | 82 ------ tests/integration/actions_job_test.go | 94 ------ 7 files changed, 7 insertions(+), 946 deletions(-) delete mode 100644 services/actions/matrix.go delete mode 100644 services/actions/matrix_metrics.go delete mode 100644 services/actions/matrix_metrics_prometheus.go delete mode 100644 services/actions/matrix_metrics_test.go diff --git a/routers/web/web.go b/routers/web/web.go index 40da3b13f4..e3dcf27cc4 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -41,7 +41,6 @@ import ( "code.gitea.io/gitea/routers/web/user" user_setting "code.gitea.io/gitea/routers/web/user/setting" "code.gitea.io/gitea/routers/web/user/setting/security" - actions_service "code.gitea.io/gitea/services/actions" auth_service "code.gitea.io/gitea/services/auth" "code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/forms" @@ -287,8 +286,6 @@ func Routes() *web.Router { if setting.Metrics.Enabled { prometheus.MustRegister(metrics.NewCollector()) - // Register matrix re-evaluation metrics - prometheus.MustRegister(actions_service.NewMatrixMetricsCollector()) routes.Get("/metrics", append(mid, Metrics)...) } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 8c1e5fc422..20a4f81eab 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "time" actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" @@ -203,9 +202,6 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up if err != nil { return nil, nil, err } - - log.Debug("Checking %d jobs for run %d (status: %s)", len(jobs), run.ID, run.Status) - vars, err := actions_model.GetVariablesOfRun(ctx, run) if err != nil { return nil, nil, err @@ -217,91 +213,22 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up } updates := newJobStatusResolver(jobs, vars).Resolve(ctx) - log.Debug("Job status resolver returned %d job status updates for run %d", len(updates), run.ID) - for _, job := range jobs { if status, ok := updates[job.ID]; ok { - oldStatus := job.Status - // IMPORTANT: Even if status hasn't changed, we must reset task_id for WAITING jobs - // after a previous task has completed (max-parallel constraint is released) - if status == actions_model.StatusWaiting && job.TaskID != 0 { - // This job was running but is now ready to be reassigned - job.Status = status - job.TaskID = 0 - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "task_id"); err != nil { - return fmt.Errorf("reset task_id for job %d: %w", job.ID, err) - } - updatedJobs = append(updatedJobs, job) - continue - } - - if job.Status == status { - // Status hasn't changed, skip - continue - } - job.Status = status - - // For jobs transitioning to WAITING status (can happen after max-parallel constraint is released), - // we need to reset task_id to 0 so a new runner can pick them up - if status == actions_model.StatusWaiting { - job.TaskID = 0 - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "task_id"); err != nil { - return fmt.Errorf("reset task_id for job %d: %w", job.ID, err) - } - } else { - // For other status changes (BLOCKED, RUNNING, etc.), only update status - if _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusWaiting}, "status"); err != nil { - return fmt.Errorf("update status for job %d: %w", job.ID, err) - } + if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { + return err + } else if n != 1 { + return fmt.Errorf("no affected for updating blocked job %v", job.ID) } - log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status) updatedJobs = append(updatedJobs, job) } } - - // CRITICAL FIX for max-parallel: Even if no jobs were updated above (because they already have task_id=0), - // we must notify runners to poll for new tasks when a job completes. - // This handles the max-parallel=1 case where jobs were never started (task_id=0) - // and won't be processed by jobStatusResolver (which only handles BLOCKED jobs). - if len(jobs) > 0 && jobs[0].OwnerID > 0 && jobs[0].RepoID > 0 { - // Check if there are any WAITING jobs with task_id=0 (ready to be picked up) - hasWaitingJobs := false - for _, job := range jobs { - if job.Status == actions_model.StatusWaiting && job.TaskID == 0 { - hasWaitingJobs = true - break - } - } - if hasWaitingJobs { - // Notify runners to poll for new tasks - if err := actions_model.IncreaseTaskVersion(ctx, jobs[0].OwnerID, jobs[0].RepoID); err != nil { - log.Error("Failed to increase task version for repo %d: %v", jobs[0].RepoID, err) - } else { - log.Debug("Increased task version for repo %d (max-parallel waiting jobs ready)", jobs[0].RepoID) - } - } - } - return nil }); err != nil { return nil, nil, err } - // Reload jobs from the database to pick up any newly created matrix jobs - oldJobCount := len(jobs) - jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) - if err != nil { - return nil, nil, err - } - - if len(jobs) > oldJobCount { - log.Info("Matrix re-evaluation created %d new jobs for run %d (was %d, now %d)", - len(jobs)-oldJobCount, run.ID, oldJobCount, len(jobs)) - } - - log.Debug("Job check completed for run %d: %d jobs updated, %d total jobs", run.ID, len(updatedJobs), len(jobs)) - return jobs, updatedJobs, nil } @@ -386,102 +313,47 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} - resolveMetrics := struct { - totalBlocked int - matrixReevaluated int - concurrencyUpdated int - jobsStarted int - jobsSkipped int - }{} - for id, status := range r.statuses { actionRunJob := r.jobMap[id] if status != actions_model.StatusBlocked { continue } - - resolveMetrics.totalBlocked++ - log.Debug("Resolving blocked job %d (JobID: %s, RunID: %d)", id, actionRunJob.JobID, actionRunJob.RunID) - allDone, allSucceed := r.resolveCheckNeeds(id) if !allDone { - log.Debug("Job %d: not all dependencies completed yet", id) continue } - log.Debug("Job %d: all dependencies completed (allSucceed: %v), checking matrix re-evaluation", id, allSucceed) - - // Try to re-evaluate the matrix with job outputs if it depends on them - startTime := time.Now() - newMatrixJobs, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars) - duration := time.Since(startTime).Milliseconds() - - if err != nil { - log.Error("Matrix re-evaluation error for job %d (JobID: %s): %v (duration: %dms)", id, actionRunJob.JobID, err, duration) - continue - } - - // If new matrix jobs were created, add them to the resolver and continue - if len(newMatrixJobs) > 0 { - resolveMetrics.matrixReevaluated++ - log.Info("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)", - id, actionRunJob.JobID, len(newMatrixJobs), duration) - // The new jobs will be picked up in the next resolution iteration - continue - } - - log.Debug("Job %d: no matrix re-evaluation needed or result is empty", id) - // update concurrency and check whether the job can run now - err = updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars) + err := updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars) if err != nil { // The err can be caused by different cases: database error, or syntax error, or the needed jobs haven't completed // At the moment there is no way to distinguish them. // Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)" // TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users - log.Debug("Concurrency evaluation failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err) + log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err) continue } - resolveMetrics.concurrencyUpdated++ - shouldStartJob := true if !allSucceed { // Not all dependent jobs completed successfully: // * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition. // * otherwise, the job should be skipped. shouldStartJob = r.resolveJobHasIfCondition(actionRunJob) - log.Debug("Job %d: not all dependencies succeeded. Has if-condition: %v, should start: %v", id, shouldStartJob, shouldStartJob) } newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped) if newStatus == actions_model.StatusWaiting { newStatus, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob) if err != nil { - log.Error("Concurrency check failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err) + log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err) } } if newStatus != actions_model.StatusBlocked { ret[id] = newStatus - switch newStatus { - case actions_model.StatusWaiting: - resolveMetrics.jobsStarted++ - log.Info("Job %d (JobID: %s) transitioned to StatusWaiting", id, actionRunJob.JobID) - case actions_model.StatusSkipped: - resolveMetrics.jobsSkipped++ - log.Info("Job %d (JobID: %s) transitioned to StatusSkipped", id, actionRunJob.JobID) - } } } - - // Log resolution metrics summary - if resolveMetrics.totalBlocked > 0 { - log.Debug("Job resolution summary: total_blocked=%d, matrix_reevaluated=%d, concurrency_updated=%d, jobs_started=%d, jobs_skipped=%d", - resolveMetrics.totalBlocked, resolveMetrics.matrixReevaluated, resolveMetrics.concurrencyUpdated, - resolveMetrics.jobsStarted, resolveMetrics.jobsSkipped) - } - return ret } diff --git a/services/actions/matrix.go b/services/actions/matrix.go deleted file mode 100644 index 10541f214f..0000000000 --- a/services/actions/matrix.go +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright 2025 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package actions - -import ( - "context" - "fmt" - "maps" - "strings" - "time" - - actions_model "code.gitea.io/gitea/models/actions" - "code.gitea.io/gitea/modules/log" - - "github.com/nektos/act/pkg/jobparser" - "gopkg.in/yaml.v3" -) - -// ExtractRawStrategies extracts strategy definitions from the raw workflow content -// Returns a map of jobID to strategy YAML for jobs that have matrix dependencies -func ExtractRawStrategies(content []byte) (map[string]string, error) { - var workflowDef struct { - Jobs map[string]struct { - Strategy any `yaml:"strategy"` - Needs any `yaml:"needs"` - } `yaml:"jobs"` - } - - if err := yaml.Unmarshal(content, &workflowDef); err != nil { - return nil, err - } - - strategies := make(map[string]string) - for jobID, jobDef := range workflowDef.Jobs { - if jobDef.Strategy == nil { - continue - } - - // Check if this job has needs (dependencies) - var needsList []string - switch needs := jobDef.Needs.(type) { - case string: - needsList = append(needsList, needs) - case []any: - for _, need := range needs { - if needStr, ok := need.(string); ok { - needsList = append(needsList, needStr) - } - } - } - - // Only store strategy for jobs with dependencies - if len(needsList) > 0 { - if strategyBytes, err := yaml.Marshal(jobDef.Strategy); err == nil { - strategies[jobID] = string(strategyBytes) - } - } - } - - return strategies, nil -} - -// hasMatrixWithNeeds checks if a job's strategy contains a matrix that depends on job outputs -func hasMatrixWithNeeds(rawStrategy string) bool { - if rawStrategy == "" { - return false - } - - var strategy map[string]any - if err := yaml.Unmarshal([]byte(rawStrategy), &strategy); err != nil { - return false - } - - matrix, ok := strategy["matrix"] - if !ok { - return false - } - - // Check if any matrix value contains "needs." reference - matrixStr := fmt.Sprintf("%v", matrix) - return strings.Contains(matrixStr, "needs.") -} - -// ReEvaluateMatrixForJobWithNeeds re-evaluates the matrix strategy of a job using outputs from dependent jobs -// If the matrix depends on job outputs and all dependent jobs are done, it will: -// 1. Evaluate the matrix with the job outputs -// 2. Create new ActionRunJobs for each matrix combination -// 3. Return the newly created jobs -func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.ActionRunJob, vars map[string]string) ([]*actions_model.ActionRunJob, error) { - startTime := time.Now() - - if job.IsMatrixEvaluated || job.RawStrategy == "" { - return nil, nil - } - - if !hasMatrixWithNeeds(job.RawStrategy) { - // Mark as evaluated since there's no needs-dependent matrix - job.IsMatrixEvaluated = true - log.Debug("Matrix re-evaluation skipped for job %d: no needs-dependent matrix found", job.ID) - return nil, nil - } - - log.Debug("Starting matrix re-evaluation for job %d (JobID: %s)", job.ID, job.JobID) - - // Get the outputs from dependent jobs - taskNeeds, err := FindTaskNeeds(ctx, job) - if err != nil { - errMsg := fmt.Sprintf("failed to find task needs for job %d (JobID: %s): %v", job.ID, job.JobID, err) - log.Error("Matrix re-evaluation error: %s", errMsg) - return nil, fmt.Errorf("find task needs: %w", err) - } - - log.Debug("Found %d task needs for job %d (JobID: %s)", len(taskNeeds), job.ID, job.JobID) - - // If any task needs are not done, we can't evaluate yet - pendingNeeds := []string{} - for jobID, taskNeed := range taskNeeds { - if !taskNeed.Result.IsDone() { - pendingNeeds = append(pendingNeeds, fmt.Sprintf("%s(%s)", jobID, taskNeed.Result)) - } - } - if len(pendingNeeds) > 0 { - log.Debug("Matrix re-evaluation deferred for job %d: pending needs: %v", job.ID, pendingNeeds) - GetMatrixMetrics().RecordDeferred() - return nil, nil - } - - // Merge vars with needs outputs - mergedVars := mergeNeedsIntoVars(vars, taskNeeds) - log.Debug("Merged %d variables with needs outputs for job %d", len(mergedVars), job.ID) - - // Load the original run to get workflow context - if job.Run == nil { - if err := job.LoadRun(ctx); err != nil { - errMsg := fmt.Sprintf("failed to load run for job %d (JobID: %s): %v", job.ID, job.JobID, err) - log.Error("Matrix re-evaluation error: %s", errMsg) - return nil, fmt.Errorf("load run: %w", err) - } - } - - // Create the giteaCtx for expression evaluation - giteaCtx := GenerateGiteaContext(job.Run, job) - - // Parse the job payload with merged vars to expand the matrix - // Note: job.WorkflowPayload already contains just this job's definition - parseStartTime := time.Now() - jobs, err := jobparser.Parse( - job.WorkflowPayload, - jobparser.WithVars(mergedVars), - jobparser.WithGitContext(giteaCtx.ToGitHubContext()), - ) - parseTime := time.Since(parseStartTime) - GetMatrixMetrics().RecordParseTime(parseTime) - - if err != nil { - // If parsing fails, we can't expand the matrix - // Mark as evaluated and skip - job.IsMatrixEvaluated = true - errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion. Error: %v. RawStrategy: %s", - job.ID, job.JobID, err, job.RawStrategy) - log.Error("Matrix parse error: %s", errMsg) - GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) - return nil, nil - } - - if len(jobs) == 0 { - job.IsMatrixEvaluated = true - log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID) - return nil, nil - } - - log.Debug("Parsed %d matrix combinations for job %d (JobID: %s)", len(jobs), job.ID, job.JobID) - - // Create new ActionRunJobs for each parsed workflow (each matrix combination) - newJobs := make([]*actions_model.ActionRunJob, 0) - - for i, parsedSingleWorkflow := range jobs { - id, jobDef := parsedSingleWorkflow.Job() - if jobDef == nil { - log.Warn("Skipped nil jobDef at index %d for job %d (JobID: %s)", i, job.ID, job.JobID) - continue - } - - // Skip the original job ID - we only want the matrix-expanded versions - if id == job.JobID { - log.Debug("Skipped original job ID %s in matrix expansion for job %d", id, job.ID) - continue - } - - // Erase needs from the payload before storing - needs := jobDef.Needs() - if err := parsedSingleWorkflow.SetJob(id, jobDef.EraseNeeds()); err != nil { - log.Error("Failed to erase needs from job %s (matrix expansion for job %d): %v", id, job.ID, err) - continue - } - - payload, _ := parsedSingleWorkflow.Marshal() - - newJob := &actions_model.ActionRunJob{ - RunID: job.RunID, - RepoID: job.RepoID, - OwnerID: job.OwnerID, - CommitSHA: job.CommitSHA, - IsForkPullRequest: job.IsForkPullRequest, - Name: jobDef.Name, - WorkflowPayload: payload, - JobID: id, - Needs: needs, - RunsOn: jobDef.RunsOn(), - Status: actions_model.StatusBlocked, - } - - newJobs = append(newJobs, newJob) - } - - // If no new jobs were created, mark as evaluated - if len(newJobs) == 0 { - job.IsMatrixEvaluated = true - log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d", job.ID, job.JobID, len(jobs)) - return nil, nil - } - - // Insert the new jobs into database - insertStartTime := time.Now() - if err := actions_model.InsertActionRunJobs(ctx, newJobs); err != nil { - insertTime := time.Since(insertStartTime) - GetMatrixMetrics().RecordInsertTime(insertTime) - errMsg := fmt.Sprintf("failed to insert %d new matrix jobs for job %d (JobID: %s): %v", len(newJobs), job.ID, job.JobID, err) - log.Error("Matrix insertion error: %s", errMsg) - GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) - return nil, fmt.Errorf("insert new jobs: %w", err) - } - insertTime := time.Since(insertStartTime) - GetMatrixMetrics().RecordInsertTime(insertTime) - - // Mark the original job as evaluated - job.IsMatrixEvaluated = true - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil { - log.Error("Failed to update job %d is_matrix_evaluated flag: %v", job.ID, err) - } - - totalTime := time.Since(startTime) - GetMatrixMetrics().RecordReevaluation(totalTime, true, int64(len(newJobs))) - - log.Info("Successfully completed matrix re-evaluation for job %d (JobID: %s): created %d new jobs from %d matrix combinations (total: %dms, parse: %dms, insert: %dms)", - job.ID, job.JobID, len(newJobs), len(jobs), totalTime.Milliseconds(), parseTime.Milliseconds(), insertTime.Milliseconds()) - - return newJobs, nil -} - -// mergeNeedsIntoVars converts task needs outputs into variables for expression evaluation -func mergeNeedsIntoVars(baseVars map[string]string, taskNeeds map[string]*TaskNeed) map[string]string { - merged := make(map[string]string) - - // Copy base vars - maps.Copy(merged, baseVars) - - // Add needs outputs as variables in format: needs..outputs. - for jobID, taskNeed := range taskNeeds { - for outputKey, outputValue := range taskNeed.Outputs { - key := fmt.Sprintf("needs.%s.outputs.%s", jobID, outputKey) - merged[key] = outputValue - } - } - - return merged -} diff --git a/services/actions/matrix_metrics.go b/services/actions/matrix_metrics.go deleted file mode 100644 index cc232c0b6c..0000000000 --- a/services/actions/matrix_metrics.go +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2025 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package actions - -import ( - "sync" - "time" -) - -// MatrixMetrics tracks performance metrics for matrix re-evaluation operations -type MatrixMetrics struct { - mu sync.RWMutex - - // Counters - TotalReevaluations int64 - SuccessfulReevaluations int64 - FailedReevaluations int64 - JobsCreatedTotal int64 - DeferredReevaluations int64 - - // Timing - TotalReevaluationTime time.Duration - TotalParseTime time.Duration - TotalInsertTime time.Duration - - // Histograms (for detailed analysis) - ReevaluationTimes []time.Duration - ParseTimes []time.Duration - InsertTimes []time.Duration -} - -var ( - matrixMetricsInstance *MatrixMetrics - metricsMutex sync.Mutex -) - -// GetMatrixMetrics returns the global matrix metrics instance -func GetMatrixMetrics() *MatrixMetrics { - if matrixMetricsInstance == nil { - metricsMutex.Lock() - if matrixMetricsInstance == nil { - matrixMetricsInstance = &MatrixMetrics{ - ReevaluationTimes: make([]time.Duration, 0, 1000), - ParseTimes: make([]time.Duration, 0, 1000), - InsertTimes: make([]time.Duration, 0, 1000), - } - } - metricsMutex.Unlock() - } - return matrixMetricsInstance -} - -// RecordReevaluation records a matrix re-evaluation attempt -func (m *MatrixMetrics) RecordReevaluation(duration time.Duration, success bool, jobsCreated int64) { - m.mu.Lock() - defer m.mu.Unlock() - - m.TotalReevaluations++ - m.TotalReevaluationTime += duration - - if success { - m.SuccessfulReevaluations++ - m.JobsCreatedTotal += jobsCreated - } else { - m.FailedReevaluations++ - } - - // Keep a limited history for detailed analysis (keep last 1000) - if len(m.ReevaluationTimes) < 1000 { - m.ReevaluationTimes = append(m.ReevaluationTimes, duration) - } else { - // Shift and add new value - copy(m.ReevaluationTimes, m.ReevaluationTimes[1:]) - m.ReevaluationTimes[len(m.ReevaluationTimes)-1] = duration - } -} - -// RecordDeferred records a deferred matrix re-evaluation -func (m *MatrixMetrics) RecordDeferred() { - m.mu.Lock() - defer m.mu.Unlock() - m.DeferredReevaluations++ -} - -// RecordParseTime records the time taken to parse a workflow -func (m *MatrixMetrics) RecordParseTime(duration time.Duration) { - m.mu.Lock() - defer m.mu.Unlock() - - m.TotalParseTime += duration - - if len(m.ParseTimes) < 1000 { - m.ParseTimes = append(m.ParseTimes, duration) - } else { - copy(m.ParseTimes, m.ParseTimes[1:]) - m.ParseTimes[len(m.ParseTimes)-1] = duration - } -} - -// RecordInsertTime records the time taken to insert matrix jobs -func (m *MatrixMetrics) RecordInsertTime(duration time.Duration) { - m.mu.Lock() - defer m.mu.Unlock() - - m.TotalInsertTime += duration - - if len(m.InsertTimes) < 1000 { - m.InsertTimes = append(m.InsertTimes, duration) - } else { - copy(m.InsertTimes, m.InsertTimes[1:]) - m.InsertTimes[len(m.InsertTimes)-1] = duration - } -} - -// GetStats returns a snapshot of the current metrics -func (m *MatrixMetrics) GetStats() map[string]any { - m.mu.RLock() - defer m.mu.RUnlock() - - avgReevaluationTime := time.Duration(0) - if m.TotalReevaluations > 0 { - avgReevaluationTime = m.TotalReevaluationTime / time.Duration(m.TotalReevaluations) - } - - avgParseTime := time.Duration(0) - if len(m.ParseTimes) > 0 { - avgParseTime = m.TotalParseTime / time.Duration(len(m.ParseTimes)) - } - - avgInsertTime := time.Duration(0) - if len(m.InsertTimes) > 0 { - avgInsertTime = m.TotalInsertTime / time.Duration(len(m.InsertTimes)) - } - - successRate := 0.0 - if m.TotalReevaluations > 0 { - successRate = float64(m.SuccessfulReevaluations) / float64(m.TotalReevaluations) * 100 - } - - return map[string]any{ - "total_reevaluations": m.TotalReevaluations, - "successful_reevaluations": m.SuccessfulReevaluations, - "failed_reevaluations": m.FailedReevaluations, - "deferred_reevaluations": m.DeferredReevaluations, - "success_rate_percent": successRate, - "total_jobs_created": m.JobsCreatedTotal, - "total_reevaluation_time_ms": m.TotalReevaluationTime.Milliseconds(), - "avg_reevaluation_time_ms": avgReevaluationTime.Milliseconds(), - "total_parse_time_ms": m.TotalParseTime.Milliseconds(), - "avg_parse_time_ms": avgParseTime.Milliseconds(), - "total_insert_time_ms": m.TotalInsertTime.Milliseconds(), - "avg_insert_time_ms": avgInsertTime.Milliseconds(), - } -} - -// Reset clears all metrics -func (m *MatrixMetrics) Reset() { - m.mu.Lock() - defer m.mu.Unlock() - - m.TotalReevaluations = 0 - m.SuccessfulReevaluations = 0 - m.FailedReevaluations = 0 - m.JobsCreatedTotal = 0 - m.DeferredReevaluations = 0 - m.TotalReevaluationTime = 0 - m.TotalParseTime = 0 - m.TotalInsertTime = 0 - m.ReevaluationTimes = m.ReevaluationTimes[:0] - m.ParseTimes = m.ParseTimes[:0] - m.InsertTimes = m.InsertTimes[:0] -} diff --git a/services/actions/matrix_metrics_prometheus.go b/services/actions/matrix_metrics_prometheus.go deleted file mode 100644 index e780e8f13f..0000000000 --- a/services/actions/matrix_metrics_prometheus.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2025 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package actions - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -// MatrixMetricsCollector implements the prometheus.Collector interface -// and exposes matrix re-evaluation metrics for prometheus -type MatrixMetricsCollector struct { - // Counters - totalReevaluations prometheus.Gauge - successfulReevaluations prometheus.Gauge - failedReevaluations prometheus.Gauge - deferredReevaluations prometheus.Gauge - jobsCreatedTotal prometheus.Gauge - - // Timing (in milliseconds) - totalReevaluationTime prometheus.Gauge - avgReevaluationTime prometheus.Gauge - totalParseTime prometheus.Gauge - avgParseTime prometheus.Gauge - totalInsertTime prometheus.Gauge - avgInsertTime prometheus.Gauge - - // Rates - successRate prometheus.Gauge -} - -const ( - namespace = "gitea_" - subsystem = "matrix_" -) - -// NewMatrixMetricsCollector creates a new MatrixMetricsCollector -func NewMatrixMetricsCollector() *MatrixMetricsCollector { - return &MatrixMetricsCollector{ - totalReevaluations: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "total_reevaluations", - Help: "Total number of matrix re-evaluation attempts", - }, - ), - successfulReevaluations: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "successful_reevaluations", - Help: "Number of successful matrix re-evaluations", - }, - ), - failedReevaluations: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "failed_reevaluations", - Help: "Number of failed matrix re-evaluations", - }, - ), - deferredReevaluations: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "deferred_reevaluations", - Help: "Number of deferred matrix re-evaluations (waiting for dependencies)", - }, - ), - jobsCreatedTotal: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "jobs_created_total", - Help: "Total number of jobs created from matrix expansion", - }, - ), - totalReevaluationTime: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "total_reevaluation_time_ms", - Help: "Total time spent on matrix re-evaluations in milliseconds", - }, - ), - avgReevaluationTime: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "avg_reevaluation_time_ms", - Help: "Average time per matrix re-evaluation in milliseconds", - }, - ), - totalParseTime: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "total_parse_time_ms", - Help: "Total time spent parsing workflow payloads in milliseconds", - }, - ), - avgParseTime: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "avg_parse_time_ms", - Help: "Average time per workflow parse in milliseconds", - }, - ), - totalInsertTime: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "total_insert_time_ms", - Help: "Total time spent inserting jobs into database in milliseconds", - }, - ), - avgInsertTime: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "avg_insert_time_ms", - Help: "Average time per database insert in milliseconds", - }, - ), - successRate: prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "success_rate_percent", - Help: "Success rate of matrix re-evaluations as percentage (0-100)", - }, - ), - } -} - -// Describe returns the metrics descriptions -func (c *MatrixMetricsCollector) Describe(ch chan<- *prometheus.Desc) { - c.totalReevaluations.Describe(ch) - c.successfulReevaluations.Describe(ch) - c.failedReevaluations.Describe(ch) - c.deferredReevaluations.Describe(ch) - c.jobsCreatedTotal.Describe(ch) - c.totalReevaluationTime.Describe(ch) - c.avgReevaluationTime.Describe(ch) - c.totalParseTime.Describe(ch) - c.avgParseTime.Describe(ch) - c.totalInsertTime.Describe(ch) - c.avgInsertTime.Describe(ch) - c.successRate.Describe(ch) -} - -// Collect collects the current metric values and sends them to the channel -func (c *MatrixMetricsCollector) Collect(ch chan<- prometheus.Metric) { - metrics := GetMatrixMetrics() - stats := metrics.GetStats() - - // Set counter values - c.totalReevaluations.Set(float64(stats["total_reevaluations"].(int64))) - c.successfulReevaluations.Set(float64(stats["successful_reevaluations"].(int64))) - c.failedReevaluations.Set(float64(stats["failed_reevaluations"].(int64))) - c.deferredReevaluations.Set(float64(stats["deferred_reevaluations"].(int64))) - c.jobsCreatedTotal.Set(float64(stats["total_jobs_created"].(int64))) - - // Set timing values (already in milliseconds) - c.totalReevaluationTime.Set(float64(stats["total_reevaluation_time_ms"].(int64))) - c.avgReevaluationTime.Set(float64(stats["avg_reevaluation_time_ms"].(int64))) - c.totalParseTime.Set(float64(stats["total_parse_time_ms"].(int64))) - c.avgParseTime.Set(float64(stats["avg_parse_time_ms"].(int64))) - c.totalInsertTime.Set(float64(stats["total_insert_time_ms"].(int64))) - c.avgInsertTime.Set(float64(stats["avg_insert_time_ms"].(int64))) - - // Set success rate - c.successRate.Set(stats["success_rate_percent"].(float64)) - - // Collect all metrics - c.totalReevaluations.Collect(ch) - c.successfulReevaluations.Collect(ch) - c.failedReevaluations.Collect(ch) - c.deferredReevaluations.Collect(ch) - c.jobsCreatedTotal.Collect(ch) - c.totalReevaluationTime.Collect(ch) - c.avgReevaluationTime.Collect(ch) - c.totalParseTime.Collect(ch) - c.avgParseTime.Collect(ch) - c.totalInsertTime.Collect(ch) - c.avgInsertTime.Collect(ch) - c.successRate.Collect(ch) -} diff --git a/services/actions/matrix_metrics_test.go b/services/actions/matrix_metrics_test.go deleted file mode 100644 index 54fdf3c0f3..0000000000 --- a/services/actions/matrix_metrics_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2025 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package actions - -import ( - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" -) - -// Essential Prometheus Collector Tests - -func TestNewMatrixMetricsCollector(t *testing.T) { - collector := NewMatrixMetricsCollector() - assert.NotNil(t, collector) - assert.NotNil(t, collector.totalReevaluations) - assert.NotNil(t, collector.successRate) -} - -func TestMatrixMetricsCollectorDescribe(t *testing.T) { - collector := NewMatrixMetricsCollector() - ch := make(chan *prometheus.Desc, 100) - collector.Describe(ch) - assert.NotEmpty(t, ch) -} - -func TestMatrixMetricsCollectorCollect(t *testing.T) { - matrixMetricsInstance = nil - metrics := GetMatrixMetrics() - metrics.RecordReevaluation(10*time.Millisecond, true, 5) - metrics.RecordParseTime(8 * time.Millisecond) - - collector := NewMatrixMetricsCollector() - ch := make(chan prometheus.Metric, 100) - collector.Collect(ch) - assert.NotEmpty(t, ch) - - matrixMetricsInstance = nil -} - -func TestMatrixMetricsGetStats(t *testing.T) { - metrics := &MatrixMetrics{ - ReevaluationTimes: make([]time.Duration, 0, 1000), - ParseTimes: make([]time.Duration, 0, 1000), - InsertTimes: make([]time.Duration, 0, 1000), - } - - metrics.RecordReevaluation(10*time.Millisecond, true, 3) - metrics.RecordReevaluation(15*time.Millisecond, true, 2) - metrics.RecordReevaluation(5*time.Millisecond, false, 0) - - stats := metrics.GetStats() - assert.Equal(t, int64(3), stats["total_reevaluations"]) - assert.Equal(t, int64(2), stats["successful_reevaluations"]) - assert.Equal(t, int64(1), stats["failed_reevaluations"]) - assert.Greater(t, stats["success_rate_percent"].(float64), 60.0) -} - -func BenchmarkMatrixMetricsCollectorCollect(b *testing.B) { - metrics := &MatrixMetrics{ - ReevaluationTimes: make([]time.Duration, 0, 1000), - ParseTimes: make([]time.Duration, 0, 1000), - InsertTimes: make([]time.Duration, 0, 1000), - } - matrixMetricsInstance = metrics - - for range 100 { - metrics.RecordReevaluation(10*time.Millisecond, true, 5) - metrics.RecordParseTime(5 * time.Millisecond) - } - - collector := NewMatrixMetricsCollector() - ch := make(chan prometheus.Metric, 100) - - b.ResetTimer() - for b.Loop() { - collector.Collect(ch) - } -} diff --git a/tests/integration/actions_job_test.go b/tests/integration/actions_job_test.go index d3294fdd11..3da290f1d3 100644 --- a/tests/integration/actions_job_test.go +++ b/tests/integration/actions_job_test.go @@ -759,97 +759,3 @@ func getTaskJobNameByTaskID(t *testing.T, authToken, ownerName, repoName string, } return "" } - -func TestDynamicMatrixFromJobOutputs(t *testing.T) { - testCases := []struct { - treePath string - fileContent string - outcomes map[string]*mockTaskOutcome - }{ - { - treePath: ".gitea/workflows/dynamic-matrix.yml", - fileContent: `name: Dynamic Matrix from Job Outputs -on: - push: - paths: - - '.gitea/workflows/dynamic-matrix.yml' -jobs: - generate: - runs-on: ubuntu-latest - outputs: - matrix: ${{ steps.gen_matrix.outputs.matrix }} - steps: - - name: Generate matrix - id: gen_matrix - run: | - echo "matrix=[1,2,3]" >> "$GITHUB_OUTPUT" - - build: - needs: [generate] - runs-on: ubuntu-latest - strategy: - matrix: - version: ${{ fromJson(needs.generate.outputs.matrix) }} - steps: - - run: echo "Building version ${{ matrix.version }}" -`, - outcomes: map[string]*mockTaskOutcome{ - "generate": { - result: runnerv1.Result_RESULT_SUCCESS, - outputs: map[string]string{ - "matrix": "[1,2,3]", - }, - }, - "build (1)": { - result: runnerv1.Result_RESULT_SUCCESS, - }, - "build (2)": { - result: runnerv1.Result_RESULT_SUCCESS, - }, - "build (3)": { - result: runnerv1.Result_RESULT_SUCCESS, - }, - }, - }, - } - onGiteaRun(t, func(t *testing.T, u *url.URL) { - user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) - session := loginUser(t, user2.Name) - token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) - - apiRepo := createActionsTestRepo(t, token, "actions-dynamic-matrix", false) - runner := newMockRunner() - runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false) - - for _, tc := range testCases { - t.Run("test "+tc.treePath, func(t *testing.T) { - opts := getWorkflowCreateFileOptions(user2, apiRepo.DefaultBranch, "create "+tc.treePath, tc.fileContent) - createWorkflowFile(t, token, user2.Name, apiRepo.Name, tc.treePath, opts) - - // Execute the generate job first - task := runner.fetchTask(t) - jobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, task.Id) - assert.Equal(t, "generate", jobName) - outcome := tc.outcomes[jobName] - assert.NotNil(t, outcome) - runner.execTask(t, task, outcome) - - // Now the build job should be created with matrix expansion from the output - // We expect 3 tasks for build (1), build (2), build (3) - buildTasks := make([]int64, 0) - for range 3 { - buildTask := runner.fetchTask(t) - buildJobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, buildTask.Id) - t.Logf("Fetched task: %s", buildJobName) - assert.Contains(t, []string{"build (1)", "build (2)", "build (3)"}, buildJobName, "Expected a build job with matrix index") - outcome := tc.outcomes[buildJobName] - assert.NotNil(t, outcome) - runner.execTask(t, buildTask, outcome) - buildTasks = append(buildTasks, buildTask.Id) - } - - assert.Len(t, len(buildTasks), 3, "Expected 3 build tasks from dynamic matrix") - }) - } - }) -}