mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-26 10:40:48 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			392 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			392 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 The Gitea Authors. All rights reserved.
 | |
| // SPDX-License-Identifier: MIT
 | |
| 
 | |
| package archiver
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"code.gitea.io/gitea/models/db"
 | |
| 	repo_model "code.gitea.io/gitea/models/repo"
 | |
| 	"code.gitea.io/gitea/modules/git"
 | |
| 	"code.gitea.io/gitea/modules/gitrepo"
 | |
| 	"code.gitea.io/gitea/modules/graceful"
 | |
| 	"code.gitea.io/gitea/modules/httplib"
 | |
| 	"code.gitea.io/gitea/modules/log"
 | |
| 	"code.gitea.io/gitea/modules/process"
 | |
| 	"code.gitea.io/gitea/modules/queue"
 | |
| 	"code.gitea.io/gitea/modules/setting"
 | |
| 	"code.gitea.io/gitea/modules/storage"
 | |
| 	gitea_context "code.gitea.io/gitea/services/context"
 | |
| )
 | |
| 
 | |
| // ArchiveRequest defines the parameters of an archive request, which notably
 | |
| // includes the specific repository being archived as well as the commit, the
 | |
| // name by which it was requested, and the kind of archive being requested.
 | |
| // This is entirely opaque to external entities, though, and mostly used as a
 | |
| // handle elsewhere.
 | |
| type ArchiveRequest struct {
 | |
| 	Repo     *repo_model.Repository
 | |
| 	Type     repo_model.ArchiveType
 | |
| 	CommitID string
 | |
| 
 | |
| 	archiveRefShortName string // the ref short name to download the archive, for example: "master", "v1.0.0", "commit id"
 | |
| }
 | |
| 
 | |
| // ErrUnknownArchiveFormat request archive format is not supported
 | |
| type ErrUnknownArchiveFormat struct {
 | |
| 	RequestNameType string
 | |
| }
 | |
| 
 | |
| // Error implements error
 | |
| func (err ErrUnknownArchiveFormat) Error() string {
 | |
| 	return "unknown format: " + err.RequestNameType
 | |
| }
 | |
| 
 | |
| // Is implements error
 | |
