mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-11-04 15:04:00 +01:00 
			
		
		
		
	enable mirror, usestdlibbars and perfsprint part of: https://github.com/go-gitea/gitea/issues/34083 --------- Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
		
			
				
	
	
		
			137 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			137 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2024 The Gitea Authors. All rights reserved.
 | 
						|
// SPDX-License-Identifier: MIT
 | 
						|
 | 
						|
package globallock
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"code.gitea.io/gitea/modules/nosql"
 | 
						|
 | 
						|
	"github.com/go-redsync/redsync/v4"
 | 
						|
	"github.com/go-redsync/redsync/v4/redis/goredis/v9"
 | 
						|
)
 | 
						|
 | 
						|
const redisLockKeyPrefix = "gitea:globallock:"
 | 
						|
 | 
						|
// redisLockExpiry is the default expiry time for a lock.
 | 
						|
// Define it as a variable to make it possible to change it in tests.
 | 
						|
var redisLockExpiry = 30 * time.Second
 | 
						|
 | 
						|
type redisLocker struct {
 | 
						|
	rs *redsync.Redsync
 | 
						|
 | 
						|
	mutexM   sync.Map
 | 
						|
	closed   atomic.Bool
 | 
						|
	extendWg sync.WaitGroup
 | 
						|
}
 | 
						|
 | 
						|
var _ Locker = &redisLocker{}
 | 
						|
 | 
						|
func NewRedisLocker(connection string) Locker {
 | 
						|
	l := &redisLocker{
 | 
						|
		rs: redsync.New(
 | 
						|
			goredis.NewPool(
 | 
						|
				nosql.GetManager().GetRedisClient(connection),
 | 
						|
			),
 | 
						|
		),
 | 
						|
	}
 | 
						|
 | 
						|
	l.extendWg.Add(1)
 | 
						|
	l.startExtend()
 | 
						|
 | 
						|
	return l
 | 
						|
}
 | 
						|
 | 
						|
func (l *redisLocker) Lock(ctx context.Context, key string) (ReleaseFunc, error) {
 | 
						|
	return l.lock(ctx, key, 0)
 | 
						|
}
 | 
						|
 | 
						|
func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error) {
 | 
						|
	f, err := l.lock(ctx, key, 1)
 | 
						|
 | 
						|
	var (
 | 
						|
		errTaken     *redsync.ErrTaken
 | 
						|
		errNodeTaken *redsync.ErrNodeTaken
 | 
						|
	)
 | 
						|
	if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) {
 | 
						|
		return false, f, nil
 | 
						|
	}
 | 
						|
	return err == nil, f, err
 | 
						|
}
 | 
						|
 | 
						|
// Close closes the locker.
 | 
						|
// It will stop extending the locks and refuse to acquire new locks.
 | 
						|
// In actual use, it is not necessary to call this function.
 | 
						|
// But it's useful in tests to release resources.
 | 
						|
// It could take some time since it waits for the extending goroutine to finish.
 | 
						|
func (l *redisLocker) Close() error {
 | 
						|
	l.closed.Store(true)
 | 
						|
	l.extendWg.Wait()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *redisLocker) lock(ctx context.Context, key string, tries int) (ReleaseFunc, error) {
 | 
						|
	if l.closed.Load() {
 | 
						|
		return func() {}, errors.New("locker is closed")
 | 
						|
	}
 | 
						|
 | 
						|
	options := []redsync.Option{
 | 
						|
		redsync.WithExpiry(redisLockExpiry),
 | 
						|
	}
 | 
						|
	if tries > 0 {
 | 
						|
		options = append(options, redsync.WithTries(tries))
 | 
						|
	}
 | 
						|
	mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...)
 | 
						|
	if err := mutex.LockContext(ctx); err != nil {
 | 
						|
		return func() {}, err
 | 
						|
	}
 | 
						|
 | 
						|
	l.mutexM.Store(key, mutex)
 | 
						|
 | 
						|
	releaseOnce := sync.Once{}
 | 
						|
	return func() {
 | 
						|
		releaseOnce.Do(func() {
 | 
						|
			l.mutexM.Delete(key)
 | 
						|
 | 
						|
			// It's safe to ignore the error here,
 | 
						|
			// if it failed to unlock, it will be released automatically after the lock expires.
 | 
						|
			// Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out.
 | 
						|
			_, _ = mutex.Unlock()
 | 
						|
		})
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *redisLocker) startExtend() {
 | 
						|
	if l.closed.Load() {
 | 
						|
		l.extendWg.Done()
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	toExtend := make([]*redsync.Mutex, 0)
 | 
						|
	l.mutexM.Range(func(_, value any) bool {
 | 
						|
		m := value.(*redsync.Mutex)
 | 
						|
 | 
						|
		// Extend the lock if it is not expired.
 | 
						|
		// Although the mutex will be removed from the map before it is released,
 | 
						|
		// it still can be expired because of a failed extension.
 | 
						|
		// If it happens, it does not need to be extended anymore.
 | 
						|
		if time.Now().After(m.Until()) {
 | 
						|
			return true
 | 
						|
		}
 | 
						|
 | 
						|
		toExtend = append(toExtend, m)
 | 
						|
		return true
 | 
						|
	})
 | 
						|
	for _, v := range toExtend {
 | 
						|
		// If it failed to extend, it will be released automatically after the lock expires.
 | 
						|
		_, _ = v.Extend()
 | 
						|
	}
 | 
						|
 | 
						|
	time.AfterFunc(redisLockExpiry/2, l.startExtend)
 | 
						|
}
 |