mirror of
https://github.com/go-gitea/gitea.git
synced 2026-04-05 01:21:17 +02:00
feat: Add support for dynamic matrix strategies with job outputs
Signed-off-by: Pascal Zimmermann <pascal.zimmermann@theiotstudio.com> # Conflicts: # models/actions/run_job.go # models/migrations/migrations.go # models/migrations/v1_26/v326.go # services/actions/run.go
This commit is contained in:
parent
47a0d88056
commit
a1c60ac854
@ -41,6 +41,7 @@ import (
|
||||
"code.gitea.io/gitea/routers/web/user"
|
||||
user_setting "code.gitea.io/gitea/routers/web/user/setting"
|
||||
"code.gitea.io/gitea/routers/web/user/setting/security"
|
||||
actions_service "code.gitea.io/gitea/services/actions"
|
||||
auth_service "code.gitea.io/gitea/services/auth"
|
||||
"code.gitea.io/gitea/services/context"
|
||||
"code.gitea.io/gitea/services/forms"
|
||||
@ -286,6 +287,8 @@ func Routes() *web.Router {
|
||||
|
||||
if setting.Metrics.Enabled {
|
||||
prometheus.MustRegister(metrics.NewCollector())
|
||||
// Register matrix re-evaluation metrics
|
||||
prometheus.MustRegister(actions_service.NewMatrixMetricsCollector())
|
||||
routes.Get("/metrics", append(mid, Metrics)...)
|
||||
}
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/models/db"
|
||||
@ -202,6 +203,9 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
log.Debug("Checking %d jobs for run %d (status: %s)", len(jobs), run.ID, run.Status)
|
||||
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -213,14 +217,18 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
|
||||
}
|
||||
|
||||
updates := newJobStatusResolver(jobs, vars).Resolve(ctx)
|
||||
log.Debug("Job status resolver returned %d job status updates for run %d", len(updates), run.ID)
|
||||
|
||||
for _, job := range jobs {
|
||||
if status, ok := updates[job.ID]; ok {
|
||||
oldStatus := job.Status
|
||||
job.Status = status
|
||||
if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
|
||||
}
|
||||
log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status)
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
}
|
||||
}
|
||||
@ -229,6 +237,20 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Reload jobs from the database to pick up any newly created matrix jobs
|
||||
oldJobCount := len(jobs)
|
||||
jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if len(jobs) > oldJobCount {
|
||||
log.Info("Matrix re-evaluation created %d new jobs for run %d (was %d, now %d)",
|
||||
len(jobs)-oldJobCount, run.ID, oldJobCount, len(jobs))
|
||||
}
|
||||
|
||||
log.Debug("Job check completed for run %d: %d jobs updated, %d total jobs", run.ID, len(updatedJobs), len(jobs))
|
||||
|
||||
return jobs, updatedJobs, nil
|
||||
}
|
||||
|
||||
@ -313,47 +335,102 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model
|
||||
|
||||
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
|
||||
ret := map[int64]actions_model.Status{}
|
||||
resolveMetrics := struct {
|
||||
totalBlocked int
|
||||
matrixReevaluated int
|
||||
concurrencyUpdated int
|
||||
jobsStarted int
|
||||
jobsSkipped int
|
||||
}{}
|
||||
|
||||
for id, status := range r.statuses {
|
||||
actionRunJob := r.jobMap[id]
|
||||
if status != actions_model.StatusBlocked {
|
||||
continue
|
||||
}
|
||||
|
||||
resolveMetrics.totalBlocked++
|
||||
log.Debug("Resolving blocked job %d (JobID: %s, RunID: %d)", id, actionRunJob.JobID, actionRunJob.RunID)
|
||||
|
||||
allDone, allSucceed := r.resolveCheckNeeds(id)
|
||||
if !allDone {
|
||||
log.Debug("Job %d: not all dependencies completed yet", id)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("Job %d: all dependencies completed (allSucceed: %v), checking matrix re-evaluation", id, allSucceed)
|
||||
|
||||
// Try to re-evaluate the matrix with job outputs if it depends on them
|
||||
startTime := time.Now()
|
||||
newMatrixJobs, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars)
|
||||
duration := time.Since(startTime).Milliseconds()
|
||||
|
||||
if err != nil {
|
||||
log.Error("Matrix re-evaluation error for job %d (JobID: %s): %v (duration: %dms)", id, actionRunJob.JobID, err, duration)
|
||||
continue
|
||||
}
|
||||
|
||||
// If new matrix jobs were created, add them to the resolver and continue
|
||||
if len(newMatrixJobs) > 0 {
|
||||
resolveMetrics.matrixReevaluated++
|
||||
log.Info("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)",
|
||||
id, actionRunJob.JobID, len(newMatrixJobs), duration)
|
||||
// The new jobs will be picked up in the next resolution iteration
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("Job %d: no matrix re-evaluation needed or result is empty", id)
|
||||
|
||||
// update concurrency and check whether the job can run now
|
||||
err := updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars)
|
||||
err = updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars)
|
||||
if err != nil {
|
||||
// The err can be caused by different cases: database error, or syntax error, or the needed jobs haven't completed
|
||||
// At the moment there is no way to distinguish them.
|
||||
// Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)"
|
||||
// TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users
|
||||
log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err)
|
||||
log.Debug("Concurrency evaluation failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
resolveMetrics.concurrencyUpdated++
|
||||
|
||||
shouldStartJob := true
|
||||
if !allSucceed {
|
||||
// Not all dependent jobs completed successfully:
|
||||
// * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition.
|
||||
// * otherwise, the job should be skipped.
|
||||
shouldStartJob = r.resolveJobHasIfCondition(actionRunJob)
|
||||
log.Debug("Job %d: not all dependencies succeeded. Has if-condition: %v, should start: %v", id, shouldStartJob, shouldStartJob)
|
||||
}
|
||||
|
||||
newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped)
|
||||
if newStatus == actions_model.StatusWaiting {
|
||||
newStatus, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob)
|
||||
if err != nil {
|
||||
log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err)
|
||||
log.Error("Concurrency check failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if newStatus != actions_model.StatusBlocked {
|
||||
ret[id] = newStatus
|
||||
switch newStatus {
|
||||
case actions_model.StatusWaiting:
|
||||
resolveMetrics.jobsStarted++
|
||||
log.Info("Job %d (JobID: %s) transitioned to StatusWaiting", id, actionRunJob.JobID)
|
||||
case actions_model.StatusSkipped:
|
||||
resolveMetrics.jobsSkipped++
|
||||
log.Info("Job %d (JobID: %s) transitioned to StatusSkipped", id, actionRunJob.JobID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Log resolution metrics summary
|
||||
if resolveMetrics.totalBlocked > 0 {
|
||||
log.Debug("Job resolution summary: total_blocked=%d, matrix_reevaluated=%d, concurrency_updated=%d, jobs_started=%d, jobs_skipped=%d",
|
||||
resolveMetrics.totalBlocked, resolveMetrics.matrixReevaluated, resolveMetrics.concurrencyUpdated,
|
||||
resolveMetrics.jobsStarted, resolveMetrics.jobsSkipped)
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
|
||||
268
services/actions/matrix.go
Normal file
268
services/actions/matrix.go
Normal file
@ -0,0 +1,268 @@
|
||||
// Copyright 2025 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"maps"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
|
||||
"github.com/nektos/act/pkg/jobparser"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// ExtractRawStrategies extracts strategy definitions from the raw workflow content
|
||||
// Returns a map of jobID to strategy YAML for jobs that have matrix dependencies
|
||||
func ExtractRawStrategies(content []byte) (map[string]string, error) {
|
||||
var workflowDef struct {
|
||||
Jobs map[string]struct {
|
||||
Strategy any `yaml:"strategy"`
|
||||
Needs any `yaml:"needs"`
|
||||
} `yaml:"jobs"`
|
||||
}
|
||||
|
||||
if err := yaml.Unmarshal(content, &workflowDef); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
strategies := make(map[string]string)
|
||||
for jobID, jobDef := range workflowDef.Jobs {
|
||||
if jobDef.Strategy == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this job has needs (dependencies)
|
||||
var needsList []string
|
||||
switch needs := jobDef.Needs.(type) {
|
||||
case string:
|
||||
needsList = append(needsList, needs)
|
||||
case []any:
|
||||
for _, need := range needs {
|
||||
if needStr, ok := need.(string); ok {
|
||||
needsList = append(needsList, needStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only store strategy for jobs with dependencies
|
||||
if len(needsList) > 0 {
|
||||
if strategyBytes, err := yaml.Marshal(jobDef.Strategy); err == nil {
|
||||
strategies[jobID] = string(strategyBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return strategies, nil
|
||||
}
|
||||
|
||||
// hasMatrixWithNeeds checks if a job's strategy contains a matrix that depends on job outputs
|
||||
func hasMatrixWithNeeds(rawStrategy string) bool {
|
||||
if rawStrategy == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
var strategy map[string]any
|
||||
if err := yaml.Unmarshal([]byte(rawStrategy), &strategy); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
matrix, ok := strategy["matrix"]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if any matrix value contains "needs." reference
|
||||
matrixStr := fmt.Sprintf("%v", matrix)
|
||||
return strings.Contains(matrixStr, "needs.")
|
||||
}
|
||||
|
||||
// ReEvaluateMatrixForJobWithNeeds re-evaluates the matrix strategy of a job using outputs from dependent jobs
|
||||
// If the matrix depends on job outputs and all dependent jobs are done, it will:
|
||||
// 1. Evaluate the matrix with the job outputs
|
||||
// 2. Create new ActionRunJobs for each matrix combination
|
||||
// 3. Return the newly created jobs
|
||||
func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.ActionRunJob, vars map[string]string) ([]*actions_model.ActionRunJob, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
if job.IsMatrixEvaluated || job.RawStrategy == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !hasMatrixWithNeeds(job.RawStrategy) {
|
||||
// Mark as evaluated since there's no needs-dependent matrix
|
||||
job.IsMatrixEvaluated = true
|
||||
log.Debug("Matrix re-evaluation skipped for job %d: no needs-dependent matrix found", job.ID)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Debug("Starting matrix re-evaluation for job %d (JobID: %s)", job.ID, job.JobID)
|
||||
|
||||
// Get the outputs from dependent jobs
|
||||
taskNeeds, err := FindTaskNeeds(ctx, job)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("failed to find task needs for job %d (JobID: %s): %v", job.ID, job.JobID, err)
|
||||
log.Error("Matrix re-evaluation error: %s", errMsg)
|
||||
return nil, fmt.Errorf("find task needs: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Found %d task needs for job %d (JobID: %s)", len(taskNeeds), job.ID, job.JobID)
|
||||
|
||||
// If any task needs are not done, we can't evaluate yet
|
||||
pendingNeeds := []string{}
|
||||
for jobID, taskNeed := range taskNeeds {
|
||||
if !taskNeed.Result.IsDone() {
|
||||
pendingNeeds = append(pendingNeeds, fmt.Sprintf("%s(%s)", jobID, taskNeed.Result))
|
||||
}
|
||||
}
|
||||
if len(pendingNeeds) > 0 {
|
||||
log.Debug("Matrix re-evaluation deferred for job %d: pending needs: %v", job.ID, pendingNeeds)
|
||||
GetMatrixMetrics().RecordDeferred()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Merge vars with needs outputs
|
||||
mergedVars := mergeNeedsIntoVars(vars, taskNeeds)
|
||||
log.Debug("Merged %d variables with needs outputs for job %d", len(mergedVars), job.ID)
|
||||
|
||||
// Load the original run to get workflow context
|
||||
if job.Run == nil {
|
||||
if err := job.LoadRun(ctx); err != nil {
|
||||
errMsg := fmt.Sprintf("failed to load run for job %d (JobID: %s): %v", job.ID, job.JobID, err)
|
||||
log.Error("Matrix re-evaluation error: %s", errMsg)
|
||||
return nil, fmt.Errorf("load run: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create the giteaCtx for expression evaluation
|
||||
giteaCtx := GenerateGiteaContext(job.Run, job)
|
||||
|
||||
// Parse the job payload with merged vars to expand the matrix
|
||||
// Note: job.WorkflowPayload already contains just this job's definition
|
||||
parseStartTime := time.Now()
|
||||
jobs, err := jobparser.Parse(
|
||||
job.WorkflowPayload,
|
||||
jobparser.WithVars(mergedVars),
|
||||
jobparser.WithGitContext(giteaCtx.ToGitHubContext()),
|
||||
)
|
||||
parseTime := time.Since(parseStartTime)
|
||||
GetMatrixMetrics().RecordParseTime(parseTime)
|
||||
|
||||
if err != nil {
|
||||
// If parsing fails, we can't expand the matrix
|
||||
// Mark as evaluated and skip
|
||||
job.IsMatrixEvaluated = true
|
||||
errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion. Error: %v. RawStrategy: %s",
|
||||
job.ID, job.JobID, err, job.RawStrategy)
|
||||
log.Error("Matrix parse error: %s", errMsg)
|
||||
GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
job.IsMatrixEvaluated = true
|
||||
log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Debug("Parsed %d matrix combinations for job %d (JobID: %s)", len(jobs), job.ID, job.JobID)
|
||||
|
||||
// Create new ActionRunJobs for each parsed workflow (each matrix combination)
|
||||
newJobs := make([]*actions_model.ActionRunJob, 0)
|
||||
|
||||
for i, parsedSingleWorkflow := range jobs {
|
||||
id, jobDef := parsedSingleWorkflow.Job()
|
||||
if jobDef == nil {
|
||||
log.Warn("Skipped nil jobDef at index %d for job %d (JobID: %s)", i, job.ID, job.JobID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip the original job ID - we only want the matrix-expanded versions
|
||||
if id == job.JobID {
|
||||
log.Debug("Skipped original job ID %s in matrix expansion for job %d", id, job.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Erase needs from the payload before storing
|
||||
needs := jobDef.Needs()
|
||||
if err := parsedSingleWorkflow.SetJob(id, jobDef.EraseNeeds()); err != nil {
|
||||
log.Error("Failed to erase needs from job %s (matrix expansion for job %d): %v", id, job.ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
payload, _ := parsedSingleWorkflow.Marshal()
|
||||
|
||||
newJob := &actions_model.ActionRunJob{
|
||||
RunID: job.RunID,
|
||||
RepoID: job.RepoID,
|
||||
OwnerID: job.OwnerID,
|
||||
CommitSHA: job.CommitSHA,
|
||||
IsForkPullRequest: job.IsForkPullRequest,
|
||||
Name: jobDef.Name,
|
||||
WorkflowPayload: payload,
|
||||
JobID: id,
|
||||
Needs: needs,
|
||||
RunsOn: jobDef.RunsOn(),
|
||||
Status: actions_model.StatusBlocked,
|
||||
}
|
||||
|
||||
newJobs = append(newJobs, newJob)
|
||||
}
|
||||
|
||||
// If no new jobs were created, mark as evaluated
|
||||
if len(newJobs) == 0 {
|
||||
job.IsMatrixEvaluated = true
|
||||
log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d", job.ID, job.JobID, len(jobs))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Insert the new jobs into database
|
||||
insertStartTime := time.Now()
|
||||
if err := actions_model.InsertActionRunJobs(ctx, newJobs); err != nil {
|
||||
insertTime := time.Since(insertStartTime)
|
||||
GetMatrixMetrics().RecordInsertTime(insertTime)
|
||||
errMsg := fmt.Sprintf("failed to insert %d new matrix jobs for job %d (JobID: %s): %v", len(newJobs), job.ID, job.JobID, err)
|
||||
log.Error("Matrix insertion error: %s", errMsg)
|
||||
GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0)
|
||||
return nil, fmt.Errorf("insert new jobs: %w", err)
|
||||
}
|
||||
insertTime := time.Since(insertStartTime)
|
||||
GetMatrixMetrics().RecordInsertTime(insertTime)
|
||||
|
||||
// Mark the original job as evaluated
|
||||
job.IsMatrixEvaluated = true
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil {
|
||||
log.Error("Failed to update job %d is_matrix_evaluated flag: %v", job.ID, err)
|
||||
}
|
||||
|
||||
totalTime := time.Since(startTime)
|
||||
GetMatrixMetrics().RecordReevaluation(totalTime, true, int64(len(newJobs)))
|
||||
|
||||
log.Info("Successfully completed matrix re-evaluation for job %d (JobID: %s): created %d new jobs from %d matrix combinations (total: %dms, parse: %dms, insert: %dms)",
|
||||
job.ID, job.JobID, len(newJobs), len(jobs), totalTime.Milliseconds(), parseTime.Milliseconds(), insertTime.Milliseconds())
|
||||
|
||||
return newJobs, nil
|
||||
}
|
||||
|
||||
// mergeNeedsIntoVars converts task needs outputs into variables for expression evaluation
|
||||
func mergeNeedsIntoVars(baseVars map[string]string, taskNeeds map[string]*TaskNeed) map[string]string {
|
||||
merged := make(map[string]string)
|
||||
|
||||
// Copy base vars
|
||||
maps.Copy(merged, baseVars)
|
||||
|
||||
// Add needs outputs as variables in format: needs.<job_id>.outputs.<output_name>
|
||||
for jobID, taskNeed := range taskNeeds {
|
||||
for outputKey, outputValue := range taskNeed.Outputs {
|
||||
key := fmt.Sprintf("needs.%s.outputs.%s", jobID, outputKey)
|
||||
merged[key] = outputValue
|
||||
}
|
||||
}
|
||||
|
||||
return merged
|
||||
}
|
||||
173
services/actions/matrix_metrics.go
Normal file
173
services/actions/matrix_metrics.go
Normal file
@ -0,0 +1,173 @@
|
||||
// Copyright 2025 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MatrixMetrics tracks performance metrics for matrix re-evaluation operations
|
||||
type MatrixMetrics struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
// Counters
|
||||
TotalReevaluations int64
|
||||
SuccessfulReevaluations int64
|
||||
FailedReevaluations int64
|
||||
JobsCreatedTotal int64
|
||||
DeferredReevaluations int64
|
||||
|
||||
// Timing
|
||||
TotalReevaluationTime time.Duration
|
||||
TotalParseTime time.Duration
|
||||
TotalInsertTime time.Duration
|
||||
|
||||
// Histograms (for detailed analysis)
|
||||
ReevaluationTimes []time.Duration
|
||||
ParseTimes []time.Duration
|
||||
InsertTimes []time.Duration
|
||||
}
|
||||
|
||||
var (
|
||||
matrixMetricsInstance *MatrixMetrics
|
||||
metricsMutex sync.Mutex
|
||||
)
|
||||
|
||||
// GetMatrixMetrics returns the global matrix metrics instance
|
||||
func GetMatrixMetrics() *MatrixMetrics {
|
||||
if matrixMetricsInstance == nil {
|
||||
metricsMutex.Lock()
|
||||
if matrixMetricsInstance == nil {
|
||||
matrixMetricsInstance = &MatrixMetrics{
|
||||
ReevaluationTimes: make([]time.Duration, 0, 1000),
|
||||
ParseTimes: make([]time.Duration, 0, 1000),
|
||||
InsertTimes: make([]time.Duration, 0, 1000),
|
||||
}
|
||||
}
|
||||
metricsMutex.Unlock()
|
||||
}
|
||||
return matrixMetricsInstance
|
||||
}
|
||||
|
||||
// RecordReevaluation records a matrix re-evaluation attempt
|
||||
func (m *MatrixMetrics) RecordReevaluation(duration time.Duration, success bool, jobsCreated int64) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.TotalReevaluations++
|
||||
m.TotalReevaluationTime += duration
|
||||
|
||||
if success {
|
||||
m.SuccessfulReevaluations++
|
||||
m.JobsCreatedTotal += jobsCreated
|
||||
} else {
|
||||
m.FailedReevaluations++
|
||||
}
|
||||
|
||||
// Keep a limited history for detailed analysis (keep last 1000)
|
||||
if len(m.ReevaluationTimes) < 1000 {
|
||||
m.ReevaluationTimes = append(m.ReevaluationTimes, duration)
|
||||
} else {
|
||||
// Shift and add new value
|
||||
copy(m.ReevaluationTimes, m.ReevaluationTimes[1:])
|
||||
m.ReevaluationTimes[len(m.ReevaluationTimes)-1] = duration
|
||||
}
|
||||
}
|
||||
|
||||
// RecordDeferred records a deferred matrix re-evaluation
|
||||
func (m *MatrixMetrics) RecordDeferred() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.DeferredReevaluations++
|
||||
}
|
||||
|
||||
// RecordParseTime records the time taken to parse a workflow
|
||||
func (m *MatrixMetrics) RecordParseTime(duration time.Duration) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.TotalParseTime += duration
|
||||
|
||||
if len(m.ParseTimes) < 1000 {
|
||||
m.ParseTimes = append(m.ParseTimes, duration)
|
||||
} else {
|
||||
copy(m.ParseTimes, m.ParseTimes[1:])
|
||||
m.ParseTimes[len(m.ParseTimes)-1] = duration
|
||||
}
|
||||
}
|
||||
|
||||
// RecordInsertTime records the time taken to insert matrix jobs
|
||||
func (m *MatrixMetrics) RecordInsertTime(duration time.Duration) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.TotalInsertTime += duration
|
||||
|
||||
if len(m.InsertTimes) < 1000 {
|
||||
m.InsertTimes = append(m.InsertTimes, duration)
|
||||
} else {
|
||||
copy(m.InsertTimes, m.InsertTimes[1:])
|
||||
m.InsertTimes[len(m.InsertTimes)-1] = duration
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats returns a snapshot of the current metrics
|
||||
func (m *MatrixMetrics) GetStats() map[string]any {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
avgReevaluationTime := time.Duration(0)
|
||||
if m.TotalReevaluations > 0 {
|
||||
avgReevaluationTime = m.TotalReevaluationTime / time.Duration(m.TotalReevaluations)
|
||||
}
|
||||
|
||||
avgParseTime := time.Duration(0)
|
||||
if len(m.ParseTimes) > 0 {
|
||||
avgParseTime = m.TotalParseTime / time.Duration(len(m.ParseTimes))
|
||||
}
|
||||
|
||||
avgInsertTime := time.Duration(0)
|
||||
if len(m.InsertTimes) > 0 {
|
||||
avgInsertTime = m.TotalInsertTime / time.Duration(len(m.InsertTimes))
|
||||
}
|
||||
|
||||
successRate := 0.0
|
||||
if m.TotalReevaluations > 0 {
|
||||
successRate = float64(m.SuccessfulReevaluations) / float64(m.TotalReevaluations) * 100
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"total_reevaluations": m.TotalReevaluations,
|
||||
"successful_reevaluations": m.SuccessfulReevaluations,
|
||||
"failed_reevaluations": m.FailedReevaluations,
|
||||
"deferred_reevaluations": m.DeferredReevaluations,
|
||||
"success_rate_percent": successRate,
|
||||
"total_jobs_created": m.JobsCreatedTotal,
|
||||
"total_reevaluation_time_ms": m.TotalReevaluationTime.Milliseconds(),
|
||||
"avg_reevaluation_time_ms": avgReevaluationTime.Milliseconds(),
|
||||
"total_parse_time_ms": m.TotalParseTime.Milliseconds(),
|
||||
"avg_parse_time_ms": avgParseTime.Milliseconds(),
|
||||
"total_insert_time_ms": m.TotalInsertTime.Milliseconds(),
|
||||
"avg_insert_time_ms": avgInsertTime.Milliseconds(),
|
||||
}
|
||||
}
|
||||
|
||||
// Reset clears all metrics
|
||||
func (m *MatrixMetrics) Reset() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.TotalReevaluations = 0
|
||||
m.SuccessfulReevaluations = 0
|
||||
m.FailedReevaluations = 0
|
||||
m.JobsCreatedTotal = 0
|
||||
m.DeferredReevaluations = 0
|
||||
m.TotalReevaluationTime = 0
|
||||
m.TotalParseTime = 0
|
||||
m.TotalInsertTime = 0
|
||||
m.ReevaluationTimes = m.ReevaluationTimes[:0]
|
||||
m.ParseTimes = m.ParseTimes[:0]
|
||||
m.InsertTimes = m.InsertTimes[:0]
|
||||
}
|
||||
191
services/actions/matrix_metrics_prometheus.go
Normal file
191
services/actions/matrix_metrics_prometheus.go
Normal file
@ -0,0 +1,191 @@
|
||||
// Copyright 2025 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// MatrixMetricsCollector implements the prometheus.Collector interface
|
||||
// and exposes matrix re-evaluation metrics for prometheus
|
||||
type MatrixMetricsCollector struct {
|
||||
// Counters
|
||||
totalReevaluations prometheus.Gauge
|
||||
successfulReevaluations prometheus.Gauge
|
||||
failedReevaluations prometheus.Gauge
|
||||
deferredReevaluations prometheus.Gauge
|
||||
jobsCreatedTotal prometheus.Gauge
|
||||
|
||||
// Timing (in milliseconds)
|
||||
totalReevaluationTime prometheus.Gauge
|
||||
avgReevaluationTime prometheus.Gauge
|
||||
totalParseTime prometheus.Gauge
|
||||
avgParseTime prometheus.Gauge
|
||||
totalInsertTime prometheus.Gauge
|
||||
avgInsertTime prometheus.Gauge
|
||||
|
||||
// Rates
|
||||
successRate prometheus.Gauge
|
||||
}
|
||||
|
||||
const (
|
||||
namespace = "gitea_"
|
||||
subsystem = "matrix_"
|
||||
)
|
||||
|
||||
// NewMatrixMetricsCollector creates a new MatrixMetricsCollector
|
||||
func NewMatrixMetricsCollector() *MatrixMetricsCollector {
|
||||
return &MatrixMetricsCollector{
|
||||
totalReevaluations: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "total_reevaluations",
|
||||
Help: "Total number of matrix re-evaluation attempts",
|
||||
},
|
||||
),
|
||||
successfulReevaluations: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "successful_reevaluations",
|
||||
Help: "Number of successful matrix re-evaluations",
|
||||
},
|
||||
),
|
||||
failedReevaluations: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "failed_reevaluations",
|
||||
Help: "Number of failed matrix re-evaluations",
|
||||
},
|
||||
),
|
||||
deferredReevaluations: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "deferred_reevaluations",
|
||||
Help: "Number of deferred matrix re-evaluations (waiting for dependencies)",
|
||||
},
|
||||
),
|
||||
jobsCreatedTotal: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "jobs_created_total",
|
||||
Help: "Total number of jobs created from matrix expansion",
|
||||
},
|
||||
),
|
||||
totalReevaluationTime: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "total_reevaluation_time_ms",
|
||||
Help: "Total time spent on matrix re-evaluations in milliseconds",
|
||||
},
|
||||
),
|
||||
avgReevaluationTime: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "avg_reevaluation_time_ms",
|
||||
Help: "Average time per matrix re-evaluation in milliseconds",
|
||||
},
|
||||
),
|
||||
totalParseTime: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "total_parse_time_ms",
|
||||
Help: "Total time spent parsing workflow payloads in milliseconds",
|
||||
},
|
||||
),
|
||||
avgParseTime: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "avg_parse_time_ms",
|
||||
Help: "Average time per workflow parse in milliseconds",
|
||||
},
|
||||
),
|
||||
totalInsertTime: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "total_insert_time_ms",
|
||||
Help: "Total time spent inserting jobs into database in milliseconds",
|
||||
},
|
||||
),
|
||||
avgInsertTime: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "avg_insert_time_ms",
|
||||
Help: "Average time per database insert in milliseconds",
|
||||
},
|
||||
),
|
||||
successRate: prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "success_rate_percent",
|
||||
Help: "Success rate of matrix re-evaluations as percentage (0-100)",
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Describe returns the metrics descriptions
|
||||
func (c *MatrixMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
c.totalReevaluations.Describe(ch)
|
||||
c.successfulReevaluations.Describe(ch)
|
||||
c.failedReevaluations.Describe(ch)
|
||||
c.deferredReevaluations.Describe(ch)
|
||||
c.jobsCreatedTotal.Describe(ch)
|
||||
c.totalReevaluationTime.Describe(ch)
|
||||
c.avgReevaluationTime.Describe(ch)
|
||||
c.totalParseTime.Describe(ch)
|
||||
c.avgParseTime.Describe(ch)
|
||||
c.totalInsertTime.Describe(ch)
|
||||
c.avgInsertTime.Describe(ch)
|
||||
c.successRate.Describe(ch)
|
||||
}
|
||||
|
||||
// Collect collects the current metric values and sends them to the channel
|
||||
func (c *MatrixMetricsCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
metrics := GetMatrixMetrics()
|
||||
stats := metrics.GetStats()
|
||||
|
||||
// Set counter values
|
||||
c.totalReevaluations.Set(float64(stats["total_reevaluations"].(int64)))
|
||||
c.successfulReevaluations.Set(float64(stats["successful_reevaluations"].(int64)))
|
||||
c.failedReevaluations.Set(float64(stats["failed_reevaluations"].(int64)))
|
||||
c.deferredReevaluations.Set(float64(stats["deferred_reevaluations"].(int64)))
|
||||
c.jobsCreatedTotal.Set(float64(stats["total_jobs_created"].(int64)))
|
||||
|
||||
// Set timing values (already in milliseconds)
|
||||
c.totalReevaluationTime.Set(float64(stats["total_reevaluation_time_ms"].(int64)))
|
||||
c.avgReevaluationTime.Set(float64(stats["avg_reevaluation_time_ms"].(int64)))
|
||||
c.totalParseTime.Set(float64(stats["total_parse_time_ms"].(int64)))
|
||||
c.avgParseTime.Set(float64(stats["avg_parse_time_ms"].(int64)))
|
||||
c.totalInsertTime.Set(float64(stats["total_insert_time_ms"].(int64)))
|
||||
c.avgInsertTime.Set(float64(stats["avg_insert_time_ms"].(int64)))
|
||||
|
||||
// Set success rate
|
||||
c.successRate.Set(stats["success_rate_percent"].(float64))
|
||||
|
||||
// Collect all metrics
|
||||
c.totalReevaluations.Collect(ch)
|
||||
c.successfulReevaluations.Collect(ch)
|
||||
c.failedReevaluations.Collect(ch)
|
||||
c.deferredReevaluations.Collect(ch)
|
||||
c.jobsCreatedTotal.Collect(ch)
|
||||
c.totalReevaluationTime.Collect(ch)
|
||||
c.avgReevaluationTime.Collect(ch)
|
||||
c.totalParseTime.Collect(ch)
|
||||
c.avgParseTime.Collect(ch)
|
||||
c.totalInsertTime.Collect(ch)
|
||||
c.avgInsertTime.Collect(ch)
|
||||
c.successRate.Collect(ch)
|
||||
}
|
||||
82
services/actions/matrix_metrics_test.go
Normal file
82
services/actions/matrix_metrics_test.go
Normal file
@ -0,0 +1,82 @@
|
||||
// Copyright 2025 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Essential Prometheus Collector Tests
|
||||
|
||||
func TestNewMatrixMetricsCollector(t *testing.T) {
|
||||
collector := NewMatrixMetricsCollector()
|
||||
assert.NotNil(t, collector)
|
||||
assert.NotNil(t, collector.totalReevaluations)
|
||||
assert.NotNil(t, collector.successRate)
|
||||
}
|
||||
|
||||
func TestMatrixMetricsCollectorDescribe(t *testing.T) {
|
||||
collector := NewMatrixMetricsCollector()
|
||||
ch := make(chan *prometheus.Desc, 100)
|
||||
collector.Describe(ch)
|
||||
assert.NotEmpty(t, ch)
|
||||
}
|
||||
|
||||
func TestMatrixMetricsCollectorCollect(t *testing.T) {
|
||||
matrixMetricsInstance = nil
|
||||
metrics := GetMatrixMetrics()
|
||||
metrics.RecordReevaluation(10*time.Millisecond, true, 5)
|
||||
metrics.RecordParseTime(8 * time.Millisecond)
|
||||
|
||||
collector := NewMatrixMetricsCollector()
|
||||
ch := make(chan prometheus.Metric, 100)
|
||||
collector.Collect(ch)
|
||||
assert.NotEmpty(t, ch)
|
||||
|
||||
matrixMetricsInstance = nil
|
||||
}
|
||||
|
||||
func TestMatrixMetricsGetStats(t *testing.T) {
|
||||
metrics := &MatrixMetrics{
|
||||
ReevaluationTimes: make([]time.Duration, 0, 1000),
|
||||
ParseTimes: make([]time.Duration, 0, 1000),
|
||||
InsertTimes: make([]time.Duration, 0, 1000),
|
||||
}
|
||||
|
||||
metrics.RecordReevaluation(10*time.Millisecond, true, 3)
|
||||
metrics.RecordReevaluation(15*time.Millisecond, true, 2)
|
||||
metrics.RecordReevaluation(5*time.Millisecond, false, 0)
|
||||
|
||||
stats := metrics.GetStats()
|
||||
assert.Equal(t, int64(3), stats["total_reevaluations"])
|
||||
assert.Equal(t, int64(2), stats["successful_reevaluations"])
|
||||
assert.Equal(t, int64(1), stats["failed_reevaluations"])
|
||||
assert.Greater(t, stats["success_rate_percent"].(float64), 60.0)
|
||||
}
|
||||
|
||||
func BenchmarkMatrixMetricsCollectorCollect(b *testing.B) {
|
||||
metrics := &MatrixMetrics{
|
||||
ReevaluationTimes: make([]time.Duration, 0, 1000),
|
||||
ParseTimes: make([]time.Duration, 0, 1000),
|
||||
InsertTimes: make([]time.Duration, 0, 1000),
|
||||
}
|
||||
matrixMetricsInstance = metrics
|
||||
|
||||
for range 100 {
|
||||
metrics.RecordReevaluation(10*time.Millisecond, true, 5)
|
||||
metrics.RecordParseTime(5 * time.Millisecond)
|
||||
}
|
||||
|
||||
collector := NewMatrixMetricsCollector()
|
||||
ch := make(chan prometheus.Metric, 100)
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
collector.Collect(ch)
|
||||
}
|
||||
}
|
||||
@ -759,3 +759,97 @@ func getTaskJobNameByTaskID(t *testing.T, authToken, ownerName, repoName string,
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func TestDynamicMatrixFromJobOutputs(t *testing.T) {
|
||||
testCases := []struct {
|
||||
treePath string
|
||||
fileContent string
|
||||
outcomes map[string]*mockTaskOutcome
|
||||
}{
|
||||
{
|
||||
treePath: ".gitea/workflows/dynamic-matrix.yml",
|
||||
fileContent: `name: Dynamic Matrix from Job Outputs
|
||||
on:
|
||||
push:
|
||||
paths:
|
||||
- '.gitea/workflows/dynamic-matrix.yml'
|
||||
jobs:
|
||||
generate:
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
matrix: ${{ steps.gen_matrix.outputs.matrix }}
|
||||
steps:
|
||||
- name: Generate matrix
|
||||
id: gen_matrix
|
||||
run: |
|
||||
echo "matrix=[1,2,3]" >> "$GITHUB_OUTPUT"
|
||||
|
||||
build:
|
||||
needs: [generate]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
version: ${{ fromJson(needs.generate.outputs.matrix) }}
|
||||
steps:
|
||||
- run: echo "Building version ${{ matrix.version }}"
|
||||
`,
|
||||
outcomes: map[string]*mockTaskOutcome{
|
||||
"generate": {
|
||||
result: runnerv1.Result_RESULT_SUCCESS,
|
||||
outputs: map[string]string{
|
||||
"matrix": "[1,2,3]",
|
||||
},
|
||||
},
|
||||
"build (1)": {
|
||||
result: runnerv1.Result_RESULT_SUCCESS,
|
||||
},
|
||||
"build (2)": {
|
||||
result: runnerv1.Result_RESULT_SUCCESS,
|
||||
},
|
||||
"build (3)": {
|
||||
result: runnerv1.Result_RESULT_SUCCESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
onGiteaRun(t, func(t *testing.T, u *url.URL) {
|
||||
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||
session := loginUser(t, user2.Name)
|
||||
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)
|
||||
|
||||
apiRepo := createActionsTestRepo(t, token, "actions-dynamic-matrix", false)
|
||||
runner := newMockRunner()
|
||||
runner.registerAsRepoRunner(t, user2.Name, apiRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false)
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run("test "+tc.treePath, func(t *testing.T) {
|
||||
opts := getWorkflowCreateFileOptions(user2, apiRepo.DefaultBranch, "create "+tc.treePath, tc.fileContent)
|
||||
createWorkflowFile(t, token, user2.Name, apiRepo.Name, tc.treePath, opts)
|
||||
|
||||
// Execute the generate job first
|
||||
task := runner.fetchTask(t)
|
||||
jobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, task.Id)
|
||||
assert.Equal(t, "generate", jobName)
|
||||
outcome := tc.outcomes[jobName]
|
||||
assert.NotNil(t, outcome)
|
||||
runner.execTask(t, task, outcome)
|
||||
|
||||
// Now the build job should be created with matrix expansion from the output
|
||||
// We expect 3 tasks for build (1), build (2), build (3)
|
||||
buildTasks := make([]int64, 0)
|
||||
for range 3 {
|
||||
buildTask := runner.fetchTask(t)
|
||||
buildJobName := getTaskJobNameByTaskID(t, token, user2.Name, apiRepo.Name, buildTask.Id)
|
||||
t.Logf("Fetched task: %s", buildJobName)
|
||||
assert.Contains(t, []string{"build (1)", "build (2)", "build (3)"}, buildJobName, "Expected a build job with matrix index")
|
||||
outcome := tc.outcomes[buildJobName]
|
||||
assert.NotNil(t, outcome)
|
||||
runner.execTask(t, buildTask, outcome)
|
||||
buildTasks = append(buildTasks, buildTask.Id)
|
||||
}
|
||||
|
||||
assert.Len(t, len(buildTasks), 3, "Expected 3 build tasks from dynamic matrix")
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user