mirror of
https://github.com/go-gitea/gitea.git
synced 2026-03-07 18:22:05 +01:00
Backport #36805 by @ChristopherHX * Use base64.RawURLEncoding to avoid equal sign * using the nodejs package they seem to get lost * Support uploads with unspecified length * Support uploads with a single named blockid * without requiring a blockmap Signed-off-by: wxiaoguang <wxiaoguang@gmail.com> Co-authored-by: ChristopherHX <christopher.homberger@web.de> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
parent
f44f7bf2d3
commit
5552eff6e7
@ -5,6 +5,7 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
@ -27,25 +28,32 @@ type LocalStorage struct {
|
||||
|
||||
// NewLocalStorage returns a local files
|
||||
func NewLocalStorage(ctx context.Context, config *setting.Storage) (ObjectStorage, error) {
|
||||
// prepare storage root path
|
||||
if !filepath.IsAbs(config.Path) {
|
||||
return nil, fmt.Errorf("LocalStorageConfig.Path should have been prepared by setting/storage.go and should be an absolute path, but not: %q", config.Path)
|
||||
}
|
||||
log.Info("Creating new Local Storage at %s", config.Path)
|
||||
if err := os.MkdirAll(config.Path, os.ModePerm); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("LocalStorage config.Path should have been prepared by setting/storage.go and should be an absolute path, but not: %q", config.Path)
|
||||
}
|
||||
storageRoot := util.FilePathJoinAbs(config.Path)
|
||||
|
||||
if config.TemporaryPath == "" {
|
||||
config.TemporaryPath = filepath.Join(config.Path, "tmp")
|
||||
// prepare storage temporary path
|
||||
storageTmp := config.TemporaryPath
|
||||
if storageTmp == "" {
|
||||
storageTmp = filepath.Join(storageRoot, "tmp")
|
||||
}
|
||||
if !filepath.IsAbs(config.TemporaryPath) {
|
||||
return nil, fmt.Errorf("LocalStorageConfig.TemporaryPath should be an absolute path, but not: %q", config.TemporaryPath)
|
||||
if !filepath.IsAbs(storageTmp) {
|
||||
return nil, fmt.Errorf("LocalStorage config.TemporaryPath should be an absolute path, but not: %q", config.TemporaryPath)
|
||||
}
|
||||
storageTmp = util.FilePathJoinAbs(storageTmp)
|
||||
|
||||
// create the storage root if not exist
|
||||
log.Info("Creating new Local Storage at %s", storageRoot)
|
||||
if err := os.MkdirAll(storageRoot, os.ModePerm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &LocalStorage{
|
||||
ctx: ctx,
|
||||
dir: config.Path,
|
||||
tmpdir: config.TemporaryPath,
|
||||
dir: storageRoot,
|
||||
tmpdir: storageTmp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -108,9 +116,21 @@ func (l *LocalStorage) Stat(path string) (os.FileInfo, error) {
|
||||
return os.Stat(l.buildLocalPath(path))
|
||||
}
|
||||
|
||||
// Delete delete a file
|
||||
func (l *LocalStorage) deleteEmptyParentDirs(localFullPath string) {
|
||||
for parent := filepath.Dir(localFullPath); len(parent) > len(l.dir); parent = filepath.Dir(parent) {
|
||||
if err := os.Remove(parent); err != nil {
|
||||
// since the target file has been deleted, parent dir error is not related to the file deletion itself.
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete deletes the file in storage and removes the empty parent directories (if possible)
|
||||
func (l *LocalStorage) Delete(path string) error {
|
||||
return util.Remove(l.buildLocalPath(path))
|
||||
localFullPath := l.buildLocalPath(path)
|
||||
err := util.Remove(localFullPath)
|
||||
l.deleteEmptyParentDirs(localFullPath)
|
||||
return err
|
||||
}
|
||||
|
||||
// URL gets the redirect URL to a file
|
||||
@ -118,34 +138,38 @@ func (l *LocalStorage) URL(path, name, _ string, reqParams url.Values) (*url.URL
|
||||
return nil, ErrURLNotSupported
|
||||
}
|
||||
|
||||
func (l *LocalStorage) normalizeWalkError(err error) error {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
// ignore it because the file may be deleted during the walk, and we don't care about it
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// IterateObjects iterates across the objects in the local storage
|
||||
func (l *LocalStorage) IterateObjects(dirName string, fn func(path string, obj Object) error) error {
|
||||
dir := l.buildLocalPath(dirName)
|
||||
return filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return filepath.WalkDir(dir, func(path string, d os.DirEntry, errWalk error) error {
|
||||
if err := l.ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-l.ctx.Done():
|
||||
return l.ctx.Err()
|
||||
default:
|
||||
if errWalk != nil {
|
||||
return l.normalizeWalkError(errWalk)
|
||||
}
|
||||
if path == l.dir {
|
||||
return nil
|
||||
}
|
||||
if d.IsDir() {
|
||||
if path == l.dir || d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
relPath, err := filepath.Rel(l.dir, path)
|
||||
if err != nil {
|
||||
return err
|
||||
return l.normalizeWalkError(err)
|
||||
}
|
||||
obj, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
return l.normalizeWalkError(err)
|
||||
}
|
||||
defer obj.Close()
|
||||
return fn(relPath, obj)
|
||||
return fn(filepath.ToSlash(relPath), obj)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -4,11 +4,14 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildLocalPath(t *testing.T) {
|
||||
@ -53,6 +56,49 @@ func TestBuildLocalPath(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalStorageDelete(t *testing.T) {
|
||||
rootDir := t.TempDir()
|
||||
st, err := NewLocalStorage(t.Context(), &setting.Storage{Path: rootDir})
|
||||
require.NoError(t, err)
|
||||
|
||||
assertExists := func(t *testing.T, path string, exists bool) {
|
||||
_, err = os.Stat(rootDir + "/" + path)
|
||||
if exists {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = st.Save("dir/sub1/1-a.txt", strings.NewReader(""), -1)
|
||||
require.NoError(t, err)
|
||||
_, err = st.Save("dir/sub1/1-b.txt", strings.NewReader(""), -1)
|
||||
require.NoError(t, err)
|
||||
_, err = st.Save("dir/sub2/2-a.txt", strings.NewReader(""), -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
assertExists(t, "dir/sub1/1-a.txt", true)
|
||||
assertExists(t, "dir/sub1/1-b.txt", true)
|
||||
assertExists(t, "dir/sub2/2-a.txt", true)
|
||||
|
||||
require.NoError(t, st.Delete("dir/sub1/1-a.txt"))
|
||||
assertExists(t, "dir/sub1", true)
|
||||
assertExists(t, "dir/sub1/1-a.txt", false)
|
||||
assertExists(t, "dir/sub1/1-b.txt", true)
|
||||
assertExists(t, "dir/sub2/2-a.txt", true)
|
||||
|
||||
require.NoError(t, st.Delete("dir/sub1/1-b.txt"))
|
||||
assertExists(t, ".", true)
|
||||
assertExists(t, "dir/sub1", false)
|
||||
assertExists(t, "dir/sub1/1-a.txt", false)
|
||||
assertExists(t, "dir/sub1/1-b.txt", false)
|
||||
assertExists(t, "dir/sub2/2-a.txt", true)
|
||||
|
||||
require.NoError(t, st.Delete("dir/sub2/2-a.txt"))
|
||||
assertExists(t, ".", true)
|
||||
assertExists(t, "dir", false)
|
||||
}
|
||||
|
||||
func TestLocalStorageIterator(t *testing.T) {
|
||||
testStorageIterator(t, setting.LocalStorageType, &setting.Storage{Path: t.TempDir()})
|
||||
}
|
||||
|
||||
@ -68,7 +68,12 @@ type ObjectStorage interface {
|
||||
Stat(path string) (os.FileInfo, error)
|
||||
Delete(path string) error
|
||||
URL(path, name, method string, reqParams url.Values) (*url.URL, error)
|
||||
IterateObjects(path string, iterator func(path string, obj Object) error) error
|
||||
|
||||
// IterateObjects calls the iterator function for each object in the storage with the given path as prefix
|
||||
// The "fullPath" argument in callback is the full path in this storage.
|
||||
// * IterateObjects("", ...): iterate all objects in this storage
|
||||
// * IterateObjects("sub-path", ...): iterate all objects with "sub-path" as prefix in this storage, the "fullPath" will be like "sub-path/xxx"
|
||||
IterateObjects(basePath string, iterator func(fullPath string, obj Object) error) error
|
||||
}
|
||||
|
||||
// Copy copies a file from source ObjectStorage to dest ObjectStorage
|
||||
|
||||
@ -75,19 +75,21 @@ const filepathSeparator = string(os.PathSeparator)
|
||||
// {`/foo`, ``, `bar`} => `/foo/bar`
|
||||
// {`/foo`, `..`, `bar`} => `/foo/bar`
|
||||
func FilePathJoinAbs(base string, sub ...string) string {
|
||||
elems := make([]string, 1, len(sub)+1)
|
||||
|
||||
// POSIX filesystem can have `\` in file names. Windows: `\` and `/` are both used for path separators
|
||||
// to keep the behavior consistent, we do not allow `\` in file names, replace all `\` with `/`
|
||||
if isOSWindows() {
|
||||
elems[0] = filepath.Clean(base)
|
||||
} else {
|
||||
elems[0] = filepath.Clean(strings.ReplaceAll(base, "\\", filepathSeparator))
|
||||
if !isOSWindows() {
|
||||
base = strings.ReplaceAll(base, "\\", filepathSeparator)
|
||||
}
|
||||
if !filepath.IsAbs(elems[0]) {
|
||||
// This shouldn't happen. If there is really necessary to pass in relative path, return the full path with filepath.Abs() instead
|
||||
panic(fmt.Sprintf("FilePathJoinAbs: %q (for path %v) is not absolute, do not guess a relative path based on current working directory", elems[0], elems))
|
||||
if !filepath.IsAbs(base) {
|
||||
// This shouldn't happen. If it is really necessary to handle relative paths, use filepath.Abs() to get absolute paths first
|
||||
panic(fmt.Sprintf("FilePathJoinAbs: %q (for path %v) is not absolute, do not guess a relative path based on current working directory", base, sub))
|
||||
}
|
||||
if len(sub) == 0 {
|
||||
return filepath.Clean(base)
|
||||
}
|
||||
|
||||
elems := make([]string, 1, len(sub)+1)
|
||||
elems[0] = base
|
||||
for _, s := range sub {
|
||||
if s == "" {
|
||||
continue
|
||||
@ -98,7 +100,7 @@ func FilePathJoinAbs(base string, sub ...string) string {
|
||||
elems = append(elems, filepath.Clean(filepathSeparator+strings.ReplaceAll(s, "\\", filepathSeparator)))
|
||||
}
|
||||
}
|
||||
// the elems[0] must be an absolute path, just join them together
|
||||
// the elems[0] must be an absolute path, just join them together, and Join will also do Clean
|
||||
return filepath.Join(elems...)
|
||||
}
|
||||
|
||||
|
||||
@ -241,7 +241,7 @@ func (ar artifactRoutes) uploadArtifact(ctx *ArtifactContext) {
|
||||
}
|
||||
|
||||
// get upload file size
|
||||
fileRealTotalSize, contentLength := getUploadFileSize(ctx)
|
||||
fileRealTotalSize := getUploadFileSize(ctx)
|
||||
|
||||
// get artifact retention days
|
||||
expiredDays := setting.Actions.ArtifactRetentionDays
|
||||
@ -265,17 +265,17 @@ func (ar artifactRoutes) uploadArtifact(ctx *ArtifactContext) {
|
||||
return
|
||||
}
|
||||
|
||||
// save chunk to storage, if success, return chunk stotal size
|
||||
// save chunk to storage, if success, return chunks total size
|
||||
// if artifact is not gzip when uploading, chunksTotalSize == fileRealTotalSize
|
||||
// if artifact is gzip when uploading, chunksTotalSize < fileRealTotalSize
|
||||
chunksTotalSize, err := saveUploadChunk(ar.fs, ctx, artifact, contentLength, runID)
|
||||
chunksTotalSize, err := saveUploadChunkV3GetTotalSize(ar.fs, ctx, artifact, runID)
|
||||
if err != nil {
|
||||
log.Error("Error save upload chunk: %v", err)
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error save upload chunk")
|
||||
return
|
||||
}
|
||||
|
||||
// update artifact size if zero or not match, over write artifact size
|
||||
// update artifact size if zero or not match, overwrite artifact size
|
||||
if artifact.FileSize == 0 ||
|
||||
artifact.FileCompressedSize == 0 ||
|
||||
artifact.FileSize != fileRealTotalSize ||
|
||||
|
||||
@ -12,7 +12,7 @@ import (
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
@ -20,18 +20,73 @@ import (
|
||||
"code.gitea.io/gitea/models/actions"
|
||||
"code.gitea.io/gitea/models/db"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/storage"
|
||||
)
|
||||
|
||||
func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
|
||||
artifact *actions.ActionArtifact,
|
||||
contentSize, runID, start, end, length int64, checkMd5 bool,
|
||||
) (int64, error) {
|
||||
type saveUploadChunkOptions struct {
|
||||
start int64
|
||||
end *int64
|
||||
checkMd5 bool
|
||||
}
|
||||
|
||||
func makeTmpPathNameV3(runID int64) string {
|
||||
return fmt.Sprintf("tmp-upload/run-%d", runID)
|
||||
}
|
||||
|
||||
func makeTmpPathNameV4(runID int64) string {
|
||||
return fmt.Sprintf("tmp-upload/run-%d-v4", runID)
|
||||
}
|
||||
|
||||
func makeChunkFilenameV3(runID, artifactID, start int64, endPtr *int64) string {
|
||||
var end int64
|
||||
if endPtr != nil {
|
||||
end = *endPtr
|
||||
}
|
||||
return fmt.Sprintf("%d-%d-%d-%d.chunk", runID, artifactID, start, end)
|
||||
}
|
||||
|
||||
func parseChunkFileItemV3(st storage.ObjectStorage, fpath string) (*chunkFileItem, error) {
|
||||
baseName := path.Base(fpath)
|
||||
if !strings.HasSuffix(baseName, ".chunk") {
|
||||
return nil, errSkipChunkFile
|
||||
}
|
||||
|
||||
var item chunkFileItem
|
||||
var unusedRunID int64
|
||||
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &unusedRunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
item.Path = fpath
|
||||
if item.End == 0 {
|
||||
fi, err := st.Stat(item.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item.Size = fi.Size()
|
||||
item.End = item.Start + item.Size - 1
|
||||
} else {
|
||||
item.Size = item.End - item.Start + 1
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
|
||||
func saveUploadChunkV3(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact,
|
||||
runID int64, opts saveUploadChunkOptions,
|
||||
) (writtenSize int64, retErr error) {
|
||||
// build chunk store path
|
||||
storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
|
||||
storagePath := fmt.Sprintf("%s/%s", makeTmpPathNameV3(runID), makeChunkFilenameV3(runID, artifact.ID, opts.start, opts.end))
|
||||
|
||||
// "end" is optional, so "contentSize=-1" means read until EOF
|
||||
contentSize := int64(-1)
|
||||
if opts.end != nil {
|
||||
contentSize = *opts.end - opts.start + 1
|
||||
}
|
||||
|
||||
var r io.Reader = ctx.Req.Body
|
||||
var hasher hash.Hash
|
||||
if checkMd5 {
|
||||
if opts.checkMd5 {
|
||||
// use io.TeeReader to avoid reading all body to md5 sum.
|
||||
// it writes data to hasher after reading end
|
||||
// if hash is not matched, delete the read-end result
|
||||
@ -41,76 +96,81 @@ func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
|
||||
// save chunk to storage
|
||||
writtenSize, err := st.Save(storagePath, r, contentSize)
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("save chunk to storage error: %v", err)
|
||||
return 0, fmt.Errorf("save chunk to storage error: %v", err)
|
||||
}
|
||||
var checkErr error
|
||||
if checkMd5 {
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
if err := st.Delete(storagePath); err != nil {
|
||||
log.Error("Error deleting chunk: %s, %v", storagePath, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if contentSize != -1 && writtenSize != contentSize {
|
||||
return writtenSize, fmt.Errorf("writtenSize %d does not match contentSize %d", writtenSize, contentSize)
|
||||
}
|
||||
if opts.checkMd5 {
|
||||
// check md5
|
||||
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
|
||||
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
||||
log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
|
||||
log.Debug("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
|
||||
// if md5 not match, delete the chunk
|
||||
if reqMd5String != chunkMd5String {
|
||||
checkErr = errors.New("md5 not match")
|
||||
return writtenSize, errors.New("md5 not match")
|
||||
}
|
||||
}
|
||||
if writtenSize != contentSize {
|
||||
checkErr = errors.Join(checkErr, fmt.Errorf("writtenSize %d not match contentSize %d", writtenSize, contentSize))
|
||||
}
|
||||
if checkErr != nil {
|
||||
if err := st.Delete(storagePath); err != nil {
|
||||
log.Error("Error deleting chunk: %s, %v", storagePath, err)
|
||||
}
|
||||
return -1, checkErr
|
||||
}
|
||||
log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
|
||||
storagePath, contentSize, artifact.ID, start, end)
|
||||
// return chunk total size
|
||||
return length, nil
|
||||
log.Debug("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, size: %d", storagePath, writtenSize, artifact.ID, opts.start, contentSize)
|
||||
return writtenSize, nil
|
||||
}
|
||||
|
||||
func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
|
||||
artifact *actions.ActionArtifact,
|
||||
contentSize, runID int64,
|
||||
) (int64, error) {
|
||||
func saveUploadChunkV3GetTotalSize(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact, runID int64) (totalSize int64, _ error) {
|
||||
// parse content-range header, format: bytes 0-1023/146515
|
||||
contentRange := ctx.Req.Header.Get("Content-Range")
|
||||
start, end, length := int64(0), int64(0), int64(0)
|
||||
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
|
||||
log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
|
||||
return -1, fmt.Errorf("parse content range error: %v", err)
|
||||
var start, end int64
|
||||
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &totalSize); err != nil {
|
||||
return 0, fmt.Errorf("parse content range error: %v", err)
|
||||
}
|
||||
return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true)
|
||||
_, err := saveUploadChunkV3(st, ctx, artifact, runID, saveUploadChunkOptions{start: start, end: &end, checkMd5: true})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return totalSize, nil
|
||||
}
|
||||
|
||||
func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
|
||||
artifact *actions.ActionArtifact,
|
||||
start, contentSize, runID int64,
|
||||
) (int64, error) {
|
||||
end := start + contentSize - 1
|
||||
return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false)
|
||||
// Returns uploaded length
|
||||
func appendUploadChunkV3(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact, runID, start int64) (int64, error) {
|
||||
opts := saveUploadChunkOptions{start: start}
|
||||
if ctx.Req.ContentLength > 0 {
|
||||
end := start + ctx.Req.ContentLength - 1
|
||||
opts.end = &end
|
||||
}
|
||||
return saveUploadChunkV3(st, ctx, artifact, runID, opts)
|
||||
}
|
||||
|
||||
type chunkFileItem struct {
|
||||
RunID int64
|
||||
ArtifactID int64
|
||||
Start int64
|
||||
End int64
|
||||
Path string
|
||||
|
||||
// these offset/size related fields might be missing when parsing, they will be filled in the listing functions
|
||||
Size int64
|
||||
Start int64
|
||||
End int64 // inclusive: Size=10, Start=0, End=9
|
||||
|
||||
ChunkName string // v4 only
|
||||
}
|
||||
|
||||
func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
|
||||
storageDir := fmt.Sprintf("tmp%d", runID)
|
||||
func listV3UnorderedChunksMapByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
|
||||
storageDir := makeTmpPathNameV3(runID)
|
||||
var chunks []*chunkFileItem
|
||||
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
|
||||
baseName := filepath.Base(fpath)
|
||||
// when read chunks from storage, it only contains storage dir and basename,
|
||||
// no matter the subdirectory setting in storage config
|
||||
item := chunkFileItem{Path: storageDir + "/" + baseName}
|
||||
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
|
||||
return fmt.Errorf("parse content range error: %v", err)
|
||||
item, err := parseChunkFileItemV3(st, fpath)
|
||||
if errors.Is(err, errSkipChunkFile) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("unable to parse chunk name: %v", fpath)
|
||||
}
|
||||
chunks = append(chunks, &item)
|
||||
chunks = append(chunks, item)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
@ -123,52 +183,78 @@ func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chun
|
||||
return chunksMap, nil
|
||||
}
|
||||
|
||||
func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
|
||||
storageDir := fmt.Sprintf("tmpv4%d", runID)
|
||||
var chunks []*chunkFileItem
|
||||
chunkMap := map[string]*chunkFileItem{}
|
||||
dummy := &chunkFileItem{}
|
||||
for _, name := range blist.Latest {
|
||||
chunkMap[name] = dummy
|
||||
func listOrderedChunksForArtifact(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
|
||||
emptyListAsError := func(chunks []*chunkFileItem) ([]*chunkFileItem, error) {
|
||||
if len(chunks) == 0 {
|
||||
return nil, fmt.Errorf("no chunk found for artifact id: %d", artifactID)
|
||||
}
|
||||
return chunks, nil
|
||||
}
|
||||
|
||||
storageDir := makeTmpPathNameV4(runID)
|
||||
var chunks []*chunkFileItem
|
||||
var chunkMapV4 map[string]*chunkFileItem
|
||||
|
||||
if blist != nil {
|
||||
// make a dummy map for quick lookup of chunk names, the values are nil now and will be filled after iterating storage objects
|
||||
chunkMapV4 = map[string]*chunkFileItem{}
|
||||
for _, name := range blist.Latest {
|
||||
chunkMapV4[name] = nil
|
||||
}
|
||||
}
|
||||
|
||||
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
|
||||
baseName := filepath.Base(fpath)
|
||||
if !strings.HasPrefix(baseName, "block-") {
|
||||
item, err := parseChunkFileItemV4(st, artifactID, fpath)
|
||||
if errors.Is(err, errSkipChunkFile) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("unable to parse chunk name: %v", fpath)
|
||||
}
|
||||
// when read chunks from storage, it only contains storage dir and basename,
|
||||
// no matter the subdirectory setting in storage config
|
||||
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
|
||||
var size int64
|
||||
var b64chunkName string
|
||||
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
|
||||
return fmt.Errorf("parse content range error: %v", err)
|
||||
}
|
||||
rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse chunkName: %v", err)
|
||||
}
|
||||
chunkName := string(rchunkName)
|
||||
item.End = item.Start + size - 1
|
||||
if _, ok := chunkMap[chunkName]; ok {
|
||||
chunkMap[chunkName] = &item
|
||||
|
||||
// Single chunk upload with block id
|
||||
if _, ok := chunkMapV4[item.ChunkName]; ok {
|
||||
chunkMapV4[item.ChunkName] = item
|
||||
} else if chunkMapV4 == nil {
|
||||
if chunks != nil {
|
||||
return errors.New("blockmap is required for chunks > 1")
|
||||
}
|
||||
chunks = []*chunkFileItem{item}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, name := range blist.Latest {
|
||||
chunk, ok := chunkMap[name]
|
||||
if !ok || chunk.Path == "" {
|
||||
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
|
||||
|
||||
if blist == nil && chunks == nil {
|
||||
chunkUnorderedItemsMapV3, err := listV3UnorderedChunksMapByRunID(st, runID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
chunks = append(chunks, chunk)
|
||||
if i > 0 {
|
||||
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
|
||||
chunk.End += chunk.Start
|
||||
chunks = chunkUnorderedItemsMapV3[artifactID]
|
||||
sort.Slice(chunks, func(i, j int) bool {
|
||||
return chunks[i].Start < chunks[j].Start
|
||||
})
|
||||
return emptyListAsError(chunks)
|
||||
}
|
||||
|
||||
if len(chunks) == 0 && blist != nil {
|
||||
for i, name := range blist.Latest {
|
||||
chunk := chunkMapV4[name]
|
||||
if chunk == nil {
|
||||
return nil, fmt.Errorf("missing chunk (%d/%d): %s", i, len(blist.Latest), name)
|
||||
}
|
||||
chunks = append(chunks, chunk)
|
||||
}
|
||||
}
|
||||
return chunks, nil
|
||||
for i, chunk := range chunks {
|
||||
if i == 0 {
|
||||
chunk.End += chunk.Size - 1
|
||||
} else {
|
||||
chunk.Start = chunkMapV4[blist.Latest[i-1]].End + 1
|
||||
chunk.End = chunk.Start + chunk.Size - 1
|
||||
}
|
||||
}
|
||||
return emptyListAsError(chunks)
|
||||
}
|
||||
|
||||
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
|
||||
@ -181,13 +267,13 @@ func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int
|
||||
return err
|
||||
}
|
||||
// read all uploading chunks from storage
|
||||
chunksMap, err := listChunksByRunID(st, runID)
|
||||
unorderedChunksMap, err := listV3UnorderedChunksMapByRunID(st, runID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// range db artifacts to merge chunks
|
||||
for _, art := range artifacts {
|
||||
chunks, ok := chunksMap[art.ID]
|
||||
chunks, ok := unorderedChunksMap[art.ID]
|
||||
if !ok {
|
||||
log.Debug("artifact %d chunks not found", art.ID)
|
||||
continue
|
||||
@ -239,12 +325,14 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
|
||||
}
|
||||
mergedReader := io.MultiReader(readers...)
|
||||
shaPrefix := "sha256:"
|
||||
var hash hash.Hash
|
||||
var hashSha256 hash.Hash
|
||||
if strings.HasPrefix(checksum, shaPrefix) {
|
||||
hash = sha256.New()
|
||||
hashSha256 = sha256.New()
|
||||
} else if checksum != "" {
|
||||
setting.PanicInDevOrTesting("unsupported checksum format: %s, will skip the checksum verification", checksum)
|
||||
}
|
||||
if hash != nil {
|
||||
mergedReader = io.TeeReader(mergedReader, hash)
|
||||
if hashSha256 != nil {
|
||||
mergedReader = io.TeeReader(mergedReader, hashSha256)
|
||||
}
|
||||
|
||||
// if chunk is gzip, use gz as extension
|
||||
@ -274,8 +362,8 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
|
||||
}
|
||||
}()
|
||||
|
||||
if hash != nil {
|
||||
rawChecksum := hash.Sum(nil)
|
||||
if hashSha256 != nil {
|
||||
rawChecksum := hashSha256.Sum(nil)
|
||||
actualChecksum := hex.EncodeToString(rawChecksum)
|
||||
if !strings.HasSuffix(checksum, actualChecksum) {
|
||||
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
|
||||
|
||||
@ -20,8 +20,8 @@ const (
|
||||
artifactXActionsResultsMD5Header = "x-actions-results-md5"
|
||||
)
|
||||
|
||||
// The rules are from https://github.com/actions/toolkit/blob/main/packages/artifact/src/internal/path-and-artifact-name-validation.ts#L32
|
||||
var invalidArtifactNameChars = strings.Join([]string{"\\", "/", "\"", ":", "<", ">", "|", "*", "?", "\r", "\n"}, "")
|
||||
// The rules are from https://github.com/actions/toolkit/blob/main/packages/artifact/src/internal/upload/path-and-artifact-name-validation.ts
|
||||
const invalidArtifactNameChars = "\\/\":<>|*?\r\n"
|
||||
|
||||
func validateArtifactName(ctx *ArtifactContext, artifactName string) bool {
|
||||
if strings.ContainsAny(artifactName, invalidArtifactNameChars) {
|
||||
@ -84,11 +84,10 @@ func parseArtifactItemPath(ctx *ArtifactContext) (string, string, bool) {
|
||||
|
||||
// getUploadFileSize returns the size of the file to be uploaded.
|
||||
// The raw size is the size of the file as reported by the header X-TFS-FileLength.
|
||||
func getUploadFileSize(ctx *ArtifactContext) (int64, int64) {
|
||||
contentLength := ctx.Req.ContentLength
|
||||
func getUploadFileSize(ctx *ArtifactContext) int64 {
|
||||
xTfsLength, _ := strconv.ParseInt(ctx.Req.Header.Get(artifactXTfsFileLengthHeader), 10, 64)
|
||||
if xTfsLength > 0 {
|
||||
return xTfsLength, contentLength
|
||||
return xTfsLength
|
||||
}
|
||||
return contentLength, contentLength
|
||||
return ctx.Req.ContentLength
|
||||
}
|
||||
|
||||
@ -90,10 +90,12 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -109,7 +111,7 @@ import (
|
||||
"code.gitea.io/gitea/services/context"
|
||||
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
"google.golang.org/protobuf/reflect/protoreflect"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
@ -157,33 +159,81 @@ func ArtifactsV4Routes(prefix string) *web.Router {
|
||||
return m
|
||||
}
|
||||
|
||||
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID, artifactID int64) []byte {
|
||||
func (r *artifactV4Routes) buildSignature(endpoint, expires, artifactName string, taskID, artifactID int64) []byte {
|
||||
mac := hmac.New(sha256.New, setting.GetGeneralTokenSigningSecret())
|
||||
mac.Write([]byte(endp))
|
||||
mac.Write([]byte(endpoint))
|
||||
mac.Write([]byte(expires))
|
||||
mac.Write([]byte(artifactName))
|
||||
fmt.Fprint(mac, taskID)
|
||||
fmt.Fprint(mac, artifactID)
|
||||
_, _ = fmt.Fprint(mac, taskID)
|
||||
_, _ = fmt.Fprint(mac, artifactID)
|
||||
return mac.Sum(nil)
|
||||
}
|
||||
|
||||
func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID, artifactID int64) string {
|
||||
func (r *artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endpoint, artifactName string, taskID, artifactID int64) string {
|
||||
expires := time.Now().Add(60 * time.Minute).Format("2006-01-02 15:04:05.999999999 -0700 MST")
|
||||
uploadURL := strings.TrimSuffix(httplib.GuessCurrentAppURL(ctx), "/") + strings.TrimSuffix(r.prefix, "/") +
|
||||
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID, artifactID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + strconv.FormatInt(taskID, 10) + "&artifactID=" + strconv.FormatInt(artifactID, 10)
|
||||
"/" + endpoint +
|
||||
"?sig=" + base64.RawURLEncoding.EncodeToString(r.buildSignature(endpoint, expires, artifactName, taskID, artifactID)) +
|
||||
"&expires=" + url.QueryEscape(expires) +
|
||||
"&artifactName=" + url.QueryEscape(artifactName) +
|
||||
"&taskID=" + strconv.FormatInt(taskID, 10) +
|
||||
"&artifactID=" + strconv.FormatInt(artifactID, 10)
|
||||
return uploadURL
|
||||
}
|
||||
|
||||
func (r artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) {
|
||||
func makeBlockFilenameV4(runID, artifactID, size int64, blockID string) string {
|
||||
sizeInName := max(size, 0) // do not use "-1" in filename
|
||||
return fmt.Sprintf("block-%d-%d-%d-%s", runID, artifactID, sizeInName, base64.URLEncoding.EncodeToString([]byte(blockID)))
|
||||
}
|
||||
|
||||
var errSkipChunkFile = errors.New("skip this chunk file")
|
||||
|
||||
func parseChunkFileItemV4(st storage.ObjectStorage, artifactID int64, fpath string) (*chunkFileItem, error) {
|
||||
baseName := path.Base(fpath)
|
||||
if !strings.HasPrefix(baseName, "block-") {
|
||||
return nil, errSkipChunkFile
|
||||
}
|
||||
var item chunkFileItem
|
||||
var unusedRunID int64
|
||||
var b64chunkName string
|
||||
_, err := fmt.Sscanf(baseName, "block-%d-%d-%d-%s", &unusedRunID, &item.ArtifactID, &item.Size, &b64chunkName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if item.ArtifactID != artifactID {
|
||||
return nil, errSkipChunkFile
|
||||
}
|
||||
chunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item.ChunkName = string(chunkName)
|
||||
item.Path = fpath
|
||||
if item.Size <= 0 {
|
||||
fi, err := st.Stat(item.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item.Size = fi.Size()
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) {
|
||||
rawTaskID := ctx.Req.URL.Query().Get("taskID")
|
||||
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
|
||||
sig := ctx.Req.URL.Query().Get("sig")
|
||||
expires := ctx.Req.URL.Query().Get("expires")
|
||||
artifactName := ctx.Req.URL.Query().Get("artifactName")
|
||||
dsig, _ := base64.URLEncoding.DecodeString(sig)
|
||||
taskID, _ := strconv.ParseInt(rawTaskID, 10, 64)
|
||||
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
|
||||
|
||||
dsig, errSig := base64.RawURLEncoding.DecodeString(sig)
|
||||
taskID, errTask := strconv.ParseInt(rawTaskID, 10, 64)
|
||||
artifactID, errArtifactID := strconv.ParseInt(rawArtifactID, 10, 64)
|
||||
err := errors.Join(errSig, errTask, errArtifactID)
|
||||
if err != nil {
|
||||
log.Error("Error decoding signature values: %v", err)
|
||||
ctx.HTTPError(http.StatusBadRequest, "Error decoding signature values")
|
||||
return nil, "", false
|
||||
}
|
||||
expecedsig := r.buildSignature(endp, expires, artifactName, taskID, artifactID)
|
||||
if !hmac.Equal(dsig, expecedsig) {
|
||||
log.Error("Error unauthorized")
|
||||
@ -226,7 +276,7 @@ func (r *artifactV4Routes) getArtifactByName(ctx *ArtifactContext, runID int64,
|
||||
return &art, nil
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) parseProtbufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) bool {
|
||||
func (r *artifactV4Routes) parseProtobufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) bool {
|
||||
body, err := io.ReadAll(ctx.Req.Body)
|
||||
if err != nil {
|
||||
log.Error("Error decode request body: %v", err)
|
||||
@ -242,7 +292,7 @@ func (r *artifactV4Routes) parseProtbufBody(ctx *ArtifactContext, req protorefle
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) sendProtbufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) {
|
||||
func (r *artifactV4Routes) sendProtobufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) {
|
||||
resp, err := protojson.Marshal(req)
|
||||
if err != nil {
|
||||
log.Error("Error encode response body: %v", err)
|
||||
@ -257,7 +307,7 @@ func (r *artifactV4Routes) sendProtbufBody(ctx *ArtifactContext, req protoreflec
|
||||
func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
|
||||
var req CreateArtifactRequest
|
||||
|
||||
if ok := r.parseProtbufBody(ctx, &req); !ok {
|
||||
if ok := r.parseProtobufBody(ctx, &req); !ok {
|
||||
return
|
||||
}
|
||||
_, _, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
|
||||
@ -291,7 +341,7 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
|
||||
Ok: true,
|
||||
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID),
|
||||
}
|
||||
r.sendProtbufBody(ctx, &respData)
|
||||
r.sendProtobufBody(ctx, &respData)
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
|
||||
@ -303,34 +353,34 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
|
||||
comp := ctx.Req.URL.Query().Get("comp")
|
||||
switch comp {
|
||||
case "block", "appendBlock":
|
||||
blockid := ctx.Req.URL.Query().Get("blockid")
|
||||
if blockid == "" {
|
||||
// get artifact by name
|
||||
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
|
||||
// get artifact by name
|
||||
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
|
||||
if err != nil {
|
||||
log.Error("Error artifact not found: %v", err)
|
||||
ctx.HTTPError(http.StatusNotFound, "Error artifact not found")
|
||||
return
|
||||
}
|
||||
blockID := ctx.Req.URL.Query().Get("blockid")
|
||||
if blockID == "" {
|
||||
uploadedLength, err := appendUploadChunkV3(r.fs, ctx, artifact, artifact.RunID, artifact.FileSize)
|
||||
if err != nil {
|
||||
log.Error("Error artifact not found: %v", err)
|
||||
ctx.HTTPError(http.StatusNotFound, "Error artifact not found")
|
||||
log.Error("Error appending chunk %v", err)
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error appending Chunk")
|
||||
return
|
||||
}
|
||||
|
||||
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
|
||||
if err != nil {
|
||||
log.Error("Error runner api getting task: task is not running")
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error runner api getting task: task is not running")
|
||||
return
|
||||
}
|
||||
artifact.FileCompressedSize += ctx.Req.ContentLength
|
||||
artifact.FileSize += ctx.Req.ContentLength
|
||||
artifact.FileCompressedSize += uploadedLength
|
||||
artifact.FileSize += uploadedLength
|
||||
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
|
||||
log.Error("Error UpdateArtifactByID: %v", err)
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error UpdateArtifactByID")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/block-%d-%d-%s", task.Job.RunID, task.Job.RunID, ctx.Req.ContentLength, base64.URLEncoding.EncodeToString([]byte(blockid))), ctx.Req.Body, -1)
|
||||
blockFilename := makeBlockFilenameV4(task.Job.RunID, artifact.ID, ctx.Req.ContentLength, blockID)
|
||||
_, err := r.fs.Save(fmt.Sprintf("%s/%s", makeTmpPathNameV4(task.Job.RunID), blockFilename), ctx.Req.Body, ctx.Req.ContentLength)
|
||||
if err != nil {
|
||||
log.Error("Error runner api getting task: task is not running")
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error runner api getting task: task is not running")
|
||||
log.Error("Error uploading block blob %v", err)
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error uploading block blob")
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -338,10 +388,10 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
|
||||
case "blocklist":
|
||||
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
|
||||
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
|
||||
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/%d-%d-blocklist", task.Job.RunID, task.Job.RunID, artifactID), ctx.Req.Body, -1)
|
||||
_, err := r.fs.Save(fmt.Sprintf("%s/%d-%d-blocklist", makeTmpPathNameV4(task.Job.RunID), task.Job.RunID, artifactID), ctx.Req.Body, -1)
|
||||
if err != nil {
|
||||
log.Error("Error runner api getting task: task is not running")
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error runner api getting task: task is not running")
|
||||
log.Error("Error uploading blocklist %v", err)
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error uploading blocklist")
|
||||
return
|
||||
}
|
||||
ctx.JSON(http.StatusCreated, "created")
|
||||
@ -357,7 +407,7 @@ type Latest struct {
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, error) {
|
||||
blockListName := fmt.Sprintf("tmpv4%d/%d-%d-blocklist", runID, runID, artifactID)
|
||||
blockListName := fmt.Sprintf("%s/%d-%d-blocklist", makeTmpPathNameV4(runID), runID, artifactID)
|
||||
s, err := r.fs.Open(blockListName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -367,17 +417,22 @@ func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, e
|
||||
blockList := &BlockList{}
|
||||
err = xdec.Decode(blockList)
|
||||
|
||||
_ = s.Close()
|
||||
|
||||
delerr := r.fs.Delete(blockListName)
|
||||
if delerr != nil {
|
||||
log.Warn("Failed to delete blockList %s: %v", blockListName, delerr)
|
||||
}
|
||||
return blockList, err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return blockList, nil
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
|
||||
var req FinalizeArtifactRequest
|
||||
|
||||
if ok := r.parseProtbufBody(ctx, &req); !ok {
|
||||
if ok := r.parseProtobufBody(ctx, &req); !ok {
|
||||
return
|
||||
}
|
||||
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
|
||||
@ -394,30 +449,20 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
|
||||
}
|
||||
|
||||
var chunks []*chunkFileItem
|
||||
blockList, err := r.readBlockList(runID, artifact.ID)
|
||||
blockList, blockListErr := r.readBlockList(runID, artifact.ID)
|
||||
chunks, err = listOrderedChunksForArtifact(r.fs, runID, artifact.ID, blockList)
|
||||
if err != nil {
|
||||
log.Warn("Failed to read BlockList, fallback to old behavior: %v", err)
|
||||
chunkMap, err := listChunksByRunID(r.fs, runID)
|
||||
if err != nil {
|
||||
log.Error("Error merge chunks: %v", err)
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks")
|
||||
return
|
||||
}
|
||||
chunks, ok = chunkMap[artifact.ID]
|
||||
if !ok {
|
||||
log.Error("Error merge chunks")
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
chunks, err = listChunksByRunIDV4(r.fs, runID, artifact.ID, blockList)
|
||||
if err != nil {
|
||||
log.Error("Error merge chunks: %v", err)
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks")
|
||||
return
|
||||
}
|
||||
artifact.FileSize = chunks[len(chunks)-1].End + 1
|
||||
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
|
||||
log.Error("Error list chunks: %v", errors.Join(blockListErr, err))
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error list chunks")
|
||||
return
|
||||
}
|
||||
artifact.FileSize = chunks[len(chunks)-1].End + 1
|
||||
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
|
||||
|
||||
if req.Size != artifact.FileSize {
|
||||
log.Error("Error merge chunks size mismatch")
|
||||
ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks size mismatch")
|
||||
return
|
||||
}
|
||||
|
||||
checksum := ""
|
||||
@ -434,13 +479,13 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
|
||||
Ok: true,
|
||||
ArtifactId: artifact.ID,
|
||||
}
|
||||
r.sendProtbufBody(ctx, &respData)
|
||||
r.sendProtobufBody(ctx, &respData)
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) listArtifacts(ctx *ArtifactContext) {
|
||||
var req ListArtifactsRequest
|
||||
|
||||
if ok := r.parseProtbufBody(ctx, &req); !ok {
|
||||
if ok := r.parseProtobufBody(ctx, &req); !ok {
|
||||
return
|
||||
}
|
||||
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
|
||||
@ -485,13 +530,13 @@ func (r *artifactV4Routes) listArtifacts(ctx *ArtifactContext) {
|
||||
respData := ListArtifactsResponse{
|
||||
Artifacts: list,
|
||||
}
|
||||
r.sendProtbufBody(ctx, &respData)
|
||||
r.sendProtobufBody(ctx, &respData)
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
|
||||
var req GetSignedArtifactURLRequest
|
||||
|
||||
if ok := r.parseProtbufBody(ctx, &req); !ok {
|
||||
if ok := r.parseProtobufBody(ctx, &req); !ok {
|
||||
return
|
||||
}
|
||||
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
|
||||
@ -525,7 +570,7 @@ func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
|
||||
if respData.SignedUrl == "" {
|
||||
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID)
|
||||
}
|
||||
r.sendProtbufBody(ctx, &respData)
|
||||
r.sendProtobufBody(ctx, &respData)
|
||||
}
|
||||
|
||||
func (r *artifactV4Routes) downloadArtifact(ctx *ArtifactContext) {
|
||||
@ -555,7 +600,7 @@ func (r *artifactV4Routes) downloadArtifact(ctx *ArtifactContext) {
|
||||
func (r *artifactV4Routes) deleteArtifact(ctx *ArtifactContext) {
|
||||
var req DeleteArtifactRequest
|
||||
|
||||
if ok := r.parseProtbufBody(ctx, &req); !ok {
|
||||
if ok := r.parseProtobufBody(ctx, &req); !ok {
|
||||
return
|
||||
}
|
||||
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
|
||||
@ -582,5 +627,5 @@ func (r *artifactV4Routes) deleteArtifact(ctx *ArtifactContext) {
|
||||
Ok: true,
|
||||
ArtifactId: artifact.ID,
|
||||
}
|
||||
r.sendProtbufBody(ctx, &respData)
|
||||
r.sendProtobufBody(ctx, &respData)
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ package integration
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
@ -22,6 +23,7 @@ import (
|
||||
"code.gitea.io/gitea/modules/json"
|
||||
"code.gitea.io/gitea/modules/storage"
|
||||
api "code.gitea.io/gitea/modules/structs"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
"code.gitea.io/gitea/routers/api/actions"
|
||||
actions_service "code.gitea.io/gitea/services/actions"
|
||||
|
||||
@ -45,45 +47,135 @@ func TestActionsArtifactV4UploadSingleFile(t *testing.T) {
|
||||
token, err := actions_service.CreateAuthorizationToken(48, 792, 193)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// acquire artifact upload url
|
||||
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact", toProtoJSON(&actions.CreateArtifactRequest{
|
||||
Version: 4,
|
||||
Name: "artifact",
|
||||
WorkflowRunBackendId: "792",
|
||||
WorkflowJobRunBackendId: "193",
|
||||
})).AddTokenAuth(token)
|
||||
resp := MakeRequest(t, req, http.StatusOK)
|
||||
var uploadResp actions.CreateArtifactResponse
|
||||
protojson.Unmarshal(resp.Body.Bytes(), &uploadResp)
|
||||
assert.True(t, uploadResp.Ok)
|
||||
assert.Contains(t, uploadResp.SignedUploadUrl, "/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact")
|
||||
table := []struct {
|
||||
name string
|
||||
version int32
|
||||
blockID bool
|
||||
noLength bool
|
||||
append int
|
||||
}{
|
||||
{
|
||||
name: "artifact",
|
||||
version: 4,
|
||||
},
|
||||
{
|
||||
name: "artifact2",
|
||||
version: 4,
|
||||
blockID: true,
|
||||
},
|
||||
{
|
||||
name: "artifact3",
|
||||
version: 4,
|
||||
noLength: true,
|
||||
},
|
||||
{
|
||||
name: "artifact4",
|
||||
version: 4,
|
||||
blockID: true,
|
||||
noLength: true,
|
||||
},
|
||||
{
|
||||
name: "artifact5",
|
||||
version: 7,
|
||||
blockID: true,
|
||||
},
|
||||
{
|
||||
name: "artifact6",
|
||||
version: 7,
|
||||
append: 2,
|
||||
noLength: true,
|
||||
},
|
||||
{
|
||||
name: "artifact7",
|
||||
version: 7,
|
||||
append: 3,
|
||||
blockID: true,
|
||||
noLength: true,
|
||||
},
|
||||
{
|
||||
name: "artifact8",
|
||||
version: 7,
|
||||
append: 4,
|
||||
blockID: true,
|
||||
},
|
||||
}
|
||||
|
||||
// get upload url
|
||||
idx := strings.Index(uploadResp.SignedUploadUrl, "/twirp/")
|
||||
url := uploadResp.SignedUploadUrl[idx:] + "&comp=block"
|
||||
for _, entry := range table {
|
||||
t.Run(entry.name, func(t *testing.T) {
|
||||
// acquire artifact upload url
|
||||
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact", toProtoJSON(&actions.CreateArtifactRequest{
|
||||
Version: entry.version,
|
||||
Name: entry.name,
|
||||
WorkflowRunBackendId: "792",
|
||||
WorkflowJobRunBackendId: "193",
|
||||
})).AddTokenAuth(token)
|
||||
resp := MakeRequest(t, req, http.StatusOK)
|
||||
var uploadResp actions.CreateArtifactResponse
|
||||
protojson.Unmarshal(resp.Body.Bytes(), &uploadResp)
|
||||
assert.True(t, uploadResp.Ok)
|
||||
assert.Contains(t, uploadResp.SignedUploadUrl, "/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact")
|
||||
|
||||
// upload artifact chunk
|
||||
body := strings.Repeat("A", 1024)
|
||||
req = NewRequestWithBody(t, "PUT", url, strings.NewReader(body))
|
||||
MakeRequest(t, req, http.StatusCreated)
|
||||
h := sha256.New()
|
||||
|
||||
t.Logf("Create artifact confirm")
|
||||
blocks := make([]string, 0, util.Iif(entry.blockID, entry.append+1, 0))
|
||||
|
||||
sha := sha256.Sum256([]byte(body))
|
||||
// get upload url
|
||||
idx := strings.Index(uploadResp.SignedUploadUrl, "/twirp/")
|
||||
for i := range entry.append + 1 {
|
||||
url := uploadResp.SignedUploadUrl[idx:]
|
||||
// See https://learn.microsoft.com/en-us/rest/api/storageservices/append-block
|
||||
// See https://learn.microsoft.com/en-us/rest/api/storageservices/put-block
|
||||
if entry.blockID {
|
||||
blockID := base64.RawURLEncoding.EncodeToString(fmt.Append([]byte("SOME_BIG_BLOCK_ID_"), i))
|
||||
blocks = append(blocks, blockID)
|
||||
url += "&comp=block&blockid=" + blockID
|
||||
} else {
|
||||
url += "&comp=appendBlock"
|
||||
}
|
||||
|
||||
// confirm artifact upload
|
||||
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact", toProtoJSON(&actions.FinalizeArtifactRequest{
|
||||
Name: "artifact",
|
||||
Size: 1024,
|
||||
Hash: wrapperspb.String("sha256:" + hex.EncodeToString(sha[:])),
|
||||
WorkflowRunBackendId: "792",
|
||||
WorkflowJobRunBackendId: "193",
|
||||
})).
|
||||
AddTokenAuth(token)
|
||||
resp = MakeRequest(t, req, http.StatusOK)
|
||||
var finalizeResp actions.FinalizeArtifactResponse
|
||||
protojson.Unmarshal(resp.Body.Bytes(), &finalizeResp)
|
||||
assert.True(t, finalizeResp.Ok)
|
||||
// upload artifact chunk
|
||||
body := strings.Repeat("A", 1024)
|
||||
_, _ = h.Write([]byte(body))
|
||||
var bodyReader io.Reader = strings.NewReader(body)
|
||||
if entry.noLength {
|
||||
bodyReader = io.MultiReader(bodyReader)
|
||||
}
|
||||
req = NewRequestWithBody(t, "PUT", url, bodyReader)
|
||||
MakeRequest(t, req, http.StatusCreated)
|
||||
}
|
||||
|
||||
if entry.blockID && entry.append > 0 {
|
||||
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list
|
||||
blockListURL := uploadResp.SignedUploadUrl[idx:] + "&comp=blocklist"
|
||||
// upload artifact blockList
|
||||
blockList := &actions.BlockList{
|
||||
Latest: blocks,
|
||||
}
|
||||
rawBlockList, err := xml.Marshal(blockList)
|
||||
assert.NoError(t, err)
|
||||
req = NewRequestWithBody(t, "PUT", blockListURL, bytes.NewReader(rawBlockList))
|
||||
MakeRequest(t, req, http.StatusCreated)
|
||||
}
|
||||
|
||||
sha := h.Sum(nil)
|
||||
|
||||
t.Logf("Create artifact confirm")
|
||||
|
||||
// confirm artifact upload
|
||||
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact", toProtoJSON(&actions.FinalizeArtifactRequest{
|
||||
Name: entry.name,
|
||||
Size: int64(entry.append+1) * 1024,
|
||||
Hash: wrapperspb.String("sha256:" + hex.EncodeToString(sha)),
|
||||
WorkflowRunBackendId: "792",
|
||||
WorkflowJobRunBackendId: "193",
|
||||
})).
|
||||
AddTokenAuth(token)
|
||||
resp = MakeRequest(t, req, http.StatusOK)
|
||||
var finalizeResp actions.FinalizeArtifactResponse
|
||||
protojson.Unmarshal(resp.Body.Bytes(), &finalizeResp)
|
||||
assert.True(t, finalizeResp.Ok)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionsArtifactV4UploadSingleFileWrongChecksum(t *testing.T) {
|
||||
@ -312,7 +404,7 @@ func TestActionsArtifactV4DownloadSingle(t *testing.T) {
|
||||
token, err := actions_service.CreateAuthorizationToken(48, 792, 193)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// acquire artifact upload url
|
||||
// list artifacts by name
|
||||
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts", toProtoJSON(&actions.ListArtifactsRequest{
|
||||
NameFilter: wrapperspb.String("artifact-v4-download"),
|
||||
WorkflowRunBackendId: "792",
|
||||
@ -323,7 +415,7 @@ func TestActionsArtifactV4DownloadSingle(t *testing.T) {
|
||||
protojson.Unmarshal(resp.Body.Bytes(), &listResp)
|
||||
assert.Len(t, listResp.Artifacts, 1)
|
||||
|
||||
// confirm artifact upload
|
||||
// acquire artifact download url
|
||||
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/GetSignedArtifactURL", toProtoJSON(&actions.GetSignedArtifactURLRequest{
|
||||
Name: "artifact-v4-download",
|
||||
WorkflowRunBackendId: "792",
|
||||
|
||||
@ -347,6 +347,10 @@ func MakeRequest(t testing.TB, rw *RequestWrapper, expectedStatus int) *httptest
|
||||
if req.RemoteAddr == "" {
|
||||
req.RemoteAddr = "test-mock:12345"
|
||||
}
|
||||
// Ensure unknown contentLength is seen as -1
|
||||
if req.Body != nil && req.ContentLength == 0 {
|
||||
req.ContentLength = -1
|
||||
}
|
||||
testWebRoutes.ServeHTTP(recorder, req)
|
||||
if expectedStatus != NoExpectedStatus {
|
||||
if expectedStatus != recorder.Code {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user