mirror of
https://github.com/go-gitea/gitea.git
synced 2026-06-19 11:13:45 +02:00
feat: Adjust the error handling from as feedback from the PR review
Signed-off-by: Pascal Zimmermann <pascal.zimmermann@theiotstudio.com>
This commit is contained in:
parent
9b0eedd3f1
commit
6dcf4580a1
@ -51,7 +51,7 @@ type ActionRunJob struct {
|
||||
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group
|
||||
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress
|
||||
|
||||
RawStrategy string // raw strategy from job YAML's "strategy" section (stored before matrix expansion for deferred evaluation)
|
||||
RawStrategy string `xorm:"TEXT"` // raw strategy from job YAML's "strategy" section (stored before matrix expansion for deferred evaluation)
|
||||
|
||||
// IsMatrixEvaluated is only valid/needed when this job's RawStrategy is not empty and contains a matrix that depends on job outputs.
|
||||
// If the matrix can't be evaluated yet (e.g. job hasn't completed), this field will be false.
|
||||
|
||||
@ -8,7 +8,12 @@ import (
|
||||
)
|
||||
|
||||
func AddMatrixEvaluationColumnsToActionRunJob(x *xorm.Engine) error {
|
||||
return x.Sync(new(ActionRunJobWithMatrixSupport))
|
||||
if _, err := x.SyncWithOptions(xorm.SyncOptions{
|
||||
IgnoreDropIndices: true,
|
||||
}, new(ActionRunJobWithMatrixSupport)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ActionRunJobWithMatrixSupport is a temporary struct for migration purposes
|
||||
|
||||
@ -228,7 +228,7 @@ func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, up
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
|
||||
}
|
||||
log.Info("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status)
|
||||
log.Debug("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status)
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
}
|
||||
}
|
||||
@ -375,7 +375,12 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
|
||||
resolveMetrics.matrixReevaluated++
|
||||
log.Info("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)",
|
||||
id, actionRunJob.JobID, len(newMatrixJobs), duration)
|
||||
// The new jobs will be picked up in the next resolution iteration
|
||||
|
||||
// Mark the original matrix placeholder job as skipped so it won't be run later.
|
||||
actionRunJob.Status = actions_model.StatusSkipped
|
||||
if _, err := db.GetEngine(ctx).ID(actionRunJob.ID).Cols("status").Update(actionRunJob); err != nil {
|
||||
log.Error("Failed to mark matrix placeholder job %d (JobID: %s) as skipped after re-evaluation: %v", id, actionRunJob.JobID, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@ -62,8 +62,8 @@ func ExtractRawStrategies(content []byte) (map[string]string, error) {
|
||||
return strategies, nil
|
||||
}
|
||||
|
||||
// hasMatrixWithNeeds checks if a job's strategy contains a matrix that depends on job outputs
|
||||
func hasMatrixWithNeeds(rawStrategy string) bool {
|
||||
// HasMatrixWithNeeds checks if a job's strategy contains a matrix that depends on job outputs
|
||||
func HasMatrixWithNeeds(rawStrategy string) bool {
|
||||
if rawStrategy == "" {
|
||||
return false
|
||||
}
|
||||
@ -95,9 +95,12 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !hasMatrixWithNeeds(job.RawStrategy) {
|
||||
// Mark as evaluated since there's no needs-dependent matrix
|
||||
if !HasMatrixWithNeeds(job.RawStrategy) {
|
||||
// Mark as evaluated since there's no needs-dependent matrix and persist to DB
|
||||
job.IsMatrixEvaluated = true
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil {
|
||||
log.Error("Failed to persist is_matrix_evaluated flag for job %d: %v", job.ID, err)
|
||||
}
|
||||
log.Debug("Matrix re-evaluation skipped for job %d: no needs-dependent matrix found", job.ID)
|
||||
return nil, nil
|
||||
}
|
||||
@ -172,9 +175,16 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
// so that the jobparser can resolve needs.*.outputs.* expressions
|
||||
workflowYAML, err := constructWorkflowWithNeeds(job, taskNeeds)
|
||||
if err != nil {
|
||||
log.Error("Failed to construct workflow for job %d (JobID: %s): %v", job.ID, job.JobID, err)
|
||||
// If we can't construct the workflow, we can't expand the matrix
|
||||
// Mark as evaluated and skip the job to prevent the placeholder from running unexpanded
|
||||
job.IsMatrixEvaluated = true
|
||||
job.Status = actions_model.StatusSkipped
|
||||
if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); dbErr != nil {
|
||||
log.Error("Failed to persist is_matrix_evaluated/status flag for job %d after workflow construction failure: %v", job.ID, dbErr)
|
||||
}
|
||||
log.Error("Failed to construct workflow for job %d (JobID: %s): %v, marking as skipped", job.ID, job.JobID, err)
|
||||
GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0)
|
||||
return nil, nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse the constructed workflow with job outputs to expand the matrix
|
||||
@ -191,8 +201,11 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
|
||||
if err != nil {
|
||||
// If parsing fails, we can't expand the matrix
|
||||
// Mark as evaluated and skip
|
||||
// Mark as evaluated and persist to DB to avoid repeated parse attempts
|
||||
job.IsMatrixEvaluated = true
|
||||
if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); dbErr != nil {
|
||||
log.Error("Failed to persist is_matrix_evaluated flag for job %d after parse failure: %v", job.ID, dbErr)
|
||||
}
|
||||
errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion. Error: %v. RawStrategy: %s",
|
||||
job.ID, job.JobID, err, job.RawStrategy)
|
||||
log.Error("Matrix parse error: %s", errMsg)
|
||||
@ -201,8 +214,13 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
// No jobs generated - mark as evaluated and skip the placeholder job
|
||||
job.IsMatrixEvaluated = true
|
||||
log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID)
|
||||
job.Status = actions_model.StatusSkipped
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil {
|
||||
log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err)
|
||||
}
|
||||
log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s), marking as skipped", job.ID, job.JobID)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@ -250,10 +268,14 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
newJobs = append(newJobs, newJob)
|
||||
}
|
||||
|
||||
// If no new jobs were created, mark as evaluated
|
||||
// If no new jobs were created, mark as evaluated, skip the placeholder job, and persist to DB
|
||||
if len(newJobs) == 0 {
|
||||
job.IsMatrixEvaluated = true
|
||||
log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d", job.ID, job.JobID, len(jobs))
|
||||
job.Status = actions_model.StatusSkipped
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil {
|
||||
log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err)
|
||||
}
|
||||
log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d, marking as skipped", job.ID, job.JobID, len(jobs))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@ -270,10 +292,11 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
insertTime := time.Since(insertStartTime)
|
||||
GetMatrixMetrics().RecordInsertTime(insertTime)
|
||||
|
||||
// Mark the original job as evaluated
|
||||
// Mark the original placeholder job as evaluated and skipped so it is never run
|
||||
job.IsMatrixEvaluated = true
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil {
|
||||
log.Error("Failed to update job %d is_matrix_evaluated flag: %v", job.ID, err)
|
||||
job.Status = actions_model.StatusSkipped
|
||||
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil {
|
||||
log.Error("Failed to update job %d after matrix expansion (is_matrix_evaluated/status): %v", job.ID, err)
|
||||
}
|
||||
|
||||
totalTime := time.Since(startTime)
|
||||
|
||||
@ -32,22 +32,17 @@ type MatrixMetrics struct {
|
||||
|
||||
var (
|
||||
matrixMetricsInstance *MatrixMetrics
|
||||
metricsMutex sync.Mutex
|
||||
metricsOnce sync.Once
|
||||
)
|
||||
|
||||
// GetMatrixMetrics returns the global matrix metrics instance
|
||||
func GetMatrixMetrics() *MatrixMetrics {
|
||||
if matrixMetricsInstance == nil {
|
||||
metricsMutex.Lock()
|
||||
if matrixMetricsInstance == nil {
|
||||
matrixMetricsInstance = &MatrixMetrics{
|
||||
ReevaluationTimes: make([]time.Duration, 0, 1000),
|
||||
ParseTimes: make([]time.Duration, 0, 1000),
|
||||
InsertTimes: make([]time.Duration, 0, 1000),
|
||||
}
|
||||
metricsOnce.Do(func() {
|
||||
matrixMetricsInstance = &MatrixMetrics{
|
||||
ReevaluationTimes: make([]time.Duration, 0, 1000),
|
||||
ParseTimes: make([]time.Duration, 0, 1000),
|
||||
InsertTimes: make([]time.Duration, 0, 1000),
|
||||
}
|
||||
metricsMutex.Unlock()
|
||||
}
|
||||
})
|
||||
return matrixMetricsInstance
|
||||
}
|
||||
|
||||
|
||||
@ -30,8 +30,8 @@ type MatrixMetricsCollector struct {
|
||||
}
|
||||
|
||||
const (
|
||||
namespace = "gitea_"
|
||||
subsystem = "matrix_"
|
||||
namespace = "gitea"
|
||||
subsystem = "matrix"
|
||||
)
|
||||
|
||||
// newMatrixGauge creates a new Prometheus Gauge with standard matrix metrics naming
|
||||
|
||||
@ -73,10 +73,16 @@ func BenchmarkMatrixMetricsCollectorCollect(b *testing.B) {
|
||||
}
|
||||
|
||||
collector := NewMatrixMetricsCollector()
|
||||
ch := make(chan prometheus.Metric, 100)
|
||||
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
// Use a fresh channel each iteration to avoid filling up the buffer
|
||||
ch := make(chan prometheus.Metric, 100)
|
||||
collector.Collect(ch)
|
||||
// Drain the channel to prevent blocking
|
||||
close(ch)
|
||||
for range ch {
|
||||
// discard metrics
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
|
||||
actions_model "code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/models/db"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
notify_service "code.gitea.io/gitea/services/notify"
|
||||
|
||||
@ -102,7 +103,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
|
||||
}
|
||||
|
||||
// Extract raw strategies from the original workflow before parsing
|
||||
rawStrategies, _ := ExtractRawStrategies(workflowContent)
|
||||
rawStrategies, err := ExtractRawStrategies(workflowContent)
|
||||
if err != nil {
|
||||
log.Warn("Failed to extract raw strategies from workflow: %v", err)
|
||||
// Continue without raw strategies - jobs will work but dynamic matrix won't be supported
|
||||
rawStrategies = nil
|
||||
}
|
||||
|
||||
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
|
||||
var hasWaitingJobs bool
|
||||
@ -131,8 +137,9 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
|
||||
Status: util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting),
|
||||
}
|
||||
|
||||
// Store raw strategy if job has matrix that depends on job outputs
|
||||
if rawStrategy, exists := rawStrategies[id]; exists && len(needs) > 0 {
|
||||
// Store raw strategy only if job has matrix that actually depends on job outputs (needs.*.outputs)
|
||||
// This avoids unnecessary DB storage and later re-evaluation checks for purely static matrices
|
||||
if rawStrategy, exists := rawStrategies[id]; exists && len(needs) > 0 && HasMatrixWithNeeds(rawStrategy) {
|
||||
runJob.RawStrategy = rawStrategy
|
||||
runJob.IsMatrixEvaluated = false
|
||||
}
|
||||
|
||||
@ -12,7 +12,6 @@ import (
|
||||
|
||||
auth_model "code.gitea.io/gitea/models/auth"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
pingv1 "code.gitea.io/actions-proto-go/ping/v1"
|
||||
"code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
|
||||
@ -20,6 +19,7 @@ import (
|
||||
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
|
||||
"connectrpc.com/connect"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user