| func (ErrUnknownArchiveFormat) Is(err error) bool {
 | |
| 	_, ok := err.(ErrUnknownArchiveFormat)
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| // RepoRefNotFoundError is returned when a requested reference (commit, tag) was not found.
 | |
| type RepoRefNotFoundError struct {
 | |
| 	RefShortName string
 | |
| }
 | |
| 
 | |
| // Error implements error.
 | |
| func (e RepoRefNotFoundError) Error() string {
 | |
| 	return "unrecognized repository reference: " + e.RefShortName
 | |
| }
 | |
| 
 | |
| func (e RepoRefNotFoundError) Is(err error) bool {
 | |
| 	_, ok := err.(RepoRefNotFoundError)
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| // NewRequest creates an archival request, based on the URI.  The
 | |
| // resulting ArchiveRequest is suitable for being passed to Await()
 | |
| // if it's determined that the request still needs to be satisfied.
 | |
| func NewRequest(repo *repo_model.Repository, gitRepo *git.Repository, archiveRefExt string) (*ArchiveRequest, error) {
 | |
| 	// here the archiveRefShortName is not a clear ref, it could be a tag, branch or commit id
 | |
| 	archiveRefShortName, archiveType := repo_model.SplitArchiveNameType(archiveRefExt)
 | |
| 	if archiveType == repo_model.ArchiveUnknown {
 | |
| 		return nil, ErrUnknownArchiveFormat{archiveRefExt}
 | |
| 	}
 | |
| 
 | |
| 	// Get corresponding commit.
 | |
| 	commitID, err := gitRepo.ConvertToGitID(archiveRefShortName)
 | |
| 	if err != nil {
 | |
| 		return nil, RepoRefNotFoundError{RefShortName: archiveRefShortName}
 | |
| 	}
 | |
| 
 | |
| 	r := &ArchiveRequest{Repo: repo, archiveRefShortName: archiveRefShortName, Type: archiveType}
 | |
| 	r.CommitID = commitID.String()
 | |
| 	return r, nil
 | |
| }
 | |
| 
 | |
| // GetArchiveName returns the name of the caller, based on the ref used by the
 | |
| // caller to create this request.
 | |
| func (aReq *ArchiveRequest) GetArchiveName() string {
 | |
| 	return strings.ReplaceAll(aReq.archiveRefShortName, "/", "-") + "." + aReq.Type.String()
 | |
| }
 | |
| 
 | |
| // Await awaits the completion of an ArchiveRequest. If the archive has
 | |
| // already been prepared the method returns immediately. Otherwise, an archiver
 | |
| // process will be started and its completion awaited. On success the returned
 | |
| // RepoArchiver may be used to download the archive. Note that even if the
 | |
| // context is cancelled/times out a started archiver will still continue to run
 | |
| // in the background.
 | |
| func (aReq *ArchiveRequest) Await(ctx context.Context) (*repo_model.RepoArchiver, error) {
 | |
| 	archiver, err := repo_model.GetRepoArchiver(ctx, aReq.Repo.ID, aReq.Type, aReq.CommitID)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("models.GetRepoArchiver: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if archiver != nil && archiver.Status == repo_model.ArchiverReady {
 | |
| 		// Archive already generated, we're done.
 | |
| 		return archiver, nil
 | |
| 	}
 | |
| 
 | |
| 	if err := StartArchive(aReq); err != nil {
 | |
| 		return nil, fmt.Errorf("archiver.StartArchive: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	poll := time.NewTicker(time.Second * 1)
 | |
| 	defer poll.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-graceful.GetManager().HammerContext().Done():
 | |
| 			// System stopped.
 | |
| 			return nil, graceful.GetManager().HammerContext().Err()
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		case <-poll.C:
 | |
| 			archiver, err = repo_model.GetRepoArchiver(ctx, aReq.Repo.ID, aReq.Type, aReq.CommitID)
 | |
| 			if err != nil {
 | |
| 				return nil, fmt.Errorf("repo_model.GetRepoArchiver: %w", err)
 | |
| 			}
 | |
| 			if archiver != nil && archiver.Status == repo_model.ArchiverReady {
 | |
| 				return archiver, nil
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Stream satisfies the ArchiveRequest being passed in.  Processing
 | |
| // will occur directly in this routine.
 | |
| func (aReq *ArchiveRequest) Stream(ctx context.Context, w io.Writer) error {
 | |
| 	if aReq.Type == repo_model.ArchiveBundle {
 | |
| 		return gitrepo.CreateBundle(
 | |
| 			ctx,
 | |
| 			aReq.Repo,
 | |
| 			aReq.CommitID,
 | |
| 			w,
 | |
| 		)
 | |
| 	}
 | |
| 	return gitrepo.CreateArchive(
 | |
| 		ctx,
 | |
| 		aReq.Repo,
 | |
| 		aReq.Type.String(),
 | |
| 		w,
 | |
| 		setting.Repository.PrefixArchiveFiles,
 | |
| 		aReq.CommitID,
 | |
| 	)
 | |
| }
 | |
| 
 | |
| // doArchive satisfies the ArchiveRequest being passed in.  Processing
 | |
| // will occur in a separate goroutine, as this phase may take a while to
 | |
| // complete.  If the archive already exists, doArchive will not do
 | |
| // anything.  In all cases, the caller should be examining the *ArchiveRequest
 | |
| // being returned for completion, as it may be different than the one they passed
 | |
| // in.
 | |
| func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver, error) {
 | |
| 	ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("ArchiveRequest[%s]: %s", r.Repo.FullName(), r.GetArchiveName()))
 | |
| 	defer finished()
 | |
| 
 | |
| 	archiver, err := repo_model.GetRepoArchiver(ctx, r.Repo.ID, r.Type, r.CommitID)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if archiver != nil {
 | |
| 		// FIXME: If another process are generating it, we think it's not ready and just return
 | |
| 		// Or we should wait until the archive generated.
 | |
| 		if archiver.Status == repo_model.ArchiverGenerating {
 | |
| 			return nil, nil
 | |
| 		}
 | |
| 	} else {
 | |
| 		archiver = &repo_model.RepoArchiver{
 | |
| 			RepoID:   r.Repo.ID,
 | |
| 			Type:     r.Type,
 | |
| 			CommitID: r.CommitID,
 | |
| 			Status:   repo_model.ArchiverGenerating,
 | |
| 		}
 | |
| 		if err := db.Insert(ctx, archiver); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	rPath := archiver.RelativePath()
 | |
| 	_, err = storage.RepoArchives.Stat(rPath)
 | |
| 	if err == nil {
 | |
| 		if archiver.Status == repo_model.ArchiverGenerating {
 | |
| 			archiver.Status = repo_model.ArchiverReady
 | |
| 			if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 		return archiver, nil
 | |
| 	}
 | |
| 
 | |
| 	if !errors.Is(err, os.ErrNotExist) {
 | |
| 		return nil, fmt.Errorf("unable to stat archive: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	rd, w := io.Pipe()
 | |
| 	defer func() {
 | |
| 		_ = w.Close()
 | |
| 		_ = rd.Close()
 | |
| 	}()
 | |
| 	done := make(chan error, 1) // Ensure that there is some capacity which will ensure that the goroutine below can always finish
 | |
| 
 | |
| 	go func(done chan error, w *io.PipeWriter, archiveReq *ArchiveRequest) {
 | |
| 		defer func() {
 | |
| 			if r := recover(); r != nil {
 | |
| 				done <- fmt.Errorf("%v", r)
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		err := archiveReq.Stream(ctx, w)
 | |
| 		_ = w.CloseWithError(err)
 | |
| 		done <- err
 | |
| 	}(done, w, r)
 | |
| 
 | |
| 	// TODO: add lfs data to zip
 | |
| 	// TODO: add submodule data to zip
 | |
| 
 | |
| 	if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil {
 | |
| 		return nil, fmt.Errorf("unable to write archive: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	err = <-done
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if archiver.Status == repo_model.ArchiverGenerating {
 | |
| 		archiver.Status = repo_model.ArchiverReady
 | |
| 		if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return archiver, nil
 | |
| }
 | |
| 
 | |
| var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
 | |
| 
 | |
| // Init initializes archiver
 | |
| func Init(ctx context.Context) error {
 | |
| 	handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
 | |
| 		for _, archiveReq := range items {
 | |
| 			log.Trace("ArchiverData Process: %#v", archiveReq)
 | |
| 			if archiver, err := doArchive(ctx, archiveReq); err != nil {
 | |
| 				log.Error("Archive %v failed: %v", archiveReq, err)
 | |
| 			} else {
 | |
| 				log.Trace("ArchiverData Success: %#v", archiver)
 | |
| 			}
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	archiverQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo-archive", handler)
 | |
| 	if archiverQueue == nil {
 | |
| 		return errors.New("unable to create repo-archive queue")
 | |
| 	}
 | |
| 	go graceful.GetManager().RunWithCancel(archiverQueue)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // StartArchive push the archive request to the queue
 | |
| func StartArchive(request *ArchiveRequest) error {
 | |
| 	has, err := archiverQueue.Has(request)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if has {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return archiverQueue.Push(request)
 | |
| }
 | |
| 
 | |
| func deleteOldRepoArchiver(ctx context.Context, archiver *repo_model.RepoArchiver) error {
 | |
| 	if _, err := db.DeleteByID[repo_model.RepoArchiver](ctx, archiver.ID); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	p := archiver.RelativePath()
 | |
| 	if err := storage.RepoArchives.Delete(p); err != nil {
 | |
| 		log.Error("delete repo archive file failed: %v", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // DeleteOldRepositoryArchives deletes old repository archives.
 | |
| func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error {
 | |
| 	log.Trace("Doing: ArchiveCleanup")
 | |
| 
 | |
| 	for {
 | |
| 		archivers, err := db.Find[repo_model.RepoArchiver](ctx, repo_model.FindRepoArchiversOption{
 | |
| 			ListOptions: db.ListOptions{
 | |
| 				PageSize: 100,
 | |
| 				Page:     1,
 | |
| 			},
 | |
| 			OlderThan: olderThan,
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			log.Trace("Error: ArchiveClean: %v", err)
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		for _, archiver := range archivers {
 | |
| 			if err := deleteOldRepoArchiver(ctx, archiver); err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 		if len(archivers) < 100 {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	log.Trace("Finished: ArchiveCleanup")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // DeleteRepositoryArchives deletes all repositories' archives.
 | |
| func DeleteRepositoryArchives(ctx context.Context) error {
 | |
| 	if err := repo_model.DeleteAllRepoArchives(ctx); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return storage.Clean(storage.RepoArchives)
 | |
| }
 | |
| 
 | |
| func ServeRepoArchive(ctx *gitea_context.Base, archiveReq *ArchiveRequest) {
 | |
| 	// Add nix format link header so tarballs lock correctly:
 | |
| 	// https://github.com/nixos/nix/blob/56763ff918eb308db23080e560ed2ea3e00c80a7/doc/manual/src/protocols/tarball-fetcher.md
 | |
| 	ctx.Resp.Header().Add("Link", fmt.Sprintf(`<%s/archive/%s.%s?rev=%s>; rel="immutable"`,
 | |
| 		archiveReq.Repo.APIURL(),
 | |
| 		archiveReq.CommitID,
 | |
| 		archiveReq.Type.String(),
 | |
| 		archiveReq.CommitID,
 | |
| 	))
 | |
| 	downloadName := archiveReq.Repo.Name + "-" + archiveReq.GetArchiveName()
 | |
| 
 | |
| 	if setting.Repository.StreamArchives {
 | |
| 		httplib.ServeSetHeaders(ctx.Resp, &httplib.ServeHeaderOptions{Filename: downloadName})
 | |
| 		if err := archiveReq.Stream(ctx, ctx.Resp); err != nil && !ctx.Written() {
 | |
| 			log.Error("Archive %v streaming failed: %v", archiveReq, err)
 | |
| 			ctx.HTTPError(http.StatusInternalServerError)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	archiver, err := archiveReq.Await(ctx)
 | |
| 	if err != nil {
 | |
| 		log.Error("Archive %v await failed: %v", archiveReq, err)
 | |
| 		ctx.HTTPError(http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rPath := archiver.RelativePath()
 | |
| 	if setting.RepoArchive.Storage.ServeDirect() {
 | |
| 		// If we have a signed url (S3, object storage), redirect to this directly.
 | |
| 		u, err := storage.RepoArchives.URL(rPath, downloadName, ctx.Req.Method, nil)
 | |
| 		if u != nil && err == nil {
 | |
| 			ctx.Redirect(u.String())
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	fr, err := storage.RepoArchives.Open(rPath)
 | |
| 	if err != nil {
 | |
| 		log.Error("Archive %v open file failed: %v", archiveReq, err)
 | |
| 		ctx.HTTPError(http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 	defer fr.Close()
 | |
| 
 | |
| 	ctx.ServeContent(fr, &gitea_context.ServeHeaderOptions{
 | |
| 		Filename:     downloadName,
 | |
| 		LastModified: archiver.CreatedUnix.AsLocalTime(),
 | |
| 	})
 | |
| }
 |