mirror of
https://github.com/go-gitea/gitea.git
synced 2026-06-18 22:19:50 +02:00
fix(actions): correct dynamic matrix job fields and expansion
- expanded matrix jobs inherit RunAttemptID, Attempt, AttemptJobID and
TokenPermissions, so they stay in the attempt and downstream needs/status
aggregation account for them
- reuse the placeholder as the first combination instead of leaving a skipped
phantom job that poisoned downstream `needs`
- jobparser only clones + pre-evaluates a matrix when it contains a `${{ }}`
expression; static matrices keep the original path
- handle the expanded-job marshal error, drop the unused mergeNeedsIntoVars,
gate and attempt-scope the post-resolve job reload, trim resolver logging
- migration: add IgnoreConstrains for mssql; revert the unrelated go.mod
golang.org/x/net downgrade
Co-Authored-By: Claude (Opus 4.8) <noreply@anthropic.com>
This commit is contained in:
parent
d8b9b0a7f6
commit
2a33c56911
2
go.mod
2
go.mod
@ -105,7 +105,7 @@ require (
|
||||
go.yaml.in/yaml/v4 v4.0.0-rc.3
|
||||
golang.org/x/crypto v0.52.0
|
||||
golang.org/x/image v0.40.0
|
||||
golang.org/x/net v0.54.0
|
||||
golang.org/x/net v0.55.0
|
||||
golang.org/x/oauth2 v0.36.0
|
||||
golang.org/x/sync v0.20.0
|
||||
golang.org/x/sys v0.45.0
|
||||
|
||||
4
go.sum
4
go.sum
@ -821,8 +821,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
|
||||
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
|
||||
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
|
||||
golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w=
|
||||
golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
|
||||
golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8=
|
||||
golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww=
|
||||
golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs=
|
||||
golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
|
||||
@ -398,3 +398,15 @@ func InsertActionRunJobs(ctx context.Context, jobs []*ActionRunJob) error {
|
||||
}
|
||||
return db.Insert(ctx, jobs)
|
||||
}
|
||||
|
||||
// GetMaxAttemptJobID returns the highest AttemptJobID among a run attempt's jobs, or 0 if none.
|
||||
// Used to assign ids to jobs created after initial planning (e.g. matrix expansion).
|
||||
func GetMaxAttemptJobID(ctx context.Context, runID, runAttemptID int64) (int64, error) {
|
||||
var job ActionRunJob
|
||||
if _, err := db.GetEngine(ctx).
|
||||
Where(builder.Eq{"run_id": runID, "run_attempt_id": runAttemptID}).
|
||||
Desc("attempt_job_id").Get(&job); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return job.AttemptJobID, nil
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ func AddMatrixEvaluationColumnsToActionRunJob(x db.EngineMigration) error {
|
||||
}
|
||||
_, err := x.SyncWithOptions(xorm.SyncOptions{
|
||||
IgnoreDropIndices: true,
|
||||
IgnoreConstrains: true,
|
||||
}, new(ActionRunJob))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ package jobparser
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
@ -33,6 +34,18 @@ func deepCopyYamlNode(node *yaml.Node) *yaml.Node {
|
||||
return &nodeCopy
|
||||
}
|
||||
|
||||
// rawMatrixHasExpression reports whether any scalar in the matrix node contains a
|
||||
// ${{ }} expression, i.e. the matrix must be evaluated rather than used verbatim.
|
||||
func rawMatrixHasExpression(node *yaml.Node) bool {
|
||||
if node == nil {
|
||||
return false
|
||||
}
|
||||
if node.Kind == yaml.ScalarNode {
|
||||
return strings.Contains(node.Value, "${{")
|
||||
}
|
||||
return slices.ContainsFunc(node.Content, rawMatrixHasExpression)
|
||||
}
|
||||
|
||||
func Parse(content []byte, options ...ParseOption) ([]*SingleWorkflow, error) {
|
||||
origin, err := model.ReadWorkflow(bytes.NewReader(content))
|
||||
if err != nil {
|
||||
@ -78,29 +91,22 @@ func Parse(content []byte, options ...ParseOption) ([]*SingleWorkflow, error) {
|
||||
return nil, fmt.Errorf("job %s not found in origin workflow", id)
|
||||
}
|
||||
|
||||
// Clone the origin job to avoid modifying the shared object
|
||||
evaluatedJob := *originJob
|
||||
if originJob.Strategy != nil {
|
||||
// Clone + pre-evaluate only when the matrix has a ${{ }} expression; static matrices use
|
||||
// the origin job as-is. Unresolved expressions defer to ReEvaluateMatrixForJobWithNeeds.
|
||||
evaluatedJob := originJob
|
||||
if originJob.Strategy != nil && rawMatrixHasExpression(&originJob.Strategy.RawMatrix) {
|
||||
jobCopy := *originJob
|
||||
stratCopy := *originJob.Strategy
|
||||
// Deep copy the RawMatrix yaml.Node to prevent mutations from affecting the original
|
||||
stratCopy.RawMatrix = *deepCopyYamlNode(&originJob.Strategy.RawMatrix)
|
||||
evaluatedJob.Strategy = &stratCopy
|
||||
}
|
||||
|
||||
// Create an evaluator with access to needs/outputs for matrix evaluation
|
||||
matrixEvaluator := NewExpressionEvaluator(NewInterpeter(id, &evaluatedJob, nil, pc.gitContext, results, pc.vars, pc.inputs))
|
||||
|
||||
// Evaluate the matrix before expanding it.
|
||||
// If evaluation fails (e.g. expression references unresolved job outputs),
|
||||
// continue with the unevaluated matrix — the job will be created as a
|
||||
// placeholder and re-evaluated by ReEvaluateMatrixForJobWithNeeds later.
|
||||
if evaluatedJob.Strategy != nil && evaluatedJob.Strategy.RawMatrix.Kind != 0 {
|
||||
jobCopy.Strategy = &stratCopy
|
||||
evaluatedJob = &jobCopy
|
||||
matrixEvaluator := NewExpressionEvaluator(NewInterpeter(id, evaluatedJob, nil, pc.gitContext, results, pc.vars, pc.inputs))
|
||||
if err := matrixEvaluator.EvaluateYamlNode(&evaluatedJob.Strategy.RawMatrix); err != nil {
|
||||
log.Debug("matrix evaluation deferred for job %s (unresolved expression): %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
matricxes, err := getMatrixes(&evaluatedJob)
|
||||
matricxes, err := getMatrixes(evaluatedJob)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getMatrixes: %w", err)
|
||||
}
|
||||
@ -110,7 +116,7 @@ func Parse(content []byte, options ...ParseOption) ([]*SingleWorkflow, error) {
|
||||
job.Name = id
|
||||
}
|
||||
job.Strategy.RawMatrix = encodeMatrix(matrix)
|
||||
evaluator := NewExpressionEvaluator(NewInterpeter(id, &evaluatedJob, matrix, pc.gitContext, results, pc.vars, pc.inputs))
|
||||
evaluator := NewExpressionEvaluator(NewInterpeter(id, evaluatedJob, matrix, pc.gitContext, results, pc.vars, pc.inputs))
|
||||
job.Name = nameWithMatrix(job.Name, matrix, evaluator)
|
||||
runsOn := evaluatedJob.RunsOn()
|
||||
for i, v := range runsOn {
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
actions_model "gitea.dev/models/actions"
|
||||
"gitea.dev/models/db"
|
||||
@ -248,8 +247,6 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("Checking %d jobs for run %d (status: %s)", len(jobs), run.ID, run.Status)
|
||||
|
||||
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
@ -262,18 +259,14 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action
|
||||
}
|
||||
|
||||
updates := resolver.Resolve(ctx)
|
||||
log.Debug("Job status resolver returned %d job status updates for run %d", len(updates), run.ID)
|
||||
|
||||
for _, job := range jobs {
|
||||
if status, ok := updates[job.ID]; ok {
|
||||
oldStatus := job.Status
|
||||
job.Status = status
|
||||
if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
|
||||
}
|
||||
log.Debug("Job %d (JobID: %s) status updated: %s -> %s", job.ID, job.JobID, oldStatus, status)
|
||||
updatedJobs = append(updatedJobs, job)
|
||||
}
|
||||
}
|
||||
@ -282,29 +275,24 @@ func checkJobsOfCurrentRunAttempt(ctx context.Context, run *actions_model.Action
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
// Reload jobs from the database to pick up any newly created matrix jobs
|
||||
oldJobCount := len(jobs)
|
||||
jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
// Matrix re-evaluation can insert new jobs in this attempt; reload so callers see them.
|
||||
if resolver.matrixExpanded {
|
||||
jobs, err = actions_model.GetRunJobsByRunAndAttemptID(ctx, run.ID, run.LatestAttemptID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(jobs) > oldJobCount {
|
||||
log.Debug("Matrix re-evaluation created %d new jobs for run %d (was %d, now %d)",
|
||||
len(jobs)-oldJobCount, run.ID, oldJobCount, len(jobs))
|
||||
}
|
||||
|
||||
log.Debug("Job check completed for run %d: %d jobs updated, %d total jobs", run.ID, len(updatedJobs), len(jobs))
|
||||
|
||||
return jobs, updatedJobs, resolver.cancelledJobs, nil
|
||||
}
|
||||
|
||||
type jobStatusResolver struct {
|
||||
statuses map[int64]actions_model.Status
|
||||
needs map[int64][]int64
|
||||
jobMap map[int64]*actions_model.ActionRunJob
|
||||
vars map[string]string
|
||||
cancelledJobs []*actions_model.ActionRunJob
|
||||
statuses map[int64]actions_model.Status
|
||||
needs map[int64][]int64
|
||||
jobMap map[int64]*actions_model.ActionRunJob
|
||||
vars map[string]string
|
||||
cancelledJobs []*actions_model.ActionRunJob
|
||||
matrixExpanded bool // set when matrix re-evaluation inserted new jobs, so callers reload
|
||||
}
|
||||
|
||||
func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver {
|
||||
@ -372,53 +360,30 @@ func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model
|
||||
|
||||
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
|
||||
ret := map[int64]actions_model.Status{}
|
||||
resolveMetrics := struct {
|
||||
totalBlocked int
|
||||
matrixReevaluated int
|
||||
concurrencyUpdated int
|
||||
jobsStarted int
|
||||
jobsSkipped int
|
||||
}{}
|
||||
|
||||
for id, status := range r.statuses {
|
||||
actionRunJob := r.jobMap[id]
|
||||
if status != actions_model.StatusBlocked {
|
||||
continue
|
||||
}
|
||||
|
||||
resolveMetrics.totalBlocked++
|
||||
log.Debug("Resolving blocked job %d (JobID: %s, RunID: %d)", id, actionRunJob.JobID, actionRunJob.RunID)
|
||||
|
||||
allDone, allSucceed := r.resolveCheckNeeds(id)
|
||||
if !allDone {
|
||||
log.Debug("Job %d: not all dependencies completed yet", id)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("Job %d: all dependencies completed (allSucceed: %v), checking matrix re-evaluation", id, allSucceed)
|
||||
|
||||
// Try to re-evaluate the matrix with job outputs if it depends on them
|
||||
startTime := time.Now()
|
||||
newMatrixJobs, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars)
|
||||
duration := time.Since(startTime).Milliseconds()
|
||||
|
||||
// Expand a needs-dependent matrix now that needs are done; on success the job is no
|
||||
// longer Blocked (placeholder becomes the first combination, siblings inserted).
|
||||
children, err := ReEvaluateMatrixForJobWithNeeds(ctx, actionRunJob, r.vars)
|
||||
if err != nil {
|
||||
log.Error("Matrix re-evaluation error for job %d (JobID: %s): %v (duration: %dms)", id, actionRunJob.JobID, err, duration)
|
||||
log.Error("ReEvaluateMatrixForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// If new matrix jobs were created, the placeholder was already claimed and
|
||||
// skipped inside ReEvaluateMatrixForJobWithNeeds (transactionally). Nothing
|
||||
// more to do here — just move to the next job.
|
||||
if len(newMatrixJobs) > 0 {
|
||||
resolveMetrics.matrixReevaluated++
|
||||
log.Debug("Matrix re-evaluation succeeded for job %d (JobID: %s): created %d new jobs (duration: %dms)",
|
||||
id, actionRunJob.JobID, len(newMatrixJobs), duration)
|
||||
if len(children) > 0 {
|
||||
r.matrixExpanded = true
|
||||
}
|
||||
if actionRunJob.Status != actions_model.StatusBlocked {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("Job %d: no matrix re-evaluation needed or result is empty", id)
|
||||
|
||||
// update concurrency and check whether the job can run now
|
||||
err = updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars)
|
||||
if err != nil {
|
||||
@ -426,19 +391,16 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
|
||||
// At the moment there is no way to distinguish them.
|
||||
// Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)"
|
||||
// TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users
|
||||
log.Debug("Concurrency evaluation failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err)
|
||||
log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
resolveMetrics.concurrencyUpdated++
|
||||
|
||||
shouldStartJob := true
|
||||
if !allSucceed {
|
||||
// Not all dependent jobs completed successfully:
|
||||
// * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition.
|
||||
// * otherwise, the job should be skipped.
|
||||
shouldStartJob = r.resolveJobHasIfCondition(actionRunJob)
|
||||
log.Debug("Job %d: not all dependencies succeeded. Has if-condition: %v, should start: %v", id, shouldStartJob, shouldStartJob)
|
||||
}
|
||||
|
||||
newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped)
|
||||
@ -446,7 +408,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
|
||||
var cancelledJobs []*actions_model.ActionRunJob
|
||||
newStatus, cancelledJobs, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob)
|
||||
if err != nil {
|
||||
log.Error("Concurrency check failed for job %d (JobID: %s): %v (job will stay blocked)", id, actionRunJob.JobID, err)
|
||||
log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err)
|
||||
} else {
|
||||
r.cancelledJobs = append(r.cancelledJobs, cancelledJobs...)
|
||||
}
|
||||
@ -454,24 +416,8 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
|
||||
|
||||
if newStatus != actions_model.StatusBlocked {
|
||||
ret[id] = newStatus
|
||||
switch newStatus {
|
||||
case actions_model.StatusWaiting:
|
||||
resolveMetrics.jobsStarted++
|
||||
log.Debug("Job %d (JobID: %s) transitioned to StatusWaiting", id, actionRunJob.JobID)
|
||||
case actions_model.StatusSkipped:
|
||||
resolveMetrics.jobsSkipped++
|
||||
log.Debug("Job %d (JobID: %s) transitioned to StatusSkipped", id, actionRunJob.JobID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Log resolution metrics summary
|
||||
if resolveMetrics.totalBlocked > 0 {
|
||||
log.Debug("Job resolution summary: total_blocked=%d, matrix_reevaluated=%d, concurrency_updated=%d, jobs_started=%d, jobs_skipped=%d",
|
||||
resolveMetrics.totalBlocked, resolveMetrics.matrixReevaluated, resolveMetrics.concurrencyUpdated,
|
||||
resolveMetrics.jobsStarted, resolveMetrics.jobsSkipped)
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
|
||||
@ -35,23 +35,6 @@ func markMatrixAsEvaluatedAndSkip(ctx context.Context, job *actions_model.Action
|
||||
return nil
|
||||
}
|
||||
|
||||
// claimMatrixExpansion atomically marks the placeholder job as evaluated+skipped
|
||||
// only when it is still in its original state (is_matrix_evaluated=false AND
|
||||
// status=Blocked). Returns (true, nil) when this caller wins the claim, or
|
||||
// (false, nil) when another concurrent process already claimed it.
|
||||
// This is the concurrency guard that prevents double-expansion.
|
||||
func claimMatrixExpansion(ctx context.Context, job *actions_model.ActionRunJob) (bool, error) {
|
||||
job.IsMatrixEvaluated = true
|
||||
job.Status = actions_model.StatusSkipped
|
||||
n, err := actions_model.UpdateRunJob(ctx, job,
|
||||
builder.Eq{"is_matrix_evaluated": false, "status": actions_model.StatusBlocked},
|
||||
"is_matrix_evaluated", "status")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return n == 1, nil
|
||||
}
|
||||
|
||||
// checkTaskNeedsReady verifies if all task dependencies are completed.
|
||||
// Returns (taskNeeds, allDone, error)
|
||||
func checkTaskNeedsReady(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*TaskNeed, bool, error) {
|
||||
@ -169,8 +152,8 @@ func yamlNodeContainsNeedsOutputsExpr(node *yaml.Node) bool {
|
||||
return slices.ContainsFunc(node.Content, yamlNodeContainsNeedsOutputsExpr)
|
||||
}
|
||||
|
||||
// containsNeedsOutputsExpr returns true when s contains a GitHub Actions
|
||||
// expression (${{ ... }}) that references needs.<id>.outputs.<key>.
|
||||
// containsNeedsOutputsExpr returns true when s contains an Actions expression
|
||||
// (${{ ... }}) that references needs.<id>.outputs.<key>.
|
||||
// A bare "needs." substring outside an expression block is not a match.
|
||||
func containsNeedsOutputsExpr(s string) bool {
|
||||
if !strings.Contains(s, "${{") {
|
||||
@ -205,10 +188,6 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !HasMatrixWithNeeds(job.RawStrategy) {
|
||||
return nil, markMatrixAsEvaluatedAndSkip(ctx, job, "no needs-dependent matrix found")
|
||||
}
|
||||
|
||||
log.Debug("Starting matrix re-evaluation for job %d (JobID: %s)", job.ID, job.JobID)
|
||||
|
||||
// skipWithError marks the job as evaluated+skipped and wraps any secondary error.
|
||||
@ -228,8 +207,6 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
mergedVars := mergeNeedsIntoVars(vars, taskNeeds)
|
||||
|
||||
if job.Run == nil {
|
||||
if err := job.LoadRun(ctx); err != nil {
|
||||
return nil, fmt.Errorf("load run: %w", err)
|
||||
@ -253,110 +230,98 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
|
||||
|
||||
workflowYAML, err := constructWorkflowWithNeeds(job, taskNeeds)
|
||||
if err != nil {
|
||||
log.Error("Matrix re-evaluation error for job %d: construct workflow: %v", job.ID, err)
|
||||
return skipWithError(fmt.Sprintf("workflow construction failed: %v", err), fmt.Errorf("construct workflow: %w", err))
|
||||
}
|
||||
|
||||
parsedJobs, err := jobparser.Parse(
|
||||
workflowYAML,
|
||||
jobparser.WithVars(mergedVars),
|
||||
jobparser.WithVars(vars),
|
||||
jobparser.WithGitContext(giteaCtx.ToGitHubContext()),
|
||||
jobparser.WithJobOutputs(jobOutputs),
|
||||
jobparser.WithJobResults(jobResults),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("Matrix parse error for job %d (RawStrategy: %s): %v", job.ID, job.RawStrategy, err)
|
||||
if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("parse failed: %v", err)); markErr != nil {
|
||||
return nil, fmt.Errorf("parse workflow: %w; additionally failed to mark as evaluated: %v", err, markErr)
|
||||
}
|
||||
return nil, nil
|
||||
return nil, markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("parse failed: %v", err))
|
||||
}
|
||||
|
||||
if len(parsedJobs) == 0 {
|
||||
log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID)
|
||||
return nil, markMatrixAsEvaluatedAndSkip(ctx, job, "no jobs generated from matrix expansion")
|
||||
// One parsed workflow per combination; needs are kept on the model and erased from the
|
||||
// payload, as at initial planning time.
|
||||
type matrixCombo struct {
|
||||
name string
|
||||
payload []byte
|
||||
runsOn []string
|
||||
needs []string
|
||||
}
|
||||
|
||||
log.Debug("Parsed %d matrix combinations for job %d (JobID: %s)", len(parsedJobs), job.ID, job.JobID)
|
||||
|
||||
var newJobs []*actions_model.ActionRunJob
|
||||
for i, sw := range parsedJobs {
|
||||
var combos []matrixCombo
|
||||
for _, sw := range parsedJobs {
|
||||
id, jobDef := sw.Job()
|
||||
if jobDef == nil {
|
||||
log.Warn("Skipped nil jobDef at index %d for job %d (JobID: %s)", i, job.ID, job.JobID)
|
||||
if jobDef == nil || id != job.JobID {
|
||||
continue
|
||||
}
|
||||
if id != job.JobID {
|
||||
continue
|
||||
}
|
||||
needs := jobDef.Needs()
|
||||
combo := matrixCombo{name: jobDef.Name, runsOn: jobDef.RunsOn(), needs: jobDef.Needs()}
|
||||
if err := sw.SetJob(id, jobDef.EraseNeeds()); err != nil {
|
||||
log.Error("Failed to erase needs from job %s (matrix expansion for job %d): %v", id, job.ID, err)
|
||||
continue
|
||||
return nil, fmt.Errorf("erase needs for job %s: %w", id, err)
|
||||
}
|
||||
payload, _ := sw.Marshal()
|
||||
newJobs = append(newJobs, &actions_model.ActionRunJob{
|
||||
RunID: job.RunID,
|
||||
RepoID: job.RepoID,
|
||||
OwnerID: job.OwnerID,
|
||||
CommitSHA: job.CommitSHA,
|
||||
IsForkPullRequest: job.IsForkPullRequest,
|
||||
Name: jobDef.Name,
|
||||
WorkflowPayload: payload,
|
||||
JobID: id,
|
||||
Needs: needs,
|
||||
RunsOn: jobDef.RunsOn(),
|
||||
Status: actions_model.StatusWaiting,
|
||||
})
|
||||
if combo.payload, err = sw.Marshal(); err != nil {
|
||||
return nil, fmt.Errorf("marshal expanded job %s: %w", id, err)
|
||||
}
|
||||
combos = append(combos, combo)
|
||||
}
|
||||
|
||||
if len(newJobs) == 0 {
|
||||
log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s), total parsed: %d", job.ID, job.JobID, len(parsedJobs))
|
||||
return nil, markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("no valid jobs created (parsed: %d)", len(parsedJobs)))
|
||||
if len(combos) == 0 {
|
||||
return nil, markMatrixAsEvaluatedAndSkip(ctx, job, "matrix expanded to no combinations")
|
||||
}
|
||||
|
||||
// Atomically claim the placeholder and insert the new jobs in one transaction.
|
||||
// The conditional WHERE in claimMatrixExpansion (is_matrix_evaluated=false AND
|
||||
// status=Blocked) ensures only one concurrent caller can win. The second caller
|
||||
// sees n==0 and rolls back without inserting duplicate jobs.
|
||||
var claimed bool
|
||||
// Reuse the placeholder as the first combination and insert the rest as siblings: no phantom
|
||||
// skipped job is left to poison downstream needs, and siblings inherit attempt + permissions.
|
||||
var children []*actions_model.ActionRunJob
|
||||
if err := db.WithTx(ctx, func(txCtx context.Context) error {
|
||||
var txErr error
|
||||
claimed, txErr = claimMatrixExpansion(txCtx, job)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
maxAttemptJobID, err := actions_model.GetMaxAttemptJobID(txCtx, job.RunID, job.RunAttemptID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !claimed {
|
||||
for i := 1; i < len(combos); i++ {
|
||||
children = append(children, &actions_model.ActionRunJob{
|
||||
RunID: job.RunID,
|
||||
RunAttemptID: job.RunAttemptID,
|
||||
RepoID: job.RepoID,
|
||||
OwnerID: job.OwnerID,
|
||||
CommitSHA: job.CommitSHA,
|
||||
IsForkPullRequest: job.IsForkPullRequest,
|
||||
Name: combos[i].name,
|
||||
Attempt: job.Attempt,
|
||||
WorkflowPayload: combos[i].payload,
|
||||
JobID: job.JobID,
|
||||
AttemptJobID: maxAttemptJobID + int64(i),
|
||||
Needs: combos[i].needs,
|
||||
RunsOn: combos[i].runsOn,
|
||||
TokenPermissions: job.TokenPermissions,
|
||||
Status: actions_model.StatusWaiting,
|
||||
})
|
||||
}
|
||||
|
||||
// Atomic claim: only one concurrent caller flips the placeholder out of Blocked.
|
||||
job.Name = combos[0].name
|
||||
job.WorkflowPayload = combos[0].payload
|
||||
job.RunsOn = combos[0].runsOn
|
||||
job.IsMatrixEvaluated = true
|
||||
job.Status = actions_model.StatusWaiting
|
||||
affected, err := actions_model.UpdateRunJob(txCtx, job,
|
||||
builder.Eq{"is_matrix_evaluated": false, "status": actions_model.StatusBlocked},
|
||||
"name", "workflow_payload", "runs_on", "is_matrix_evaluated", "status")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if affected != 1 {
|
||||
children = nil
|
||||
return nil
|
||||
}
|
||||
return actions_model.InsertActionRunJobs(txCtx, newJobs)
|
||||
return actions_model.InsertActionRunJobs(txCtx, children)
|
||||
}); err != nil {
|
||||
log.Error("Matrix expansion transaction failed for job %d (JobID: %s): %v", job.ID, job.JobID, err)
|
||||
return nil, fmt.Errorf("matrix expansion transaction: %w", err)
|
||||
}
|
||||
if !claimed {
|
||||
log.Warn("Matrix placeholder job %d (JobID: %s) was already claimed by a concurrent process; skipping",
|
||||
job.ID, job.JobID)
|
||||
return nil, nil
|
||||
return nil, fmt.Errorf("expand matrix for job %d: %w", job.ID, err)
|
||||
}
|
||||
|
||||
log.Debug("Matrix re-evaluation complete for job %d (JobID: %s): created %d new jobs",
|
||||
job.ID, job.JobID, len(newJobs))
|
||||
|
||||
return newJobs, nil
|
||||
}
|
||||
|
||||
// mergeNeedsIntoVars converts task needs outputs into variables for expression evaluation.
|
||||
func mergeNeedsIntoVars(baseVars map[string]string, taskNeeds map[string]*TaskNeed) map[string]string {
|
||||
merged := make(map[string]string)
|
||||
maps.Copy(merged, baseVars)
|
||||
for jobID, taskNeed := range taskNeeds {
|
||||
for outputKey, outputValue := range taskNeed.Outputs {
|
||||
key := fmt.Sprintf("needs.%s.outputs.%s", jobID, outputKey)
|
||||
merged[key] = outputValue
|
||||
}
|
||||
}
|
||||
return merged
|
||||
return children, nil
|
||||
}
|
||||
|
||||
// constructWorkflowWithNeeds creates a workflow YAML that includes the target job
|
||||
|
||||
@ -89,23 +89,6 @@ matrix:
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeNeedsIntoVars(t *testing.T) {
|
||||
base := map[string]string{"MY_VAR": "hello"}
|
||||
needs := map[string]*TaskNeed{
|
||||
"setup": {
|
||||
Result: actions_model.StatusSuccess,
|
||||
Outputs: map[string]string{"versions": `["1","2"]`, "extra": "val"},
|
||||
},
|
||||
}
|
||||
merged := mergeNeedsIntoVars(base, needs)
|
||||
|
||||
assert.Equal(t, "hello", merged["MY_VAR"])
|
||||
assert.Equal(t, `["1","2"]`, merged["needs.setup.outputs.versions"])
|
||||
assert.Equal(t, "val", merged["needs.setup.outputs.extra"])
|
||||
// base must not be mutated
|
||||
assert.NotContains(t, base, "needs.setup.outputs.versions")
|
||||
}
|
||||
|
||||
func TestConstructWorkflowWithNeeds(t *testing.T) {
|
||||
// Minimal WorkflowPayload with a strategy referencing a needs output.
|
||||
payload, err := yaml.Marshal(map[string]any{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user