From 40f71bcd4cc9ad38e3a8205747901440371ffb80 Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Fri, 10 Oct 2025 12:58:55 -0600 Subject: [PATCH] Support Actions `concurrency` syntax (#32751) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix #24769 Fix #32662 Fix #33260 Depends on https://gitea.com/gitea/act/pulls/124 - https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for-github-actions#concurrency ## ⚠️ BREAKING ⚠️ This PR removes the auto-cancellation feature added by #25716. Users need to manually add `concurrency` to workflows to control concurrent workflows or jobs. --------- Signed-off-by: Zettat123 Co-authored-by: Christopher Homberger Co-authored-by: wxiaoguang --- models/actions/run.go | 203 +- models/actions/run_job.go | 75 +- models/actions/run_job_list.go | 19 +- models/actions/run_list.go | 25 +- models/migrations/migrations.go | 1 + models/migrations/v1_25/v323.go | 43 + routers/api/actions/runner/runner.go | 5 +- routers/web/repo/actions/view.go | 141 +- services/actions/clear_tasks.go | 76 +- services/actions/concurrency.go | 121 ++ services/actions/job_emitter.go | 296 ++- services/actions/job_emitter_test.go | 4 +- services/actions/notifier_helper.go | 29 +- services/actions/run.go | 127 ++ services/actions/schedule_tasks.go | 26 +- services/actions/workflow.go | 27 +- tests/integration/actions_concurrency_test.go | 1709 +++++++++++++++++ 17 files changed, 2649 insertions(+), 278 deletions(-) create mode 100644 models/migrations/v1_25/v323.go create mode 100644 services/actions/concurrency.go create mode 100644 services/actions/run.go create mode 100644 tests/integration/actions_concurrency_test.go diff --git a/models/actions/run.go b/models/actions/run.go index f1d85bbcd9..4da6958e2d 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -16,13 +16,13 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" webhook_module "code.gitea.io/gitea/modules/webhook" - "github.com/nektos/act/pkg/jobparser" "xorm.io/builder" ) @@ -30,7 +30,7 @@ import ( type ActionRun struct { ID int64 Title string - RepoID int64 `xorm:"index unique(repo_index)"` + RepoID int64 `xorm:"unique(repo_index) index(repo_concurrency)"` Repo *repo_model.Repository `xorm:"-"` OwnerID int64 `xorm:"index"` WorkflowID string `xorm:"index"` // the name of workflow file @@ -49,6 +49,9 @@ type ActionRun struct { TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow Status Status `xorm:"index"` Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed + RawConcurrency string // raw concurrency + ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` + ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 Started timeutil.TimeStamp Stopped timeutil.TimeStamp @@ -190,7 +193,7 @@ func (run *ActionRun) IsSchedule() bool { return run.ScheduleID > 0 } -func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { +func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { _, err := db.GetEngine(ctx).ID(repo.ID). NoAutoTime(). SetExpr("num_action_runs", @@ -247,116 +250,62 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin return cancelledJobs, err } - // Iterate over each job and attempt to cancel it. - for _, job := range jobs { - // Skip jobs that are already in a terminal state (completed, cancelled, etc.). - status := job.Status - if status.IsDone() { - continue - } - - // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. - if job.TaskID == 0 { - job.Status = StatusCancelled - job.Stopped = timeutil.TimeStampNow() - - // Update the job's status and stopped time in the database. - n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") - if err != nil { - return cancelledJobs, err - } - - // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. - if n == 0 { - return cancelledJobs, errors.New("job has changed, try again") - } - - cancelledJobs = append(cancelledJobs, job) - // Continue with the next job. - continue - } - - // If the job has an associated task, try to stop the task, effectively cancelling the job. - if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { - return cancelledJobs, err - } - cancelledJobs = append(cancelledJobs, job) + cjs, err := CancelJobs(ctx, jobs) + if err != nil { + return cancelledJobs, err } + cancelledJobs = append(cancelledJobs, cjs...) } // Return nil to indicate successful cancellation of all running and waiting jobs. return cancelledJobs, nil } -// InsertRun inserts a run -// The title will be cut off at 255 characters if it's longer than 255 characters. -func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error { - return db.WithTx(ctx, func(ctx context.Context) error { - index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) - if err != nil { - return err - } - run.Index = index - run.Title = util.EllipsisDisplayString(run.Title, 255) - - if err := db.Insert(ctx, run); err != nil { - return err +func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) { + cancelledJobs := make([]*ActionRunJob, 0, len(jobs)) + // Iterate over each job and attempt to cancel it. + for _, job := range jobs { + // Skip jobs that are already in a terminal state (completed, cancelled, etc.). + status := job.Status + if status.IsDone() { + continue } - if run.Repo == nil { - repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) + // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. + if job.TaskID == 0 { + job.Status = StatusCancelled + job.Stopped = timeutil.TimeStampNow() + + // Update the job's status and stopped time in the database. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") if err != nil { - return err + return cancelledJobs, err } - run.Repo = repo + + // If the update affected 0 rows, it means the job has changed in the meantime + if n == 0 { + log.Error("Failed to cancel job %d because it has changed", job.ID) + continue + } + + cancelledJobs = append(cancelledJobs, job) + // Continue with the next job. + continue } - if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { - return err + // If the job has an associated task, try to stop the task, effectively cancelling the job. + if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { + return cancelledJobs, err } + updatedJob, err := GetRunJobByID(ctx, job.ID) + if err != nil { + return cancelledJobs, fmt.Errorf("get job: %w", err) + } + cancelledJobs = append(cancelledJobs, updatedJob) + } - runJobs := make([]*ActionRunJob, 0, len(jobs)) - var hasWaiting bool - for _, v := range jobs { - id, job := v.Job() - needs := job.Needs() - if err := v.SetJob(id, job.EraseNeeds()); err != nil { - return err - } - payload, _ := v.Marshal() - status := StatusWaiting - if len(needs) > 0 || run.NeedApproval { - status = StatusBlocked - } else { - hasWaiting = true - } - job.Name = util.EllipsisDisplayString(job.Name, 255) - runJobs = append(runJobs, &ActionRunJob{ - RunID: run.ID, - RepoID: run.RepoID, - OwnerID: run.OwnerID, - CommitSHA: run.CommitSHA, - IsForkPullRequest: run.IsForkPullRequest, - Name: job.Name, - WorkflowPayload: payload, - JobID: id, - Needs: needs, - RunsOn: job.RunsOn(), - Status: status, - }) - } - if err := db.Insert(ctx, runJobs); err != nil { - return err - } - - // if there is a job in the waiting status, increase tasks version. - if hasWaiting { - if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { - return err - } - } - return nil - }) + // Return nil to indicate successful cancellation of all running and waiting jobs. + return cancelledJobs, nil } func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, error) { @@ -441,7 +390,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { if err = run.LoadRepo(ctx); err != nil { return err } - if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { + if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil { return err } } @@ -450,3 +399,59 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { } type ActionRunIndex db.ResourceIndex + +func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) { + runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ + RepoID: repoID, + ConcurrencyGroup: concurrencyGroup, + Status: status, + }) + if err != nil { + return nil, nil, fmt.Errorf("find runs: %w", err) + } + + jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ + RepoID: repoID, + ConcurrencyGroup: concurrencyGroup, + Statuses: status, + }) + if err != nil { + return nil, nil, fmt.Errorf("find jobs: %w", err) + } + + return runs, jobs, nil +} + +func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) { + if actionRun.ConcurrencyGroup == "" { + return nil, nil + } + + var jobsToCancel []*ActionRunJob + + statusFindOption := []Status{StatusWaiting, StatusBlocked} + if actionRun.ConcurrencyCancel { + statusFindOption = append(statusFindOption, StatusRunning) + } + runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption) + if err != nil { + return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) + } + jobsToCancel = append(jobsToCancel, jobs...) + + // cancel runs in the same concurrency group + for _, run := range runs { + if run.ID == actionRun.ID { + continue + } + jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + } + jobsToCancel = append(jobsToCancel, jobs...) + } + + return CancelJobs(ctx, jobsToCancel) +} diff --git a/models/actions/run_job.go b/models/actions/run_job.go index e7fa21270c..9c7c3658d7 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -22,23 +22,38 @@ type ActionRunJob struct { ID int64 RunID int64 `xorm:"index"` Run *ActionRun `xorm:"-"` - RepoID int64 `xorm:"index"` + RepoID int64 `xorm:"index(repo_concurrency)"` Repo *repo_model.Repository `xorm:"-"` OwnerID int64 `xorm:"index"` CommitSHA string `xorm:"index"` IsForkPullRequest bool Name string `xorm:"VARCHAR(255)"` Attempt int64 - WorkflowPayload []byte - JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id - Needs []string `xorm:"JSON TEXT"` - RunsOn []string `xorm:"JSON TEXT"` - TaskID int64 // the latest task of the job - Status Status `xorm:"index"` - Started timeutil.TimeStamp - Stopped timeutil.TimeStamp - Created timeutil.TimeStamp `xorm:"created"` - Updated timeutil.TimeStamp `xorm:"updated index"` + + // WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse + // it should contain exactly one job with global workflow fields for this model + WorkflowPayload []byte + + JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id + Needs []string `xorm:"JSON TEXT"` + RunsOn []string `xorm:"JSON TEXT"` + TaskID int64 // the latest task of the job + Status Status `xorm:"index"` + + RawConcurrency string // raw concurrency from job YAML's "concurrency" section + + // IsConcurrencyEvaluated is only valid/needed when this job's RawConcurrency is not empty. + // If RawConcurrency can't be evaluated (e.g. depend on other job's outputs or have errors), this field will be false. + // If RawConcurrency has been successfully evaluated, this field will be true, ConcurrencyGroup and ConcurrencyCancel are also set. + IsConcurrencyEvaluated bool + + ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group + ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress + + Started timeutil.TimeStamp + Stopped timeutil.TimeStamp + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated index"` } func init() { @@ -125,7 +140,7 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col return affected, nil } - if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() { + if slices.Contains(cols, "status") && job.Status.IsWaiting() { // if the status of job changes to waiting again, increase tasks version. if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { return 0, err @@ -197,3 +212,39 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status { return StatusUnknown // it shouldn't happen } } + +func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) (jobsToCancel []*ActionRunJob, _ error) { + if job.RawConcurrency == "" { + return nil, nil + } + if !job.IsConcurrencyEvaluated { + return nil, nil + } + if job.ConcurrencyGroup == "" { + return nil, nil + } + + statusFindOption := []Status{StatusWaiting, StatusBlocked} + if job.ConcurrencyCancel { + statusFindOption = append(statusFindOption, StatusRunning) + } + runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption) + if err != nil { + return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) + } + jobs = slices.DeleteFunc(jobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) + jobsToCancel = append(jobsToCancel, jobs...) + + // cancel runs in the same concurrency group + for _, run := range runs { + jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + } + jobsToCancel = append(jobsToCancel, jobs...) + } + + return CancelJobs(ctx, jobsToCancel) +} diff --git a/models/actions/run_job_list.go b/models/actions/run_job_list.go index 5f7bb62878..10f76d3641 100644 --- a/models/actions/run_job_list.go +++ b/models/actions/run_job_list.go @@ -69,12 +69,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err type FindRunJobOptions struct { db.ListOptions - RunID int64 - RepoID int64 - OwnerID int64 - CommitSHA string - Statuses []Status - UpdatedBefore timeutil.TimeStamp + RunID int64 + RepoID int64 + OwnerID int64 + CommitSHA string + Statuses []Status + UpdatedBefore timeutil.TimeStamp + ConcurrencyGroup string } func (opts FindRunJobOptions) ToConds() builder.Cond { @@ -94,6 +95,12 @@ func (opts FindRunJobOptions) ToConds() builder.Cond { if opts.UpdatedBefore > 0 { cond = cond.And(builder.Lt{"`action_run_job`.updated": opts.UpdatedBefore}) } + if opts.ConcurrencyGroup != "" { + if opts.RepoID == 0 { + panic("Invalid FindRunJobOptions: repo_id is required") + } + cond = cond.And(builder.Eq{"`action_run_job`.concurrency_group": opts.ConcurrencyGroup}) + } return cond } diff --git a/models/actions/run_list.go b/models/actions/run_list.go index 12c55e538e..2628c4712f 100644 --- a/models/actions/run_list.go +++ b/models/actions/run_list.go @@ -64,15 +64,16 @@ func (runs RunList) LoadRepos(ctx context.Context) error { type FindRunOptions struct { db.ListOptions - RepoID int64 - OwnerID int64 - WorkflowID string - Ref string // the commit/tag/… that caused this workflow - TriggerUserID int64 - TriggerEvent webhook_module.HookEventType - Approved bool // not util.OptionalBool, it works only when it's true - Status []Status - CommitSHA string + RepoID int64 + OwnerID int64 + WorkflowID string + Ref string // the commit/tag/… that caused this workflow + TriggerUserID int64 + TriggerEvent webhook_module.HookEventType + Approved bool // not util.OptionalBool, it works only when it's true + Status []Status + ConcurrencyGroup string + CommitSHA string } func (opts FindRunOptions) ToConds() builder.Cond { @@ -101,6 +102,12 @@ func (opts FindRunOptions) ToConds() builder.Cond { if opts.CommitSHA != "" { cond = cond.And(builder.Eq{"`action_run`.commit_sha": opts.CommitSHA}) } + if len(opts.ConcurrencyGroup) > 0 { + if opts.RepoID == 0 { + panic("Invalid FindRunOptions: repo_id is required") + } + cond = cond.And(builder.Eq{"`action_run`.concurrency_group": opts.ConcurrencyGroup}) + } return cond } diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 1b1558f39d..8fb10e84cf 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -394,6 +394,7 @@ func prepareMigrationTasks() []*migration { // Gitea 1.24.0 ends at database version 321 newMigration(321, "Use LONGTEXT for some columns and fix review_state.updated_files column", v1_25.UseLongTextInSomeColumnsAndFixBugs), newMigration(322, "Extend comment tree_path length limit", v1_25.ExtendCommentTreePathLength), + newMigration(323, "Add support for actions concurrency", v1_25.AddActionsConcurrency), } return preparedMigrations } diff --git a/models/migrations/v1_25/v323.go b/models/migrations/v1_25/v323.go new file mode 100644 index 0000000000..5f38ea8545 --- /dev/null +++ b/models/migrations/v1_25/v323.go @@ -0,0 +1,43 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_25 + +import ( + "xorm.io/xorm" +) + +func AddActionsConcurrency(x *xorm.Engine) error { + type ActionRun struct { + RepoID int64 `xorm:"index(repo_concurrency)"` + RawConcurrency string + ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` + ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` + } + + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreDropIndices: true, + }, new(ActionRun)); err != nil { + return err + } + + if err := x.Sync(new(ActionRun)); err != nil { + return err + } + + type ActionRunJob struct { + RepoID int64 `xorm:"index(repo_concurrency)"` + RawConcurrency string + IsConcurrencyEvaluated bool + ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` + ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` + } + + if _, err := x.SyncWithOptions(xorm.SyncOptions{ + IgnoreDropIndices: true, + }, new(ActionRunJob)); err != nil { + return err + } + + return nil +} diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 55ba7862a9..46c5147d99 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -227,9 +227,12 @@ func (s *Service) UpdateTask( } if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { - if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil { + if err := actions_service.EmitJobsIfReadyByRun(task.Job.RunID); err != nil { log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err) } + if task.Job.Run.Status.IsDone() { + actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, task.Job) + } } return connect.NewResponse(&runnerv1.UpdateTaskResponse{ diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 3422128026..232c627709 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -27,7 +27,6 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/templates" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/web" "code.gitea.io/gitea/routers/common" @@ -36,6 +35,7 @@ import ( notify_service "code.gitea.io/gitea/services/notify" "github.com/nektos/act/pkg/model" + "gopkg.in/yaml.v3" "xorm.io/builder" ) @@ -420,12 +420,45 @@ func Rerun(ctx *context_module.Context) { return } + // check run (workflow-level) concurrency + + job, jobs := getRunJobs(ctx, runIndex, jobIndex) + if ctx.Written() { + return + } + // reset run's start and stop time when it is done if run.Status.IsDone() { run.PreviousDuration = run.Duration() run.Started = 0 run.Stopped = 0 - if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration"); err != nil { + + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + ctx.ServerError("GetVariablesOfRun", fmt.Errorf("get run %d variables: %w", run.ID, err)) + return + } + + if run.RawConcurrency != "" { + var rawConcurrency model.RawConcurrency + if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil { + ctx.ServerError("UnmarshalRawConcurrency", fmt.Errorf("unmarshal raw concurrency: %w", err)) + return + } + + err = actions_service.EvaluateRunConcurrencyFillModel(ctx, run, &rawConcurrency, vars) + if err != nil { + ctx.ServerError("EvaluateRunConcurrencyFillModel", err) + return + } + + run.Status, err = actions_service.PrepareToStartRunWithConcurrency(ctx, run) + if err != nil { + ctx.ServerError("PrepareToStartRunWithConcurrency", err) + return + } + } + if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil { ctx.ServerError("UpdateRun", err) return } @@ -437,16 +470,12 @@ func Rerun(ctx *context_module.Context) { notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) } - job, jobs := getRunJobs(ctx, runIndex, jobIndex) - if ctx.Written() { - return - } - + isRunBlocked := run.Status == actions_model.StatusBlocked if jobIndexStr == "" { // rerun all jobs for _, j := range jobs { // if the job has needs, it should be set to "blocked" status to wait for other jobs - shouldBlock := len(j.Needs) > 0 - if err := rerunJob(ctx, j, shouldBlock); err != nil { + shouldBlockJob := len(j.Needs) > 0 || isRunBlocked + if err := rerunJob(ctx, j, shouldBlockJob); err != nil { ctx.ServerError("RerunJob", err) return } @@ -459,8 +488,8 @@ func Rerun(ctx *context_module.Context) { for _, j := range rerunJobs { // jobs other than the specified one should be set to "blocked" status - shouldBlock := j.JobID != job.JobID - if err := rerunJob(ctx, j, shouldBlock); err != nil { + shouldBlockJob := j.JobID != job.JobID || isRunBlocked + if err := rerunJob(ctx, j, shouldBlockJob); err != nil { ctx.ServerError("RerunJob", err) return } @@ -476,15 +505,37 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou } job.TaskID = 0 - job.Status = actions_model.StatusWaiting - if shouldBlock { - job.Status = actions_model.StatusBlocked - } + job.Status = util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting) job.Started = 0 job.Stopped = 0 + job.ConcurrencyGroup = "" + job.ConcurrencyCancel = false + job.IsConcurrencyEvaluated = false + if err := job.LoadRun(ctx); err != nil { + return err + } + + vars, err := actions_model.GetVariablesOfRun(ctx, job.Run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", job.Run.ID, err) + } + + if job.RawConcurrency != "" && !shouldBlock { + err = actions_service.EvaluateJobConcurrencyFillModel(ctx, job.Run, job, vars) + if err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + + job.Status, err = actions_service.PrepareToStartJobWithConcurrency(ctx, job) + if err != nil { + return err + } + } + if err := db.WithTx(ctx, func(ctx context.Context) error { - _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "task_id", "status", "started", "stopped") + updateCols := []string{"task_id", "status", "started", "stopped", "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"} + _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, updateCols...) return err }); err != nil { return err @@ -523,33 +574,14 @@ func Cancel(ctx *context_module.Context) { return } - var updatedjobs []*actions_model.ActionRunJob + var updatedJobs []*actions_model.ActionRunJob if err := db.WithTx(ctx, func(ctx context.Context) error { - for _, job := range jobs { - status := job.Status - if status.IsDone() { - continue - } - if job.TaskID == 0 { - job.Status = actions_model.StatusCancelled - job.Stopped = timeutil.TimeStampNow() - n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") - if err != nil { - return err - } - if n == 0 { - return errors.New("job has changed, try again") - } - if n > 0 { - updatedjobs = append(updatedjobs, job) - } - continue - } - if err := actions_model.StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil { - return err - } + cancelledJobs, err := actions_model.CancelJobs(ctx, jobs) + if err != nil { + return fmt.Errorf("cancel jobs: %w", err) } + updatedJobs = append(updatedJobs, cancelledJobs...) return nil }); err != nil { ctx.ServerError("StopTask", err) @@ -557,13 +589,14 @@ func Cancel(ctx *context_module.Context) { } actions_service.CreateCommitStatus(ctx, jobs...) + actions_service.EmitJobsIfReadyByJobs(updatedJobs) - for _, job := range updatedjobs { + for _, job := range updatedJobs { _ = job.LoadAttributes(ctx) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } - if len(updatedjobs) > 0 { - job := updatedjobs[0] + if len(updatedJobs) > 0 { + job := updatedJobs[0] actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job) } ctx.JSONOK() @@ -579,40 +612,44 @@ func Approve(ctx *context_module.Context) { run := current.Run doer := ctx.Doer - var updatedjobs []*actions_model.ActionRunJob + var updatedJobs []*actions_model.ActionRunJob - if err := db.WithTx(ctx, func(ctx context.Context) error { + err := db.WithTx(ctx, func(ctx context.Context) (err error) { run.NeedApproval = false run.ApprovedBy = doer.ID if err := actions_model.UpdateRun(ctx, run, "need_approval", "approved_by"); err != nil { return err } for _, job := range jobs { - if len(job.Needs) == 0 && job.Status.IsBlocked() { - job.Status = actions_model.StatusWaiting + job.Status, err = actions_service.PrepareToStartJobWithConcurrency(ctx, job) + if err != nil { + return err + } + if job.Status == actions_model.StatusWaiting { n, err := actions_model.UpdateRunJob(ctx, job, nil, "status") if err != nil { return err } if n > 0 { - updatedjobs = append(updatedjobs, job) + updatedJobs = append(updatedJobs, job) } } } return nil - }); err != nil { + }) + if err != nil { ctx.ServerError("UpdateRunJob", err) return } actions_service.CreateCommitStatus(ctx, jobs...) - if len(updatedjobs) > 0 { - job := updatedjobs[0] + if len(updatedJobs) > 0 { + job := updatedJobs[0] actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job) } - for _, job := range updatedjobs { + for _, job := range updatedJobs { _ = job.LoadAttributes(ctx) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index 3c7aa0b1a5..727a18443d 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -15,6 +15,7 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" webhook_module "code.gitea.io/gitea/modules/webhook" notify_service "code.gitea.io/gitea/services/notify" ) @@ -50,15 +51,84 @@ func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.Ac func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { jobs, err := actions_model.CancelPreviousJobs(ctx, repoID, ref, workflowID, event) notifyWorkflowJobStatusUpdate(ctx, jobs) + EmitJobsIfReadyByJobs(jobs) return err } func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) error { jobs, err := actions_model.CleanRepoScheduleTasks(ctx, repo) notifyWorkflowJobStatusUpdate(ctx, jobs) + EmitJobsIfReadyByJobs(jobs) return err } +func shouldBlockJobByConcurrency(ctx context.Context, job *actions_model.ActionRunJob) (bool, error) { + if job.RawConcurrency != "" && !job.IsConcurrencyEvaluated { + // when the job depends on other jobs, we cannot evaluate its concurrency, so it should be blocked and will be evaluated again when its dependencies are done + return true, nil + } + + if job.ConcurrencyGroup == "" || job.ConcurrencyCancel { + return false, nil + } + + runs, jobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning}) + if err != nil { + return false, fmt.Errorf("GetConcurrentRunsAndJobs: %w", err) + } + + return len(runs) > 0 || len(jobs) > 0, nil +} + +// PrepareToStartJobWithConcurrency prepares a job to start by its evaluated concurrency group and cancelling previous jobs if necessary. +// It returns the new status of the job (either StatusBlocked or StatusWaiting) and any error encountered during the process. +func PrepareToStartJobWithConcurrency(ctx context.Context, job *actions_model.ActionRunJob) (actions_model.Status, error) { + shouldBlock, err := shouldBlockJobByConcurrency(ctx, job) + if err != nil { + return actions_model.StatusBlocked, err + } + + // even if the current job is blocked, we still need to cancel previous "waiting/blocked" jobs in the same concurrency group + jobs, err := actions_model.CancelPreviousJobsByJobConcurrency(ctx, job) + if err != nil { + return actions_model.StatusBlocked, fmt.Errorf("CancelPreviousJobsByJobConcurrency: %w", err) + } + notifyWorkflowJobStatusUpdate(ctx, jobs) + + return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), nil +} + +func shouldBlockRunByConcurrency(ctx context.Context, actionRun *actions_model.ActionRun) (bool, error) { + if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel { + return false, nil + } + + runs, jobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, []actions_model.Status{actions_model.StatusRunning}) + if err != nil { + return false, fmt.Errorf("find concurrent runs and jobs: %w", err) + } + + return len(runs) > 0 || len(jobs) > 0, nil +} + +// PrepareToStartRunWithConcurrency prepares a run to start by its evaluated concurrency group and cancelling previous jobs if necessary. +// It returns the new status of the run (either StatusBlocked or StatusWaiting) and any error encountered during the process. +func PrepareToStartRunWithConcurrency(ctx context.Context, run *actions_model.ActionRun) (actions_model.Status, error) { + shouldBlock, err := shouldBlockRunByConcurrency(ctx, run) + if err != nil { + return actions_model.StatusBlocked, err + } + + // even if the current run is blocked, we still need to cancel previous "waiting/blocked" jobs in the same concurrency group + jobs, err := actions_model.CancelPreviousJobsByRunConcurrency(ctx, run) + if err != nil { + return actions_model.StatusBlocked, fmt.Errorf("CancelPreviousJobsByRunConcurrency: %w", err) + } + notifyWorkflowJobStatusUpdate(ctx, jobs) + + return util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting), nil +} + func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { tasks, err := db.Find[actions_model.ActionTask](ctx, opts) if err != nil { @@ -95,6 +165,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { } notifyWorkflowJobStatusUpdate(ctx, jobs) + EmitJobsIfReadyByJobs(jobs) return nil } @@ -103,7 +174,7 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { func CancelAbandonedJobs(ctx context.Context) error { jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ Statuses: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusBlocked}, - UpdatedBefore: timeutil.TimeStamp(time.Now().Add(-setting.Actions.AbandonedJobTimeout).Unix()), + UpdatedBefore: timeutil.TimeStampNow().AddDuration(-setting.Actions.AbandonedJobTimeout), }) if err != nil { log.Warn("find abandoned tasks: %v", err) @@ -114,6 +185,7 @@ func CancelAbandonedJobs(ctx context.Context) error { // Collect one job per run to send workflow run status update updatedRuns := map[int64]*actions_model.ActionRunJob{} + updatedJobs := []*actions_model.ActionRunJob{} for _, job := range jobs { job.Status = actions_model.StatusCancelled @@ -138,6 +210,7 @@ func CancelAbandonedJobs(ctx context.Context) error { } CreateCommitStatus(ctx, job) if updated { + updatedJobs = append(updatedJobs, job) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } } @@ -145,6 +218,7 @@ func CancelAbandonedJobs(ctx context.Context) error { for _, job := range updatedRuns { notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run) } + EmitJobsIfReadyByJobs(updatedJobs) return nil } diff --git a/services/actions/concurrency.go b/services/actions/concurrency.go new file mode 100644 index 0000000000..2910e47739 --- /dev/null +++ b/services/actions/concurrency.go @@ -0,0 +1,121 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "errors" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/modules/json" + api "code.gitea.io/gitea/modules/structs" + + "github.com/nektos/act/pkg/jobparser" + act_model "github.com/nektos/act/pkg/model" + "gopkg.in/yaml.v3" +) + +// EvaluateRunConcurrencyFillModel evaluates the expressions in a run-level (workflow) concurrency, +// and fills the run's model fields with `concurrency.group` and `concurrency.cancel-in-progress`. +// Workflow-level concurrency doesn't depend on the job outputs, so it can always be evaluated if there is no syntax error. +// See https://docs.github.com/en/actions/reference/workflows-and-actions/workflow-syntax#concurrency +func EvaluateRunConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, wfRawConcurrency *act_model.RawConcurrency, vars map[string]string) error { + if err := run.LoadAttributes(ctx); err != nil { + return fmt.Errorf("run LoadAttributes: %w", err) + } + + actionsRunCtx := GenerateGiteaContext(run, nil) + jobResults := map[string]*jobparser.JobResult{"": {}} + inputs, err := getInputsFromRun(run) + if err != nil { + return fmt.Errorf("get inputs: %w", err) + } + + rawConcurrency, err := yaml.Marshal(wfRawConcurrency) + if err != nil { + return fmt.Errorf("marshal raw concurrency: %w", err) + } + run.RawConcurrency = string(rawConcurrency) + run.ConcurrencyGroup, run.ConcurrencyCancel, err = jobparser.EvaluateConcurrency(wfRawConcurrency, "", nil, actionsRunCtx, jobResults, vars, inputs) + if err != nil { + return fmt.Errorf("evaluate concurrency: %w", err) + } + return nil +} + +func findJobNeedsAndFillJobResults(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*jobparser.JobResult, error) { + taskNeeds, err := FindTaskNeeds(ctx, job) + if err != nil { + return nil, fmt.Errorf("find task needs: %w", err) + } + jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds)) + for jobID, taskNeed := range taskNeeds { + jobResult := &jobparser.JobResult{ + Result: taskNeed.Result.String(), + Outputs: taskNeed.Outputs, + } + jobResults[jobID] = jobResult + } + jobResults[job.JobID] = &jobparser.JobResult{ + Needs: job.Needs, + } + return jobResults, nil +} + +// EvaluateJobConcurrencyFillModel evaluates the expressions in a job-level concurrency, +// and fills the job's model fields with `concurrency.group` and `concurrency.cancel-in-progress`. +// Job-level concurrency may depend on other job's outputs (via `needs`): `concurrency.group: my-group-${{ needs.job1.outputs.out1 }}` +// If the needed jobs haven't been executed yet, this evaluation will also fail. +// See https://docs.github.com/en/actions/reference/workflows-and-actions/workflow-syntax#jobsjob_idconcurrency +func EvaluateJobConcurrencyFillModel(ctx context.Context, run *actions_model.ActionRun, actionRunJob *actions_model.ActionRunJob, vars map[string]string) error { + if err := actionRunJob.LoadAttributes(ctx); err != nil { + return fmt.Errorf("job LoadAttributes: %w", err) + } + + var rawConcurrency act_model.RawConcurrency + if err := yaml.Unmarshal([]byte(actionRunJob.RawConcurrency), &rawConcurrency); err != nil { + return fmt.Errorf("unmarshal raw concurrency: %w", err) + } + + actionsJobCtx := GenerateGiteaContext(run, actionRunJob) + + jobResults, err := findJobNeedsAndFillJobResults(ctx, actionRunJob) + if err != nil { + return fmt.Errorf("find job needs and fill job results: %w", err) + } + + inputs, err := getInputsFromRun(run) + if err != nil { + return fmt.Errorf("get inputs: %w", err) + } + + // singleWorkflows is created from an ActionJob, which always contains exactly a single job's YAML definition. + // Ideally it shouldn't be called "Workflow", it is just a job with global workflow fields + trigger + singleWorkflows, err := jobparser.Parse(actionRunJob.WorkflowPayload) + if err != nil { + return fmt.Errorf("parse single workflow: %w", err) + } else if len(singleWorkflows) != 1 { + return errors.New("not single workflow") + } + _, singleWorkflowJob := singleWorkflows[0].Job() + + actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = jobparser.EvaluateConcurrency(&rawConcurrency, actionRunJob.JobID, singleWorkflowJob, actionsJobCtx, jobResults, vars, inputs) + if err != nil { + return fmt.Errorf("evaluate concurrency: %w", err) + } + actionRunJob.IsConcurrencyEvaluated = true + return nil +} + +func getInputsFromRun(run *actions_model.ActionRun) (map[string]any, error) { + if run.Event != "workflow_dispatch" { + return map[string]any{}, nil + } + var payload api.WorkflowDispatchPayload + if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil { + return nil, err + } + return payload.Inputs, nil +} diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 47c9f59094..c6578eae65 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -10,9 +10,12 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/container" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/queue" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/util" notify_service "code.gitea.io/gitea/services/notify" "github.com/nektos/act/pkg/jobparser" @@ -25,7 +28,7 @@ type jobUpdate struct { RunID int64 } -func EmitJobsIfReady(runID int64) error { +func EmitJobsIfReadyByRun(runID int64) error { err := jobEmitterQueue.Push(&jobUpdate{ RunID: runID, }) @@ -35,53 +38,77 @@ func EmitJobsIfReady(runID int64) error { return err } +func EmitJobsIfReadyByJobs(jobs []*actions_model.ActionRunJob) { + checkedRuns := make(container.Set[int64]) + for _, job := range jobs { + if !job.Status.IsDone() || checkedRuns.Contains(job.RunID) { + continue + } + if err := EmitJobsIfReadyByRun(job.RunID); err != nil { + log.Error("Check jobs of run %d: %v", job.RunID, err) + } + checkedRuns.Add(job.RunID) + } +} + func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { ctx := graceful.GetManager().ShutdownContext() var ret []*jobUpdate for _, update := range items { - if err := checkJobsOfRun(ctx, update.RunID); err != nil { + if err := checkJobsByRunID(ctx, update.RunID); err != nil { + log.Error("check run %d: %v", update.RunID, err) ret = append(ret, update) } } return ret } -func checkJobsOfRun(ctx context.Context, runID int64) error { - jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID}) - if err != nil { - return err +func checkJobsByRunID(ctx context.Context, runID int64) error { + run, exist, err := db.GetByID[actions_model.ActionRun](ctx, runID) + if !exist { + return fmt.Errorf("run %d does not exist", runID) } - var updatedjobs []*actions_model.ActionRunJob + if err != nil { + return fmt.Errorf("get action run: %w", err) + } + var jobs, updatedJobs []*actions_model.ActionRunJob if err := db.WithTx(ctx, func(ctx context.Context) error { - idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) - for _, job := range jobs { - idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + // check jobs of the current run + if js, ujs, err := checkJobsOfRun(ctx, run); err != nil { + return err + } else { + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) } - - updates := newJobStatusResolver(jobs).Resolve() - for _, job := range jobs { - if status, ok := updates[job.ID]; ok { - job.Status = status - if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { - return err - } else if n != 1 { - return fmt.Errorf("no affected for updating blocked job %v", job.ID) - } - updatedjobs = append(updatedjobs, job) - } + if js, ujs, err := checkRunConcurrency(ctx, run); err != nil { + return err + } else { + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) } return nil }); err != nil { return err } CreateCommitStatus(ctx, jobs...) - for _, job := range updatedjobs { + for _, job := range updatedJobs { _ = job.LoadAttributes(ctx) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } - if len(jobs) > 0 { + runJobs := make(map[int64][]*actions_model.ActionRunJob) + for _, job := range jobs { + runJobs[job.RunID] = append(runJobs[job.RunID], job) + } + runUpdatedJobs := make(map[int64][]*actions_model.ActionRunJob) + for _, uj := range updatedJobs { + runUpdatedJobs[uj.RunID] = append(runUpdatedJobs[uj.RunID], uj) + } + for runID, js := range runJobs { + if len(runUpdatedJobs[runID]) == 0 { + continue + } runUpdated := true - for _, job := range jobs { + for _, job := range js { if !job.Status.IsDone() { runUpdated = false break @@ -94,6 +121,118 @@ func checkJobsOfRun(ctx context.Context, runID int64) error { return nil } +// findBlockedRunByConcurrency finds the blocked concurrent run in a repo and returns `nil, nil` when there is no blocked run. +func findBlockedRunByConcurrency(ctx context.Context, repoID int64, concurrencyGroup string) (*actions_model.ActionRun, error) { + if concurrencyGroup == "" { + return nil, nil + } + cRuns, cJobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, repoID, concurrencyGroup, []actions_model.Status{actions_model.StatusBlocked}) + if err != nil { + return nil, fmt.Errorf("find concurrent runs and jobs: %w", err) + } + + // There can be at most one blocked run or job + var concurrentRun *actions_model.ActionRun + if len(cRuns) > 0 { + concurrentRun = cRuns[0] + } else if len(cJobs) > 0 { + jobRun, exist, err := db.GetByID[actions_model.ActionRun](ctx, cJobs[0].RunID) + if !exist { + return nil, fmt.Errorf("run %d does not exist", cJobs[0].RunID) + } + if err != nil { + return nil, fmt.Errorf("get run by job %d: %w", cJobs[0].ID, err) + } + concurrentRun = jobRun + } + + return concurrentRun, nil +} + +func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) { + checkedConcurrencyGroup := make(container.Set[string]) + + // check run (workflow-level) concurrency + if run.ConcurrencyGroup != "" { + concurrentRun, err := findBlockedRunByConcurrency(ctx, run.RepoID, run.ConcurrencyGroup) + if err != nil { + return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) + } + if concurrentRun != nil && !concurrentRun.NeedApproval { + js, ujs, err := checkJobsOfRun(ctx, concurrentRun) + if err != nil { + return nil, nil, err + } + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) + } + checkedConcurrencyGroup.Add(run.ConcurrencyGroup) + } + + // check job concurrency + runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err) + } + for _, job := range runJobs { + if !job.Status.IsDone() { + continue + } + if job.ConcurrencyGroup == "" && checkedConcurrencyGroup.Contains(job.ConcurrencyGroup) { + continue + } + concurrentRun, err := findBlockedRunByConcurrency(ctx, job.RepoID, job.ConcurrencyGroup) + if err != nil { + return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err) + } + if concurrentRun != nil && !concurrentRun.NeedApproval { + js, ujs, err := checkJobsOfRun(ctx, concurrentRun) + if err != nil { + return nil, nil, err + } + jobs = append(jobs, js...) + updatedJobs = append(updatedJobs, ujs...) + } + checkedConcurrencyGroup.Add(job.ConcurrencyGroup) + } + return jobs, updatedJobs, nil +} + +func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) { + jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return nil, nil, err + } + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return nil, nil, err + } + + if err = db.WithTx(ctx, func(ctx context.Context) error { + for _, job := range jobs { + job.Run = run + } + + updates := newJobStatusResolver(jobs, vars).Resolve(ctx) + for _, job := range jobs { + if status, ok := updates[job.ID]; ok { + job.Status = status + if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { + return err + } else if n != 1 { + return fmt.Errorf("no affected for updating blocked job %v", job.ID) + } + updatedJobs = append(updatedJobs, job) + } + } + return nil + }); err != nil { + return nil, nil, err + } + + return jobs, updatedJobs, nil +} + func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_model.ActionRunJob) { job.Run = nil if err := job.LoadAttributes(ctx); err != nil { @@ -107,9 +246,10 @@ type jobStatusResolver struct { statuses map[int64]actions_model.Status needs map[int64][]int64 jobMap map[int64]*actions_model.ActionRunJob + vars map[string]string } -func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { +func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver { idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) jobMap := make(map[int64]*actions_model.ActionRunJob) for _, job := range jobs { @@ -131,13 +271,14 @@ func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { statuses: statuses, needs: needs, jobMap: jobMap, + vars: vars, } } -func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) Resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for i := 0; i < len(r.statuses); i++ { - updated := r.resolve() + updated := r.resolve(ctx) if len(updated) == 0 { return ret } @@ -149,43 +290,86 @@ func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { return ret } -func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) resolveCheckNeeds(id int64) (allDone, allSucceed bool) { + allDone, allSucceed = true, true + for _, need := range r.needs[id] { + needStatus := r.statuses[need] + if !needStatus.IsDone() { + allDone = false + } + if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) { + allSucceed = false + } + } + return allDone, allSucceed +} + +func (r *jobStatusResolver) resolveJobHasIfCondition(actionRunJob *actions_model.ActionRunJob) (hasIf bool) { + if wfJobs, _ := jobparser.Parse(actionRunJob.WorkflowPayload); len(wfJobs) == 1 { + _, wfJob := wfJobs[0].Job() + hasIf = len(wfJob.If.Value) > 0 + } + return hasIf +} + +func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for id, status := range r.statuses { + actionRunJob := r.jobMap[id] if status != actions_model.StatusBlocked { continue } - allDone, allSucceed := true, true - for _, need := range r.needs[id] { - needStatus := r.statuses[need] - if !needStatus.IsDone() { - allDone = false - } - if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) { - allSucceed = false + allDone, allSucceed := r.resolveCheckNeeds(id) + if !allDone { + continue + } + + // update concurrency and check whether the job can run now + err := updateConcurrencyEvaluationForJobWithNeeds(ctx, actionRunJob, r.vars) + if err != nil { + // The err can be caused by different cases: database error, or syntax error, or the needed jobs haven't completed + // At the moment there is no way to distinguish them. + // Actually, for most cases, the error is caused by "syntax error" / "the needed jobs haven't completed (skipped?)" + // TODO: if workflow or concurrency expression has syntax error, there should be a user error message, need to show it to end users + log.Debug("updateConcurrencyEvaluationForJobWithNeeds failed, this job will stay blocked: job: %d, err: %v", id, err) + continue + } + + shouldStartJob := true + if !allSucceed { + // Not all dependent jobs completed successfully: + // * if the job has "if" condition, it can be started, then the act_runner will evaluate the "if" condition. + // * otherwise, the job should be skipped. + shouldStartJob = r.resolveJobHasIfCondition(actionRunJob) + } + + newStatus := util.Iif(shouldStartJob, actions_model.StatusWaiting, actions_model.StatusSkipped) + if newStatus == actions_model.StatusWaiting { + newStatus, err = PrepareToStartJobWithConcurrency(ctx, actionRunJob) + if err != nil { + log.Error("ShouldBlockJobByConcurrency failed, this job will stay blocked: job: %d, err: %v", id, err) } } - if allDone { - if allSucceed { - ret[id] = actions_model.StatusWaiting - } else { - // Check if the job has an "if" condition - hasIf := false - if wfJobs, _ := jobparser.Parse(r.jobMap[id].WorkflowPayload); len(wfJobs) == 1 { - _, wfJob := wfJobs[0].Job() - hasIf = len(wfJob.If.Value) > 0 - } - if hasIf { - // act_runner will check the "if" condition - ret[id] = actions_model.StatusWaiting - } else { - // If the "if" condition is empty and not all dependent jobs completed successfully, - // the job should be skipped. - ret[id] = actions_model.StatusSkipped - } - } + if newStatus != actions_model.StatusBlocked { + ret[id] = newStatus } } return ret } + +func updateConcurrencyEvaluationForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) error { + if setting.IsInTesting && actionRunJob.RepoID == 0 { + return nil // for testing purpose only, no repo, no evaluation + } + + err := EvaluateJobConcurrencyFillModel(ctx, actionRunJob.Run, actionRunJob, vars) + if err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + + if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil { + return fmt.Errorf("update run job: %w", err) + } + return nil +} diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 58c2dc3b24..a2152fb270 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -129,8 +129,8 @@ jobs: } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newJobStatusResolver(tt.jobs) - assert.Equal(t, tt.want, r.Resolve()) + r := newJobStatusResolver(tt.jobs, nil) + assert.Equal(t, tt.want, r.Resolve(t.Context())) }) } } diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index d0d2572b0b..fc1894c5d8 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -357,6 +357,19 @@ func handleWorkflows( continue } + wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(dwf.Content) + if err != nil { + log.Error("ReadWorkflowRawConcurrency: %v", err) + continue + } + if wfRawConcurrency != nil { + err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars) + if err != nil { + log.Error("EvaluateRunConcurrencyFillModel: %v", err) + continue + } + } + giteaCtx := GenerateGiteaContext(run, nil) jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext())) @@ -369,21 +382,7 @@ func handleWorkflows( run.Title = jobs[0].RunName } - // cancel running jobs if the event is push or pull_request_sync - if run.Event == webhook_module.HookEventPush || - run.Event == webhook_module.HookEventPullRequestSync { - if err := CancelPreviousJobs( - ctx, - run.RepoID, - run.Ref, - run.WorkflowID, - run.Event, - ); err != nil { - log.Error("CancelPreviousJobs: %v", err) - } - } - - if err := actions_model.InsertRun(ctx, run, jobs); err != nil { + if err := InsertRun(ctx, run, jobs); err != nil { log.Error("InsertRun: %v", err) continue } diff --git a/services/actions/run.go b/services/actions/run.go new file mode 100644 index 0000000000..a3356d71c1 --- /dev/null +++ b/services/actions/run.go @@ -0,0 +1,127 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/util" + + "github.com/nektos/act/pkg/jobparser" + "gopkg.in/yaml.v3" +) + +// InsertRun inserts a run +// The title will be cut off at 255 characters if it's longer than 255 characters. +func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) error { + return db.WithTx(ctx, func(ctx context.Context) error { + index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) + if err != nil { + return err + } + run.Index = index + run.Title = util.EllipsisDisplayString(run.Title, 255) + + // check run (workflow-level) concurrency + run.Status, err = PrepareToStartRunWithConcurrency(ctx, run) + if err != nil { + return err + } + + if err := db.Insert(ctx, run); err != nil { + return err + } + + if err := run.LoadRepo(ctx); err != nil { + return err + } + + if err := actions_model.UpdateRepoRunsNumbers(ctx, run.Repo); err != nil { + return err + } + + // query vars for evaluating job concurrency groups + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", run.ID, err) + } + + runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs)) + var hasWaitingJobs bool + for _, v := range jobs { + id, job := v.Job() + needs := job.Needs() + if err := v.SetJob(id, job.EraseNeeds()); err != nil { + return err + } + payload, _ := v.Marshal() + + shouldBlockJob := len(needs) > 0 || run.NeedApproval || run.Status == actions_model.StatusBlocked + + job.Name = util.EllipsisDisplayString(job.Name, 255) + runJob := &actions_model.ActionRunJob{ + RunID: run.ID, + RepoID: run.RepoID, + OwnerID: run.OwnerID, + CommitSHA: run.CommitSHA, + IsForkPullRequest: run.IsForkPullRequest, + Name: job.Name, + WorkflowPayload: payload, + JobID: id, + Needs: needs, + RunsOn: job.RunsOn(), + Status: util.Iif(shouldBlockJob, actions_model.StatusBlocked, actions_model.StatusWaiting), + } + // check job concurrency + if job.RawConcurrency != nil { + rawConcurrency, err := yaml.Marshal(job.RawConcurrency) + if err != nil { + return fmt.Errorf("marshal raw concurrency: %w", err) + } + runJob.RawConcurrency = string(rawConcurrency) + + // do not evaluate job concurrency when it requires `needs`, the jobs with `needs` will be evaluated later by job emitter + if len(needs) == 0 { + err = EvaluateJobConcurrencyFillModel(ctx, run, runJob, vars) + if err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + } + + // If a job needs other jobs ("needs" is not empty), its status is set to StatusBlocked at the entry of the loop + // No need to check job concurrency for a blocked job (it will be checked by job emitter later) + if runJob.Status == actions_model.StatusWaiting { + runJob.Status, err = PrepareToStartJobWithConcurrency(ctx, runJob) + if err != nil { + return fmt.Errorf("prepare to start job with concurrency: %w", err) + } + } + } + + hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting + if err := db.Insert(ctx, runJob); err != nil { + return err + } + + runJobs = append(runJobs, runJob) + } + + run.Status = actions_model.AggregateJobStatus(runJobs) + if err := actions_model.UpdateRun(ctx, run, "status"); err != nil { + return err + } + + // if there is a job in the waiting status, increase tasks version. + if hasWaitingJobs { + if err := actions_model.IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + return err + } + } + + return nil + }) +} diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go index c029c5a1a2..3b37b44ac4 100644 --- a/services/actions/schedule_tasks.go +++ b/services/actions/schedule_tasks.go @@ -53,20 +53,6 @@ func startTasks(ctx context.Context) error { // Loop through each spec and create a schedule task for it for _, row := range specs { - // cancel running jobs if the event is push - if row.Schedule.Event == webhook_module.HookEventPush { - // cancel running jobs of the same workflow - if err := CancelPreviousJobs( - ctx, - row.RepoID, - row.Schedule.Ref, - row.Schedule.WorkflowID, - webhook_module.HookEventSchedule, - ); err != nil { - log.Error("CancelPreviousJobs: %v", err) - } - } - if row.Repo.IsArchived { // Skip if the repo is archived continue @@ -144,9 +130,19 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) if err != nil { return err } + wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(cron.Content) + if err != nil { + return err + } + if wfRawConcurrency != nil { + err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars) + if err != nil { + return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err) + } + } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := InsertRun(ctx, run, workflows); err != nil { return err } allJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) diff --git a/services/actions/workflow.go b/services/actions/workflow.go index 40b34194e9..e3e60d4967 100644 --- a/services/actions/workflow.go +++ b/services/actions/workflow.go @@ -100,6 +100,7 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re // find workflow from commit var workflows []*jobparser.SingleWorkflow var entry *git.TreeEntry + var wfRawConcurrency *model.RawConcurrency run := &actions_model.ActionRun{ Title: strings.SplitN(runTargetCommit.CommitMessage, "\n", 2)[0], @@ -170,6 +171,11 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re ) } + wfRawConcurrency, err = jobparser.ReadWorkflowRawConcurrency(content) + if err != nil { + return err + } + // ctx.Req.PostForm -> WorkflowDispatchPayload.Inputs -> ActionRun.EventPayload -> runner: ghc.Event // https://docs.github.com/en/actions/learn-github-actions/contexts#github-context // https://docs.github.com/en/webhooks/webhook-events-and-payloads#workflow_dispatch @@ -187,19 +193,20 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re } run.EventPayload = string(eventPayload) - // cancel running jobs of the same workflow - if err := CancelPreviousJobs( - ctx, - run.RepoID, - run.Ref, - run.WorkflowID, - run.Event, - ); err != nil { - log.Error("CancelRunningJobs: %v", err) + // cancel running jobs of the same concurrency group + if wfRawConcurrency != nil { + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return fmt.Errorf("GetVariablesOfRun: %w", err) + } + err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars) + if err != nil { + return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err) + } } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := InsertRun(ctx, run, workflows); err != nil { return fmt.Errorf("InsertRun: %w", err) } diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go new file mode 100644 index 0000000000..cc61368260 --- /dev/null +++ b/tests/integration/actions_concurrency_test.go @@ -0,0 +1,1709 @@ +// Copyright 2025 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package integration + +import ( + "encoding/base64" + "fmt" + "net/http" + "net/url" + "testing" + "time" + + actions_model "code.gitea.io/gitea/models/actions" + auth_model "code.gitea.io/gitea/models/auth" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unittest" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/setting" + api "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + webhook_module "code.gitea.io/gitea/modules/webhook" + actions_service "code.gitea.io/gitea/services/actions" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/stretchr/testify/assert" +) + +func TestWorkflowConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/myvar", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "abc123", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusCreated) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +concurrency: + group: workflow-main-abc123-user2 +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: + group: workflow-${{ gitea.ref_name }}-${{ vars.myvar }}-${{ gitea.event.pusher.username }} +jobs: + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +concurrency: + group: workflow-main-abc${{ 123 }}-${{ gitea.event.pusher.username }} +jobs: + wf3-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow3' +` + // push workflow1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + // fetch and exec workflow1 + task := runner.fetchTask(t) + _, _, run := getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-1.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // fetch workflow2 + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-2.yml", run.WorkflowID) + + // push workflow3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + runner.fetchNoTask(t) + + // exec workflow2 + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch and exec workflow3 + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-3.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + }) +} + +func TestWorkflowConcurrencyShort(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/myvar", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "abc123", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusCreated) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +concurrency: workflow-main-abc123-user2 +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: workflow-${{ gitea.ref_name }}-${{ vars.myvar }}-${{ gitea.event.pusher.username }} +jobs: + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +concurrency: workflow-main-abc${{ 123 }}-${{ gitea.event.pusher.username }} +jobs: + wf3-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow3' +` + // push workflow1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + // fetch and exec workflow1 + task := runner.fetchTask(t) + _, _, run := getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-1.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // fetch workflow2 + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-2.yml", run.WorkflowID) + + // push workflow3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + runner.fetchNoTask(t) + + // exec workflow2 + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch and exec workflow3 + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-3.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + }) +} + +func TestWorkflowConcurrencyShortJson(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/myvar", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "abc123", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusCreated) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +concurrency: |- + ${{ fromjson('{ + "group": "workflow-main-abc123-user2", + "cancel-in-progress": false + }') }} +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: |- + ${{ fromjson('{ + "group": "workflow-main-abc123-user2", + "cancel-in-progress": false + }') }} +jobs: + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +concurrency: |- + ${{ fromjson('{ + "group": "workflow-main-abc123-user2", + "cancel-in-progress": false + }') }} +jobs: + wf3-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow3' +` + // push workflow1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + // fetch and exec workflow1 + task := runner.fetchTask(t) + _, _, run := getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-1.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // fetch workflow2 + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-2.yml", run.WorkflowID) + + // push workflow3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + runner.fetchNoTask(t) + + // exec workflow2 + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch and exec workflow3 + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123-user2", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-3.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + }) +} + +func TestPullRequestWorkflowConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + // user2 is the owner of the base repo + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + user2Session := loginUser(t, user2.Name) + user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + // user4 is the owner of the forked repo + user4 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4}) + user4Token := getTokenForLoggedInUser(t, loginUser(t, user4.Name), auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiBaseRepo := createActionsTestRepo(t, user2Token, "actions-concurrency", false) + baseRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiBaseRepo.ID}) + user2APICtx := NewAPITestContext(t, baseRepo.OwnerName, baseRepo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(user2APICtx)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + // init the workflow + wfTreePath := ".gitea/workflows/pull.yml" + wfFileContent := `name: Pull Request +on: pull_request +concurrency: + group: pull-request-test + cancel-in-progress: ${{ !startsWith(gitea.head_ref, 'do-not-cancel/') }} +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'test the pull' +` + opts1 := getWorkflowCreateFileOptions(user2, baseRepo.DefaultBranch, "create %s"+wfTreePath, wfFileContent) + createWorkflowFile(t, user2Token, baseRepo.OwnerName, baseRepo.Name, wfTreePath, opts1) + // user2 creates a pull request + doAPICreateFile(user2APICtx, "user2-fix.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "bugfix/aaa", + Message: "create user2-fix.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user2-fix")), + })(t) + doAPICreatePullRequest(user2APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, "bugfix/aaa")(t) + pr1Task1 := runner.fetchTask(t) + _, _, pr1Run1 := getTaskAndJobAndRunByTaskID(t, pr1Task1.Id) + assert.Equal(t, "pull-request-test", pr1Run1.ConcurrencyGroup) + assert.True(t, pr1Run1.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, pr1Run1.Status) + + // user4 forks the repo + req := NewRequestWithJSON(t, "POST", fmt.Sprintf("/api/v1/repos/%s/%s/forks", baseRepo.OwnerName, baseRepo.Name), + &api.CreateForkOption{ + Name: util.ToPointer("actions-concurrency-fork"), + }).AddTokenAuth(user4Token) + resp := MakeRequest(t, req, http.StatusAccepted) + var apiForkRepo api.Repository + DecodeJSON(t, resp, &apiForkRepo) + forkRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiForkRepo.ID}) + user4APICtx := NewAPITestContext(t, user4.Name, forkRepo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(user4APICtx)(t) + + // user4 creates a pull request from branch "bugfix/bbb" + doAPICreateFile(user4APICtx, "user4-fix.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "bugfix/bbb", + Message: "create user4-fix.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix")), + })(t) + doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, user4.Name+":bugfix/bbb")(t) + // cannot fetch the task because an approval is required + runner.fetchNoTask(t) + // user2 approves the run + pr2Run1 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: baseRepo.ID, TriggerUserID: user4.ID}) + req = NewRequestWithValues(t, "POST", + fmt.Sprintf("/%s/%s/actions/runs/%d/approve", baseRepo.OwnerName, baseRepo.Name, pr2Run1.Index), + map[string]string{ + "_csrf": GetUserCSRFToken(t, user2Session), + }) + user2Session.MakeRequest(t, req, http.StatusOK) + // fetch the task and the previous task has been cancelled + pr2Task1 := runner.fetchTask(t) + _, _, pr2Run1 = getTaskAndJobAndRunByTaskID(t, pr2Task1.Id) + assert.Equal(t, "pull-request-test", pr2Run1.ConcurrencyGroup) + assert.True(t, pr2Run1.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, pr2Run1.Status) + pr1Run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: pr1Run1.ID}) + assert.Equal(t, actions_model.StatusCancelled, pr1Run1.Status) + + // user4 creates another pull request from branch "do-not-cancel/ccc" + doAPICreateFile(user4APICtx, "user4-fix2.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "do-not-cancel/ccc", + Message: "create user4-fix2.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix2")), + })(t) + doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, user4.Name+":do-not-cancel/ccc")(t) + // cannot fetch the task because cancel-in-progress is false + runner.fetchNoTask(t) + runner.execTask(t, pr2Task1, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + pr2Run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: pr2Run1.ID}) + assert.Equal(t, actions_model.StatusSuccess, pr2Run1.Status) + // fetch the task + pr3Task1 := runner.fetchTask(t) + _, _, pr3Run1 := getTaskAndJobAndRunByTaskID(t, pr3Task1.Id) + assert.Equal(t, "pull-request-test", pr3Run1.ConcurrencyGroup) + assert.False(t, pr3Run1.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, pr3Run1.Status) + }) +} + +func TestJobConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner1 := newMockRunner() + runner1.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-1", []string{"runner1"}, false) + runner2 := newMockRunner() + runner2.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-2", []string{"runner2"}, false) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/version_var", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "v1.23.0", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusCreated) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +jobs: + wf1-job1: + runs-on: runner1 + concurrency: + group: job-main-${{ vars.version_var }} + steps: + - run: echo 'wf1-job1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +jobs: + wf2-job1: + runs-on: runner2 + outputs: + version: ${{ steps.version_step.outputs.app_version }} + steps: + - id: version_step + run: echo "app_version=v1.23.0" >> "$GITHUB_OUTPUT" + wf2-job2: + runs-on: runner1 + needs: [wf2-job1] + concurrency: + group: job-main-${{ needs.wf2-job1.outputs.version }} + steps: + - run: echo 'wf2-job2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +jobs: + wf3-job1: + runs-on: runner1 + concurrency: + group: job-main-${{ vars.version_var }} + cancel-in-progress: ${{ vars.version_var == 'v1.23.0' }} + steps: + - run: echo 'wf3-job1' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + + // fetch wf1-job1 + wf1Job1Task := runner1.fetchTask(t) + _, wf1Job1ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf1Job1Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf1Job1ActionJob.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, wf1Job1ActionJob.Status) + // fetch and exec wf2-job1 + wf2Job1Task := runner2.fetchTask(t) + runner2.execTask(t, wf2Job1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "version": "v1.23.0", + }, + }) + // cannot fetch wf2-job2 because wf1-job1 is running + runner1.fetchNoTask(t) + // exec wf1-job1 + runner1.execTask(t, wf1Job1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + // fetch wf2-job2 + wf2Job2Task := runner1.fetchTask(t) + _, wf2Job2ActionJob, wf2Run := getTaskAndJobAndRunByTaskID(t, wf2Job2Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf2Job2ActionJob.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, wf2Job2ActionJob.Status) + // push workflow3 to trigger wf3-job1 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // fetch wf3-job1 + wf3Job1Task := runner1.fetchTask(t) + _, wf3Job1ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf3Job1Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf3Job1ActionJob.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, wf3Job1ActionJob.Status) + // wf2-job2 has been cancelled + _, wf2Job2ActionJob, _ = getTaskAndJobAndRunByTaskID(t, wf2Job2Task.Id) + assert.Equal(t, actions_model.StatusCancelled, wf2Job2ActionJob.Status) + + // rerun wf2 + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, repo.Name, wf2Run.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + // (rerun1) cannot fetch wf2-job2 + runner1.fetchNoTask(t) + // (rerun1) fetch and exec wf2-job1 + wf2Job1Rerun1Task := runner2.fetchTask(t) + _, _, wf2Rerun1Run := getTaskAndJobAndRunByTaskID(t, wf2Job1Rerun1Task.Id) + assert.Equal(t, wf2Rerun1Run.ID, wf2Run.ID) + runner2.execTask(t, wf2Job1Rerun1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "version": "v1.24.0", + }, + }) + // (rerun1) fetch and exec wf2-job2 + wf2Job2Rerun1Task := runner1.fetchTask(t) + runner1.execTask(t, wf2Job2Rerun1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + _, wf2Job2Rerun1Job, _ := getTaskAndJobAndRunByTaskID(t, wf2Job2Rerun1Task.Id) + assert.Equal(t, "job-main-v1.24.0", wf2Job2Rerun1Job.ConcurrencyGroup) + + // rerun wf2-job2 + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, repo.Name, wf2Run.Index, 1), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + // (rerun2) fetch and exec wf2-job2 + wf2Job2Rerun2Task := runner1.fetchTask(t) + runner1.execTask(t, wf2Job2Rerun2Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + _, wf2Job2Rerun2Job, _ := getTaskAndJobAndRunByTaskID(t, wf2Job2Rerun2Task.Id) + assert.Equal(t, "job-main-v1.24.0", wf2Job2Rerun2Job.ConcurrencyGroup) + }) +} + +func TestMatrixConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + linuxRunner := newMockRunner() + linuxRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-linux-runner", []string{"linux-runner"}, false) + windowsRunner := newMockRunner() + windowsRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-windows-runner", []string{"windows-runner"}, false) + darwinRunner := newMockRunner() + darwinRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-darwin-runner", []string{"darwin-runner"}, false) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +jobs: + wf1-job: + runs-on: ${{ matrix.os }}-runner + strategy: + matrix: + os: [windows, linux] + concurrency: + group: job-os-${{ matrix.os }} + steps: + - run: echo 'wf1' +` + + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +jobs: + wf2-job: + runs-on: ${{ matrix.os }}-runner + strategy: + matrix: + os: [darwin, windows, linux] + concurrency: + group: job-os-${{ matrix.os }} + steps: + - run: echo 'wf2' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + job1WinTask := windowsRunner.fetchTask(t) + job1LinuxTask := linuxRunner.fetchTask(t) + windowsRunner.fetchNoTask(t) + linuxRunner.fetchNoTask(t) + _, job1WinJob, _ := getTaskAndJobAndRunByTaskID(t, job1WinTask.Id) + assert.Equal(t, "wf1-job (windows)", job1WinJob.Name) + assert.Equal(t, "job-os-windows", job1WinJob.ConcurrencyGroup) + _, job1LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job1LinuxTask.Id) + assert.Equal(t, "wf1-job (linux)", job1LinuxJob.Name) + assert.Equal(t, "job-os-linux", job1LinuxJob.ConcurrencyGroup) + + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + job2DarwinTask := darwinRunner.fetchTask(t) + _, job2DarwinJob, _ := getTaskAndJobAndRunByTaskID(t, job2DarwinTask.Id) + assert.Equal(t, "wf2-job (darwin)", job2DarwinJob.Name) + assert.Equal(t, "job-os-darwin", job2DarwinJob.ConcurrencyGroup) + + windowsRunner.execTask(t, job1WinTask, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + linuxRunner.execTask(t, job1LinuxTask, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + job2WinTask := windowsRunner.fetchTask(t) + job2LinuxTask := linuxRunner.fetchTask(t) + _, job2WinJob, _ := getTaskAndJobAndRunByTaskID(t, job2WinTask.Id) + assert.Equal(t, "wf2-job (windows)", job2WinJob.Name) + assert.Equal(t, "job-os-windows", job2WinJob.ConcurrencyGroup) + _, job2LinuxJob, _ := getTaskAndJobAndRunByTaskID(t, job2LinuxTask.Id) + assert.Equal(t, "wf2-job (linux)", job2LinuxJob.Name) + assert.Equal(t, "job-os-linux", job2LinuxJob.ConcurrencyGroup) + }) +} + +func TestWorkflowDispatchConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/workflow-dispatch-concurrency.yml" + wf1FileContent := `name: workflow-dispatch-concurrency +on: + workflow_dispatch: + inputs: + appVersion: + description: 'APP version' + required: true + default: 'v1.23' + type: choice + options: + - v1.21 + - v1.22 + - v1.23 + cancel: + description: 'Cancel running workflows' + required: false + type: boolean + default: false +concurrency: + group: workflow-dispatch-${{ inputs.appVersion }} + cancel-in-progress: ${{ inputs.cancel }} +jobs: + job: + runs-on: ubuntu-latest + steps: + - run: echo 'workflow dispatch job' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // run the workflow with appVersion=v1.21 and cancel=false + urlStr := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "workflow-dispatch-concurrency.yml") + req := NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.21", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "workflow-dispatch-v1.21", run1.ConcurrencyGroup) + + // run the workflow with appVersion=v1.22 and cancel=false + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run2.ConcurrencyGroup) + + // run the workflow with appVersion=v1.22 and cancel=false again + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + runner.fetchNoTask(t) // cannot fetch task because task2 is not completed + + // run the workflow with appVersion=v1.22 and cancel=true + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + "cancel": "on", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task4 := runner.fetchTask(t) + _, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run4.ConcurrencyGroup) + _, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2.Status) + }) +} + +func TestWorkflowDispatchRerunAllJobsConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/workflow-dispatch-concurrency.yml" + wf1FileContent := `name: workflow-dispatch-concurrency +on: + workflow_dispatch: + inputs: + appVersion: + description: 'APP version' + required: true + default: 'v1.23' + type: choice + options: + - v1.21 + - v1.22 + - v1.23 + cancel: + description: 'Cancel running workflows' + required: false + type: boolean + default: false +concurrency: + group: workflow-dispatch-${{ inputs.appVersion }} + cancel-in-progress: ${{ inputs.cancel }} +jobs: + job: + runs-on: ubuntu-latest + steps: + - run: echo 'workflow dispatch job' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // run the workflow with appVersion=v1.21 and cancel=false + urlStr := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "workflow-dispatch-concurrency.yml") + req := NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.21", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "workflow-dispatch-v1.21", run1.ConcurrencyGroup) + + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run2.ConcurrencyGroup) + + // run the workflow with appVersion=v1.22 and cancel=false again + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + + runner.fetchNoTask(t) // cannot fetch task because task2 is not completed + + // run the workflow with appVersion=v1.22 and cancel=true + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + "cancel": "on", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task4 := runner.fetchTask(t) + _, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id) + assert.Equal(t, actions_model.StatusRunning, run4.Status) + assert.Equal(t, "workflow-dispatch-v1.22", run4.ConcurrencyGroup) + _, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2.Status) + + runner.execTask(t, task4, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // rerun cancel true scenario + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run4.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + task5 := runner.fetchTask(t) + _, _, run4_1 := getTaskAndJobAndRunByTaskID(t, task5.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run4_1.ConcurrencyGroup) + assert.Equal(t, run4.ID, run4_1.ID) + _, _, run2_1 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2_1.Status) + + runner.execTask(t, task5, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_CANCELLED, + }) + + // rerun cancel false scenario + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + run2_2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID}) + assert.Equal(t, actions_model.StatusWaiting, run2_2.Status) + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + task6 := runner.fetchTask(t) + _, _, run3 := getTaskAndJobAndRunByTaskID(t, task6.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup) + + run2_2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2_2.ID}) + assert.Equal(t, actions_model.StatusCancelled, run2_2.Status) // cancelled by run3 + }) +} + +func TestWorkflowDispatchRerunSingleJobConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/workflow-dispatch-concurrency.yml" + wf1FileContent := `name: workflow-dispatch-concurrency +on: + workflow_dispatch: + inputs: + appVersion: + description: 'APP version' + required: true + default: 'v1.23' + type: choice + options: + - v1.21 + - v1.22 + - v1.23 + cancel: + description: 'Cancel running workflows' + required: false + type: boolean + default: false +concurrency: + group: workflow-dispatch-${{ inputs.appVersion }} + cancel-in-progress: ${{ inputs.cancel }} +jobs: + job: + runs-on: ubuntu-latest + steps: + - run: echo 'workflow dispatch job' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // run the workflow with appVersion=v1.21 and cancel=false + urlStr := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "workflow-dispatch-concurrency.yml") + req := NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.21", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "workflow-dispatch-v1.21", run1.ConcurrencyGroup) + + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run2.ConcurrencyGroup) + + // run the workflow with appVersion=v1.22 and cancel=false again + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + + runner.fetchNoTask(t) // cannot fetch task because task2 is not completed + + // run the workflow with appVersion=v1.22 and cancel=true + req = NewRequestWithValues(t, "POST", urlStr, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + "ref": "refs/heads/main", + "appVersion": "v1.22", + "cancel": "on", + }) + session.MakeRequest(t, req, http.StatusSeeOther) + task4 := runner.fetchTask(t) + _, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id) + assert.Equal(t, actions_model.StatusRunning, run4.Status) + assert.Equal(t, "workflow-dispatch-v1.22", run4.ConcurrencyGroup) + _, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2.Status) + + runner.execTask(t, task4, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // rerun cancel true scenario + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, apiRepo.Name, run2.Index, 1), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, apiRepo.Name, run4.Index, 1), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + task5 := runner.fetchTask(t) + _, _, run4_1 := getTaskAndJobAndRunByTaskID(t, task5.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run4_1.ConcurrencyGroup) + assert.Equal(t, run4.ID, run4_1.ID) + _, _, run2_1 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusCancelled, run2_1.Status) + + runner.execTask(t, task5, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_CANCELLED, + }) + + // rerun cancel false scenario + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, apiRepo.Name, run2.Index, 1), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + run2_2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID}) + assert.Equal(t, actions_model.StatusWaiting, run2_2.Status) + + req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/jobs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1, 1), map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + _ = session.MakeRequest(t, req, http.StatusOK) + + task6 := runner.fetchTask(t) + _, _, run3 := getTaskAndJobAndRunByTaskID(t, task6.Id) + assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup) + + run2_2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2_2.ID}) + assert.Equal(t, actions_model.StatusCancelled, run2_2.Status) // cancelled by run3 + }) +} + +func TestScheduleConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/schedule-concurrency.yml" + wf1FileContent := `name: schedule-concurrency +on: + push: + schedule: + - cron: '@every 1m' +concurrency: + group: schedule-concurrency + cancel-in-progress: ${{ gitea.event_name == 'push' }} +jobs: + job: + runs-on: ubuntu-latest + steps: + - run: echo 'schedule workflow' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // fetch the task triggered by push + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "schedule-concurrency", run1.ConcurrencyGroup) + assert.True(t, run1.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventPush), run1.TriggerEvent) + assert.Equal(t, actions_model.StatusRunning, run1.Status) + + // trigger the task by schedule + spec := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionScheduleSpec{RepoID: repo.ID}) + spec.Next = timeutil.TimeStampNow() // manually update "Next" + assert.NoError(t, actions_model.UpdateScheduleSpec(t.Context(), spec, "next")) + assert.NoError(t, actions_service.StartScheduleTasks(t.Context())) + runner.fetchNoTask(t) // cannot fetch because task1 is not completed + runner.execTask(t, task1, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + _, _, run1 = getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, actions_model.StatusSuccess, run1.Status) + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "schedule-concurrency", run2.ConcurrencyGroup) + assert.False(t, run2.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventSchedule), run2.TriggerEvent) + assert.Equal(t, actions_model.StatusRunning, run2.Status) + + // trigger the task by schedule again + spec = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionScheduleSpec{RepoID: repo.ID}) + spec.Next = timeutil.TimeStampNow() // manually update "Next" + assert.NoError(t, actions_model.UpdateScheduleSpec(t.Context(), spec, "next")) + assert.NoError(t, actions_service.StartScheduleTasks(t.Context())) + runner.fetchNoTask(t) // cannot fetch because task2 is not completed + run3 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, Status: actions_model.StatusBlocked}) + assert.Equal(t, "schedule-concurrency", run3.ConcurrencyGroup) + assert.False(t, run3.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventSchedule), run3.TriggerEvent) + + // trigger the task by push + doAPICreateFile(httpContext, "doc.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "main", + Message: "create doc.txt", + Author: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Committer: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("doc")), + })(t) + + task4 := runner.fetchTask(t) + _, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id) + assert.Equal(t, "schedule-concurrency", run4.ConcurrencyGroup) + assert.True(t, run4.ConcurrencyCancel) + assert.Equal(t, string(webhook_module.HookEventPush), run4.TriggerEvent) + run3 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run3.ID}) + assert.Equal(t, actions_model.StatusCancelled, run3.Status) + }) +} + +func TestWorkflowAndJobConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner1 := newMockRunner() + runner1.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-1", []string{"runner1"}, false) + runner2 := newMockRunner() + runner2.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-2", []string{"runner2"}, false) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +concurrency: + group: workflow-group-1 +jobs: + wf1-job1: + runs-on: runner1 + concurrency: + group: job-group-1 + steps: + - run: echo 'wf1-job1' + wf1-job2: + runs-on: runner2 + concurrency: + group: job-group-2 + steps: + - run: echo 'wf1-job2' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: + group: workflow-group-1 +jobs: + wf2-job1: + runs-on: runner1 + concurrency: + group: job-group-1 + steps: + - run: echo 'wf2-job1' + wf2-job2: + runs-on: runner2 + concurrency: + group: job-group-2 + steps: + - run: echo 'wf2-job2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +concurrency: + group: workflow-group-2 +jobs: + wf3-job1: + runs-on: runner1 + concurrency: + group: job-group-1 + steps: + - run: echo 'wf3-job1' +` + + wf4TreePath := ".gitea/workflows/concurrent-workflow-4.yml" + wf4FileContent := `name: concurrent-workflow-4 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-4.yml' +concurrency: + group: workflow-group-2 +jobs: + wf4-job1: + runs-on: runner2 + concurrency: + group: job-group-2 + cancel-in-progress: true + steps: + - run: echo 'wf4-job1' +` + + // push workflow 1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + + // fetch wf1-job1 and wf1-job2 + w1j1Task := runner1.fetchTask(t) + w1j2Task := runner2.fetchTask(t) + _, w1j1Job, w1Run := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id) + assert.Equal(t, "job-group-1", w1j1Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-1", w1Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-1.yml", w1Run.WorkflowID) + assert.Equal(t, actions_model.StatusRunning, w1j1Job.Status) + _, w1j2Job, _ := getTaskAndJobAndRunByTaskID(t, w1j2Task.Id) + assert.Equal(t, "job-group-2", w1j2Job.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, w1j2Job.Status) + + // push workflow 2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1" + runner1.fetchNoTask(t) + runner2.fetchNoTask(t) + // query wf2-job1 from db and check its status + w2Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-2.yml"}) + w2j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w2Run.ID, JobID: "wf2-job1"}) + assert.Equal(t, actions_model.StatusBlocked, w2j1Job.Status) + + // push workflow 3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1" + runner1.fetchNoTask(t) + // query wf3-job1 from db and check its status + w3Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-3.yml"}) + w3j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w3Run.ID, JobID: "wf3-job1"}) + assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status) + // wf2-job1 is cancelled by wf3-job1 + w2j1Job = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: w2j1Job.ID}) + assert.Equal(t, actions_model.StatusCancelled, w2j1Job.Status) + + // exec wf1-job1 + runner1.execTask(t, w1j1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch wf3-job1 + assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status) + w3j1Task := runner1.fetchTask(t) + _, w3j1Job, w3Run = getTaskAndJobAndRunByTaskID(t, w3j1Task.Id) + assert.Equal(t, "job-group-1", w3j1Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-2", w3Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-3.yml", w3Run.WorkflowID) + + // exec wf1-job2 + runner2.execTask(t, w1j2Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch wf2-job2 + w2j2Task := runner2.fetchTask(t) + _, w2j2Job, w2Run := getTaskAndJobAndRunByTaskID(t, w2j2Task.Id) + assert.Equal(t, "job-group-2", w2j2Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-1", w2Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-2.yml", w2Run.WorkflowID) + assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status) + + // push workflow-4 + opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf4TreePath, wf4FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4) + // cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2" + runner2.fetchNoTask(t) + + // exec wf3-job1 + runner1.execTask(t, w3j1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch wf4-job1 + w4j1Task := runner2.fetchTask(t) + // all tasks have been fetched + runner1.fetchNoTask(t) + runner2.fetchNoTask(t) + + _, w2j2Job, w2Run = getTaskAndJobAndRunByTaskID(t, w2j2Task.Id) + // wf2-job2 is cancelled because wf4-job1's cancel-in-progress is true + assert.Equal(t, actions_model.StatusCancelled, w2j2Job.Status) + assert.Equal(t, actions_model.StatusCancelled, w2Run.Status) + _, w4j1Job, w4Run := getTaskAndJobAndRunByTaskID(t, w4j1Task.Id) + assert.Equal(t, "job-group-2", w4j1Job.ConcurrencyGroup) + assert.Equal(t, "workflow-group-2", w4Run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-4.yml", w4Run.WorkflowID) + }) +} + +func TestCancelConcurrentRun(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + user2Session := loginUser(t, user2.Name) + user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, user2Token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + user2APICtx := NewAPITestContext(t, repo.OwnerName, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(user2APICtx)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, repo.OwnerName, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + // init the workflow + wfTreePath := ".gitea/workflows/run.yml" + wfFileContent := `name: Cancel Run +on: push +concurrency: + group: cancel-run-group + cancel-in-progress: false +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'test' +` + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wfTreePath, wfFileContent) + createWorkflowFile(t, user2Token, repo.OwnerName, repo.Name, wfTreePath, opts1) + + // fetch and check the first task + task1 := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id) + assert.Equal(t, "cancel-run-group", run1.ConcurrencyGroup) + assert.False(t, run1.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, run1.Status) + + // push another file to trigger the workflow again + doAPICreateFile(user2APICtx, "file1.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + Message: "create file1.txt", + Author: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Committer: api.Identity{ + Name: user2.Name, + Email: user2.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("file1")), + })(t) + + // cannot fetch the second task because the first task is not completed + runner.fetchNoTask(t) + + // cancel the first run + req := NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/cancel", user2.Name, repo.Name, run1.Index), map[string]string{ + "_csrf": GetUserCSRFToken(t, user2Session), + }) + user2Session.MakeRequest(t, req, http.StatusOK) + + // the first run has been cancelled + run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run1.ID}) + assert.Equal(t, actions_model.StatusCancelled, run1.Status) + + // fetch and check the second task + task2 := runner.fetchTask(t) + _, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, "cancel-run-group", run2.ConcurrencyGroup) + assert.False(t, run2.ConcurrencyCancel) + assert.Equal(t, actions_model.StatusRunning, run2.Status) + }) +} + +func TestAbandonConcurrentRun(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + user2Session := loginUser(t, user2.Name) + user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, user2Token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + user2APICtx := NewAPITestContext(t, repo.OwnerName, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(user2APICtx)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, repo.OwnerName, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/workflow-1.yml" + wf1FileContent := `name: Workflow-1 +on: + push: + paths: + - '.gitea/workflows/workflow-1.yml' +concurrency: + group: test-group +jobs: + wf1-job1: + runs-on: ubuntu-latest + steps: + - run: echo 'wf1-job1' + wf1-job2: + runs-on: customized-runner + steps: + - run: echo 'wf1-job1' +` + + wf2TreePath := ".gitea/workflows/workflow-2.yml" + wf2FileContent := `name: Workflow-2 +on: + push: + paths: + - '.gitea/workflows/workflow-2.yml' +concurrency: + group: test-group +jobs: + wf2-job1: + runs-on: ubuntu-latest + steps: + - run: echo 'wf2-job1' +` + // push workflow1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent) + createWorkflowFile(t, user2Token, repo.OwnerName, repo.Name, wf1TreePath, opts1) + + // fetch wf1-job1 + w1j1Task := runner.fetchTask(t) + _, _, run1 := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id) + assert.Equal(t, "test-group", run1.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, run1.Status) + // query wf1-job2 from db and check its status + w1j2Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: run1.ID, JobID: "wf1-job2"}) + // wf1-job2 is waiting but no runner will run it + assert.Equal(t, actions_model.StatusWaiting, w1j2Job.Status) + + time.Sleep(time.Second) + now := time.Now() + time.Sleep(time.Second) + + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent) + createWorkflowFile(t, user2Token, repo.OwnerName, repo.Name, wf2TreePath, opts2) + + // query run2 from db and check its status + run2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "workflow-2.yml"}) + // run2 is blocked because it is blocked by workflow1's concurrency group "test-group" + assert.Equal(t, actions_model.StatusBlocked, run2.Status) + + // mock time + fakeNow := now.Add(setting.Actions.AbandonedJobTimeout) + timeutil.MockSet(fakeNow) + defer timeutil.MockUnset() + + // call CancelAbandonedJobs manually + assert.NoError(t, actions_service.CancelAbandonedJobs(t.Context())) + + // check the status of wf1-job2 + w1j2Job = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: w1j2Job.ID}) + assert.Equal(t, actions_model.StatusCancelled, w1j2Job.Status) + // check the status of run1 + run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run1.ID}) + assert.Equal(t, actions_model.StatusCancelled, run1.Status) + + // fetch wf2-job1 and check + w2j1Task := runner.fetchTask(t) + _, w2j1Job, run2 := getTaskAndJobAndRunByTaskID(t, w2j1Task.Id) + assert.Equal(t, "test-group", run2.ConcurrencyGroup) + assert.Equal(t, "wf2-job1", w2j1Job.JobID) + assert.Equal(t, actions_model.StatusRunning, run2.Status) + assert.Equal(t, actions_model.StatusRunning, w2j1Job.Status) + }) +} + +func TestRunAndJobWithSameConcurrencyGroup(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + defer doAPIDeleteRepository(httpContext)(t) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +jobs: + wf1-job: + runs-on: ubuntu-latest + concurrency: + group: test-group + steps: + - run: echo 'wf1-job' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: + group: test-group +jobs: + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'wf2-job' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +jobs: + wf3-job: + runs-on: ubuntu-latest + concurrency: + group: test-group + cancel-in-progress: true + steps: + - run: echo 'wf3-job' +` + // push workflow1 + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf1TreePath, wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + // fetch run1 + task := runner.fetchTask(t) + _, job1, run1 := getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "test-group", job1.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, run1.Status) + + // push workflow2 + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf2TreePath, wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + // cannot fetch run2 because run1 is still running + runner.fetchNoTask(t) + run2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-2.yml"}) + assert.Equal(t, "test-group", run2.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusBlocked, run2.Status) + + // exec run1 + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch run2 + task2 := runner.fetchTask(t) + _, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id) + assert.Equal(t, actions_model.StatusRunning, run2.Status) + + // push workflow3 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create "+wf3TreePath, wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // fetch run3 + task3 := runner.fetchTask(t) + _, job3, run3 := getTaskAndJobAndRunByTaskID(t, task3.Id) + assert.Equal(t, "test-group", job3.ConcurrencyGroup) + assert.Equal(t, actions_model.StatusRunning, run3.Status) + + // run2 should be cancelled by run3 + run2 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: run2.ID}) + assert.Equal(t, actions_model.StatusCancelled, run2.Status) + }) +}