mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-11-04 10:44:12 +01:00 
			
		
		
		
	Add a transaction to pickTask (#33543)
				
					
				
			In the old `pickTask`, when getting secrets or variables failed, the task could get stuck in the `running` status (task status is `running` but the runner did not fetch the task). To fix this issue, these steps should be in one transaction. --------- Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
		
							parent
							
								
									245ac321c3
								
							
						
					
					
						commit
						06f1065636
					
				@ -156,7 +156,7 @@ func (s *Service) FetchTask(
 | 
			
		||||
		// if the task version in request is not equal to the version in db,
 | 
			
		||||
		// it means there may still be some tasks not be assgined.
 | 
			
		||||
		// try to pick a task for the runner that send the request.
 | 
			
		||||
		if t, ok, err := pickTask(ctx, runner); err != nil {
 | 
			
		||||
		if t, ok, err := actions_service.PickTask(ctx, runner); err != nil {
 | 
			
		||||
			log.Error("pick task failed: %v", err)
 | 
			
		||||
			return nil, status.Errorf(codes.Internal, "pick task: %v", err)
 | 
			
		||||
		} else if ok {
 | 
			
		||||
 | 
			
		||||
@ -1,95 +0,0 @@
 | 
			
		||||
// Copyright 2022 The Gitea Authors. All rights reserved.
 | 
			
		||||
// SPDX-License-Identifier: MIT
 | 
			
		||||
 | 
			
		||||
package runner
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	actions_model "code.gitea.io/gitea/models/actions"
 | 
			
		||||
	secret_model "code.gitea.io/gitea/models/secret"
 | 
			
		||||
	"code.gitea.io/gitea/modules/log"
 | 
			
		||||
	"code.gitea.io/gitea/services/actions"
 | 
			
		||||
 | 
			
		||||
	runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
 | 
			
		||||
	"google.golang.org/protobuf/types/known/structpb"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func pickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv1.Task, bool, error) {
 | 
			
		||||
	t, ok, err := actions_model.CreateTaskForRunner(ctx, runner)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, false, fmt.Errorf("CreateTaskForRunner: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, false, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	secrets, err := secret_model.GetSecretsOfTask(ctx, t)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, false, fmt.Errorf("GetSecretsOfTask: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, false, fmt.Errorf("GetVariablesOfRun: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	actions.CreateCommitStatus(ctx, t.Job)
 | 
			
		||||
 | 
			
		||||
	task := &runnerv1.Task{
 | 
			
		||||
		Id:              t.ID,
 | 
			
		||||
		WorkflowPayload: t.Job.WorkflowPayload,
 | 
			
		||||
		Context:         generateTaskContext(t),
 | 
			
		||||
		Secrets:         secrets,
 | 
			
		||||
		Vars:            vars,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if needs, err := findTaskNeeds(ctx, t); err != nil {
 | 
			
		||||
		log.Error("Cannot find needs for task %v: %v", t.ID, err)
 | 
			
		||||
		// Go on with empty needs.
 | 
			
		||||
		// If return error, the task will be wild, which means the runner will never get it when it has been assigned to the runner.
 | 
			
		||||
		// In contrast, missing needs is less serious.
 | 
			
		||||
		// And the task will fail and the runner will report the error in the logs.
 | 
			
		||||
	} else {
 | 
			
		||||
		task.Needs = needs
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return task, true, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func generateTaskContext(t *actions_model.ActionTask) *structpb.Struct {
 | 
			
		||||
	giteaRuntimeToken, err := actions.CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error("actions.CreateAuthorizationToken failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gitCtx := actions.GenerateGiteaContext(t.Job.Run, t.Job)
 | 
			
		||||
	gitCtx["token"] = t.Token
 | 
			
		||||
	gitCtx["gitea_runtime_token"] = giteaRuntimeToken
 | 
			
		||||
 | 
			
		||||
	taskContext, err := structpb.NewStruct(gitCtx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error("structpb.NewStruct failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return taskContext
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func findTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*runnerv1.TaskNeed, error) {
 | 
			
		||||
	if err := task.LoadAttributes(ctx); err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("task LoadAttributes: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	taskNeeds, err := actions.FindTaskNeeds(ctx, task.Job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	ret := make(map[string]*runnerv1.TaskNeed, len(taskNeeds))
 | 
			
		||||
	for jobID, taskNeed := range taskNeeds {
 | 
			
		||||
		ret[jobID] = &runnerv1.TaskNeed{
 | 
			
		||||
			Outputs: taskNeed.Outputs,
 | 
			
		||||
			Result:  runnerv1.Result(taskNeed.Result),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ret, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										107
									
								
								services/actions/task.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										107
									
								
								services/actions/task.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,107 @@
 | 
			
		||||
// Copyright 2022 The Gitea Authors. All rights reserved.
 | 
			
		||||
// SPDX-License-Identifier: MIT
 | 
			
		||||
 | 
			
		||||
package actions
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	actions_model "code.gitea.io/gitea/models/actions"
 | 
			
		||||
	"code.gitea.io/gitea/models/db"
 | 
			
		||||
	secret_model "code.gitea.io/gitea/models/secret"
 | 
			
		||||
 | 
			
		||||
	runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
 | 
			
		||||
	"google.golang.org/protobuf/types/known/structpb"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv1.Task, bool, error) {
 | 
			
		||||
	var (
 | 
			
		||||
		task *runnerv1.Task
 | 
			
		||||
		job  *actions_model.ActionRunJob
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if err := db.WithTx(ctx, func(ctx context.Context) error {
 | 
			
		||||
		t, ok, err := actions_model.CreateTaskForRunner(ctx, runner)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("CreateTaskForRunner: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := t.LoadAttributes(ctx); err != nil {
 | 
			
		||||
			return fmt.Errorf("task LoadAttributes: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
		job = t.Job
 | 
			
		||||
 | 
			
		||||
		secrets, err := secret_model.GetSecretsOfTask(ctx, t)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("GetSecretsOfTask: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("GetVariablesOfRun: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		needs, err := findTaskNeeds(ctx, job)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("findTaskNeeds: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		taskContext, err := generateTaskContext(t)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("generateTaskContext: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		task = &runnerv1.Task{
 | 
			
		||||
			Id:              t.ID,
 | 
			
		||||
			WorkflowPayload: t.Job.WorkflowPayload,
 | 
			
		||||
			Context:         taskContext,
 | 
			
		||||
			Secrets:         secrets,
 | 
			
		||||
			Vars:            vars,
 | 
			
		||||
			Needs:           needs,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return nil
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		return nil, false, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if task == nil {
 | 
			
		||||
		return nil, false, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	CreateCommitStatus(ctx, job)
 | 
			
		||||
 | 
			
		||||
	return task, true, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func generateTaskContext(t *actions_model.ActionTask) (*structpb.Struct, error) {
 | 
			
		||||
	giteaRuntimeToken, err := CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gitCtx := GenerateGiteaContext(t.Job.Run, t.Job)
 | 
			
		||||
	gitCtx["token"] = t.Token
 | 
			
		||||
	gitCtx["gitea_runtime_token"] = giteaRuntimeToken
 | 
			
		||||
 | 
			
		||||
	return structpb.NewStruct(gitCtx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func findTaskNeeds(ctx context.Context, taskJob *actions_model.ActionRunJob) (map[string]*runnerv1.TaskNeed, error) {
 | 
			
		||||
	taskNeeds, err := FindTaskNeeds(ctx, taskJob)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	ret := make(map[string]*runnerv1.TaskNeed, len(taskNeeds))
 | 
			
		||||
	for jobID, taskNeed := range taskNeeds {
 | 
			
		||||
		ret[jobID] = &runnerv1.TaskNeed{
 | 
			
		||||
			Outputs: taskNeed.Outputs,
 | 
			
		||||
			Result:  runnerv1.Result(taskNeed.Result),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ret, nil
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user