0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-01-21 19:52:18 +01:00

reusable workflow logs

This commit is contained in:
Zettat123 2026-01-15 22:41:49 -07:00
parent 95ea2df00a
commit 37c73bd12b
9 changed files with 349 additions and 4 deletions

View File

@ -52,6 +52,12 @@ type ActionRun struct {
RawConcurrency string // raw concurrency
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"`
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"`
// ParentJobID is the ID of the parent job if the run is a reusable workflow.
// 0 - normal run, no parent job;
// >0 - the parent job ID.
ParentJobID int64 `xorm:"index"`
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp

View File

@ -51,6 +51,12 @@ type ActionRunJob struct {
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress
// ChildRunID is the ID of a reusable workflow run.
// 0 - normal job, no child run;
// -1 - child run is being created;
// >0 - the child run ID.
ChildRunID int64 `xorm:"index"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`

View File

@ -236,7 +236,7 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
}
var jobs []*ActionRunJob
if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
if err := e.Where("task_id=? AND status=? AND child_run_id=?", 0, StatusWaiting, 0).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
return nil, false, err
}

View File

@ -399,6 +399,7 @@ func prepareMigrationTasks() []*migration {
newMigration(323, "Add support for actions concurrency", v1_26.AddActionsConcurrency),
newMigration(324, "Fix closed milestone completeness for milestones with no issues", v1_26.FixClosedMilestoneCompleteness),
newMigration(325, "Add parent job id to ActionRun and child run id to ActionRunJob", v1_26.AddParentRunAndJobToActionRun),
}
return preparedMigrations
}

View File

@ -0,0 +1,30 @@
// Copyright 2025 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package v1_26
import (
"xorm.io/xorm"
)
func AddParentRunAndJobToActionRun(x *xorm.Engine) error {
type ActionRun struct {
ParentJobID int64 `xorm:"index"`
}
type ActionRunJob struct {
ChildRunID int64 `xorm:"index"`
}
if _, err := x.SyncWithOptions(xorm.SyncOptions{
IgnoreIndices: true,
IgnoreConstrains: true,
}, &ActionRun{}); err != nil {
return err
}
_, err := x.SyncWithOptions(xorm.SyncOptions{
IgnoreIndices: true,
IgnoreConstrains: true,
}, &ActionRunJob{})
return err
}

View File

