From ab77a46f62669f235bbb066384ef59d2bb5a7c6c Mon Sep 17 00:00:00 2001 From: Mohit25022005 Date: Thu, 7 May 2026 13:28:19 +0000 Subject: [PATCH 1/3] fix: prevent num_members drift and deadlock in LDAP team sync Signed-off-by: Mohit25022005 --- services/auth/source/source_group_sync.go | 32 ++++++---- services/org/team.go | 75 ++++++++++++++++++----- services/org/team_test.go | 42 +++++++++++++ 3 files changed, 120 insertions(+), 29 deletions(-) diff --git a/services/auth/source/source_group_sync.go b/services/auth/source/source_group_sync.go index 9cb7d4165c..c28444b9f6 100644 --- a/services/auth/source/source_group_sync.go +++ b/services/auth/source/source_group_sync.go @@ -10,6 +10,7 @@ import ( "code.gitea.io/gitea/models/organization" user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/container" + "code.gitea.io/gitea/modules/globallock" "code.gitea.io/gitea/modules/log" org_service "code.gitea.io/gitea/services/org" ) @@ -94,21 +95,26 @@ func syncGroupsToTeamsCached(ctx context.Context, user *user_model.User, orgTeam teamCache[orgName+teamName] = team } - isMember, err := organization.IsTeamMember(ctx, org.ID, team.ID, user.ID) - if err != nil { - return err - } + if err := globallock.LockAndDo(ctx, fmt.Sprintf("group_sync_team_%d", team.ID), func(ctx context.Context) error { + isMember, err := organization.IsTeamMember(ctx, org.ID, team.ID, user.ID) + if err != nil { + return err + } - if action == syncAdd && !isMember { - if err := org_service.AddTeamMember(ctx, team, user); err != nil { - log.Error("group sync: Could not add user to team: %v", err) - return err - } - } else if action == syncRemove && isMember { - if err := org_service.RemoveTeamMember(ctx, team, user); err != nil { - log.Error("group sync: Could not remove user from team: %v", err) - return err + if action == syncAdd && !isMember { + if err := org_service.AddTeamMember(ctx, team, user); err != nil { + log.Error("group sync: Could not add user to team: %v", err) + return err + } + } else if action == syncRemove && isMember { + if err := org_service.RemoveTeamMember(ctx, team, user); err != nil { + log.Error("group sync: Could not remove user from team: %v", err) + return err + } } + return nil + }); err != nil { + return err } } } diff --git a/services/org/team.go b/services/org/team.go index 6c92ee4f44..c595f17e1f 100644 --- a/services/org/team.go +++ b/services/org/team.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strings" + "time" "code.gitea.io/gitea/models/db" git_model "code.gitea.io/gitea/models/git" @@ -224,7 +225,7 @@ func AddTeamMember(ctx context.Context, team *organization.Team, user *user_mode return err } - err = db.WithTx(ctx, func(ctx context.Context) error { + err = withTeamMembershipTxRetry(ctx, func(ctx context.Context) error { // check in transaction isAlreadyMember, err = organization.IsTeamMember(ctx, team.OrgID, team.ID, user.ID) if err != nil || isAlreadyMember { @@ -233,13 +234,20 @@ func AddTeamMember(ctx context.Context, team *organization.Team, user *user_mode sess := db.GetEngine(ctx) - if err := db.Insert(ctx, &organization.TeamUser{ - UID: user.ID, - OrgID: team.OrgID, - TeamID: team.ID, - }); err != nil { + res, err := sess.Exec("INSERT INTO team_user (org_id, team_id, uid) SELECT ?, ?, ? WHERE NOT EXISTS (SELECT 1 FROM team_user WHERE org_id=? AND team_id=? AND uid=?)", + team.OrgID, team.ID, user.ID, team.OrgID, team.ID, user.ID) + if err != nil { return err - } else if _, err := sess.Incr("num_members").ID(team.ID).Update(new(organization.Team)); err != nil { + } + rowsAffected, err := res.RowsAffected() + if err != nil { + return err + } + if rowsAffected == 0 { + return nil + } + + if _, err := sess.Incr("num_members").ID(team.ID).Update(new(organization.Team)); err != nil { return err } @@ -285,8 +293,6 @@ func removeTeamMember(ctx context.Context, team *organization.Team, user *user_m return organization.ErrLastOrgOwner{UID: user.ID} } - team.NumMembers-- - repos, err := repo_model.GetTeamRepositories(ctx, &repo_model.SearchTeamRepoOptions{ TeamID: team.ID, }) @@ -294,18 +300,24 @@ func removeTeamMember(ctx context.Context, team *organization.Team, user *user_m return err } - if _, err := e.Delete(&organization.TeamUser{ + rowsAffected, err := e.Delete(&organization.TeamUser{ UID: user.ID, OrgID: team.OrgID, TeamID: team.ID, - }); err != nil { + }) + if err != nil { return err - } else if _, err = e. - ID(team.ID). - Cols("num_members"). - Update(team); err != nil { + } + if rowsAffected == 0 { + return nil + } + + if _, err = e.Decr("num_members").ID(team.ID).Update(new(organization.Team)); err != nil { return err } + if team.NumMembers > 0 { + team.NumMembers-- + } // Delete access to team repositories. If any user or repo is missing, we can continue. for _, repo := range repos { @@ -347,7 +359,38 @@ func removeInvalidOrgUser(ctx context.Context, orgID int64, user *user_model.Use // RemoveTeamMember removes member from given team of given organization. func RemoveTeamMember(ctx context.Context, team *organization.Team, user *user_model.User) error { - return db.WithTx(ctx, func(ctx context.Context) error { + return withTeamMembershipTxRetry(ctx, func(ctx context.Context) error { return removeTeamMember(ctx, team, user) }) } + +func withTeamMembershipTxRetry(parentCtx context.Context, f func(ctx context.Context) error) error { + const maxAttempts = 3 + var err error + for i := 0; i < maxAttempts; i++ { + err = db.WithTx(parentCtx, f) + if err == nil || !isRetriableTeamMembershipTxError(err) || i == maxAttempts-1 { + return err + } + time.Sleep(time.Duration(i+1) * 10 * time.Millisecond) + } + return err +} + +func isRetriableTeamMembershipTxError(err error) bool { + msg := strings.ToLower(err.Error()) + if strings.Contains(msg, "deadlock") || strings.Contains(msg, "serialization failure") { + return true + } + // SQLSTATE 40P01 + if strings.Contains(msg, "40p01") { + return true + } + // MySQL ER_LOCK_DEADLOCK and MSSQL deadlock victim are frequently surfaced as numeric codes in error text. + for _, code := range []string{"1213", "1205"} { + if strings.Contains(msg, " "+code+" ") || strings.Contains(msg, ":"+code) || strings.Contains(msg, "("+code+")") { + return true + } + } + return false +} diff --git a/services/org/team_test.go b/services/org/team_test.go index 5cb588b7dd..3784f9be6d 100644 --- a/services/org/team_test.go +++ b/services/org/team_test.go @@ -6,8 +6,10 @@ package org import ( "fmt" "strings" + "sync" "testing" + "code.gitea.io/gitea/models/db" issues_model "code.gitea.io/gitea/models/issues" "code.gitea.io/gitea/models/organization" "code.gitea.io/gitea/models/perm" @@ -328,3 +330,43 @@ func TestIncludesAllRepositoriesTeams(t *testing.T) { } assert.NoError(t, DeleteOrganization(t.Context(), org, false), "DeleteOrganization") } + +func TestTeamMemberConcurrentAddRemoveIdempotent(t *testing.T) { + assert.NoError(t, unittest.PrepareTestDatabase()) + + ctx := t.Context() + team := unittest.AssertExistsAndLoadBean(t, &organization.Team{ID: 2}) + user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4}) + + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, AddTeamMember(ctx, team, user)) + }() + } + wg.Wait() + + count, err := db.GetEngine(ctx).Count(&organization.TeamUser{OrgID: team.OrgID, TeamID: team.ID, UID: user.ID}) + assert.NoError(t, err) + assert.EqualValues(t, 1, count) + + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, RemoveTeamMember(ctx, team, user)) + }() + } + wg.Wait() + + count, err = db.GetEngine(ctx).Count(&organization.TeamUser{OrgID: team.OrgID, TeamID: team.ID, UID: user.ID}) + assert.NoError(t, err) + assert.EqualValues(t, 0, count) + + team = unittest.AssertExistsAndLoadBean(t, &organization.Team{ID: team.ID}) + memberCount, err := db.GetEngine(ctx).Count(&organization.TeamUser{OrgID: team.OrgID, TeamID: team.ID}) + assert.NoError(t, err) + assert.EqualValues(t, team.NumMembers, memberCount) +} From 8de366211241a8f1916480965d912fc0a23b955b Mon Sep 17 00:00:00 2001 From: Mohit25022005 Date: Thu, 7 May 2026 13:29:14 +0000 Subject: [PATCH 2/3] test: add concurrent sync regression test for LDAP team sync Signed-off-by: Mohit25022005 --- .../source/oauth2/source_group_sync_test.go | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 services/auth/source/oauth2/source_group_sync_test.go diff --git a/services/auth/source/oauth2/source_group_sync_test.go b/services/auth/source/oauth2/source_group_sync_test.go new file mode 100644 index 0000000000..a637eed373 --- /dev/null +++ b/services/auth/source/oauth2/source_group_sync_test.go @@ -0,0 +1,57 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package oauth2 + +import ( + "sync" + "testing" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/organization" + "code.gitea.io/gitea/models/unittest" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/modules/container" + auth_source "code.gitea.io/gitea/services/auth/source" + + "github.com/stretchr/testify/assert" +) + +func TestSyncGroupsToTeamsConcurrentRuns(t *testing.T) { + assert.NoError(t, unittest.PrepareTestDatabase()) + + ctx := t.Context() + user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4}) + team := unittest.AssertExistsAndLoadBean(t, &organization.Team{ID: 2}) + org := unittest.AssertExistsAndLoadBean(t, &organization.Organization{ID: team.OrgID}) + + sourceGroupTeamMapping := map[string]map[string][]string{ + "ldap-group": { + org.Name: []string{team.Name}, + }, + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(2) + go func() { + defer wg.Done() + assert.NoError(t, auth_source.SyncGroupsToTeams(ctx, user, container.SetOf("ldap-group"), sourceGroupTeamMapping, true)) + }() + go func() { + defer wg.Done() + assert.NoError(t, auth_source.SyncGroupsToTeams(ctx, user, container.Set[string]{}, sourceGroupTeamMapping, true)) + }() + } + wg.Wait() + + memberCount, err := db.GetEngine(ctx).Count(&organization.TeamUser{OrgID: team.OrgID, TeamID: team.ID, UID: user.ID}) + assert.NoError(t, err) + assert.LessOrEqual(t, memberCount, int64(1)) + + team = unittest.AssertExistsAndLoadBean(t, &organization.Team{ID: team.ID}) + totalCount, err := db.GetEngine(ctx).Count(&organization.TeamUser{OrgID: team.OrgID, TeamID: team.ID}) + assert.NoError(t, err) + assert.EqualValues(t, team.NumMembers, totalCount) + assert.GreaterOrEqual(t, team.NumMembers, 0) +} From bda7281fcb65fd12359f1234bad6200705fd0108 Mon Sep 17 00:00:00 2001 From: Mohit25022005 Date: Thu, 7 May 2026 14:03:44 +0000 Subject: [PATCH 3/3] fix: remove retry helper, modernize loops and waitgroup usage Signed-off-by: Mohit25022005 --- .../source/oauth2/source_group_sync_test.go | 13 +++---- services/org/team.go | 36 ++----------------- services/org/team_test.go | 16 ++++----- 3 files changed, 13 insertions(+), 52 deletions(-) diff --git a/services/auth/source/oauth2/source_group_sync_test.go b/services/auth/source/oauth2/source_group_sync_test.go index a637eed373..77430b53e9 100644 --- a/services/auth/source/oauth2/source_group_sync_test.go +++ b/services/auth/source/oauth2/source_group_sync_test.go @@ -32,16 +32,13 @@ func TestSyncGroupsToTeamsConcurrentRuns(t *testing.T) { } var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(2) - go func() { - defer wg.Done() + for range 10 { + wg.Go(func() { assert.NoError(t, auth_source.SyncGroupsToTeams(ctx, user, container.SetOf("ldap-group"), sourceGroupTeamMapping, true)) - }() - go func() { - defer wg.Done() + }) + wg.Go(func() { assert.NoError(t, auth_source.SyncGroupsToTeams(ctx, user, container.Set[string]{}, sourceGroupTeamMapping, true)) - }() + }) } wg.Wait() diff --git a/services/org/team.go b/services/org/team.go index c595f17e1f..3ccef4ae8e 100644 --- a/services/org/team.go +++ b/services/org/team.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "strings" - "time" "code.gitea.io/gitea/models/db" git_model "code.gitea.io/gitea/models/git" @@ -225,7 +224,7 @@ func AddTeamMember(ctx context.Context, team *organization.Team, user *user_mode return err } - err = withTeamMembershipTxRetry(ctx, func(ctx context.Context) error { + err = db.WithTx(ctx, func(ctx context.Context) error { // check in transaction isAlreadyMember, err = organization.IsTeamMember(ctx, team.OrgID, team.ID, user.ID) if err != nil || isAlreadyMember { @@ -359,38 +358,7 @@ func removeInvalidOrgUser(ctx context.Context, orgID int64, user *user_model.Use // RemoveTeamMember removes member from given team of given organization. func RemoveTeamMember(ctx context.Context, team *organization.Team, user *user_model.User) error { - return withTeamMembershipTxRetry(ctx, func(ctx context.Context) error { + return db.WithTx(ctx, func(ctx context.Context) error { return removeTeamMember(ctx, team, user) }) } - -func withTeamMembershipTxRetry(parentCtx context.Context, f func(ctx context.Context) error) error { - const maxAttempts = 3 - var err error - for i := 0; i < maxAttempts; i++ { - err = db.WithTx(parentCtx, f) - if err == nil || !isRetriableTeamMembershipTxError(err) || i == maxAttempts-1 { - return err - } - time.Sleep(time.Duration(i+1) * 10 * time.Millisecond) - } - return err -} - -func isRetriableTeamMembershipTxError(err error) bool { - msg := strings.ToLower(err.Error()) - if strings.Contains(msg, "deadlock") || strings.Contains(msg, "serialization failure") { - return true - } - // SQLSTATE 40P01 - if strings.Contains(msg, "40p01") { - return true - } - // MySQL ER_LOCK_DEADLOCK and MSSQL deadlock victim are frequently surfaced as numeric codes in error text. - for _, code := range []string{"1213", "1205"} { - if strings.Contains(msg, " "+code+" ") || strings.Contains(msg, ":"+code) || strings.Contains(msg, "("+code+")") { - return true - } - } - return false -} diff --git a/services/org/team_test.go b/services/org/team_test.go index 3784f9be6d..e03ab1ea2d 100644 --- a/services/org/team_test.go +++ b/services/org/team_test.go @@ -339,12 +339,10 @@ func TestTeamMemberConcurrentAddRemoveIdempotent(t *testing.T) { user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4}) var wg sync.WaitGroup - for i := 0; i < 8; i++ { - wg.Add(1) - go func() { - defer wg.Done() + for range 8 { + wg.Go(func() { assert.NoError(t, AddTeamMember(ctx, team, user)) - }() + }) } wg.Wait() @@ -352,12 +350,10 @@ func TestTeamMemberConcurrentAddRemoveIdempotent(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 1, count) - for i := 0; i < 8; i++ { - wg.Add(1) - go func() { - defer wg.Done() + for range 8 { + wg.Go(func() { assert.NoError(t, RemoveTeamMember(ctx, team, user)) - }() + }) } wg.Wait()