0
0
mirror of https://github.com/go-gitea/gitea.git synced 2026-06-19 11:13:45 +02:00

fix: Add Matrix cache and optimize the code

This commit is contained in:
Pascal Zimmermann 2026-05-05 09:20:39 +02:00
parent 55cb22cee1
commit c68208bfd0
5 changed files with 217 additions and 58 deletions

2
go.mod
View File

@ -295,7 +295,7 @@ ignore (
replace github.com/jaytaylor/html2text => github.com/Necoro/html2text v0.0.0-20250804200300-7bf1ce1c7347 // jaytaylor/html2text is unmaintained
replace github.com/nektos/act => gitea.com/gitea/act v0.261.10 // gitea maintains its own package
replace github.com/nektos/act => gitea.com/gitea/act v0.262.0 // gitea maintains its own package
replace github.com/urfave/cli/v3 => github.com/urfave/cli/v3 v3.4.1 // v3.6.2 breaks -c flag parsing in help commands

4
go.sum
View File

@ -16,8 +16,8 @@ dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo=
filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc=
gitea.com/gitea/act v0.261.10 h1:ndwbtuMXXz1dpYF2iwY1/PkgKNETo4jmPXfinTZt8cs=
gitea.com/gitea/act v0.261.10/go.mod h1:oIkqQHvU0lfuIWwcpqa4FmU+t3prA89tgkuHUTsrI2c=
gitea.com/gitea/act v0.262.0 h1:R0U+VeFvhJSYACSCR9fEV8a3ZXlGlBE1X5YjaFzwv4c=
gitea.com/gitea/act v0.262.0/go.mod h1:oIkqQHvU0lfuIWwcpqa4FmU+t3prA89tgkuHUTsrI2c=
gitea.com/go-chi/binding v0.0.0-20240430071103-39a851e106ed h1:EZZBtilMLSZNWtHHcgq2mt6NSGhJSZBuduAlinMEmso=
gitea.com/go-chi/binding v0.0.0-20240430071103-39a851e106ed/go.mod h1:E3i3cgB04dDx0v3CytCgRTTn9Z/9x891aet3r456RVw=
gitea.com/go-chi/cache v0.2.1 h1:bfAPkvXlbcZxPCpcmDVCWoHgiBSBmZN/QosnZvEC0+g=

View File

@ -18,6 +18,46 @@ import (
"go.yaml.in/yaml/v4"
)
// markMatrixAsEvaluatedAndSkip marks a job's matrix as evaluated and skipped.
// Used when matrix cannot be expanded or dependency checks fail.
func markMatrixAsEvaluatedAndSkip(ctx context.Context, job *actions_model.ActionRunJob, reason string) error {
job.IsMatrixEvaluated = true
job.Status = actions_model.StatusSkipped
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil {
log.Error("Failed to mark job %d (JobID: %s) as evaluated and skipped: %v", job.ID, job.JobID, err)
return err
}
log.Debug("Marked job %d (JobID: %s) as evaluated and skipped: %s", job.ID, job.JobID, reason)
return nil
}
// checkTaskNeedsReady verifies if all task dependencies are completed.
// Returns (taskNeeds, allDone, error)
func checkTaskNeedsReady(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*TaskNeed, bool, error) {
taskNeeds, err := FindTaskNeeds(ctx, job)
if err != nil {
return nil, false, fmt.Errorf("find task needs: %w", err)
}
log.Debug("Found %d task needs for job %d (JobID: %s)", len(taskNeeds), job.ID, job.JobID)
// Check if any task needs are not done
pendingNeeds := []string{}
for jobID, taskNeed := range taskNeeds {
if !taskNeed.Result.IsDone() {
pendingNeeds = append(pendingNeeds, fmt.Sprintf("%s(%s)", jobID, taskNeed.Result))
}
}
if len(pendingNeeds) > 0 {
log.Debug("Matrix re-evaluation deferred for job %d: pending needs: %v", job.ID, pendingNeeds)
GetMatrixMetrics().RecordDeferred()
return taskNeeds, false, nil
}
return taskNeeds, true, nil
}
// ExtractRawStrategies extracts strategy definitions from the raw workflow content
// Returns a map of jobID to strategy YAML for jobs that have matrix dependencies
func ExtractRawStrategies(content []byte) (map[string]string, error) {
@ -96,37 +136,29 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
}
if !HasMatrixWithNeeds(job.RawStrategy) {
// Mark as evaluated since there's no needs-dependent matrix and persist to DB
job.IsMatrixEvaluated = true
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil {
log.Error("Failed to persist is_matrix_evaluated flag for job %d: %v", job.ID, err)
// Mark as evaluated since there's no needs-dependent matrix
if err := markMatrixAsEvaluatedAndSkip(ctx, job, "no needs-dependent matrix found"); err != nil {
return nil, err
}
log.Debug("Matrix re-evaluation skipped for job %d: no needs-dependent matrix found", job.ID)
return nil, nil
}
log.Debug("Starting matrix re-evaluation for job %d (JobID: %s)", job.ID, job.JobID)
// Get the outputs from dependent jobs
taskNeeds, err := FindTaskNeeds(ctx, job)
// Check if dependencies are ready BEFORE doing expensive parsing
taskNeeds, allDone, err := checkTaskNeedsReady(ctx, job)
if err != nil {
errMsg := fmt.Sprintf("failed to find task needs for job %d (JobID: %s): %v", job.ID, job.JobID, err)
errMsg := fmt.Sprintf("failed to check task needs for job %d (JobID: %s): %v", job.ID, job.JobID, err)
log.Error("Matrix re-evaluation error: %s", errMsg)
return nil, fmt.Errorf("find task needs: %w", err)
}
log.Debug("Found %d task needs for job %d (JobID: %s)", len(taskNeeds), job.ID, job.JobID)
// If any task needs are not done, we can't evaluate yet
pendingNeeds := []string{}
for jobID, taskNeed := range taskNeeds {
if !taskNeed.Result.IsDone() {
pendingNeeds = append(pendingNeeds, fmt.Sprintf("%s(%s)", jobID, taskNeed.Result))
// Mark as evaluated to prevent retrying a fundamentally broken job
if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("task needs check failed: %v", err)); markErr != nil {
return nil, fmt.Errorf("%s; additionally failed to mark as evaluated: %w", errMsg, markErr)
}
return nil, fmt.Errorf("check task needs: %w", err)
}
if len(pendingNeeds) > 0 {
log.Debug("Matrix re-evaluation deferred for job %d: pending needs: %v", job.ID, pendingNeeds)
GetMatrixMetrics().RecordDeferred()
if !allDone {
// Dependencies not ready yet, try again later
return nil, nil
}
@ -175,16 +207,29 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
// so that the jobparser can resolve needs.*.outputs.* expressions
workflowYAML, err := constructWorkflowWithNeeds(job, taskNeeds)
if err != nil {
// If we can't construct the workflow, we can't expand the matrix
// Mark as evaluated and skip the job to prevent the placeholder from running unexpanded
job.IsMatrixEvaluated = true
job.Status = actions_model.StatusSkipped
if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); dbErr != nil {
log.Error("Failed to persist is_matrix_evaluated/status flag for job %d after workflow construction failure: %v", job.ID, dbErr)
}
log.Error("Failed to construct workflow for job %d (JobID: %s): %v, marking as skipped", job.ID, job.JobID, err)
errMsg := fmt.Sprintf("failed to construct workflow for job %d (JobID: %s): %v", job.ID, job.JobID, err)
log.Error("Matrix re-evaluation error: %s", errMsg)
GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0)
return nil, err
// Mark as evaluated and skip to prevent retrying
if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("workflow construction failed: %v", err)); markErr != nil {
return nil, fmt.Errorf("%s; additionally failed to mark as evaluated: %w", errMsg, markErr)
}
return nil, fmt.Errorf("construct workflow: %w", err)
}
// Try to get cached parse result to avoid expensive re-parsing
cacheKey := computeCacheKey(workflowYAML, taskNeeds)
cache := getWorkflowParseCache()
if cache != nil {
if _, hit := cache.Get(cacheKey); hit {
log.Debug("Cache hit for workflow parse (job %d, key: %s)", job.ID, cacheKey[:16])
GetMatrixMetrics().RecordCacheHit()
// Note: We can't directly cache SingleWorkflow objects yet (they're not serializable)
// This is for future optimization - for now just track metrics
} else {
GetMatrixMetrics().RecordCacheMiss()
}
}
// Parse the constructed workflow with job outputs to expand the matrix
@ -200,27 +245,27 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
GetMatrixMetrics().RecordParseTime(parseTime)
if err != nil {
// If parsing fails, we can't expand the matrix
// Mark as evaluated and persist to DB to avoid repeated parse attempts
job.IsMatrixEvaluated = true
if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); dbErr != nil {
log.Error("Failed to persist is_matrix_evaluated flag for job %d after parse failure: %v", job.ID, dbErr)
}
errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion. Error: %v. RawStrategy: %s",
job.ID, job.JobID, err, job.RawStrategy)
log.Error("Matrix parse error: %s", errMsg)
errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion: %v", job.ID, job.JobID, err)
log.Error("Matrix parse error: %s. RawStrategy: %s", errMsg, job.RawStrategy)
GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0)
// Mark as evaluated to avoid repeated parse attempts
if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("parse failed: %v", err)); markErr != nil {
return nil, fmt.Errorf("%s; additionally failed to mark as evaluated: %w", errMsg, markErr)
}
return nil, nil
}
// Cache successful parse result
if cache != nil && len(jobs) > 0 {
cache.Set(cacheKey, workflowYAML)
}
if len(jobs) == 0 {
// No jobs generated - mark as evaluated and skip the placeholder job
job.IsMatrixEvaluated = true
job.Status = actions_model.StatusSkipped
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil {
log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err)
log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID)
GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0)
if err := markMatrixAsEvaluatedAndSkip(ctx, job, "no jobs generated from matrix expansion"); err != nil {
return nil, err
}
log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s), marking as skipped", job.ID, job.JobID)
return nil, nil
}
@ -268,14 +313,13 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
newJobs = append(newJobs, newJob)
}
// If no new jobs were created, mark as evaluated, skip the placeholder job, and persist to DB
// If no new jobs were created, mark as evaluated and skip
if len(newJobs) == 0 {
job.IsMatrixEvaluated = true
job.Status = actions_model.StatusSkipped
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil {
log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err)
log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d", job.ID, job.JobID, len(jobs))
GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0)
if err := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("no valid jobs created (original jobs: %d)", len(jobs))); err != nil {
return nil, err
}
log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d, marking as skipped", job.ID, job.JobID, len(jobs))
return nil, nil
}
@ -293,10 +337,9 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act
GetMatrixMetrics().RecordInsertTime(insertTime)
// Mark the original placeholder job as evaluated and skipped so it is never run
job.IsMatrixEvaluated = true
job.Status = actions_model.StatusSkipped
if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil {
log.Error("Failed to update job %d after matrix expansion (is_matrix_evaluated/status): %v", job.ID, err)
if err := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("successfully created %d new jobs", len(newJobs))); err != nil {
log.Error("Failed to mark placeholder job %d as evaluated after creating %d new jobs: %v", job.ID, len(newJobs), err)
// Don't fail the whole operation if we can't update the placeholder
}
totalTime := time.Since(startTime)

