0
0
mirror of https://github.com/go-gitea/gitea.git synced 2025-12-08 15:45:27 +01:00

Merge e68da9a037e65ed969a40e811c8d79c3023eaf57 into 98ef79d73a6a546241dd02959ae17f136369b604

This commit is contained in:
ChristopherHX 2025-12-07 23:24:07 +01:00 committed by GitHub
commit 47a5d22ac7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 84 additions and 42 deletions

View File

@ -8,6 +8,7 @@ import (
"crypto/subtle"
"errors"
"fmt"
"math/rand"
"time"
auth_model "code.gitea.io/gitea/models/auth"
@ -223,6 +224,17 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
e := db.GetEngine(ctx)
// Create a new task record as early as possible to be able to reserve jobs
task := &ActionTask{
RunnerID: runner.ID,
Status: StatusBlocked,
}
// This is a requirement of the database schema
if err := task.GenerateToken(); err != nil {
return nil, false, err
}
// Insert the task on demand if task.ID == 0, if at least one job matches avoid unnecessary id increment
jobCond := builder.NewCond()
if runner.RepoID != 0 {
jobCond = builder.Eq{"repo_id": runner.RepoID}
@ -235,20 +247,57 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond))
}
var job *ActionRunJob
const limit = 10
// TODO store the last position to continue searching next time inside the runner record
// e.g. we would start again from zero if no job matches our known labels
// For stable paging
var lastUpdated timeutil.TimeStamp
// TODO: a more efficient way to filter labels
log.Trace("runner labels: %v", runner.AgentLabels)
backoffGen := rand.New(rand.NewSource(time.Now().UnixNano() ^ runner.ID))
for page := 0; job == nil; page++ {
var jobs []*ActionRunJob
if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
// Load only 10 job in a batch without all fields for memory / db load reduction
if err := e.Where("task_id=? AND status=? AND updated>?", 0, StatusWaiting, lastUpdated).Cols("id", "runs_on", "updated").And(jobCond).Asc("updated", "id").Limit(limit).Find(&jobs); err != nil {
return nil, false, err
}
// TODO: a more efficient way to filter labels
var job *ActionRunJob
log.Trace("runner labels: %v", runner.AgentLabels)
for _, v := range jobs {
if runner.CanMatchLabels(v.RunsOn) {
job = v
// Insert on demand, auto removed by aborted transaction if no job matches
if task.ID == 0 {
if _, err := e.Insert(task); err != nil {
return nil, false, err
}
}
// Reserve our job before preparing task, otherwise continue searching
v.TaskID = task.ID
if n, err := UpdateRunJob(ctx, v, builder.Eq{"task_id": 0}, "task_id"); err != nil {
return nil, false, err
} else if n == 1 {
var exist bool
// reload to get all fields
if job, exist, err = db.GetByID[ActionRunJob](ctx, v.ID); err != nil || !exist {
return nil, false, err
}
break
}
}
lastUpdated = v.Updated
}
if len(jobs) < limit {
break
}
// Randomly distribute retries over time to reduce contention
jitter := time.Duration(backoffGen.Int63n(int64(util.Iif(page < 4, page+1, 5))*20)) * time.Millisecond // random jitter
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case <-time.After(jitter):
}
}
if job == nil {
return nil, false, nil
}
@ -261,32 +310,27 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
job.Started = now
job.Status = StatusRunning
task := &ActionTask{
JobID: job.ID,
Attempt: job.Attempt,
RunnerID: runner.ID,
Started: now,
Status: StatusRunning,
RepoID: job.RepoID,
OwnerID: job.OwnerID,
CommitSHA: job.CommitSHA,
IsForkPullRequest: job.IsForkPullRequest,
}
if err := task.GenerateToken(); err != nil {
return nil, false, err
}
workflowJob, err := job.ParseJob()
if err != nil {
return nil, false, fmt.Errorf("load job %d: %w", job.ID, err)
}
if _, err := e.Insert(task); err != nil {
task.Job = job
task.JobID = job.ID
task.Attempt = job.Attempt
task.Started = now
task.Status = StatusRunning
task.RepoID = job.RepoID
task.OwnerID = job.OwnerID
task.CommitSHA = job.CommitSHA
task.IsForkPullRequest = job.IsForkPullRequest
task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)
if err := UpdateTask(ctx, task, "job_id", "attempt", "started", "status", "repo_id", "owner_id", "commit_sha", "is_fork_pull_request", "log_filename"); err != nil {
return nil, false, err
}
task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)
if err := UpdateTask(ctx, task, "log_filename"); err != nil {
if _, err := UpdateRunJob(ctx, job, builder.Eq{"id": job.ID}, "attempt", "started", "status"); err != nil {
return nil, false, err
}
@ -308,15 +352,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
task.Steps = steps
}
job.TaskID = task.ID
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
return nil, false, err
} else if n != 1 {
return nil, false, nil
}
task.Job = job
if err := committer.Commit(); err != nil {
return nil, false, err
}

View File

@ -45,15 +45,22 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar
return nil, status.Error(codes.Unauthenticated, "unregistered runner")
}
cols := []string{"last_online"}
runner.LastOnline = timeutil.TimeStampNow()
if methodName == "UpdateTask" || methodName == "UpdateLog" {
runner.LastActive = timeutil.TimeStampNow()
// Reduce db writes by only updating last active/online when needed
var cols []string
now := timeutil.TimeStampNow()
if runner.LastActive.AddDuration(actions_model.RunnerOfflineTime/2) < now {
runner.LastOnline = now
cols = append(cols, "last_online")
}
if (methodName == "UpdateTask" || methodName == "UpdateLog") && runner.LastActive.AddDuration(actions_model.RunnerIdleTime/2) < now {
runner.LastActive = now
cols = append(cols, "last_active")
}
if cols != nil {
if err := actions_model.UpdateRunner(ctx, runner, cols...); err != nil {
log.Error("can't update runner status: %v", err)
}
}
ctx = context.WithValue(ctx, runnerCtxKey{}, runner)
return unaryFunc(ctx, request)