diff --git a/models/actions/run.go b/models/actions/run.go index be332d6857..85402698b2 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -52,6 +52,12 @@ type ActionRun struct { RawConcurrency string // raw concurrency ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` + + // ParentJobID is the ID of the parent job if the run is a reusable workflow. + // 0 - normal run, no parent job; + // >0 - the parent job ID. + ParentJobID int64 `xorm:"index"` + // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 Started timeutil.TimeStamp Stopped timeutil.TimeStamp diff --git a/models/actions/run_job.go b/models/actions/run_job.go index f72a7040e3..c16b7f6f90 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -51,6 +51,12 @@ type ActionRunJob struct { ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress + // ChildRunID is the ID of a reusable workflow run. + // 0 - normal job, no child run; + // -1 - child run is being created; + // >0 - the child run ID. + ChildRunID int64 `xorm:"index"` + Started timeutil.TimeStamp Stopped timeutil.TimeStamp Created timeutil.TimeStamp `xorm:"created"` diff --git a/models/actions/task.go b/models/actions/task.go index 8b4ecf28f7..3086ecd0df 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -236,7 +236,7 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask } var jobs []*ActionRunJob - if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { + if err := e.Where("task_id=? AND status=? AND child_run_id=?", 0, StatusWaiting, 0).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil { return nil, false, err } diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index fa11acaee2..a07de0781f 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -399,6 +399,7 @@ func prepareMigrationTasks() []*migration { newMigration(323, "Add support for actions concurrency", v1_26.AddActionsConcurrency), newMigration(324, "Fix closed milestone completeness for milestones with no issues", v1_26.FixClosedMilestoneCompleteness), + newMigration(325, "Add parent job id to ActionRun and child run id to ActionRunJob", v1_26.AddParentRunAndJobToActionRun), } return preparedMigrations } diff --git a/models/migrations/v1_26/v325.go b/models/migrations/v1_26/v325.go new file mode 100644 index 0000000000..031455eed6 --- /dev/null +++ b/models/migrations/v1_26/v325.go @@ -0,0 +1,30 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_26 + +import ( + "xorm.io/xorm" +) + +func AddParentRunAndJobToActionRun(x *xorm.Engine) error { + type ActionRun struct { + ParentJobID int64 `xorm:"index"` + } + type ActionRunJob struct { + ChildRunID int64 `xorm:"index"` + } + + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreIndices: true, + IgnoreConstrains: true, + }, &ActionRun{}); err != nil { + return err + } + + _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreIndices: true, + IgnoreConstrains: true, + }, &ActionRunJob{}) + return err +} diff --git a/models/perm/access/repo_permission.go b/models/perm/access/repo_permission.go index 3235d83203..605e3ab163 100644 --- a/models/perm/access/repo_permission.go +++ b/models/perm/access/repo_permission.go @@ -302,6 +302,41 @@ func GetActionsUserRepoPermission(ctx context.Context, repo *repo_model.Reposito return perm, nil } +// GetActionsUserRepoPermissionByRepoID returns the actions user permissions to the repository +func GetActionsUserRepoPermissionByRepoID(ctx context.Context, repo *repo_model.Repository, actionsUser *user_model.User, sourceRepoID int64, isForkPullRequest bool) (perm Permission, err error) { + if actionsUser.ID != user_model.ActionsUserID { + return perm, errors.New("api GetActionsUserRepoPermissionByRepoID can only be called by the actions user") + } + + var accessMode perm_model.AccessMode + if sourceRepoID != repo.ID { + taskRepo, exist, err := db.GetByID[repo_model.Repository](ctx, sourceRepoID) + if err != nil || !exist { + return perm, err + } + actionsCfg := repo.MustGetUnit(ctx, unit.TypeActions).ActionsConfig() + if !actionsCfg.IsCollaborativeOwner(taskRepo.OwnerID) || !taskRepo.IsPrivate { + perm, err = GetUserRepoPermission(ctx, repo, user_model.NewActionsUser()) + if err != nil { + return perm, err + } + perm.AccessMode = min(perm.AccessMode, perm_model.AccessModeRead) + return perm, nil + } + accessMode = perm_model.AccessModeRead + } else if isForkPullRequest { + accessMode = perm_model.AccessModeRead + } else { + accessMode = perm_model.AccessModeWrite + } + + if err := repo.LoadUnits(ctx); err != nil { + return perm, err + } + perm.SetUnitsWithDefaultAccessMode(repo.Units, accessMode) + return perm, nil +} + // GetUserRepoPermission returns the user permissions to the repository func GetUserRepoPermission(ctx context.Context, repo *repo_model.Repository, user *user_model.User) (perm Permission, err error) { defer func() { diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 74a8a127ef..c02f216032 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -93,6 +93,12 @@ func checkJobsByRunID(ctx context.Context, runID int64) error { for _, job := range updatedJobs { _ = job.LoadAttributes(ctx) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) + + if job.ChildRunID == -1 && job.Status == actions_model.StatusWaiting { + if err := expandReusableWorkflow(ctx, job); err != nil { + log.Error("ensure reusable workflow child run for job %d: %v", job.ID, err) + } + } } runJobs := make(map[int64][]*actions_model.ActionRunJob) for _, job := range jobs { @@ -239,6 +245,41 @@ func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_m return } notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run) + updateParentJobFromChildRun(ctx, job.Run) +} + +func updateParentJobFromChildRun(ctx context.Context, run *actions_model.ActionRun) { + if run.ParentJobID == 0 || !run.Status.IsDone() { + return + } + + parentJob, exist, err := db.GetByID[actions_model.ActionRunJob](ctx, run.ParentJobID) + if err != nil { + log.Error("Get parent job %d: %v", run.ParentJobID, err) + return + } + if !exist { + return + } + + parentJob.Status = run.Status + parentJob.Stopped = run.Stopped + if _, err := actions_model.UpdateRunJob(ctx, parentJob, nil, "status", "stopped"); err != nil { + log.Error("Update parent job %d: %v", parentJob.ID, err) + return + } + + parentRun, exist, err := db.GetByID[actions_model.ActionRun](ctx, parentJob.RunID) + if err != nil { + log.Error("Get parent run %d: %v", parentJob.RunID, err) + return + } + if exist { + CreateCommitStatusForRunJobs(ctx, parentRun, parentJob) + if err := EmitJobsIfReadyByRun(parentJob.RunID); err != nil { + log.Error("EmitJobsIfReadyByRun for parent run %d: %v", parentJob.RunID, err) + } + } } type jobStatusResolver struct { diff --git a/services/actions/reusable_workflow.go b/services/actions/reusable_workflow.go new file mode 100644 index 0000000000..b5a99aaf84 --- /dev/null +++ b/services/actions/reusable_workflow.go @@ -0,0 +1,207 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "errors" + "fmt" + "net/url" + "strings" + + actions_model "code.gitea.io/gitea/models/actions" + access_model "code.gitea.io/gitea/models/perm/access" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unit" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/gitrepo" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/setting" + + "github.com/nektos/act/pkg/jobparser" + actmodel "github.com/nektos/act/pkg/model" + act_runner_pkg "github.com/nektos/act/pkg/runner" + "gopkg.in/yaml.v3" + "xorm.io/builder" +) + +func expandReusableWorkflows(ctx context.Context, run *actions_model.ActionRun, jobs []*actions_model.ActionRunJob, vars map[string]string) error { + for _, job := range jobs { + if job.ChildRunID != -1 { + // should not happen + continue + } + workflowJob, err := job.ParseJob() + if err != nil { + return err + } + ref, err := act_runner_pkg.ParseReusableWorkflowRef(workflowJob.Uses) + if err != nil { + return err + } + if err := createChildRunFromReusableWorkflow(ctx, run, job, workflowJob, ref, vars); err != nil { + return err + } + } + return nil +} + +func expandReusableWorkflow(ctx context.Context, parentJob *actions_model.ActionRunJob) error { + if parentJob.ChildRunID != -1 { + // should not happen + return fmt.Errorf("no need to expand") + } + workflowJob, err := parentJob.ParseJob() + if err != nil { + return err + } + ref, err := act_runner_pkg.ParseReusableWorkflowRef(workflowJob.Uses) + if err != nil { + return err + } + if err := parentJob.LoadAttributes(ctx); err != nil { + return err + } + vars, err := actions_model.GetVariablesOfRun(ctx, parentJob.Run) + if err != nil { + return err + } + if err := createChildRunFromReusableWorkflow(ctx, parentJob.Run, parentJob, workflowJob, ref, vars); err != nil { + return err + } + return nil +} + +func createChildRunFromReusableWorkflow(ctx context.Context, run *actions_model.ActionRun, parentJob *actions_model.ActionRunJob, workflowJob *jobparser.Job, ref *act_runner_pkg.ReusableWorkflowRef, vars map[string]string) error { + content, err := loadReusableWorkflowContent(ctx, run, ref) + if err != nil { + return err + } + + inputsWithDefaults, err := buildWorkflowCallInputs(ctx, run, parentJob, workflowJob, content, vars) + if err != nil { + return err + } + + eventPayload, err := json.Marshal(map[string]any{ + "inputs": inputsWithDefaults, + }) + if err != nil { + return err + } + + childRun := &actions_model.ActionRun{ + Title: run.Title, + RepoID: run.RepoID, + Repo: run.Repo, + OwnerID: run.OwnerID, + ParentJobID: parentJob.ID, + WorkflowID: ref.WorkflowPath, + TriggerUserID: run.TriggerUserID, + TriggerUser: run.TriggerUser, + Ref: run.Ref, + CommitSHA: run.CommitSHA, + IsForkPullRequest: run.IsForkPullRequest, + Event: "workflow_call", + TriggerEvent: "workflow_call", + EventPayload: string(eventPayload), + Status: actions_model.StatusWaiting, + NeedApproval: run.NeedApproval, + } + + if err := PrepareRunAndInsert(ctx, content, childRun, inputsWithDefaults); err != nil { + return err + } + parentJob.ChildRunID = childRun.ID + if _, err := actions_model.UpdateRunJob(ctx, parentJob, builder.Eq{"child_run_id": -1}, "child_run_id"); err != nil { + return err + } + return nil +} + +func buildWorkflowCallInputs(ctx context.Context, run *actions_model.ActionRun, parentJob *actions_model.ActionRunJob, workflowJob *jobparser.Job, content []byte, vars map[string]string) (map[string]any, error) { + singleWorkflow := &jobparser.SingleWorkflow{} + if err := yaml.Unmarshal(content, singleWorkflow); err != nil { + return nil, fmt.Errorf("unmarshal workflow: %w", err) + } + + workflow := &actmodel.Workflow{ + RawOn: singleWorkflow.RawOn, + } + + giteaCtx := GenerateGiteaContext(run, parentJob) + inputs, err := getInputsFromRun(run) + if err != nil { + return nil, fmt.Errorf("get inputs: %w", err) + } + + results, err := findJobNeedsAndFillJobResults(ctx, parentJob) + if err != nil { + return nil, fmt.Errorf("get job results: %w", err) + } + + return jobparser.EvaluateWorkflowCallInputs(workflow, parentJob.JobID, workflowJob, giteaCtx, results, vars, inputs) +} + +func loadReusableWorkflowContent(ctx context.Context, run *actions_model.ActionRun, ref *act_runner_pkg.ReusableWorkflowRef) ([]byte, error) { + if ref.Kind == act_runner_pkg.ReusableWorkflowKindLocal { + if err := run.LoadRepo(ctx); err != nil { + return nil, err + } + return readWorkflowContentFromRepo(ctx, run.Repo, run.Ref, ref.WorkflowPath) + } + + if ref.Kind == act_runner_pkg.ReusableWorkflowKindOtherRepo || isSameInstanceHost(ref.Host) { + repo, err := repo_model.GetRepositoryByOwnerAndName(ctx, ref.Owner, ref.Repo) + if err != nil { + return nil, err + } + if repo.IsPrivate { + perm, err := access_model.GetActionsUserRepoPermissionByRepoID(ctx, repo, user_model.NewActionsUser(), run.RepoID, run.IsForkPullRequest) + if err != nil { + return nil, err + } + if !perm.CanRead(unit.TypeCode) { + return nil, errors.New("actions user has no access to reusable workflow repo") + } + } + return readWorkflowContentFromRepo(ctx, repo, ref.Ref, ref.WorkflowPath) + } + + content, err := act_runner_pkg.FetchReusableWorkflowContent(ctx, ref) + if err != nil { + return nil, err + } + return content, nil +} + +func readWorkflowContentFromRepo(ctx context.Context, repo *repo_model.Repository, ref, workflowPath string) ([]byte, error) { + gitRepo, err := gitrepo.OpenRepository(ctx, repo) + if err != nil { + return nil, err + } + defer gitRepo.Close() + + commit, err := gitRepo.GetCommit(ref) + if err != nil { + return nil, err + } + content, err := commit.GetFileContent(workflowPath, 1024*1024) + if err != nil { + return nil, err + } + return []byte(content), nil +} + +func isSameInstanceHost(host string) bool { + appURL, err := url.Parse(setting.AppURL) + if err != nil { + return false + } + h, err := url.Parse(host) + if err != nil { + return false + } + return strings.EqualFold(h.Host, appURL.Host) +} diff --git a/services/actions/run.go b/services/actions/run.go index 90413e9bc2..f13b8a2bc9 100644 --- a/services/actions/run.go +++ b/services/actions/run.go @@ -9,6 +9,7 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/util" notify_service "code.gitea.io/gitea/services/notify" @@ -75,7 +76,9 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model // InsertRun inserts a run // The title will be cut off at 255 characters if it's longer than 255 characters. func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow, vars map[string]string) error { - return db.WithTx(ctx, func(ctx context.Context) error { + var readyUsesJobs []*actions_model.ActionRunJob // jobs that have "uses" and are ready to run + + if err := db.WithTx(ctx, func(ctx context.Context) error { index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) if err != nil { return err @@ -153,7 +156,14 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar } } - hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting + if job.Uses != "" { + runJob.ChildRunID = -1 + if runJob.Status == actions_model.StatusWaiting { + readyUsesJobs = append(readyUsesJobs, runJob) + } + } else { + hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting + } if err := db.Insert(ctx, runJob); err != nil { return err } @@ -174,5 +184,14 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar } return nil - }) + }); err != nil { + return err + } + + if err := expandReusableWorkflows(ctx, run, readyUsesJobs, vars); err != nil { + // TODO: need to rollback and show an error message to the user + log.Error("expandReusableWorkflows for run %d: %v", run.ID, err) + } + + return nil }