0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-06-22 22:58:50 +02:00
bircni bc2fbe77b1
refactor(actions): read runner capabilities from proto field (#38068)
[actions-proto-go v0.6.0](https://gitea.com/gitea/actions-proto-go) adds
a
`capabilities` field to `RegisterRequest` and `DeclareRequest`. This
lets a
runner advertise the transitional `cancelling` capability directly in
the proto
message instead of through the out-of-band mechanism we used while the
proto
bump was pending.

This PR:

- Bumps `gitea.dev/actions-proto-go` to `v0.6.0`.
- Drops the forward-compat `capabilityGetter` type-assertion shim and
the
`runnerRequestHasCancellingCapability` helper, reading
`GetCapabilities()`
  directly (now part of the `declareRequest` interface).
- Removes the "capability state unknown → preserve existing value"
branch.

## Why the behaviour change is correct

The shim and the `(hasSupport, known)` two-value return only existed
because the
old proto had no `capabilities` field, so we couldn't tell "runner
doesn't
support it" from "we can't see the field." With v0.6.0 the field is
always
present. Since proto3 repeated fields have no presence, "no capabilities
sent"
now unambiguously means the runner does not advertise the capability, so
a
runner that omits `cancelling` is correctly recorded as
`HasCancellingSupport =
false`.

There is no regression: prior to this bump Gitea was on `v0.5.0`, where
the
type assertion always failed and `HasCancellingSupport` was therefore
never set
from requests — so no runner relied on the preserved-unknown path.

## Compatibility

The change is wire-compatible in both directions of version skew,
because the
new field uses a previously unused field number (8 on `RegisterRequest`,
3 on
`DeclareRequest`) and the transport uses the binary protobuf codec:

- **Old runner → new Gitea:** the runner omits the field; it decodes to
an empty
capability list. Registration/declaration succeed; the runner simply
doesn't
  get the cancelling feature.
- **New runner → old Gitea:** the runner sends the field; the old
server's
  generated code doesn't know the field number and silently ignores it.
  Registration/declaration succeed.

The feature only activates once both server and runner are on `v0.6.0`.
2026-06-11 09:18:31 +00:00

352 lines
11 KiB
Go

// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package runner
import (
"context"
"errors"
"net/http"
"slices"
runnerv1 "gitea.dev/actions-proto-go/runner/v1"
"gitea.dev/actions-proto-go/runner/v1/runnerv1connect"
actions_model "gitea.dev/models/actions"
repo_model "gitea.dev/models/repo"
user_model "gitea.dev/models/user"
"gitea.dev/modules/actions"
"gitea.dev/modules/log"
"gitea.dev/modules/util"
actions_service "gitea.dev/services/actions"
"connectrpc.com/connect"
gouuid "github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
func NewRunnerServiceHandler() (string, http.Handler) {
return runnerv1connect.NewRunnerServiceHandler(
&Service{},
connect.WithCompressMinBytes(1024),
withRunner,
)
}
var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
type Service struct{}
// Register for new runner.
func (s *Service) Register(
ctx context.Context,
req *connect.Request[runnerv1.RegisterRequest],
) (*connect.Response[runnerv1.RegisterResponse], error) {
if req.Msg.Token == "" || req.Msg.Name == "" {
return nil, errors.New("missing runner token, name")
}
runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
if err != nil {
return nil, errors.New("runner registration token not found")
}
if !runnerToken.IsActive {
return nil, errors.New("runner registration token has been invalidated, please use the latest one")
}
if runnerToken.OwnerID > 0 {
if _, err := user_model.GetUserByID(ctx, runnerToken.OwnerID); err != nil {
return nil, errors.New("owner of the token not found")
}
}
if runnerToken.RepoID > 0 {
if _, err := repo_model.GetRepositoryByID(ctx, runnerToken.RepoID); err != nil {
return nil, errors.New("repository of the token not found")
}
}
labels := req.Msg.Labels
hasCancellingSupport := slices.Contains(req.Msg.GetCapabilities(), runnerCapabilityCancelling)
// create new runner
name := util.EllipsisDisplayString(req.Msg.Name, 255)
runner := &actions_model.ActionRunner{
UUID: gouuid.New().String(),
Name: name,
OwnerID: runnerToken.OwnerID,
RepoID: runnerToken.RepoID,
Version: req.Msg.Version,
AgentLabels: labels,
Ephemeral: req.Msg.Ephemeral,
HasCancellingSupport: hasCancellingSupport,
}
runner.GenerateAndFillToken()
// create new runner
if err := actions_model.CreateRunner(ctx, runner); err != nil {
return nil, errors.New("can't create new runner")
}
// update token status
runnerToken.IsActive = true
if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
return nil, errors.New("can't update runner token status")
}
res := connect.NewResponse(&runnerv1.RegisterResponse{
Runner: &runnerv1.Runner{
Id: runner.ID,
Uuid: runner.UUID,
Token: runner.Token,
Name: runner.Name,
Version: runner.Version,
Labels: runner.AgentLabels,
Ephemeral: runner.Ephemeral,
},
})
return res, nil
}
// runnerCapabilityCancelling is the wire string the runner advertises in its
// capabilities list to indicate it understands the transitional cancelling
// state and will run post-step cleanup before finalizing the task.
const runnerCapabilityCancelling = "cancelling"
type declareRequest interface {
proto.Message
GetVersion() string
GetLabels() []string
GetCapabilities() []string
}
func applyDeclareRequestToRunner(runner *actions_model.ActionRunner, req declareRequest) []string {
runner.AgentLabels = req.GetLabels()
runner.Version = req.GetVersion()
cols := []string{"agent_labels", "version"}
hasCancellingSupport := slices.Contains(req.GetCapabilities(), runnerCapabilityCancelling)
if runner.HasCancellingSupport != hasCancellingSupport {
runner.HasCancellingSupport = hasCancellingSupport
cols = append(cols, "has_cancelling_support")
}
return cols
}
func (s *Service) Declare(
ctx context.Context,
req *connect.Request[runnerv1.DeclareRequest],
) (*connect.Response[runnerv1.DeclareResponse], error) {
runner := GetRunner(ctx)
if err := actions_model.UpdateRunner(ctx, runner, applyDeclareRequestToRunner(runner, req.Msg)...); err != nil {
return nil, status.Errorf(codes.Internal, "update runner: %v", err)
}
resp := connect.NewResponse(&runnerv1.DeclareResponse{
Runner: &runnerv1.Runner{
Id: runner.ID,
Uuid: runner.UUID,
Token: runner.Token,
Name: runner.Name,
Version: runner.Version,
Labels: runner.AgentLabels,
},
})
// Capabilities are communicated via headers to avoid a hard dependency on a proto bump.
// Older runners ignore unknown headers; newer runners can use this for feature negotiation.
resp.Header().Set("X-Gitea-Actions-Capabilities", actions_model.RunnerCapabilities())
return resp, nil
}
// FetchTask assigns a task to the runner
func (s *Service) FetchTask(
ctx context.Context,
req *connect.Request[runnerv1.FetchTaskRequest],
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
runner := GetRunner(ctx)
var task *runnerv1.Task
tasksVersion := req.Msg.TasksVersion // task version from runner
latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
if err != nil {
return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err)
} else if latestVersion == 0 {
if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err)
}
// if we don't increase the value of `latestVersion` here,
// the response of FetchTask will return tasksVersion as zero.
// and the runner will treat it as an old version of Gitea.
latestVersion++
}
if tasksVersion != latestVersion {
// Re-load runner from DB so task assignment uses current IsDisabled state
// (avoids race where disable commits while this request still has stale runner).
freshRunner, err := actions_model.GetRunnerByUUID(ctx, runner.UUID)
if err != nil {
return nil, status.Errorf(codes.Internal, "get runner: %v", err)
}
// if the task version in request is not equal to the version in db,
// it means there may still be some tasks that haven't been assigned.
// try to pick a task for the runner that send the request.
if t, ok, err := actions_service.PickTask(ctx, freshRunner); err != nil {
log.Error("pick task failed: %v", err)
return nil, status.Errorf(codes.Internal, "pick task: %v", err)
} else if ok {
task = t
}
}
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
Task: task,
TasksVersion: latestVersion,
})
return res, nil
}
// UpdateTask updates the task status.
func (s *Service) UpdateTask(
ctx context.Context,
req *connect.Request[runnerv1.UpdateTaskRequest],
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
runner := GetRunner(ctx)
task, err := actions_model.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
if err != nil {
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}
for k, v := range req.Msg.Outputs {
if len(k) > 255 {
log.Warn("Ignore the output of task %d because the key is too long: %q", task.ID, k)
continue
}
// The value can be a maximum of 1 MB
if l := len(v); l > 1024*1024 {
log.Warn("Ignore the output %q of task %d because the value is too long: %v", k, task.ID, l)
continue
}
// There's another limitation on GitHub that the total of all outputs in a workflow run can be a maximum of 50 MB.
// We don't check the total size here because it's not easy to do, and it doesn't really worth it.
// See https://docs.github.com/en/actions/using-jobs/defining-outputs-for-jobs
if err := actions_model.InsertTaskOutputIfNotExist(ctx, task.ID, k, v); err != nil {
log.Warn("Failed to insert the output %q of task %d: %v", k, task.ID, err)
// It's ok not to return errors, the runner will resend the outputs.
}
}
sentOutputs, err := actions_model.FindTaskOutputKeyByTaskID(ctx, task.ID)
if err != nil {
log.Warn("Failed to find the sent outputs of task %d: %v", task.ID, err)
// It's not to return errors, it can be handled when the runner resends sent outputs.
}
if err := task.LoadJob(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "load job: %v", err)
}
if err := task.Job.LoadAttributes(ctx); err != nil {
return nil, status.Errorf(codes.Internal, "load run: %v", err)
}
actions_service.CreateCommitStatusForRunJobs(ctx, task.Job.Run, task.Job)
if task.Status.IsDone() {
actions_service.NotifyWorkflowJobStatusUpdateWithTask(ctx, task.Job, task)
}
if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
if err := actions_service.EmitJobsIfReadyByRun(task.Job.RunID); err != nil {
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
}
if task.Job.Run.Status.IsDone() {
actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, task.Job.RepoID, task.Job.RunID)
}
}
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
State: &runnerv1.TaskState{
Id: req.Msg.State.Id,
Result: task.Status.AsResult(),
},
SentOutputs: sentOutputs,
}), nil
}
// UpdateLog uploads log of the task.
func (s *Service) UpdateLog(
ctx context.Context,
req *connect.Request[runnerv1.UpdateLogRequest],
) (*connect.Response[runnerv1.UpdateLogResponse], error) {
runner := GetRunner(ctx)
res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
if err != nil {
return nil, status.Errorf(codes.Internal, "get task: %v", err)
} else if runner.ID != task.RunnerID {
return nil, status.Errorf(codes.Internal, "invalid runner for task")
}
ack := task.LogLength
// Trim rows the runner already had acked.
var rows []*runnerv1.LogRow
if req.Msg.Index <= ack && int64(len(req.Msg.Rows))+req.Msg.Index > ack {
rows = req.Msg.Rows[ack-req.Msg.Index:]
}
// Ack a re-sent finalize idempotently. Appending new rows past the seal errors.
if task.LogInStorage {
if len(rows) > 0 {
return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
}
res.Msg.AckIndex = ack
return res, nil
}
// Bail unless we have new rows or a NoMore to finalize. Even with
// NoMore, bail when the runner has outrun the server — archiving a
// log with a gap is worse than asking it to retry.
if len(rows) == 0 && (!req.Msg.NoMore || req.Msg.Index > ack) {
res.Msg.AckIndex = ack
return res, nil
}
// WriteLogs is called even with no rows: with offset==0 it bootstraps
// an empty DBFS file so TransferLogs below has something to read when
// the runner finalizes a task that produced no log output.
ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to append logs to dbfs file: %v", err)
}
task.LogLength += int64(len(rows))
for _, n := range ns {
task.LogIndexes = append(task.LogIndexes, task.LogSize)
task.LogSize += int64(n)
}
res.Msg.AckIndex = task.LogLength
var remove func()
if req.Msg.NoMore {
task.LogInStorage = true
remove, err = actions.TransferLogs(ctx, task.LogFilename)
if err != nil {
return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
}
}
if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}
if remove != nil {
remove()
}
return res, nil
}