mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-11-04 08:34:30 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			399 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			399 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package couchbase
 | 
						|
 | 
						|
import (
 | 
						|
	"log"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"fmt"
 | 
						|
	"github.com/couchbase/gomemcached"
 | 
						|
	"github.com/couchbase/gomemcached/client"
 | 
						|
	"github.com/couchbase/goutils/logging"
 | 
						|
)
 | 
						|
 | 
						|
// A UprFeed streams mutation events from a bucket.
 | 
						|
//
 | 
						|
// Events from the bucket can be read from the channel 'C'.  Remember
 | 
						|
// to call Close() on it when you're done, unless its channel has
 | 
						|
// closed itself already.
 | 
						|
type UprFeed struct {
 | 
						|
	C <-chan *memcached.UprEvent
 | 
						|
 | 
						|
	bucket          *Bucket
 | 
						|
	nodeFeeds       map[string]*FeedInfo     // The UPR feeds of the individual nodes
 | 
						|
	output          chan *memcached.UprEvent // Same as C but writeably-typed
 | 
						|
	outputClosed    bool
 | 
						|
	quit            chan bool
 | 
						|
	name            string // name of this UPR feed
 | 
						|
	sequence        uint32 // sequence number for this feed
 | 
						|
	connected       bool
 | 
						|
	killSwitch      chan bool
 | 
						|
	closing         bool
 | 
						|
	wg              sync.WaitGroup
 | 
						|
	dcp_buffer_size uint32
 | 
						|
	data_chan_size  int
 | 
						|
}
 | 
						|
 | 
						|
// UprFeed from a single connection
 | 
						|
type FeedInfo struct {
 | 
						|
	uprFeed   *memcached.UprFeed // UPR feed handle
 | 
						|
	host      string             // hostname
 | 
						|
	connected bool               // connected
 | 
						|
	quit      chan bool          // quit channel
 | 
						|
}
 | 
						|
 | 
						|
type FailoverLog map[uint16]memcached.FailoverLog
 | 
						|
 | 
						|
// GetFailoverLogs, get the failover logs for a set of vbucket ids
 | 
						|
func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) {
 | 
						|
 | 
						|
	// map vbids to their corresponding hosts
 | 
						|
	vbHostList := make(map[string][]uint16)
 | 
						|
	vbm := b.VBServerMap()
 | 
						|
	if len(vbm.VBucketMap) < len(vBuckets) {
 | 
						|
		return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
 | 
						|
			vbm.VBucketMap, vBuckets)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, vb := range vBuckets {
 | 
						|
		masterID := vbm.VBucketMap[vb][0]
 | 
						|
		master := b.getMasterNode(masterID)
 | 
						|
		if master == "" {
 | 
						|
			return nil, fmt.Errorf("No master found for vb %d", vb)
 | 
						|
		}
 | 
						|
 | 
						|
		vbList := vbHostList[master]
 | 
						|
		if vbList == nil {
 | 
						|
			vbList = make([]uint16, 0)
 | 
						|
		}
 | 
						|
		vbList = append(vbList, vb)
 | 
						|
		vbHostList[master] = vbList
 | 
						|
	}
 | 
						|
 | 
						|
	failoverLogMap := make(FailoverLog)
 | 
						|
	for _, serverConn := range b.getConnPools(false /* not already locked */) {
 | 
						|
 | 
						|
		vbList := vbHostList[serverConn.host]
 | 
						|
		if vbList == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		mc, err := serverConn.Get()
 | 
						|
		if err != nil {
 | 
						|
			logging.Infof("No Free connections for vblist %v", vbList)
 | 
						|
			return nil, fmt.Errorf("No Free connections for host %s",
 | 
						|
				serverConn.host)
 | 
						|
 | 
						|
		}
 | 
						|
		// close the connection so that it doesn't get reused for upr data
 | 
						|
		// connection
 | 
						|
		defer mc.Close()
 | 
						|
		failoverlogs, err := mc.UprGetFailoverLog(vbList)
 | 
						|
		if err != nil {
 | 
						|
			return nil, fmt.Errorf("Error getting failover log %s host %s",
 | 
						|
				err.Error(), serverConn.host)
 | 
						|
 | 
						|
		}
 | 
						|
 | 
						|
		for vb, log := range failoverlogs {
 | 
						|
			failoverLogMap[vb] = *log
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return failoverLogMap, nil
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) {
 | 
						|
	return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE)
 | 
						|
}
 | 
						|
 | 
						|
// StartUprFeed creates and starts a new Upr feed
 | 
						|
// No data will be sent on the channel unless vbuckets streams are requested
 | 
						|
func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) {
 | 
						|
 | 
						|
	feed := &UprFeed{
 | 
						|
		bucket:          b,
 | 
						|
		output:          make(chan *memcached.UprEvent, data_chan_size),
 | 
						|
		quit:            make(chan bool),
 | 
						|
		nodeFeeds:       make(map[string]*FeedInfo, 0),
 | 
						|
		name:            name,
 | 
						|
		sequence:        sequence,
 | 
						|
		killSwitch:      make(chan bool),
 | 
						|
		dcp_buffer_size: dcp_buffer_size,
 | 
						|
		data_chan_size:  data_chan_size,
 | 
						|
	}
 | 
						|
 | 
						|
	err := feed.connectToNodes()
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error())
 | 
						|
	}
 | 
						|
	feed.connected = true
 | 
						|
	go feed.run()
 | 
						|
 | 
						|
	feed.C = feed.output
 | 
						|
	return feed, nil
 | 
						|
}
 | 
						|
 | 
						|
