diff --git a/go.mod b/go.mod index 5c3be61c69..0b8d4f977a 100644 --- a/go.mod +++ b/go.mod @@ -105,7 +105,7 @@ require ( go.yaml.in/yaml/v4 v4.0.0-rc.3 golang.org/x/crypto v0.52.0 golang.org/x/image v0.40.0 - golang.org/x/net v0.54.0 + golang.org/x/net v0.55.0 golang.org/x/oauth2 v0.36.0 golang.org/x/sync v0.20.0 golang.org/x/sys v0.45.0 diff --git a/go.sum b/go.sum index c611f238ab..3038c46e01 100644 --- a/go.sum +++ b/go.sum @@ -821,8 +821,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w= -golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ= +golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8= +golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 682d9f5050..bd6c9a86eb 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -398,3 +398,15 @@ func InsertActionRunJobs(ctx context.Context, jobs []*ActionRunJob) error { } return db.Insert(ctx, jobs) } + +// GetMaxAttemptJobID returns the highest AttemptJobID among a run attempt's jobs, or 0 if none. +// Used to assign ids to jobs created after initial planning (e.g. matrix expansion). +func GetMaxAttemptJobID(ctx context.Context, runID, runAttemptID int64) (int64, error) { + var job ActionRunJob + if _, err := db.GetEngine(ctx). + Where(builder.Eq{"run_id": runID, "run_attempt_id": runAttemptID}). + Desc("attempt_job_id").Get(&job); err != nil { + return 0, err + } + return job.AttemptJobID, nil +} diff --git a/models/migrations/v1_27/v335.go b/models/migrations/v1_27/v335.go index 0b6f9e57b2..b1026545ba 100644 --- a/models/migrations/v1_27/v335.go +++ b/models/migrations/v1_27/v335.go @@ -18,6 +18,7 @@ func AddMatrixEvaluationColumnsToActionRunJob(x db.EngineMigration) error { } _, err := x.SyncWithOptions(xorm.SyncOptions{ IgnoreDropIndices: true, + IgnoreConstrains: true, }, new(ActionRunJob)) return err } diff --git a/modules/actions/jobparser/jobparser.go b/modules/actions/jobparser/jobparser.go index 643a4d2c90..88b2f09b10 100644 --- a/modules/actions/jobparser/jobparser.go +++ b/modules/actions/jobparser/jobparser.go @@ -6,6 +6,7 @@ package jobparser import ( "bytes" "fmt" + "slices" "sort" "strings" @@ -33,6 +34,18 @@ func deepCopyYamlNode(node *yaml.Node) *yaml.Node { return &nodeCopy } +// rawMatrixHasExpression reports whether any scalar in the matrix node contains a +// ${{ }} expression, i.e. the matrix must be evaluated rather than used verbatim. +func rawMatrixHasExpression(node *yaml.Node) bool { + if node == nil { + return false + } + if node.Kind == yaml.ScalarNode { + return strings.Contains(node.Value, "${{") + } + return slices.ContainsFunc(node.Content, rawMatrixHasExpression) +} + func Parse(content []byte, options ...ParseOption) ([]*SingleWorkflow, error) { origin, err := model.ReadWorkflow(bytes.NewReader(content)) if err != nil { @@ -78,29 +91,22 @@ func Parse(content []byte, options ...ParseOption) ([]*SingleWorkflow, error) { return nil, fmt.Errorf("job %s not found in origin workflow", id) } - // Clone the origin job to avoid modifying the shared object - evaluatedJob := *originJob - if originJob.Strategy != nil { + // Clone + pre-evaluate only when the matrix has a ${{ }} expression; static matrices use + // the origin job as-is. Unresolved expressions defer to ReEvaluateMatrixForJobWithNeeds. + evaluatedJob := originJob + if originJob.Strategy != nil && rawMatrixHasExpression(&originJob.Strategy.RawMatrix) { + jobCopy := *originJob stratCopy := *originJob.Strategy - // Deep copy the RawMatrix yaml.Node to prevent mutations from affecting the original stratCopy.RawMatrix = *deepCopyYamlNode(&originJob.Strategy.RawMatrix) - evaluatedJob.Strategy = &stratCopy - } - - // Create an evaluator with access to needs/outputs for matrix evaluation - matrixEvaluator := NewExpressionEvaluator(NewInterpeter(id, &evaluatedJob, nil, pc.gitContext, results, pc.vars, pc.inputs)) - - // Evaluate the matrix before expanding it. - // If evaluation fails (e.g. expression references unresolved job outputs), - // continue with the unevaluated matrix — the job will be created as a - // placeholder and re-evaluated by ReEvaluateMatrixForJobWithNeeds later. - if evaluatedJob.Strategy != nil && evaluatedJob.Strategy.RawMatrix.Kind != 0 { + jobCopy.Strategy = &stratCopy + evaluatedJob = &jobCopy + matrixEvaluator := NewExpressionEvaluator(NewInterpeter(id, evaluatedJob, nil, pc.gitContext, results, pc.vars, pc.inputs)) if err := matrixEvaluator.EvaluateYamlNode(&evaluatedJob.Strategy.RawMatrix); err != nil { log.Debug("matrix evaluation deferred for job %s (unresolved expression): %v", id, err) } } - matricxes, err := getMatrixes(&evaluatedJob) + matricxes, err := getMatrixes(evaluatedJob) if err != nil { return nil, fmt.Errorf("getMatrixes: %w", err) } @@ -110,7 +116,7 @@ func Parse(content []byte, options ...ParseOption) ([]*SingleWorkflow, error) { job.Name = id } job.Strategy.RawMatrix = encodeMatrix(matrix) - evaluator := NewExpressionEvaluator(NewInterpeter(id, &evaluatedJob, matrix, pc.gitContext, results, pc.vars, pc.inputs)) + evaluator := NewExpressionEvaluator(NewInterpeter(id, evaluatedJob, matrix, pc.gitContext, results, pc.vars, pc.inputs)) job.Name = nameWithMatrix(job.Name, matrix, evaluator) runsOn := evaluatedJob.RunsOn() for i, v := range runsOn { diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 057510e0f1..9042f47be8 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "time" actions_model "gitea.dev/models/actions" "gitea.dev/models/db" @@ -248,8 +247,6 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action } } - log.Debug("Checking %d jobs for run %d (status: %s)", len(jobs), run.ID, run.Status) - vars, err := actions_model.GetVariablesOfRun(ctx, run) if err != nil { return nil, nil, nil, err @@ -262,18 +259,14 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action } updates := resolver.Resolve(ctx) - log.Debug("Job status resolver returned %d job status updates for run %d", len(updates), run.ID) - for _, job := range jobs { if status, ok := updates[job.ID]; ok { - oldStatus := job.Status job.Status = status if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { return err } else if n != 1 { return fmt.Errorf("no affected for updating blocked job %v", job.ID) } - log.Debug("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status) updatedJobs = append(updatedJobs, job) } } @@ -282,29 +275,24 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action return nil, nil, nil, err } - // Reload jobs from the database to pick up any newly created matrix jobs - oldJobCount := len(jobs) - jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) - if err != nil { - return nil, nil, nil, err + // Matrix re-evaluation can insert new jobs in this attempt; reload so callers see them. + if resolver.matrixExpanded { + jobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, run.LatestAttemptID) + if err != nil { + return nil, nil, nil, err + } } - if len(jobs) > oldJobCount { - log.Debug("Matrix re-evaluation created %d new jobs for run %d (was %d, now %d)", - len(jobs)-oldJobCount, run.ID, oldJobCount, len(jobs)) - } - - log.Debug("Job check completed for run %d: %d jobs updated, %d total jobs", run.ID, len(updatedJobs), len(jobs)) - return jobs, updatedJobs, resolver.cancelledJobs, nil } type jobStatusResolver struct { - statuses map[int64]actions_model.Status - needs map[int64][]int64 - jobMap map[int64]*actions_model.ActionRunJob - vars map[string]string - cancelledJobs []*actions_model.ActionRunJob + statuses map[int64]actions_model.Status + needs map[int64][]int64 + jobMap map[int64]*actions_model.ActionRunJob + vars map[string]string + cancelledJobs []*actions_model.ActionRunJob + matrixExpanded bool // set when matrix re-evaluation inserted new jobs, so callers reload } func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver { @@ -372,53 +360,30 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} - resolveMetrics := struct { - totalBlocked int - matrixReevaluated int - concurrencyUpdated int - jobsStarted int - jobsSkipped int - }{} - for id, status := range r.statuses { actionRunJob := r.jobMap[id] if status != actions_model.StatusBlocked { continue } - - resolveMetrics.totalBlocked++ - log.Debug("Resolving blocked job %d (JobID: %s, RunID: %d)", id, actionRunJob.JobID, actionRunJob.RunID) - allDone, allSucceed := r.resolveCheckNeeds(id) if !allDone { - log.Debug("Job %d: not all dependencies completed yet", id) continue } - log.Debug("Job %d: all dependencies completed (allSucceed: %v), checking matrix re-evaluation", id, allSucceed) - - // Try to re-evaluate the matrix with job outputs if it depends on them - startTime := time.Now() - newMatrixJobs, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars) - duration := time.Since(startTime).Milliseconds() - + // Expand a needs-dependent matrix now that needs are done; on success the job is no + // longer Blocked (placeholder becomes the first combination, siblings inserted). + children, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars) if err != nil { - log.Error("Matrix re-evaluation error for job %d (JobID: %s): %v (duration: %dms)", id, actionRunJob.JobID, err, duration) + log.Error("ReEvaluateMatrixForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err) continue } - - // If new matrix jobs were created, the placeholder was already claimed and - // skipped inside ReEvaluateMatrixForJobWithNeeds (transactionally). Nothing - // more to do here — just move to the next job. - if len(newMatrixJobs) > 0 { - resolveMetrics.matrixReevaluated++ - log.Debug("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)", - id, actionRunJob.JobID, len(newMatrixJobs), duration) + if len(children) > 0 { + r.matrixExpanded = true + } + if actionRunJob.Status != actions_model.StatusBlocked { continue } - log.Debug("Job %d: no matrix re-evaluation needed or result is empty", id) - // update concurrency and check whether the job can run now err = updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars) if err != nil { @@ -426,19 +391,16 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model // At the moment there is no way to distinguish them. // Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)" // TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users - log.Debug("Concurrency evaluation failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err) + log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err) continue } - resolveMetrics.concurrencyUpdated++ - shouldStartJob := true if !allSucceed { // Not all dependent jobs completed successfully: // * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition. // * otherwise, the job should be skipped. shouldStartJob = r.resolveJobHasIfCondition(actionRunJob) - log.Debug("Job %d: not all dependencies succeeded. Has if-condition: %v, should start: %v", id, shouldStartJob, shouldStartJob) } newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped) @@ -446,7 +408,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model var cancelledJobs []*actions_model.ActionRunJob newStatus, cancelledJobs, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob) if err != nil { - log.Error("Concurrency check failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err) + log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err) } else { r.cancelledJobs = append(r.cancelledJobs, cancelledJobs...) } @@ -454,24 +416,8 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model if newStatus != actions_model.StatusBlocked { ret[id] = newStatus - switch newStatus { - case actions_model.StatusWaiting: - resolveMetrics.jobsStarted++ - log.Debug("Job %d (JobID: %s) transitioned to StatusWaiting", id, actionRunJob.JobID) - case actions_model.StatusSkipped: - resolveMetrics.jobsSkipped++ - log.Debug("Job %d (JobID: %s) transitioned to StatusSkipped", id, actionRunJob.JobID) - } } } - - // Log resolution metrics summary - if resolveMetrics.totalBlocked > 0 { - log.Debug("Job resolution summary: total_blocked=%d, matrix_reevaluated=%d, concurrency_updated=%d, jobs_started=%d, jobs_skipped=%d", - resolveMetrics.totalBlocked, resolveMetrics.matrixReevaluated, resolveMetrics.concurrencyUpdated, - resolveMetrics.jobsStarted, resolveMetrics.jobsSkipped) - } - return ret } diff --git a/services/actions/matrix.go b/services/actions/matrix.go index 079f71eec3..6df1bba131 100644 --- a/services/actions/matrix.go +++ b/services/actions/matrix.go @@ -35,23 +35,6 @@ func markMatrixAsEvaluatedAndSkip(ctx context.Context, job *actions_model.Action return nil } -// claimMatrixExpansion atomically marks the placeholder job as evaluated+skipped -// only when it is still in its original state (is_matrix_evaluated=false AND -// status=Blocked). Returns (true, nil) when this caller wins the claim, or -// (false, nil) when another concurrent process already claimed it. -// This is the concurrency guard that prevents double-expansion. -func claimMatrixExpansion(ctx context.Context, job *actions_model.ActionRunJob) (bool, error) { - job.IsMatrixEvaluated = true - job.Status = actions_model.StatusSkipped - n, err := actions_model.UpdateRunJob(ctx, job, - builder.Eq{"is_matrix_evaluated": false, "status": actions_model.StatusBlocked}, - "is_matrix_evaluated", "status") - if err != nil { - return false, err - } - return n == 1, nil -} - // checkTaskNeedsReady verifies if all task dependencies are completed. // Returns (taskNeeds, allDone, error) func checkTaskNeedsReady(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*TaskNeed, bool, error) { @@ -169,8 +152,8 @@ func yamlNodeContainsNeedsOutputsExpr(node *yaml.Node) bool { return slices.ContainsFunc(node.Content, yamlNodeContainsNeedsOutputsExpr) } -// containsNeedsOutputsExpr returns true when s contains a GitHub Actions -// expression (${{ ... }}) that references needs..outputs.. +// containsNeedsOutputsExpr returns true when s contains an Actions expression +// (${{ ... }}) that references needs..outputs.. // A bare "needs." substring outside an expression block is not a match. func containsNeedsOutputsExpr(s string) bool { if !strings.Contains(s, "${{") { @@ -205,10 +188,6 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act return nil, nil } - if !HasMatrixWithNeeds(job.RawStrategy) { - return nil, markMatrixAsEvaluatedAndSkip(ctx, job, "no needs-dependent matrix found") - } - log.Debug("Starting matrix re-evaluation for job %d (JobID: %s)", job.ID, job.JobID) // skipWithError marks the job as evaluated+skipped and wraps any secondary error. @@ -228,8 +207,6 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act return nil, nil } - mergedVars := mergeNeedsIntoVars(vars, taskNeeds) - if job.Run == nil { if err := job.LoadRun(ctx); err != nil { return nil, fmt.Errorf("load run: %w", err) @@ -253,110 +230,98 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act workflowYAML, err := constructWorkflowWithNeeds(job, taskNeeds) if err != nil { - log.Error("Matrix re-evaluation error for job %d: construct workflow: %v", job.ID, err) return skipWithError(fmt.Sprintf("workflow construction failed: %v", err), fmt.Errorf("construct workflow: %w", err)) } parsedJobs, err := jobparser.Parse( workflowYAML, - jobparser.WithVars(mergedVars), + jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext()), jobparser.WithJobOutputs(jobOutputs), jobparser.WithJobResults(jobResults), ) if err != nil { - log.Error("Matrix parse error for job %d (RawStrategy: %s): %v", job.ID, job.RawStrategy, err) - if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("parse failed: %v", err)); markErr != nil { - return nil, fmt.Errorf("parse workflow: %w; additionally failed to mark as evaluated: %v", err, markErr) - } - return nil, nil + return nil, markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("parse failed: %v", err)) } - if len(parsedJobs) == 0 { - log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID) - return nil, markMatrixAsEvaluatedAndSkip(ctx, job, "no jobs generated from matrix expansion") + // One parsed workflow per combination; needs are kept on the model and erased from the + // payload, as at initial planning time. + type matrixCombo struct { + name string + payload []byte + runsOn []string + needs []string } - - log.Debug("Parsed %d matrix combinations for job %d (JobID: %s)", len(parsedJobs), job.ID, job.JobID) - - var newJobs []*actions_model.ActionRunJob - for i, sw := range parsedJobs { + var combos []matrixCombo + for _, sw := range parsedJobs { id, jobDef := sw.Job() - if jobDef == nil { - log.Warn("Skipped nil jobDef at index %d for job %d (JobID: %s)", i, job.ID, job.JobID) + if jobDef == nil || id != job.JobID { continue } - if id != job.JobID { - continue - } - needs := jobDef.Needs() + combo := matrixCombo{name: jobDef.Name, runsOn: jobDef.RunsOn(), needs: jobDef.Needs()} if err := sw.SetJob(id, jobDef.EraseNeeds()); err != nil { - log.Error("Failed to erase needs from job %s (matrix expansion for job %d): %v", id, job.ID, err) - continue + return nil, fmt.Errorf("erase needs for job %s: %w", id, err) } - payload, _ := sw.Marshal() - newJobs = append(newJobs, &actions_model.ActionRunJob{ - RunID: job.RunID, - RepoID: job.RepoID, - OwnerID: job.OwnerID, - CommitSHA: job.CommitSHA, - IsForkPullRequest: job.IsForkPullRequest, - Name: jobDef.Name, - WorkflowPayload: payload, - JobID: id, - Needs: needs, - RunsOn: jobDef.RunsOn(), - Status: actions_model.StatusWaiting, - }) + if combo.payload, err = sw.Marshal(); err != nil { + return nil, fmt.Errorf("marshal expanded job %s: %w", id, err) + } + combos = append(combos, combo) } - if len(newJobs) == 0 { - log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s), total parsed: %d", job.ID, job.JobID, len(parsedJobs)) - return nil, markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("no valid jobs created (parsed: %d)", len(parsedJobs))) + if len(combos) == 0 { + return nil, markMatrixAsEvaluatedAndSkip(ctx, job, "matrix expanded to no combinations") } - // Atomically claim the placeholder and insert the new jobs in one transaction. - // The conditional WHERE in claimMatrixExpansion (is_matrix_evaluated=false AND - // status=Blocked) ensures only one concurrent caller can win. The second caller - // sees n==0 and rolls back without inserting duplicate jobs. - var claimed bool + // Reuse the placeholder as the first combination and insert the rest as siblings: no phantom + // skipped job is left to poison downstream needs, and siblings inherit attempt + permissions. + var children []*actions_model.ActionRunJob if err := db.WithTx(ctx, func(txCtx context.Context) error { - var txErr error - claimed, txErr = claimMatrixExpansion(txCtx, job) - if txErr != nil { - return txErr + maxAttemptJobID, err := actions_model.GetMaxAttemptJobID(txCtx, job.RunID, job.RunAttemptID) + if err != nil { + return err } - if !claimed { + for i := 1; i < len(combos); i++ { + children = append(children, &actions_model.ActionRunJob{ + RunID: job.RunID, + RunAttemptID: job.RunAttemptID, + RepoID: job.RepoID, + OwnerID: job.OwnerID, + CommitSHA: job.CommitSHA, + IsForkPullRequest: job.IsForkPullRequest, + Name: combos[i].name, + Attempt: job.Attempt, + WorkflowPayload: combos[i].payload, + JobID: job.JobID, + AttemptJobID: maxAttemptJobID + int64(i), + Needs: combos[i].needs, + RunsOn: combos[i].runsOn, + TokenPermissions: job.TokenPermissions, + Status: actions_model.StatusWaiting, + }) + } + + // Atomic claim: only one concurrent caller flips the placeholder out of Blocked. + job.Name = combos[0].name + job.WorkflowPayload = combos[0].payload + job.RunsOn = combos[0].runsOn + job.IsMatrixEvaluated = true + job.Status = actions_model.StatusWaiting + affected, err := actions_model.UpdateRunJob(txCtx, job, + builder.Eq{"is_matrix_evaluated": false, "status": actions_model.StatusBlocked}, + "name", "workflow_payload", "runs_on", "is_matrix_evaluated", "status") + if err != nil { + return err + } + if affected != 1 { + children = nil return nil } - return actions_model.InsertActionRunJobs(txCtx, newJobs) + return actions_model.InsertActionRunJobs(txCtx, children) }); err != nil { - log.Error("Matrix expansion transaction failed for job %d (JobID: %s): %v", job.ID, job.JobID, err) - return nil, fmt.Errorf("matrix expansion transaction: %w", err) - } - if !claimed { - log.Warn("Matrix placeholder job %d (JobID: %s) was already claimed by a concurrent process; skipping", - job.ID, job.JobID) - return nil, nil + return nil, fmt.Errorf("expand matrix for job %d: %w", job.ID, err) } - log.Debug("Matrix re-evaluation complete for job %d (JobID: %s): created %d new jobs", - job.ID, job.JobID, len(newJobs)) - - return newJobs, nil -} - -// mergeNeedsIntoVars converts task needs outputs into variables for expression evaluation. -func mergeNeedsIntoVars(baseVars map[string]string, taskNeeds map[string]*TaskNeed) map[string]string { - merged := make(map[string]string) - maps.Copy(merged, baseVars) - for jobID, taskNeed := range taskNeeds { - for outputKey, outputValue := range taskNeed.Outputs { - key := fmt.Sprintf("needs.%s.outputs.%s", jobID, outputKey) - merged[key] = outputValue - } - } - return merged + return children, nil } // constructWorkflowWithNeeds creates a workflow YAML that includes the target job diff --git a/services/actions/matrix_test.go b/services/actions/matrix_test.go index a75ec97ae3..45fba1a80c 100644 --- a/services/actions/matrix_test.go +++ b/services/actions/matrix_test.go @@ -89,23 +89,6 @@ matrix: } } -func TestMergeNeedsIntoVars(t *testing.T) { - base := map[string]string{"MY_VAR": "hello"} - needs := map[string]*TaskNeed{ - "setup": { - Result: actions_model.StatusSuccess, - Outputs: map[string]string{"versions": `["1","2"]`, "extra": "val"}, - }, - } - merged := mergeNeedsIntoVars(base, needs) - - assert.Equal(t, "hello", merged["MY_VAR"]) - assert.Equal(t, `["1","2"]`, merged["needs.setup.outputs.versions"]) - assert.Equal(t, "val", merged["needs.setup.outputs.extra"]) - // base must not be mutated - assert.NotContains(t, base, "needs.setup.outputs.versions") -} - func TestConstructWorkflowWithNeeds(t *testing.T) { // Minimal WorkflowPayload with a strategy referencing a needs output. payload, err := yaml.Marshal(map[string]any{