mirror of
https://github.com/go-gitea/gitea.git
synced 2026-06-17 20:27:10 +02:00
Merge 0e14de67992bda873f67a46de61fa05d83e5ec87 into c68925152b1b6c8f92806cdbda9c4672dcc1608f
This commit is contained in:
commit
b9dd9ff828
@ -66,6 +66,12 @@ func Init(ctx context.Context) error {
|
||||
}
|
||||
go graceful.GetManager().RunWithCancel(jobEmitterQueue)
|
||||
|
||||
actionsNotifyQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "actions_notify", actionsNotifyQueueHandler)
|
||||
if actionsNotifyQueue == nil {
|
||||
return errors.New("unable to create actions_notify queue")
|
||||
}
|
||||
go graceful.GetManager().RunWithCancel(actionsNotifyQueue)
|
||||
|
||||
notify_service.RegisterNotifier(NewNotifier())
|
||||
return initGlobalRunnerToken(ctx)
|
||||
}
|
||||
|
||||
@ -108,10 +108,10 @@ func (input *notifyInput) WithPullRequest(pr *issues_model.PullRequest) *notifyI
|
||||
}
|
||||
|
||||
func (input *notifyInput) Notify(ctx context.Context) {
|
||||
log.Trace("execute %v for event %v whose doer is %v", getMethod(ctx), input.Event, input.Doer.Name)
|
||||
|
||||
if err := notify(ctx, input); err != nil {
|
||||
log.Error("an error occurred while executing the %s actions method: %v", getMethod(ctx), err)
|
||||
if err := input.enqueue(ctx); err != nil {
|
||||
log.Error("Unable to queue the %s actions method, falling back to synchronous execution: %v", getMethod(ctx), err)
|
||||
} else {
|
||||
log.Trace("queue %v for event %v whose doer is %v", getMethod(ctx), input.Event, input.Doer.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
214
services/actions/notifier_queue.go
Normal file
214
services/actions/notifier_queue.go
Normal file
@ -0,0 +1,214 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
issues_model "gitea.dev/models/issues"
|
||||
repo_model "gitea.dev/models/repo"
|
||||
user_model "gitea.dev/models/user"
|
||||
"gitea.dev/modules/git"
|
||||
"gitea.dev/modules/graceful"
|
||||
"gitea.dev/modules/json"
|
||||
"gitea.dev/modules/log"
|
||||
"gitea.dev/modules/queue"
|
||||
api "gitea.dev/modules/structs"
|
||||
webhook_module "gitea.dev/modules/webhook"
|
||||
)
|
||||
|
||||
var actionsNotifyQueue *queue.WorkerPoolQueue[*actionsNotifyQueueItem]
|
||||
|
||||
type actionsNotifyQueueItem struct {
|
||||
Method string
|
||||
RepoID int64
|
||||
DoerID int64
|
||||
Event webhook_module.HookEventType
|
||||
Ref string
|
||||
PullRequestID int64
|
||||
PayloadType string
|
||||
PayloadJSON []byte
|
||||
}
|
||||
|
||||
func actionsNotifyQueueHandler(items ...*actionsNotifyQueueItem) []*actionsNotifyQueueItem {
|
||||
for _, item := range items {
|
||||
ctx := withMethod(graceful.GetManager().ShutdownContext(), item.Method)
|
||||
input, err := item.toNotifyInput(ctx)
|
||||
if err != nil {
|
||||
log.Error("Unable to restore actions notification %q for repo %d: %v", item.Method, item.RepoID, err)
|
||||
continue
|
||||
}
|
||||
if err := notify(ctx, input); err != nil {
|
||||
log.Error("an error occurred while executing the %s actions method: %v", item.Method, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (input *notifyInput) enqueue(ctx context.Context) error {
|
||||
item, err := input.toQueueItem(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return actionsNotifyQueue.Push(item)
|
||||
}
|
||||
|
||||
func (input *notifyInput) toQueueItem(ctx context.Context) (*actionsNotifyQueueItem, error) {
|
||||
payloadType, payloadJSON, err := marshalActionsPayload(input.Payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item := &actionsNotifyQueueItem{
|
||||
Method: getMethod(ctx),
|
||||
RepoID: input.Repo.ID,
|
||||
DoerID: input.Doer.ID,
|
||||
Event: input.Event,
|
||||
Ref: input.Ref.String(),
|
||||
PayloadType: payloadType,
|
||||
PayloadJSON: payloadJSON,
|
||||
}
|
||||
if input.PullRequest != nil {
|
||||
item.PullRequestID = input.PullRequest.ID
|
||||
}
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func (item *actionsNotifyQueueItem) toNotifyInput(ctx context.Context) (*notifyInput, error) {
|
||||
repo, err := repo_model.GetRepositoryByID(ctx, item.RepoID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
doer, err := loadActionsNotifyDoer(ctx, item.DoerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
payload, err := unmarshalActionsPayload(item.PayloadType, item.PayloadJSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
input := ¬ifyInput{
|
||||
Repo: repo,
|
||||
Doer: doer,
|
||||
Event: item.Event,
|
||||
Ref: git.RefName(item.Ref),
|
||||
Payload: payload,
|
||||
}
|
||||
if item.PullRequestID > 0 {
|
||||
pr, err := issues_model.GetPullRequestByID(ctx, item.PullRequestID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
input.PullRequest = pr
|
||||
if input.Ref == "" {
|
||||
input.Ref = git.RefName(pr.GetGitHeadRefName())
|
||||
}
|
||||
}
|
||||
return input, nil
|
||||
}
|
||||
|
||||
func loadActionsNotifyDoer(ctx context.Context, doerID int64) (*user_model.User, error) {
|
||||
if doerID == user_model.ActionsUserID {
|
||||
return user_model.NewActionsUser(), nil
|
||||
}
|
||||
return user_model.GetUserByID(ctx, doerID)
|
||||
}
|
||||
|
||||
const (
|
||||
actionsPayloadTypeIssueComment = "issue_comment"
|
||||
actionsPayloadTypeIssue = "issue"
|
||||
actionsPayloadTypePullRequest = "pull_request"
|
||||
actionsPayloadTypeRepository = "repository"
|
||||
actionsPayloadTypeFork = "fork"
|
||||
actionsPayloadTypePush = "push"
|
||||
actionsPayloadTypeCreate = "create"
|
||||
actionsPayloadTypeDelete = "delete"
|
||||
actionsPayloadTypeWiki = "wiki"
|
||||
actionsPayloadTypeWorkflowRun = "workflow_run"
|
||||
actionsPayloadTypeRelease = "release"
|
||||
actionsPayloadTypePackage = "package"
|
||||
)
|
||||
|
||||
func marshalActionsPayload(payload api.Payloader) (string, []byte, error) {
|
||||
if payload == nil {
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
var payloadType string
|
||||
switch payload.(type) {
|
||||
case *api.IssueCommentPayload:
|
||||
payloadType = actionsPayloadTypeIssueComment
|
||||
case *api.IssuePayload:
|
||||
payloadType = actionsPayloadTypeIssue
|
||||
case *api.PullRequestPayload:
|
||||
payloadType = actionsPayloadTypePullRequest
|
||||
case *api.RepositoryPayload:
|
||||
payloadType = actionsPayloadTypeRepository
|
||||
case *api.ForkPayload:
|
||||
payloadType = actionsPayloadTypeFork
|
||||
case *api.PushPayload:
|
||||
payloadType = actionsPayloadTypePush
|
||||
case *api.CreatePayload:
|
||||
payloadType = actionsPayloadTypeCreate
|
||||
case *api.DeletePayload:
|
||||
payloadType = actionsPayloadTypeDelete
|
||||
case *api.WikiPayload:
|
||||
payloadType = actionsPayloadTypeWiki
|
||||
case *api.WorkflowRunPayload:
|
||||
payloadType = actionsPayloadTypeWorkflowRun
|
||||
case *api.ReleasePayload:
|
||||
payloadType = actionsPayloadTypeRelease
|
||||
case *api.PackagePayload:
|
||||
payloadType = actionsPayloadTypePackage
|
||||
default:
|
||||
return "", nil, fmt.Errorf("unsupported actions payload type %T", payload)
|
||||
}
|
||||
|
||||
payloadJSON, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
return payloadType, payloadJSON, nil
|
||||
}
|
||||
|
||||
func unmarshalActionsPayload(payloadType string, payloadJSON []byte) (api.Payloader, error) {
|
||||
if payloadType == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var payload api.Payloader
|
||||
switch payloadType {
|
||||
case actionsPayloadTypeIssueComment:
|
||||
payload = new(api.IssueCommentPayload)
|
||||
case actionsPayloadTypeIssue:
|
||||
payload = new(api.IssuePayload)
|
||||
case actionsPayloadTypePullRequest:
|
||||
payload = new(api.PullRequestPayload)
|
||||
case actionsPayloadTypeRepository:
|
||||
payload = new(api.RepositoryPayload)
|
||||
case actionsPayloadTypeFork:
|
||||
payload = new(api.ForkPayload)
|
||||
case actionsPayloadTypePush:
|
||||
payload = new(api.PushPayload)
|
||||
case actionsPayloadTypeCreate:
|
||||
payload = new(api.CreatePayload)
|
||||
case actionsPayloadTypeDelete:
|
||||
payload = new(api.DeletePayload)
|
||||
case actionsPayloadTypeWiki:
|
||||
payload = new(api.WikiPayload)
|
||||
case actionsPayloadTypeWorkflowRun:
|
||||
payload = new(api.WorkflowRunPayload)
|
||||
case actionsPayloadTypeRelease:
|
||||
payload = new(api.ReleasePayload)
|
||||
case actionsPayloadTypePackage:
|
||||
payload = new(api.PackagePayload)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported actions payload type %q", payloadType)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(payloadJSON, payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
47
services/actions/notifier_queue_test.go
Normal file
47
services/actions/notifier_queue_test.go
Normal file
@ -0,0 +1,47 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package actions
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
api "gitea.dev/modules/structs"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestActionsPayloadRoundTrip(t *testing.T) {
|
||||
payload := &api.PullRequestPayload{
|
||||
Action: api.HookIssueOpened,
|
||||
Index: 42,
|
||||
PullRequest: &api.PullRequest{
|
||||
HTMLURL: "https://example.com/pr/42",
|
||||
},
|
||||
}
|
||||
|
||||
payloadType, payloadJSON, err := marshalActionsPayload(payload)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, actionsPayloadTypePullRequest, payloadType)
|
||||
|
||||
decoded, err := unmarshalActionsPayload(payloadType, payloadJSON)
|
||||
require.NoError(t, err)
|
||||
|
||||
decodedPayload, ok := decoded.(*api.PullRequestPayload)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, payload.Action, decodedPayload.Action)
|
||||
assert.Equal(t, payload.Index, decodedPayload.Index)
|
||||
assert.Equal(t, payload.PullRequest.HTMLURL, decodedPayload.PullRequest.HTMLURL)
|
||||
}
|
||||
|
||||
func TestActionsPayloadRoundTripNil(t *testing.T) {
|
||||
payloadType, payloadJSON, err := marshalActionsPayload(nil)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, payloadType)
|
||||
assert.Nil(t, payloadJSON)
|
||||
|
||||
decoded, err := unmarshalActionsPayload(payloadType, payloadJSON)
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, decoded)
|
||||
}
|
||||
@ -380,7 +380,13 @@ func DismissApprovalReviews(ctx context.Context, doer *user_model.User, pull *is
|
||||
return err
|
||||
}
|
||||
|
||||
return db.WithTx(ctx, func(ctx context.Context) error {
|
||||
type dismissNotification struct {
|
||||
review *issues_model.Review
|
||||
comment *issues_model.Comment
|
||||
}
|
||||
notifications := make([]dismissNotification, 0, len(reviews))
|
||||
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
for _, review := range reviews {
|
||||
if err := issues_model.DismissReview(ctx, review, true); err != nil {
|
||||
return err
|
||||
@ -401,11 +407,17 @@ func DismissApprovalReviews(ctx context.Context, doer *user_model.User, pull *is
|
||||
comment.Review = review
|
||||
comment.Poster = doer
|
||||
comment.Issue = review.Issue
|
||||
|
||||
notify_service.PullReviewDismiss(ctx, doer, review, comment)
|
||||
notifications = append(notifications, dismissNotification{review: review, comment: comment})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, notification := range notifications {
|
||||
notify_service.PullReviewDismiss(ctx, doer, notification.review, notification.comment)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DismissReview dismissing stale review by repo admin
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user