// UprRequestStream starts a stream for a vb on a feed
 | 
						|
func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
 | 
						|
	vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		if r := recover(); r != nil {
 | 
						|
			log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	vbm := feed.bucket.VBServerMap()
 | 
						|
	if len(vbm.VBucketMap) < int(vb) {
 | 
						|
		return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
 | 
						|
			vb, vbm.VBucketMap)
 | 
						|
	}
 | 
						|
 | 
						|
	if int(vb) >= len(vbm.VBucketMap) {
 | 
						|
		return fmt.Errorf("Invalid vbucket id %d", vb)
 | 
						|
	}
 | 
						|
 | 
						|
	masterID := vbm.VBucketMap[vb][0]
 | 
						|
	master := feed.bucket.getMasterNode(masterID)
 | 
						|
	if master == "" {
 | 
						|
		return fmt.Errorf("Master node not found for vbucket %d", vb)
 | 
						|
	}
 | 
						|
	singleFeed := feed.nodeFeeds[master]
 | 
						|
	if singleFeed == nil {
 | 
						|
		return fmt.Errorf("UprFeed for this host not found")
 | 
						|
	}
 | 
						|
 | 
						|
	if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags,
 | 
						|
		vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// UprCloseStream ends a vbucket stream.
 | 
						|
func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error {
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		if r := recover(); r != nil {
 | 
						|
			log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	vbm := feed.bucket.VBServerMap()
 | 
						|
	if len(vbm.VBucketMap) < int(vb) {
 | 
						|
		return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v",
 | 
						|
			vb, vbm.VBucketMap)
 | 
						|
	}
 | 
						|
 | 
						|
	if int(vb) >= len(vbm.VBucketMap) {
 | 
						|
		return fmt.Errorf("Invalid vbucket id %d", vb)
 | 
						|
	}
 | 
						|
 | 
						|
	masterID := vbm.VBucketMap[vb][0]
 | 
						|
	master := feed.bucket.getMasterNode(masterID)
 | 
						|
	if master == "" {
 | 
						|
		return fmt.Errorf("Master node not found for vbucket %d", vb)
 | 
						|
	}
 | 
						|
	singleFeed := feed.nodeFeeds[master]
 | 
						|
	if singleFeed == nil {
 | 
						|
		return fmt.Errorf("UprFeed for this host not found")
 | 
						|
	}
 | 
						|
 | 
						|
	if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Goroutine that runs the feed
 | 
						|
func (feed *UprFeed) run() {
 | 
						|
	retryInterval := initialRetryInterval
 | 
						|
	bucketOK := true
 | 
						|
	for {
 | 
						|
		// Connect to the UPR feed of each server node:
 | 
						|
		if bucketOK {
 | 
						|
			// Run until one of the sub-feeds fails:
 | 
						|
			select {
 | 
						|
			case <-feed.killSwitch:
 | 
						|
			case <-feed.quit:
 | 
						|
				return
 | 
						|
			}
 | 
						|
			//feed.closeNodeFeeds()
 | 
						|
			retryInterval = initialRetryInterval
 | 
						|
		}
 | 
						|
 | 
						|
		if feed.closing == true {
 | 
						|
			// we have been asked to shut down
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// On error, try to refresh the bucket in case the list of nodes changed:
 | 
						|
		logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v",
 | 
						|
			feed.bucket.Name, retryInterval)
 | 
						|
 | 
						|
		if err := feed.bucket.Refresh(); err != nil {
 | 
						|
			// if we fail to refresh the bucket, exit the feed
 | 
						|
			// MB-14917
 | 
						|
			logging.Infof("Unable to refresh bucket %s ", err.Error())
 | 
						|
			close(feed.output)
 | 
						|
			feed.outputClosed = true
 | 
						|
			feed.closeNodeFeeds()
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// this will only connect to nodes that are not connected or changed
 | 
						|
		// user will have to reconnect the stream
 | 
						|
		err := feed.connectToNodes()
 | 
						|
		if err != nil {
 | 
						|
			logging.Infof("Unable to connect to nodes..exit ")
 | 
						|
			close(feed.output)
 | 
						|
			feed.outputClosed = true
 | 
						|
			feed.closeNodeFeeds()
 | 
						|
			return
 | 
						|
		}
 | 
						|
		bucketOK = err == nil
 | 
						|
 | 
						|
		select {
 | 
						|
		case <-time.After(retryInterval):
 | 
						|
		case <-feed.quit:
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if retryInterval *= 2; retryInterval > maximumRetryInterval {
 | 
						|
			retryInterval = maximumRetryInterval
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (feed *UprFeed) connectToNodes() (err error) {
 | 
						|
	nodeCount := 0
 | 
						|
	for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
 | 
						|
 | 
						|
		// this maybe a reconnection, so check if the connection to the node
 | 
						|
		// already exists. Connect only if the node is not found in the list
 | 
						|
		// or connected == false
 | 
						|
		nodeFeed := feed.nodeFeeds[serverConn.host]
 | 
						|
 | 
						|
		if nodeFeed != nil && nodeFeed.connected == true {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		var singleFeed *memcached.UprFeed
 | 
						|
		var name string
 | 
						|
		if feed.name == "" {
 | 
						|
			name = "DefaultUprClient"
 | 
						|
		} else {
 | 
						|
			name = feed.name
 | 
						|
		}
 | 
						|
		singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size)
 | 
						|
		if err != nil {
 | 
						|
			logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err)
 | 
						|
			feed.closeNodeFeeds()
 | 
						|
			return
 | 
						|
		}
 | 
						|
		// add the node to the connection map
 | 
						|
		feedInfo := &FeedInfo{
 | 
						|
			uprFeed:   singleFeed,
 | 
						|
			connected: true,
 | 
						|
			host:      serverConn.host,
 | 
						|
			quit:      make(chan bool),
 | 
						|
		}
 | 
						|
		feed.nodeFeeds[serverConn.host] = feedInfo
 | 
						|
		go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host)
 | 
						|
		feed.wg.Add(1)
 | 
						|
		nodeCount++
 | 
						|
	}
 | 
						|
	if nodeCount == 0 {
 | 
						|
		return fmt.Errorf("No connection to bucket")
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Goroutine that forwards Upr events from a single node's feed to the aggregate feed.
 | 
						|
func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) {
 | 
						|
	singleFeed := nodeFeed.uprFeed
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		feed.wg.Done()
 | 
						|
		if r := recover(); r != nil {
 | 
						|
			//if feed is not closing, re-throw the panic
 | 
						|
			if feed.outputClosed != true && feed.closing != true {
 | 
						|
				panic(r)
 | 
						|
			} else {
 | 
						|
				logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully")
 | 
						|
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-nodeFeed.quit:
 | 
						|
			nodeFeed.connected = false
 | 
						|
			return
 | 
						|
 | 
						|
		case event, ok := <-singleFeed.C:
 | 
						|
			if !ok {
 | 
						|
				if singleFeed.Error != nil {
 | 
						|
					logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error)
 | 
						|
				}
 | 
						|
				killSwitch <- true
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if feed.outputClosed == true {
 | 
						|
				// someone closed the node feed
 | 
						|
				logging.Infof("Node need closed, returning from forwardUprEvent")
 | 
						|
				return
 | 
						|
			}
 | 
						|
			feed.output <- event
 | 
						|
			if event.Status == gomemcached.NOT_MY_VBUCKET {
 | 
						|
				logging.Infof(" Got a not my vbucket error !! ")
 | 
						|
				if err := feed.bucket.Refresh(); err != nil {
 | 
						|
					logging.Errorf("Unable to refresh bucket %s ", err.Error())
 | 
						|
					feed.closeNodeFeeds()
 | 
						|
					return
 | 
						|
				}
 | 
						|
				// this will only connect to nodes that are not connected or changed
 | 
						|
				// user will have to reconnect the stream
 | 
						|
				if err := feed.connectToNodes(); err != nil {
 | 
						|
					logging.Errorf("Unable to connect to nodes %s", err.Error())
 | 
						|
					return
 | 
						|
				}
 | 
						|
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (feed *UprFeed) closeNodeFeeds() {
 | 
						|
	for _, f := range feed.nodeFeeds {
 | 
						|
		logging.Infof(" Sending close to forwardUprEvent ")
 | 
						|
		close(f.quit)
 | 
						|
		f.uprFeed.Close()
 | 
						|
	}
 | 
						|
	feed.nodeFeeds = nil
 | 
						|
}
 | 
						|
 | 
						|
// Close a Upr feed.
 | 
						|
func (feed *UprFeed) Close() error {
 | 
						|
	select {
 | 
						|
	case <-feed.quit:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
	}
 | 
						|
 | 
						|
	feed.closing = true
 | 
						|
	feed.closeNodeFeeds()
 | 
						|
	close(feed.quit)
 | 
						|
 | 
						|
	feed.wg.Wait()
 | 
						|
	if feed.outputClosed == false {
 | 
						|
		feed.outputClosed = true
 | 
						|
		close(feed.output)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |