diff --git a/routers/web/web.go b/routers/web/web.go index e3dcf27cc4..40da3b13f4 100644 --- a/routers/web/web.go +++ b/routers/web/web.go @@ -41,6 +41,7 @@ import ( "code.gitea.io/gitea/routers/web/user" user_setting "code.gitea.io/gitea/routers/web/user/setting" "code.gitea.io/gitea/routers/web/user/setting/security" + actions_service "code.gitea.io/gitea/services/actions" auth_service "code.gitea.io/gitea/services/auth" "code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/forms" @@ -286,6 +287,8 @@ func Routes() *web.Router { if setting.Metrics.Enabled { prometheus.MustRegister(metrics.NewCollector()) + // Register matrix re-evaluation metrics + prometheus.MustRegister(actions_service.NewMatrixMetricsCollector()) routes.Get("/metrics", append(mid, Metrics)...) } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 20a4f81eab..ed6e2fe5aa 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "time" actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" @@ -202,6 +203,9 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up if err != nil { return nil, nil, err } + + log.Debug("Checking %d jobs for run %d (status: %s)", len(jobs), run.ID, run.Status) + vars, err := actions_model.GetVariablesOfRun(ctx, run) if err != nil { return nil, nil, err @@ -213,14 +217,18 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up } updates := newJobStatusResolver(jobs, vars).Resolve(ctx) + log.Debug("Job status resolver returned %d job status updates for run %d", len(updates), run.ID) + for _, job := range jobs { if status, ok := updates[job.ID]; ok { + oldStatus := job.Status job.Status = status if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { return err } else if n != 1 { return fmt.Errorf("no affected for updating blocked job %v", job.ID) } + log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status) updatedJobs = append(updatedJobs, job) } } @@ -229,6 +237,20 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up return nil, nil, err } + // Reload jobs from the database to pick up any newly created matrix jobs + oldJobCount := len(jobs) + jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return nil, nil, err + } + + if len(jobs) > oldJobCount { + log.Info("Matrix re-evaluation created %d new jobs for run %d (was %d, now %d)", + len(jobs)-oldJobCount, run.ID, oldJobCount, len(jobs)) + } + + log.Debug("Job check completed for run %d: %d jobs updated, %d total jobs", run.ID, len(updatedJobs), len(jobs)) + return jobs, updatedJobs, nil } @@ -313,47 +335,102 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} + resolveMetrics := struct { + totalBlocked int + matrixReevaluated int + concurrencyUpdated int + jobsStarted int + jobsSkipped int + }{} + for id, status := range r.statuses { actionRunJob := r.jobMap[id] if status != actions_model.StatusBlocked { continue } + + resolveMetrics.totalBlocked++ + log.Debug("Resolving blocked job %d (JobID: %s, RunID: %d)", id, actionRunJob.JobID, actionRunJob.RunID) + allDone, allSucceed := r.resolveCheckNeeds(id) if !allDone { + log.Debug("Job %d: not all dependencies completed yet", id) continue } + log.Debug("Job %d: all dependencies completed (allSucceed: %v), checking matrix re-evaluation", id, allSucceed) + + // Try to re-evaluate the matrix with job outputs if it depends on them + startTime := time.Now() + newMatrixJobs, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars) + duration := time.Since(startTime).Milliseconds() + + if err != nil { + log.Error("Matrix re-evaluation error for job %d (JobID: %s): %v (duration: %dms)", id, actionRunJob.JobID, err, duration) + continue + } + + // If new matrix jobs were created, add them to the resolver and continue + if len(newMatrixJobs) > 0 { + resolveMetrics.matrixReevaluated++ + log.Info("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)", + id, actionRunJob.JobID, len(newMatrixJobs), duration) + // The new jobs will be picked up in the next resolution iteration + continue + } + + log.Debug("Job %d: no matrix re-evaluation needed or result is empty", id) + // update concurrency and check whether the job can run now - err := updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars) + err = updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars) if err != nil { // The err can be caused by different cases: database error, or syntax error, or the needed jobs haven't completed // At the moment there is no way to distinguish them. // Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)" // TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users - log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err) + log.Debug("Concurrency evaluation failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err) continue } + resolveMetrics.concurrencyUpdated++ + shouldStartJob := true if !allSucceed { // Not all dependent jobs completed successfully: // * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition. // * otherwise, the job should be skipped. shouldStartJob = r.resolveJobHasIfCondition(actionRunJob) + log.Debug("Job %d: not all dependencies succeeded. Has if-condition: %v, should start: %v", id, shouldStartJob, shouldStartJob) } newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped) if newStatus == actions_model.StatusWaiting { newStatus, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob) if err != nil { - log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err) + log.Error("Concurrency check failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err) } } if newStatus != actions_model.StatusBlocked { ret[id] = newStatus + switch newStatus { + case actions_model.StatusWaiting: + resolveMetrics.jobsStarted++ + log.Info("Job %d (JobID: %s) transitioned to StatusWaiting", id, actionRunJob.JobID) + case actions_model.StatusSkipped: + resolveMetrics.jobsSkipped++ + log.Info("Job %d (JobID: %s) transitioned to StatusSkipped", id, actionRunJob.JobID) + } } } + + // Log resolution metrics summary + if resolveMetrics.totalBlocked > 0 { + log.Debug("Job resolution summary: total_blocked=%d, matrix_reevaluated=%d, concurrency_updated=%d, jobs_started=%d, jobs_skipped=%d", + resolveMetrics.totalBlocked, resolveMetrics.matrixReevaluated, resolveMetrics.concurrencyUpdated, + resolveMetrics.jobsStarted, resolveMetrics.jobsSkipped) + } + return ret } diff --git a/services/actions/matrix.go b/services/actions/matrix.go new file mode 100644 index 0000000000..10541f214f --- /dev/null +++ b/services/actions/matrix.go @@ -0,0 +1,268 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + "maps" + "strings" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/modules/log" + + "github.com/nektos/act/pkg/jobparser" + "gopkg.in/yaml.v3" +) + +// ExtractRawStrategies extracts strategy definitions from the raw workflow content +// Returns a map of jobID to strategy YAML for jobs that have matrix dependencies +func ExtractRawStrategies(content []byte) (map[string]string, error) { + var workflowDef struct { + Jobs map[string]struct { + Strategy any `yaml:"strategy"` + Needs any `yaml:"needs"` + } `yaml:"jobs"` + } + + if err := yaml.Unmarshal(content, &workflowDef); err != nil { + return nil, err + } + + strategies := make(map[string]string) + for jobID, jobDef := range workflowDef.Jobs { + if jobDef.Strategy == nil { + continue + } + + // Check if this job has needs (dependencies) + var needsList []string + switch needs := jobDef.Needs.(type) { + case string: + needsList = append(needsList, needs) + case []any: + for _, need := range needs { + if needStr, ok := need.(string); ok { + needsList = append(needsList, needStr) + } + } + } + + // Only store strategy for jobs with dependencies + if len(needsList) > 0 { + if strategyBytes, err := yaml.Marshal(jobDef.Strategy); err == nil { + strategies[jobID] = string(strategyBytes) + } + } + } + + return strategies, nil +} + +// hasMatrixWithNeeds checks if a job's strategy contains a matrix that depends on job outputs +func hasMatrixWithNeeds(rawStrategy string) bool { + if rawStrategy == "" { + return false + } + + var strategy map[string]any + if err := yaml.Unmarshal([]byte(rawStrategy), &strategy); err != nil { + return false + } + + matrix, ok := strategy["matrix"] + if !ok { + return false + } + + // Check if any matrix value contains "needs." reference + matrixStr := fmt.Sprintf("%v", matrix) + return strings.Contains(matrixStr, "needs.") +} + +// ReEvaluateMatrixForJobWithNeeds re-evaluates the matrix strategy of a job using outputs from dependent jobs +// If the matrix depends on job outputs and all dependent jobs are done, it will: +// 1. Evaluate the matrix with the job outputs +// 2. Create new ActionRunJobs for each matrix combination +// 3. Return the newly created jobs +func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.ActionRunJob, vars map[string]string) ([]*actions_model.ActionRunJob, error) { + startTime := time.Now() + + if job.IsMatrixEvaluated || job.RawStrategy == "" { + return nil, nil + } + + if !hasMatrixWithNeeds(job.RawStrategy) { + // Mark as evaluated since there's no needs-dependent matrix + job.IsMatrixEvaluated = true + log.Debug("Matrix re-evaluation skipped for job %d: no needs-dependent matrix found", job.ID) + return nil, nil + } + + log.Debug("Starting matrix re-evaluation for job %d (JobID: %s)", job.ID, job.JobID) + + // Get the outputs from dependent jobs + taskNeeds, err := FindTaskNeeds(ctx, job) + if err != nil { + errMsg := fmt.Sprintf("failed to find task needs for job %d (JobID: %s): %v", job.ID, job.JobID, err) + log.Error("Matrix re-evaluation error: %s", errMsg) + return nil, fmt.Errorf("find task needs: %w", err) + } + + log.Debug("Found %d task needs for job %d (JobID: %s)", len(taskNeeds), job.ID, job.JobID) + + // If any task needs are not done, we can't evaluate yet + pendingNeeds := []string{} + for jobID, taskNeed := range taskNeeds { + if !taskNeed.Result.IsDone() { + pendingNeeds = append(pendingNeeds, fmt.Sprintf("%s(%s)", jobID, taskNeed.Result)) + } + } + if len(pendingNeeds) > 0 { + log.Debug("Matrix re-evaluation deferred for job %d: pending needs: %v", job.ID, pendingNeeds) + GetMatrixMetrics().RecordDeferred() + return nil, nil + } + + // Merge vars with needs outputs + mergedVars := mergeNeedsIntoVars(vars, taskNeeds) + log.Debug("Merged %d variables with needs outputs for job %d", len(mergedVars), job.ID) + + // Load the original run to get workflow context + if job.Run == nil { + if err := job.LoadRun(ctx); err != nil { + errMsg := fmt.Sprintf("failed to load run for job %d (JobID: %s): %v", job.ID, job.JobID, err) + log.Error("Matrix re-evaluation error: %s", errMsg) + return nil, fmt.Errorf("load run: %w", err) + } + } + + // Create the giteaCtx for expression evaluation + giteaCtx := GenerateGiteaContext(job.Run, job) + + // Parse the job payload with merged vars to expand the matrix + // Note: job.WorkflowPayload already contains just this job's definition + parseStartTime := time.Now() + jobs, err := jobparser.Parse( + job.WorkflowPayload, + jobparser.WithVars(mergedVars), + jobparser.WithGitContext(giteaCtx.ToGitHubContext()), + ) + parseTime := time.Since(parseStartTime) + GetMatrixMetrics().RecordParseTime(parseTime) + + if err != nil { + // If parsing fails, we can't expand the matrix + // Mark as evaluated and skip + job.IsMatrixEvaluated = true + errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion. Error: %v. RawStrategy: %s", + job.ID, job.JobID, err, job.RawStrategy) + log.Error("Matrix parse error: %s", errMsg) + GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) + return nil, nil + } + + if len(jobs) == 0 { + job.IsMatrixEvaluated = true + log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID) + return nil, nil + } + + log.Debug("Parsed %d matrix combinations for job %d (JobID: %s)", len(jobs), job.ID, job.JobID) + + // Create new ActionRunJobs for each parsed workflow (each matrix combination) + newJobs := make([]*actions_model.ActionRunJob, 0) + + for i, parsedSingleWorkflow := range jobs { + id, jobDef := parsedSingleWorkflow.Job() + if jobDef == nil { + log.Warn("Skipped nil jobDef at index %d for job %d (JobID: %s)", i, job.ID, job.JobID) + continue + } + + // Skip the original job ID - we only want the matrix-expanded versions + if id == job.JobID { + log.Debug("Skipped original job ID %s in matrix expansion for job %d", id, job.ID) + continue + } + + // Erase needs from the payload before storing + needs := jobDef.Needs() + if err := parsedSingleWorkflow.SetJob(id, jobDef.EraseNeeds()); err != nil { + log.Error("Failed to erase needs from job %s (matrix expansion for job %d): %v", id, job.ID, err) + continue + } + + payload, _ := parsedSingleWorkflow.Marshal() + + newJob := &actions_model.ActionRunJob{ + RunID: job.RunID, + RepoID: job.RepoID, + OwnerID: job.OwnerID, + CommitSHA: job.CommitSHA, + IsForkPullRequest: job.IsForkPullRequest, + Name: jobDef.Name, + WorkflowPayload: payload, + JobID: id, + Needs: needs, + RunsOn: jobDef.RunsOn(), + Status: actions_model.StatusBlocked, + } + + newJobs = append(newJobs, newJob) + } + + // If no new jobs were created, mark as evaluated + if len(newJobs) == 0 { + job.IsMatrixEvaluated = true + log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d", job.ID, job.JobID, len(jobs)) + return nil, nil + } + + // Insert the new jobs into database + insertStartTime := time.Now() + if err := actions_model.InsertActionRunJobs(ctx, newJobs); err != nil { + insertTime := time.Since(insertStartTime) + GetMatrixMetrics().RecordInsertTime(insertTime) + errMsg := fmt.Sprintf("failed to insert %d new matrix jobs for job %d (JobID: %s): %v", len(newJobs), job.ID, job.JobID, err) + log.Error("Matrix insertion error: %s", errMsg) + GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) + return nil, fmt.Errorf("insert new jobs: %w", err) + } + insertTime := time.Since(insertStartTime) + GetMatrixMetrics().RecordInsertTime(insertTime) + + // Mark the original job as evaluated + job.IsMatrixEvaluated = true + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil { + log.Error("Failed to update job %d is_matrix_evaluated flag: %v", job.ID, err) + } + + totalTime := time.Since(startTime) + GetMatrixMetrics().RecordReevaluation(totalTime, true, int64(len(newJobs))) + + log.Info("Successfully completed matrix re-evaluation for job %d (JobID: %s): created %d new jobs from %d matrix combinations (total: %dms, parse: %dms, insert: %dms)", + job.ID, job.JobID, len(newJobs), len(jobs), totalTime.Milliseconds(), parseTime.Milliseconds(), insertTime.Milliseconds()) + + return newJobs, nil +} + +// mergeNeedsIntoVars converts task needs outputs into variables for expression evaluation +func mergeNeedsIntoVars(baseVars map[string]string, taskNeeds map[string]*TaskNeed) map[string]string { + merged := make(map[string]string) + + // Copy base vars + maps.Copy(merged, baseVars) + + // Add needs outputs as variables in format: needs..outputs. + for jobID, taskNeed := range taskNeeds { + for outputKey, outputValue := range taskNeed.Outputs { + key := fmt.Sprintf("needs.%s.outputs.%s", jobID, outputKey) + merged[key] = outputValue + } + } + + return merged +} diff --git a/services/actions/matrix_metrics.go b/services/actions/matrix_metrics.go new file mode 100644 index 0000000000..cc232c0b6c --- /dev/null +++ b/services/actions/matrix_metrics.go @@ -0,0 +1,173 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "sync" + "time" +) + +// MatrixMetrics tracks performance metrics for matrix re-evaluation operations +type MatrixMetrics struct { + mu sync.RWMutex + + // Counters + TotalReevaluations int64 + SuccessfulReevaluations int64 + FailedReevaluations int64 + JobsCreatedTotal int64 + DeferredReevaluations int64 + + // Timing + TotalReevaluationTime time.Duration + TotalParseTime time.Duration + TotalInsertTime time.Duration + + // Histograms (for detailed analysis) + ReevaluationTimes []time.Duration + ParseTimes []time.Duration + InsertTimes []time.Duration +} + +var ( + matrixMetricsInstance *MatrixMetrics + metricsMutex sync.Mutex +) + +// GetMatrixMetrics returns the global matrix metrics instance +func GetMatrixMetrics() *MatrixMetrics { + if matrixMetricsInstance == nil { + metricsMutex.Lock() + if matrixMetricsInstance == nil { + matrixMetricsInstance = &MatrixMetrics{ + ReevaluationTimes: make([]time.Duration, 0, 1000), + ParseTimes: make([]time.Duration, 0, 1000), + InsertTimes: make([]time.Duration, 0, 1000), + } + } + metricsMutex.Unlock() + } + return matrixMetricsInstance +} + +// RecordReevaluation records a matrix re-evaluation attempt +func (m *MatrixMetrics) RecordReevaluation(duration time.Duration, success bool, jobsCreated int64) { + m.mu.Lock() + defer m.mu.Unlock() + + m.TotalReevaluations++ + m.TotalReevaluationTime += duration + + if success { + m.SuccessfulReevaluations++ + m.JobsCreatedTotal += jobsCreated + } else { + m.FailedReevaluations++ + } + + // Keep a limited history for detailed analysis (keep last 1000) + if len(m.ReevaluationTimes) < 1000 { + m.ReevaluationTimes = append(m.ReevaluationTimes, duration) + } else { + // Shift and add new value + copy(m.ReevaluationTimes, m.ReevaluationTimes[1:]) + m.ReevaluationTimes[len(m.ReevaluationTimes)-1] = duration + } +} + +// RecordDeferred records a deferred matrix re-evaluation +func (m *MatrixMetrics) RecordDeferred() { + m.mu.Lock() + defer m.mu.Unlock() + m.DeferredReevaluations++ +} + +// RecordParseTime records the time taken to parse a workflow +func (m *MatrixMetrics) RecordParseTime(duration time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + + m.TotalParseTime += duration + + if len(m.ParseTimes) < 1000 { + m.ParseTimes = append(m.ParseTimes, duration) + } else { + copy(m.ParseTimes, m.ParseTimes[1:]) + m.ParseTimes[len(m.ParseTimes)-1] = duration + } +} + +// RecordInsertTime records the time taken to insert matrix jobs +func (m *MatrixMetrics) RecordInsertTime(duration time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + + m.TotalInsertTime += duration + + if len(m.InsertTimes) < 1000 { + m.InsertTimes = append(m.InsertTimes, duration) + } else { + copy(m.InsertTimes, m.InsertTimes[1:]) + m.InsertTimes[len(m.InsertTimes)-1] = duration + } +} + +// GetStats returns a snapshot of the current metrics +func (m *MatrixMetrics) GetStats() map[string]any { + m.mu.RLock() + defer m.mu.RUnlock() + + avgReevaluationTime := time.Duration(0) + if m.TotalReevaluations > 0 { + avgReevaluationTime = m.TotalReevaluationTime / time.Duration(m.TotalReevaluations) + } + + avgParseTime := time.Duration(0) + if len(m.ParseTimes) > 0 { + avgParseTime = m.TotalParseTime / time.Duration(len(m.ParseTimes)) + } + + avgInsertTime := time.Duration(0) + if len(m.InsertTimes) > 0 { + avgInsertTime = m.TotalInsertTime / time.Duration(len(m.InsertTimes)) + } + + successRate := 0.0 + if m.TotalReevaluations > 0 { + successRate = float64(m.SuccessfulReevaluations) / float64(m.TotalReevaluations) * 100 + } + + return map[string]any{ + "total_reevaluations": m.TotalReevaluations, + "successful_reevaluations": m.SuccessfulReevaluations, + "failed_reevaluations": m.FailedReevaluations, + "deferred_reevaluations": m.DeferredReevaluations, + "success_rate_percent": successRate, + "total_jobs_created": m.JobsCreatedTotal, + "total_reevaluation_time_ms": m.TotalReevaluationTime.Milliseconds(), + "avg_reevaluation_time_ms": avgReevaluationTime.Milliseconds(), + "total_parse_time_ms": m.TotalParseTime.Milliseconds(), + "avg_parse_time_ms": avgParseTime.Milliseconds(), + "total_insert_time_ms": m.TotalInsertTime.Milliseconds(), + "avg_insert_time_ms": avgInsertTime.Milliseconds(), + } +} + +// Reset clears all metrics +func (m *MatrixMetrics) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + + m.TotalReevaluations = 0 + m.SuccessfulReevaluations = 0 + m.FailedReevaluations = 0 + m.JobsCreatedTotal = 0 + m.DeferredReevaluations = 0 + m.TotalReevaluationTime = 0 + m.TotalParseTime = 0 + m.TotalInsertTime = 0 + m.ReevaluationTimes = m.ReevaluationTimes[:0] + m.ParseTimes = m.ParseTimes[:0] + m.InsertTimes = m.InsertTimes[:0] +} diff --git a/services/actions/matrix_metrics_prometheus.go b/services/actions/matrix_metrics_prometheus.go new file mode 100644 index 0000000000..e780e8f13f --- /dev/null +++ b/services/actions/matrix_metrics_prometheus.go @@ -0,0 +1,191 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// MatrixMetricsCollector implements the prometheus.Collector interface +// and exposes matrix re-evaluation metrics for prometheus +type MatrixMetricsCollector struct { + // Counters + totalReevaluations prometheus.Gauge + successfulReevaluations prometheus.Gauge + failedReevaluations prometheus.Gauge + deferredReevaluations prometheus.Gauge + jobsCreatedTotal prometheus.Gauge + + // Timing (in milliseconds) + totalReevaluationTime prometheus.Gauge + avgReevaluationTime prometheus.Gauge + totalParseTime prometheus.Gauge + avgParseTime prometheus.Gauge + totalInsertTime prometheus.Gauge + avgInsertTime prometheus.Gauge + + // Rates + successRate prometheus.Gauge +} + +const ( + namespace = "gitea_" + subsystem = "matrix_" +) + +// NewMatrixMetricsCollector creates a new MatrixMetricsCollector +func NewMatrixMetricsCollector() *MatrixMetricsCollector { + return &MatrixMetricsCollector{ + totalReevaluations: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "total_reevaluations", + Help: "Total number of matrix re-evaluation attempts", + }, + ), + successfulReevaluations: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "successful_reevaluations", + Help: "Number of successful matrix re-evaluations", + }, + ), + failedReevaluations: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_reevaluations", + Help: "Number of failed matrix re-evaluations", + }, + ), + deferredReevaluations: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "deferred_reevaluations", + Help: "Number of deferred matrix re-evaluations (waiting for dependencies)", + }, + ), + jobsCreatedTotal: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "jobs_created_total", + Help: "Total number of jobs created from matrix expansion", + }, + ), + totalReevaluationTime: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "total_reevaluation_time_ms", + Help: "Total time spent on matrix re-evaluations in milliseconds", + }, + ), + avgReevaluationTime: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "avg_reevaluation_time_ms", + Help: "Average time per matrix re-evaluation in milliseconds", + }, + ), + totalParseTime: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "total_parse_time_ms", + Help: "Total time spent parsing workflow payloads in milliseconds", + }, + ), + avgParseTime: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "avg_parse_time_ms", + Help: "Average time per workflow parse in milliseconds", + }, + ), + totalInsertTime: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "total_insert_time_ms", + Help: "Total time spent inserting jobs into database in milliseconds", + }, + ), + avgInsertTime: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "avg_insert_time_ms", + Help: "Average time per database insert in milliseconds", + }, + ), + successRate: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "success_rate_percent", + Help: "Success rate of matrix re-evaluations as percentage (0-100)", + }, + ), + } +} + +// Describe returns the metrics descriptions +func (c *MatrixMetricsCollector) Describe(ch chan<- *prometheus.Desc) { + c.totalReevaluations.Describe(ch) + c.successfulReevaluations.Describe(ch) + c.failedReevaluations.Describe(ch) + c.deferredReevaluations.Describe(ch) + c.jobsCreatedTotal.Describe(ch) + c.totalReevaluationTime.Describe(ch) + c.avgReevaluationTime.Describe(ch) + c.totalParseTime.Describe(ch) + c.avgParseTime.Describe(ch) + c.totalInsertTime.Describe(ch) + c.avgInsertTime.Describe(ch) + c.successRate.Describe(ch) +} + +// Collect collects the current metric values and sends them to the channel +func (c *MatrixMetricsCollector) Collect(ch chan<- prometheus.Metric) { + metrics := GetMatrixMetrics() + stats := metrics.GetStats() + + // Set counter values + c.totalReevaluations.Set(float64(stats["total_reevaluations"].(int64))) + c.successfulReevaluations.Set(float64(stats["successful_reevaluations"].(int64))) + c.failedReevaluations.Set(float64(stats["failed_reevaluations"].(int64))) + c.deferredReevaluations.Set(float64(stats["deferred_reevaluations"].(int64))) + c.jobsCreatedTotal.Set(float64(stats["total_jobs_created"].(int64))) + + // Set timing values (already in milliseconds) + c.totalReevaluationTime.Set(float64(stats["total_reevaluation_time_ms"].(int64))) + c.avgReevaluationTime.Set(float64(stats["avg_reevaluation_time_ms"].(int64))) + c.totalParseTime.Set(float64(stats["total_parse_time_ms"].(int64))) + c.avgParseTime.Set(float64(stats["avg_parse_time_ms"].(int64))) + c.totalInsertTime.Set(float64(stats["total_insert_time_ms"].(int64))) + c.avgInsertTime.Set(float64(stats["avg_insert_time_ms"].(int64))) + + // Set success rate + c.successRate.Set(stats["success_rate_percent"].(float64)) + + // Collect all metrics + c.totalReevaluations.Collect(ch) + c.successfulReevaluations.Collect(ch) + c.failedReevaluations.Collect(ch) + c.deferredReevaluations.Collect(ch) + c.jobsCreatedTotal.Collect(ch) + c.totalReevaluationTime.Collect(ch) + c.avgReevaluationTime.Collect(ch) + c.totalParseTime.Collect(ch) + c.avgParseTime.Collect(ch) + c.totalInsertTime.Collect(ch) + c.avgInsertTime.Collect(ch) + c.successRate.Collect(ch) +} diff --git a/services/actions/matrix_metrics_test.go b/services/actions/matrix_metrics_test.go new file mode 100644 index 0000000000..54fdf3c0f3 --- /dev/null +++ b/services/actions/matrix_metrics_test.go @@ -0,0 +1,82 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" +) + +// Essential Prometheus Collector Tests + +func TestNewMatrixMetricsCollector(t *testing.T) { + collector := NewMatrixMetricsCollector() + assert.NotNil(t, collector) + assert.NotNil(t, collector.totalReevaluations) + assert.NotNil(t, collector.successRate) +} + +func TestMatrixMetricsCollectorDescribe(t *testing.T) { + collector := NewMatrixMetricsCollector() + ch := make(chan *prometheus.Desc, 100) + collector.Describe(ch) + assert.NotEmpty(t, ch) +} + +func TestMatrixMetricsCollectorCollect(t *testing.T) { + matrixMetricsInstance = nil + metrics := GetMatrixMetrics() + metrics.RecordReevaluation(10*time.Millisecond, true, 5) + metrics.RecordParseTime(8 * time.Millisecond) + + collector := NewMatrixMetricsCollector() + ch := make(chan prometheus.Metric, 100) + collector.Collect(ch) + assert.NotEmpty(t, ch) + + matrixMetricsInstance = nil +} + +func TestMatrixMetricsGetStats(t *testing.T) { + metrics := &MatrixMetrics{ + ReevaluationTimes: make([]time.Duration, 0, 1000), + ParseTimes: make([]time.Duration, 0, 1000), + InsertTimes: make([]time.Duration, 0, 1000), + } + + metrics.RecordReevaluation(10*time.Millisecond, true, 3) + metrics.RecordReevaluation(15*time.Millisecond, true, 2) + metrics.RecordReevaluation(5*time.Millisecond, false, 0) + + stats := metrics.GetStats() + assert.Equal(t, int64(3), stats["total_reevaluations"]) + assert.Equal(t, int64(2), stats["successful_reevaluations"]) + assert.Equal(t, int64(1), stats["failed_reevaluations"]) + assert.Greater(t, stats["success_rate_percent"].(float64), 60.0) +} + +func BenchmarkMatrixMetricsCollectorCollect(b *testing.B) { + metrics := &MatrixMetrics{ + ReevaluationTimes: make([]time.Duration, 0, 1000), + ParseTimes: make([]time.Duration, 0, 1000), + InsertTimes: make([]time.Duration, 0, 1000), + } + matrixMetricsInstance = metrics + + for range 100 { + metrics.RecordReevaluation(10*time.Millisecond, true, 5) + metrics.RecordParseTime(5 * time.Millisecond) + } + + collector := NewMatrixMetricsCollector() + ch := make(chan prometheus.Metric, 100) + + b.ResetTimer() + for b.Loop() { + collector.Collect(ch) + } +} diff --git a/tests/integration/actions_job_test.go b/tests/integration/actions_job_test.go index 3da290f1d3..d3294fdd11 100644 --- a/tests/integration/actions_job_test.go +++ b/tests/integration/actions_job_test.go @@ -759,3 +759,97 @@ func getTaskJobNameByTaskID(t *testing.T, authToken, ownerName, repoName string, } return "" } + +func TestDynamicMatrixFromJobOutputs(t *testing.T) { + testCases := []struct { + treePath string + fileContent string + outcomes map[string]*mockTaskOutcome + }{ + { + treePath: ".gitea/workflows/dynamic-matrix.yml", + fileContent: `name: Dynamic Matrix from Job Outputs +on: + push: + paths: + - '.gitea/workflows/dynamic-matrix.yml' +jobs: + generate: + runs-on: ubuntu-latest + outputs: + matrix: ${{ steps.gen_matrix.outputs.matrix }} + steps: + - name: Generate matrix + id: gen_matrix + run: | + echo "matrix=[1,2,3]" >> "$GITHUB_OUTPUT" + + build: + needs: [generate] + runs-on: ubuntu-latest + strategy: + matrix: + version: ${{ fromJson(needs.generate.outputs.matrix) }} + steps: + - run: echo "Building version ${{ matrix.version }}" +`, + outcomes: map[string]*mockTaskOutcome{ + "generate": { + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "matrix": "[1,2,3]", + }, + }, + "build (1)": { + result: runnerv1.Result_RESULT_SUCCESS, + }, + "build (2)": { + result: runnerv1.Result_RESULT_SUCCESS, + }, + "build (3)": { + result: runnerv1.Result_RESULT_SUCCESS, + }, + }, + }, + } + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-dynamic-matrix", false) + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + for _, tc := range testCases { + t.Run("test "+tc.treePath, func(t *testing.T) { + opts := getWorkflowCreateFileOptions(user2, apiRepo.DefaultBranch, "create "+tc.treePath, tc.fileContent) + createWorkflowFile(t, token, user2.Name, apiRepo.Name, tc.treePath, opts) + + // Execute the generate job first + task := runner.fetchTask(t) + jobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, task.Id) + assert.Equal(t, "generate", jobName) + outcome := tc.outcomes[jobName] + assert.NotNil(t, outcome) + runner.execTask(t, task, outcome) + + // Now the build job should be created with matrix expansion from the output + // We expect 3 tasks for build (1), build (2), build (3) + buildTasks := make([]int64, 0) + for range 3 { + buildTask := runner.fetchTask(t) + buildJobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, buildTask.Id) + t.Logf("Fetched task: %s", buildJobName) + assert.Contains(t, []string{"build (1)", "build (2)", "build (3)"}, buildJobName, "Expected a build job with matrix index") + outcome := tc.outcomes[buildJobName] + assert.NotNil(t, outcome) + runner.execTask(t, buildTask, outcome) + buildTasks = append(buildTasks, buildTask.Id) + } + + assert.Len(t, len(buildTasks), 3, "Expected 3 build tasks from dynamic matrix") + }) + } + }) +}