From 0e14de67992bda873f67a46de61fa05d83e5ec87 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Fri, 12 Jun 2026 18:14:28 -0700 Subject: [PATCH] perf(pull): reduce pull request creation latency --- services/actions/init.go | 6 + services/actions/notifier_helper.go | 8 +- services/actions/notifier_queue.go | 214 ++++++++++++++++++++++++ services/actions/notifier_queue_test.go | 47 ++++++ services/pull/review.go | 20 ++- 5 files changed, 287 insertions(+), 8 deletions(-) create mode 100644 services/actions/notifier_queue.go create mode 100644 services/actions/notifier_queue_test.go diff --git a/services/actions/init.go b/services/actions/init.go index 9261c48f29..81546aaf75 100644 --- a/services/actions/init.go +++ b/services/actions/init.go @@ -66,6 +66,12 @@ func Init(ctx context.Context) error { } go graceful.GetManager().RunWithCancel(jobEmitterQueue) + actionsNotifyQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "actions_notify", actionsNotifyQueueHandler) + if actionsNotifyQueue == nil { + return errors.New("unable to create actions_notify queue") + } + go graceful.GetManager().RunWithCancel(actionsNotifyQueue) + notify_service.RegisterNotifier(NewNotifier()) return initGlobalRunnerToken(ctx) } diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index e67d1028f7..93a1143fa1 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -108,10 +108,10 @@ func (input *notifyInput) WithPullRequest(pr *issues_model.PullRequest) *notifyI } func (input *notifyInput) Notify(ctx context.Context) { - log.Trace("execute %v for event %v whose doer is %v", getMethod(ctx), input.Event, input.Doer.Name) - - if err := notify(ctx, input); err != nil { - log.Error("an error occurred while executing the %s actions method: %v", getMethod(ctx), err) + if err := input.enqueue(ctx); err != nil { + log.Error("Unable to queue the %s actions method, falling back to synchronous execution: %v", getMethod(ctx), err) + } else { + log.Trace("queue %v for event %v whose doer is %v", getMethod(ctx), input.Event, input.Doer.Name) } } diff --git a/services/actions/notifier_queue.go b/services/actions/notifier_queue.go new file mode 100644 index 0000000000..2f6266fec4 --- /dev/null +++ b/services/actions/notifier_queue.go @@ -0,0 +1,214 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + + issues_model "gitea.dev/models/issues" + repo_model "gitea.dev/models/repo" + user_model "gitea.dev/models/user" + "gitea.dev/modules/git" + "gitea.dev/modules/graceful" + "gitea.dev/modules/json" + "gitea.dev/modules/log" + "gitea.dev/modules/queue" + api "gitea.dev/modules/structs" + webhook_module "gitea.dev/modules/webhook" +) + +var actionsNotifyQueue *queue.WorkerPoolQueue[*actionsNotifyQueueItem] + +type actionsNotifyQueueItem struct { + Method string + RepoID int64 + DoerID int64 + Event webhook_module.HookEventType + Ref string + PullRequestID int64 + PayloadType string + PayloadJSON []byte +} + +func actionsNotifyQueueHandler(items ...*actionsNotifyQueueItem) []*actionsNotifyQueueItem { + for _, item := range items { + ctx := withMethod(graceful.GetManager().ShutdownContext(), item.Method) + input, err := item.toNotifyInput(ctx) + if err != nil { + log.Error("Unable to restore actions notification %q for repo %d: %v", item.Method, item.RepoID, err) + continue + } + if err := notify(ctx, input); err != nil { + log.Error("an error occurred while executing the %s actions method: %v", item.Method, err) + } + } + return nil +} + +func (input *notifyInput) enqueue(ctx context.Context) error { + item, err := input.toQueueItem(ctx) + if err != nil { + return err + } + return actionsNotifyQueue.Push(item) +} + +func (input *notifyInput) toQueueItem(ctx context.Context) (*actionsNotifyQueueItem, error) { + payloadType, payloadJSON, err := marshalActionsPayload(input.Payload) + if err != nil { + return nil, err + } + item := &actionsNotifyQueueItem{ + Method: getMethod(ctx), + RepoID: input.Repo.ID, + DoerID: input.Doer.ID, + Event: input.Event, + Ref: input.Ref.String(), + PayloadType: payloadType, + PayloadJSON: payloadJSON, + } + if input.PullRequest != nil { + item.PullRequestID = input.PullRequest.ID + } + return item, nil +} + +func (item *actionsNotifyQueueItem) toNotifyInput(ctx context.Context) (*notifyInput, error) { + repo, err := repo_model.GetRepositoryByID(ctx, item.RepoID) + if err != nil { + return nil, err + } + doer, err := loadActionsNotifyDoer(ctx, item.DoerID) + if err != nil { + return nil, err + } + payload, err := unmarshalActionsPayload(item.PayloadType, item.PayloadJSON) + if err != nil { + return nil, err + } + input := ¬ifyInput{ + Repo: repo, + Doer: doer, + Event: item.Event, + Ref: git.RefName(item.Ref), + Payload: payload, + } + if item.PullRequestID > 0 { + pr, err := issues_model.GetPullRequestByID(ctx, item.PullRequestID) + if err != nil { + return nil, err + } + input.PullRequest = pr + if input.Ref == "" { + input.Ref = git.RefName(pr.GetGitHeadRefName()) + } + } + return input, nil +} + +func loadActionsNotifyDoer(ctx context.Context, doerID int64) (*user_model.User, error) { + if doerID == user_model.ActionsUserID { + return user_model.NewActionsUser(), nil + } + return user_model.GetUserByID(ctx, doerID) +} + +const ( + actionsPayloadTypeIssueComment = "issue_comment" + actionsPayloadTypeIssue = "issue" + actionsPayloadTypePullRequest = "pull_request" + actionsPayloadTypeRepository = "repository" + actionsPayloadTypeFork = "fork" + actionsPayloadTypePush = "push" + actionsPayloadTypeCreate = "create" + actionsPayloadTypeDelete = "delete" + actionsPayloadTypeWiki = "wiki" + actionsPayloadTypeWorkflowRun = "workflow_run" + actionsPayloadTypeRelease = "release" + actionsPayloadTypePackage = "package" +) + +func marshalActionsPayload(payload api.Payloader) (string, []byte, error) { + if payload == nil { + return "", nil, nil + } + + var payloadType string + switch payload.(type) { + case *api.IssueCommentPayload: + payloadType = actionsPayloadTypeIssueComment + case *api.IssuePayload: + payloadType = actionsPayloadTypeIssue + case *api.PullRequestPayload: + payloadType = actionsPayloadTypePullRequest + case *api.RepositoryPayload: + payloadType = actionsPayloadTypeRepository + case *api.ForkPayload: + payloadType = actionsPayloadTypeFork + case *api.PushPayload: + payloadType = actionsPayloadTypePush + case *api.CreatePayload: + payloadType = actionsPayloadTypeCreate + case *api.DeletePayload: + payloadType = actionsPayloadTypeDelete + case *api.WikiPayload: + payloadType = actionsPayloadTypeWiki + case *api.WorkflowRunPayload: + payloadType = actionsPayloadTypeWorkflowRun + case *api.ReleasePayload: + payloadType = actionsPayloadTypeRelease + case *api.PackagePayload: + payloadType = actionsPayloadTypePackage + default: + return "", nil, fmt.Errorf("unsupported actions payload type %T", payload) + } + + payloadJSON, err := json.Marshal(payload) + if err != nil { + return "", nil, err + } + return payloadType, payloadJSON, nil +} + +func unmarshalActionsPayload(payloadType string, payloadJSON []byte) (api.Payloader, error) { + if payloadType == "" { + return nil, nil + } + + var payload api.Payloader + switch payloadType { + case actionsPayloadTypeIssueComment: + payload = new(api.IssueCommentPayload) + case actionsPayloadTypeIssue: + payload = new(api.IssuePayload) + case actionsPayloadTypePullRequest: + payload = new(api.PullRequestPayload) + case actionsPayloadTypeRepository: + payload = new(api.RepositoryPayload) + case actionsPayloadTypeFork: + payload = new(api.ForkPayload) + case actionsPayloadTypePush: + payload = new(api.PushPayload) + case actionsPayloadTypeCreate: + payload = new(api.CreatePayload) + case actionsPayloadTypeDelete: + payload = new(api.DeletePayload) + case actionsPayloadTypeWiki: + payload = new(api.WikiPayload) + case actionsPayloadTypeWorkflowRun: + payload = new(api.WorkflowRunPayload) + case actionsPayloadTypeRelease: + payload = new(api.ReleasePayload) + case actionsPayloadTypePackage: + payload = new(api.PackagePayload) + default: + return nil, fmt.Errorf("unsupported actions payload type %q", payloadType) + } + + if err := json.Unmarshal(payloadJSON, payload); err != nil { + return nil, err + } + return payload, nil +} diff --git a/services/actions/notifier_queue_test.go b/services/actions/notifier_queue_test.go new file mode 100644 index 0000000000..9ab6ec3348 --- /dev/null +++ b/services/actions/notifier_queue_test.go @@ -0,0 +1,47 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "testing" + + api "gitea.dev/modules/structs" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestActionsPayloadRoundTrip(t *testing.T) { + payload := &api.PullRequestPayload{ + Action: api.HookIssueOpened, + Index: 42, + PullRequest: &api.PullRequest{ + HTMLURL: "https://example.com/pr/42", + }, + } + + payloadType, payloadJSON, err := marshalActionsPayload(payload) + require.NoError(t, err) + assert.Equal(t, actionsPayloadTypePullRequest, payloadType) + + decoded, err := unmarshalActionsPayload(payloadType, payloadJSON) + require.NoError(t, err) + + decodedPayload, ok := decoded.(*api.PullRequestPayload) + require.True(t, ok) + assert.Equal(t, payload.Action, decodedPayload.Action) + assert.Equal(t, payload.Index, decodedPayload.Index) + assert.Equal(t, payload.PullRequest.HTMLURL, decodedPayload.PullRequest.HTMLURL) +} + +func TestActionsPayloadRoundTripNil(t *testing.T) { + payloadType, payloadJSON, err := marshalActionsPayload(nil) + require.NoError(t, err) + assert.Empty(t, payloadType) + assert.Nil(t, payloadJSON) + + decoded, err := unmarshalActionsPayload(payloadType, payloadJSON) + require.NoError(t, err) + assert.Nil(t, decoded) +} diff --git a/services/pull/review.go b/services/pull/review.go index 18abdb04cd..07b79d0175 100644 --- a/services/pull/review.go +++ b/services/pull/review.go @@ -380,7 +380,13 @@ func DismissApprovalReviews(ctx context.Context, doer *user_model.User, pull *is return err } - return db.WithTx(ctx, func(ctx context.Context) error { + type dismissNotification struct { + review *issues_model.Review + comment *issues_model.Comment + } + notifications := make([]dismissNotification, 0, len(reviews)) + + if err := db.WithTx(ctx, func(ctx context.Context) error { for _, review := range reviews { if err := issues_model.DismissReview(ctx, review, true); err != nil { return err @@ -401,11 +407,17 @@ func DismissApprovalReviews(ctx context.Context, doer *user_model.User, pull *is comment.Review = review comment.Poster = doer comment.Issue = review.Issue - - notify_service.PullReviewDismiss(ctx, doer, review, comment) + notifications = append(notifications, dismissNotification{review: review, comment: comment}) } return nil - }) + }); err != nil { + return err + } + + for _, notification := range notifications { + notify_service.PullReviewDismiss(ctx, doer, notification.review, notification.comment) + } + return nil } // DismissReview dismissing stale review by repo admin