@ -302,6 +302,41 @@ func GetActionsUserRepoPermission(ctx context.Context, repo *repo_model.Reposito
return perm, nil
}
// GetActionsUserRepoPermissionByRepoID returns the actions user permissions to the repository
func GetActionsUserRepoPermissionByRepoID(ctx context.Context, repo *repo_model.Repository, actionsUser *user_model.User, sourceRepoID int64, isForkPullRequest bool) (perm Permission, err error) {
if actionsUser.ID != user_model.ActionsUserID {
return perm, errors.New("api GetActionsUserRepoPermissionByRepoID can only be called by the actions user")
}
var accessMode perm_model.AccessMode
if sourceRepoID != repo.ID {
taskRepo, exist, err := db.GetByID[repo_model.Repository](ctx, sourceRepoID)
if err != nil || !exist {
return perm, err
}
actionsCfg := repo.MustGetUnit(ctx, unit.TypeActions).ActionsConfig()
if !actionsCfg.IsCollaborativeOwner(taskRepo.OwnerID) || !taskRepo.IsPrivate {
perm, err = GetUserRepoPermission(ctx, repo, user_model.NewActionsUser())
if err != nil {
return perm, err
}
perm.AccessMode = min(perm.AccessMode, perm_model.AccessModeRead)
return perm, nil
}
accessMode = perm_model.AccessModeRead
} else if isForkPullRequest {
accessMode = perm_model.AccessModeRead
} else {
accessMode = perm_model.AccessModeWrite
}
if err := repo.LoadUnits(ctx); err != nil {
return perm, err
}
perm.SetUnitsWithDefaultAccessMode(repo.Units, accessMode)
return perm, nil
}
// GetUserRepoPermission returns the user permissions to the repository
func GetUserRepoPermission(ctx context.Context, repo *repo_model.Repository, user *user_model.User) (perm Permission, err error) {
defer func() {

View File

@ -93,6 +93,12 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
for _, job := range updatedJobs {
_ = job.LoadAttributes(ctx)
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
if job.ChildRunID == -1 && job.Status == actions_model.StatusWaiting {
if err := expandReusableWorkflow(ctx, job); err != nil {
log.Error("ensure reusable workflow child run for job %d: %v", job.ID, err)
}
}
}
runJobs := make(map[int64][]*actions_model.ActionRunJob)
for _, job := range jobs {
@ -239,6 +245,41 @@ func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_m
return
}
notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
updateParentJobFromChildRun(ctx, job.Run)
}
func updateParentJobFromChildRun(ctx context.Context, run *actions_model.ActionRun) {
if run.ParentJobID == 0 || !run.Status.IsDone() {
return
}
parentJob, exist, err := db.GetByID[actions_model.ActionRunJob](ctx, run.ParentJobID)
if err != nil {
log.Error("Get parent job %d: %v", run.ParentJobID, err)
return
}
if !exist {
return
}
parentJob.Status = run.Status
parentJob.Stopped = run.Stopped
if _, err := actions_model.UpdateRunJob(ctx, parentJob, nil, "status", "stopped"); err != nil {
log.Error("Update parent job %d: %v", parentJob.ID, err)
return
}
parentRun, exist, err := db.GetByID[actions_model.ActionRun](ctx, parentJob.RunID)
if err != nil {
log.Error("Get parent run %d: %v", parentJob.RunID, err)
return
}
if exist {
CreateCommitStatusForRunJobs(ctx, parentRun, parentJob)
if err := EmitJobsIfReadyByRun(parentJob.RunID); err != nil {
log.Error("EmitJobsIfReadyByRun for parent run %d: %v", parentJob.RunID, err)
}
}
}
type jobStatusResolver struct {

View File

@ -0,0 +1,207 @@
// Copyright 2025 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
actions_model "code.gitea.io/gitea/models/actions"
access_model "code.gitea.io/gitea/models/perm/access"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unit"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/setting"
"github.com/nektos/act/pkg/jobparser"
actmodel "github.com/nektos/act/pkg/model"
act_runner_pkg "github.com/nektos/act/pkg/runner"
"gopkg.in/yaml.v3"
"xorm.io/builder"
)
func expandReusableWorkflows(ctx context.Context, run *actions_model.ActionRun, jobs []*actions_model.ActionRunJob, vars map[string]string) error {
for _, job := range jobs {
if job.ChildRunID != -1 {
// should not happen
continue
}
workflowJob, err := job.ParseJob()
if err != nil {
return err
}
ref, err := act_runner_pkg.ParseReusableWorkflowRef(workflowJob.Uses)
if err != nil {
return err
}
if err := createChildRunFromReusableWorkflow(ctx, run, job, workflowJob, ref, vars); err != nil {
return err
}
}
return nil
}
func expandReusableWorkflow(ctx context.Context, parentJob *actions_model.ActionRunJob) error {
if parentJob.ChildRunID != -1 {
// should not happen
return fmt.Errorf("no need to expand")
}
workflowJob, err := parentJob.ParseJob()
if err != nil {
return err
}
ref, err := act_runner_pkg.ParseReusableWorkflowRef(workflowJob.Uses)
if err != nil {
return err
}
if err := parentJob.LoadAttributes(ctx); err != nil {
return err
}
vars, err := actions_model.GetVariablesOfRun(ctx, parentJob.Run)
if err != nil {
return err
}
if err := createChildRunFromReusableWorkflow(ctx, parentJob.Run, parentJob, workflowJob, ref, vars); err != nil {
return err
}
return nil
}
func createChildRunFromReusableWorkflow(ctx context.Context, run *actions_model.ActionRun, parentJob *actions_model.ActionRunJob, workflowJob *jobparser.Job, ref *act_runner_pkg.ReusableWorkflowRef, vars map[string]string) error {
content, err := loadReusableWorkflowContent(ctx, run, ref)
if err != nil {
return err
}
inputsWithDefaults, err := buildWorkflowCallInputs(ctx, run, parentJob, workflowJob, content, vars)
if err != nil {
return err
}
eventPayload, err := json.Marshal(map[string]any{
"inputs": inputsWithDefaults,
})
if err != nil {
return err
}
childRun := &actions_model.ActionRun{
Title: run.Title,
RepoID: run.RepoID,
Repo: run.Repo,
OwnerID: run.OwnerID,
ParentJobID: parentJob.ID,
WorkflowID: ref.WorkflowPath,
TriggerUserID: run.TriggerUserID,
TriggerUser: run.TriggerUser,
Ref: run.Ref,
CommitSHA: run.CommitSHA,
IsForkPullRequest: run.IsForkPullRequest,
Event: "workflow_call",
TriggerEvent: "workflow_call",
EventPayload: string(eventPayload),
Status: actions_model.StatusWaiting,
NeedApproval: run.NeedApproval,
}
if err := PrepareRunAndInsert(ctx, content, childRun, inputsWithDefaults); err != nil {
return err
}
parentJob.ChildRunID = childRun.ID
if _, err := actions_model.UpdateRunJob(ctx, parentJob, builder.Eq{"child_run_id": -1}, "child_run_id"); err != nil {
return err
}
return nil
}
func buildWorkflowCallInputs(ctx context.Context, run *actions_model.ActionRun, parentJob *actions_model.ActionRunJob, workflowJob *jobparser.Job, content []byte, vars map[string]string) (map[string]any, error) {
singleWorkflow := &jobparser.SingleWorkflow{}
if err := yaml.Unmarshal(content, singleWorkflow); err != nil {
return nil, fmt.Errorf("unmarshal workflow: %w", err)
}
workflow := &actmodel.Workflow{
RawOn: singleWorkflow.RawOn,
}
giteaCtx := GenerateGiteaContext(run, parentJob)
inputs, err := getInputsFromRun(run)
if err != nil {
return nil, fmt.Errorf("get inputs: %w", err)
}
results, err := findJobNeedsAndFillJobResults(ctx, parentJob)
if err != nil {
return nil, fmt.Errorf("get job results: %w", err)
}
return jobparser.EvaluateWorkflowCallInputs(workflow, parentJob.JobID, workflowJob, giteaCtx, results, vars, inputs)
}
func loadReusableWorkflowContent(ctx context.Context, run *actions_model.ActionRun, ref *act_runner_pkg.ReusableWorkflowRef) ([]byte, error) {
if ref.Kind == act_runner_pkg.ReusableWorkflowKindLocal {
if err := run.LoadRepo(ctx); err != nil {
return nil, err
}
return readWorkflowContentFromRepo(ctx, run.Repo, run.Ref, ref.WorkflowPath)
}
if ref.Kind == act_runner_pkg.ReusableWorkflowKindOtherRepo || isSameInstanceHost(ref.Host) {
repo, err := repo_model.GetRepositoryByOwnerAndName(ctx, ref.Owner, ref.Repo)
if err != nil {
return nil, err
}
if repo.IsPrivate {
perm, err := access_model.GetActionsUserRepoPermissionByRepoID(ctx, repo, user_model.NewActionsUser(), run.RepoID, run.IsForkPullRequest)
if err != nil {
return nil, err
}
if !perm.CanRead(unit.TypeCode) {
return nil, errors.New("actions user has no access to reusable workflow repo")
}
}
return readWorkflowContentFromRepo(ctx, repo, ref.Ref, ref.WorkflowPath)
}
content, err := act_runner_pkg.FetchReusableWorkflowContent(ctx, ref)
if err != nil {
return nil, err
}
return content, nil
}
func readWorkflowContentFromRepo(ctx context.Context, repo *repo_model.Repository, ref, workflowPath string) ([]byte, error) {
gitRepo, err := gitrepo.OpenRepository(ctx, repo)
if err != nil {
return nil, err
}
defer gitRepo.Close()
commit, err := gitRepo.GetCommit(ref)
if err != nil {
return nil, err
}
content, err := commit.GetFileContent(workflowPath, 1024*1024)
if err != nil {
return nil, err
}
return []byte(content), nil
}
func isSameInstanceHost(host string) bool {
appURL, err := url.Parse(setting.AppURL)
if err != nil {
return false
}
h, err := url.Parse(host)
if err != nil {
return false
}
return strings.EqualFold(h.Host, appURL.Host)
}

View File

@ -9,6 +9,7 @@ import (
actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"
notify_service "code.gitea.io/gitea/services/notify"
@ -75,7 +76,9 @@ func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model
// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow, vars map[string]string) error {
return db.WithTx(ctx, func(ctx context.Context) error {
var readyUsesJobs []*actions_model.ActionRunJob // jobs that have "uses" and are ready to run
if err := db.WithTx(ctx, func(ctx context.Context) error {
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
if err != nil {
return err
@ -153,7 +156,14 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
}
}
hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting
if job.Uses != "" {
runJob.ChildRunID = -1
if runJob.Status == actions_model.StatusWaiting {
readyUsesJobs = append(readyUsesJobs, runJob)
}
} else {
hasWaitingJobs = hasWaitingJobs || runJob.Status == actions_model.StatusWaiting
}
if err := db.Insert(ctx, runJob); err != nil {
return err
}
@ -174,5 +184,14 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
}
return nil
})
}); err != nil {
return err
}
if err := expandReusableWorkflows(ctx, run, readyUsesJobs, vars); err != nil {
// TODO: need to rollback and show an error message to the user
log.Error("expandReusableWorkflows for run %d: %v", run.ID, err)
}
return nil
}