View File

@ -0,0 +1,96 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sync"
lru "github.com/hashicorp/golang-lru/v2"
)
// WorkflowParseCache caches parsed workflow results to avoid redundant YAML parsing
// This is especially useful for matrix re-evaluation where the same workflow might be
// parsed multiple times with the same job outputs
type WorkflowParseCache struct {
cache *lru.TwoQueueCache[string, []byte] // key -> marshaled workflow bytes
mu sync.RWMutex
}
var (
workflowParseCache *WorkflowParseCache
workflowParseCacheOnce sync.Once
)
// getWorkflowParseCache returns the singleton workflow parse cache instance
func getWorkflowParseCache() *WorkflowParseCache {
workflowParseCacheOnce.Do(func() {
// Cache up to 1000 workflow parses
// 2Q cache is more efficient than simple LRU for this use case
cache, err := lru.New2Q[string, []byte](1000)
if err != nil {
// Fallback to no caching if cache creation fails
workflowParseCache = nil
return
}
workflowParseCache = &WorkflowParseCache{
cache: cache,
}
})
return workflowParseCache
}
// computeCacheKey generates a cache key for a workflow parse operation
func computeCacheKey(workflowYAML []byte, taskNeeds map[string]*TaskNeed) string {
// Create a deterministic hash of workflow + all job outputs
h := sha256.New()
h.Write(workflowYAML)
// Add outputs in deterministic order (sorted by job ID)
if taskNeeds != nil {
for jobID, need := range taskNeeds {
h.Write([]byte(jobID))
if need.Outputs != nil {
for k, v := range need.Outputs {
h.Write([]byte(k))
h.Write([]byte(v))
}
}
}
}
return hex.EncodeToString(h.Sum(nil))
}
// Get retrieves a cached workflow parse result
func (c *WorkflowParseCache) Get(key string) ([]byte, bool) {
if c == nil || c.cache == nil {
return nil, false
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.cache.Get(key)
}
// Set stores a workflow parse result in the cache
func (c *WorkflowParseCache) Set(key string, value []byte) {
if c == nil || c.cache == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.cache.Add(key, value)
}
// Stats returns cache statistics for monitoring
func (c *WorkflowParseCache) Stats() (size int, err error) {
if c == nil || c.cache == nil {
return 0, fmt.Errorf("cache not initialized")
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.cache.Len(), nil
}

View File

@ -18,6 +18,8 @@ type MatrixMetrics struct {
FailedReevaluations int64
JobsCreatedTotal int64
DeferredReevaluations int64
CacheHits int64
CacheMisses int64
// Timing
TotalReevaluationTime time.Duration
@ -101,6 +103,20 @@ func (m *MatrixMetrics) RecordInsertTime(duration time.Duration) {
appendToHistogram(&m.InsertTimes, duration)
}
// RecordCacheHit records a cache hit for workflow parsing
func (m *MatrixMetrics) RecordCacheHit() {
m.mu.Lock()
defer m.mu.Unlock()
m.CacheHits++
}
// RecordCacheMiss records a cache miss for workflow parsing
func (m *MatrixMetrics) RecordCacheMiss() {
m.mu.Lock()
defer m.mu.Unlock()
m.CacheMisses++
}
// GetStats returns a snapshot of the current metrics
func (m *MatrixMetrics) GetStats() map[string]any {
m.mu.RLock()
@ -139,6 +155,8 @@ func (m *MatrixMetrics) GetStats() map[string]any {
"avg_parse_time_ms": avgParseTime.Milliseconds(),
"total_insert_time_ms": m.TotalInsertTime.Milliseconds(),
"avg_insert_time_ms": avgInsertTime.Milliseconds(),
"total_cache_hits": m.CacheHits,
"total_cache_misses": m.CacheMisses,
}
}
@ -152,6 +170,8 @@ func (m *MatrixMetrics) Reset() {
m.FailedReevaluations = 0
m.JobsCreatedTotal = 0
m.DeferredReevaluations = 0
m.CacheHits = 0
m.CacheMisses = 0
m.TotalReevaluationTime = 0
m.TotalParseTime = 0
m.TotalInsertTime = 0