0
0
mirror of https://github.com/go-gitea/gitea.git synced 2025-12-08 20:01:46 +01:00

Move rerunJob logic to shared actions_service.RerunJob function for reuse across API and WEBUI

This commit is contained in:
Ross Golder 2025-10-23 22:38:20 +07:00
parent b6ba7241c2
commit 0a2fc6fec5
No known key found for this signature in database
GPG Key ID: 253A7E508D2D59CD
3 changed files with 108 additions and 129 deletions

View File

@ -141,10 +141,12 @@ func RerunWorkflowRun(ctx *context.APIContext) {
for _, job := range jobs {
// If the job has needs, it should be set to "blocked" status to wait for other jobs
shouldBlock := len(job.Needs) > 0
if err := rerunJob(ctx, job, shouldBlock); err != nil {
if err := actions_service.RerunJob(ctx, job, shouldBlock); err != nil {
ctx.APIErrorInternal(err)
return
}
actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job)
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
}
ctx.Status(200)
@ -431,10 +433,12 @@ func RerunWorkflowJob(ctx *context.APIContext) {
for _, j := range rerunJobs {
// Jobs other than the specified one should be set to "blocked" status
shouldBlock := j.JobID != job.JobID
if err := rerunJob(ctx, j, shouldBlock); err != nil {
if err := actions_service.RerunJob(ctx, j, shouldBlock); err != nil {
ctx.APIErrorInternal(err)
return
}
actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, j)
notify_service.WorkflowJobStatusUpdate(ctx, j.Run.Repo, j.Run.TriggerUser, j, nil)
}
ctx.Status(200)
@ -487,38 +491,7 @@ func getRunJobsAndCurrent(ctx *context.APIContext, runID, jobIndex int64) (*acti
return jobs[0], jobs, nil
}
func rerunJob(ctx *context.APIContext, job *actions_model.ActionRunJob, shouldBlock bool) error {
if job.Run == nil {
if err := job.LoadRun(ctx); err != nil {
return err
}
}
status := job.Status
if !status.IsDone() || !job.Run.Status.IsDone() {
return nil
}
job.TaskID = 0
job.Status = actions_model.StatusWaiting
if shouldBlock {
job.Status = actions_model.StatusBlocked
}
job.Started = 0
job.Stopped = 0
if err := db.WithTx(ctx, func(ctx stdCtx.Context) error {
_, err := actions_model.UpdateRunJob(ctx, job, nil, "task_id", "status", "started", "stopped")
return err
}); err != nil {
return err
}
actions_service.CreateCommitStatusForRunJobs(ctx, job.Run, job)
actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job)
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
return nil
}
// LogCursor represents a log cursor position
type LogCursor struct {

View File

@ -412,12 +412,6 @@ func Rerun(ctx *context_module.Context) {
return
}
// rerun is not allowed if the run is not done
if !run.Status.IsDone() {
ctx.JSONError(ctx.Locale.Tr("actions.runs.not_done"))
return
}
// can not rerun job when workflow is disabled
cfgUnit := ctx.Repo.Repository.MustGetUnit(ctx, unit.TypeActions)
cfg := cfgUnit.ActionsConfig()
@ -426,62 +420,67 @@ func Rerun(ctx *context_module.Context) {
return
}
// reset run's start and stop time
run.PreviousDuration = run.Duration()
run.Started = 0
run.Stopped = 0
run.Status = actions_model.StatusWaiting
vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
ctx.ServerError("GetVariablesOfRun", fmt.Errorf("get run %d variables: %w", run.ID, err))
return
}
if run.RawConcurrency != "" {
var rawConcurrency model.RawConcurrency
if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil {
ctx.ServerError("UnmarshalRawConcurrency", fmt.Errorf("unmarshal raw concurrency: %w", err))
return
}
err = actions_service.EvaluateRunConcurrencyFillModel(ctx, run, &rawConcurrency, vars)
if err != nil {
ctx.ServerError("EvaluateRunConcurrencyFillModel", err)
return
}
run.Status, err = actions_service.PrepareToStartRunWithConcurrency(ctx, run)
if err != nil {
ctx.ServerError("PrepareToStartRunWithConcurrency", err)
return
}
}
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil {
ctx.ServerError("UpdateRun", err)
return
}
if err := run.LoadAttributes(ctx); err != nil {
ctx.ServerError("run.LoadAttributes", err)
return
}
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
// check run (workflow-level) concurrency
job, jobs := getRunJobs(ctx, runIndex, jobIndex)
if ctx.Written() {
return
}
// reset run's start and stop time when it is done
if run.Status.IsDone() {
run.PreviousDuration = run.Duration()
run.Started = 0
run.Stopped = 0
run.Status = actions_model.StatusWaiting
vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
ctx.ServerError("GetVariablesOfRun", fmt.Errorf("get run %d variables: %w", run.ID, err))
return
}
if run.RawConcurrency != "" {
var rawConcurrency model.RawConcurrency
if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil {
ctx.ServerError("UnmarshalRawConcurrency", fmt.Errorf("unmarshal raw concurrency: %w", err))
return
}
err = actions_service.EvaluateRunConcurrencyFillModel(ctx, run, &rawConcurrency, vars)
if err != nil {
ctx.ServerError("EvaluateRunConcurrencyFillModel", err)
return
}
run.Status, err = actions_service.PrepareToStartRunWithConcurrency(ctx, run)
if err != nil {
ctx.ServerError("PrepareToStartRunWithConcurrency", err)
return
}
}
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil {
ctx.ServerError("UpdateRun", err)
return
}
if err := run.LoadAttributes(ctx); err != nil {
ctx.ServerError("run.LoadAttributes", err)
return
}
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
}
isRunBlocked := run.Status == actions_model.StatusBlocked
if jobIndexStr == "" { // rerun all jobs
for _, j := range jobs {
// if the job has needs, it should be set to "blocked" status to wait for other jobs
shouldBlockJob := len(j.Needs) > 0 || isRunBlocked
if err := rerunJob(ctx, j, shouldBlockJob); err != nil {
if err := actions_service.RerunJob(ctx, j, shouldBlockJob); err != nil {
ctx.ServerError("RerunJob", err)
return
}
notify_service.WorkflowJobStatusUpdate(ctx, j.Run.Repo, j.Run.TriggerUser, j, nil)
}
ctx.JSONOK()
return
@ -492,63 +491,17 @@ func Rerun(ctx *context_module.Context) {
for _, j := range rerunJobs {
// jobs other than the specified one should be set to "blocked" status
shouldBlockJob := j.JobID != job.JobID || isRunBlocked
if err := rerunJob(ctx, j, shouldBlockJob); err != nil {
if err := actions_service.RerunJob(ctx, j, shouldBlockJob); err != nil {
ctx.ServerError("RerunJob", err)
return
}
notify_service.WorkflowJobStatusUpdate(ctx, j.Run.Repo, j.Run.TriggerUser, j, nil)
}
ctx.JSONOK()
}
func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shouldBlock bool) error {
status := job.Status
if !status.IsDone() {
return nil
}
job.TaskID = 0
job.Status = util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting)
job.Started = 0
job.Stopped = 0
job.ConcurrencyGroup = ""
job.ConcurrencyCancel = false
job.IsConcurrencyEvaluated = false
if err := job.LoadRun(ctx); err != nil {
return err
}
vars, err := actions_model.GetVariablesOfRun(ctx, job.Run)
if err != nil {
return fmt.Errorf("get run %d variables: %w", job.Run.ID, err)
}
if job.RawConcurrency != "" && !shouldBlock {
err = actions_service.EvaluateJobConcurrencyFillModel(ctx, job.Run, job, vars)
if err != nil {
return fmt.Errorf("evaluate job concurrency: %w", err)
}
job.Status, err = actions_service.PrepareToStartJobWithConcurrency(ctx, job)
if err != nil {
return err
}
}
if err := db.WithTx(ctx, func(ctx context.Context) error {
updateCols := []string{"task_id", "status", "started", "stopped", "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"}
_, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, updateCols...)
return err
}); err != nil {
return err
}
actions_service.CreateCommitStatusForRunJobs(ctx, job.Run, job)
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
return nil
}
func Logs(ctx *context_module.Context) {
runIndex := getRunIndex(ctx)

View File

@ -5,9 +5,13 @@ package actions
import (
"context"
"fmt"
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/util"
"xorm.io/builder"
)
// ResetRunTimes resets the start and stop times for a run when it is done, for rerun
@ -21,6 +25,55 @@ func ResetRunTimes(ctx context.Context, run *actions_model.ActionRun) error {
return nil
}
// RerunJob reruns a job, handling concurrency and status updates
func RerunJob(ctx context.Context, job *actions_model.ActionRunJob, shouldBlock bool) error {
status := job.Status
if !status.IsDone() || !job.Run.Status.IsDone() {
return nil
}
job.TaskID = 0
job.Status = util.Iif(shouldBlock, actions_model.StatusBlocked, actions_model.StatusWaiting)
job.Started = 0
job.Stopped = 0
job.ConcurrencyGroup = ""
job.ConcurrencyCancel = false
job.IsConcurrencyEvaluated = false
if err := job.LoadRun(ctx); err != nil {
return err
}
vars, err := actions_model.GetVariablesOfRun(ctx, job.Run)
if err != nil {
return fmt.Errorf("get run %d variables: %w", job.Run.ID, err)
}
if job.RawConcurrency != "" && !shouldBlock {
err = EvaluateJobConcurrencyFillModel(ctx, job.Run, job, vars)
if err != nil {
return fmt.Errorf("evaluate job concurrency: %w", err)
}
job.Status, err = PrepareToStartJobWithConcurrency(ctx, job)
if err != nil {
return err
}
}
if err := db.WithTx(ctx, func(ctx context.Context) error {
updateCols := []string{"task_id", "status", "started", "stopped", "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"}
_, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, updateCols...)
return err
}); err != nil {
return err
}
CreateCommitStatusForRunJobs(ctx, job.Run, job)
return nil
}
// GetAllRerunJobs get all jobs that need to be rerun when job should be rerun
func GetAllRerunJobs(job *actions_model.ActionRunJob, allJobs []*actions_model.ActionRunJob) []*actions_model.ActionRunJob {
rerunJobs := []*actions_model.ActionRunJob{job}