diff --git a/go.mod b/go.mod index 72d85dcb02..d1d0ac44dd 100644 --- a/go.mod +++ b/go.mod @@ -295,7 +295,7 @@ ignore ( replace github.com/jaytaylor/html2text => github.com/Necoro/html2text v0.0.0-20250804200300-7bf1ce1c7347 // jaytaylor/html2text is unmaintained -replace github.com/nektos/act => gitea.com/gitea/act v0.261.10 // gitea maintains its own package +replace github.com/nektos/act => gitea.com/gitea/act v0.262.0 // gitea maintains its own package replace github.com/urfave/cli/v3 => github.com/urfave/cli/v3 v3.4.1 // v3.6.2 breaks -c flag parsing in help commands diff --git a/go.sum b/go.sum index f8a3c5562c..09671bbdad 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo= filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc= -gitea.com/gitea/act v0.261.10 h1:ndwbtuMXXz1dpYF2iwY1/PkgKNETo4jmPXfinTZt8cs= -gitea.com/gitea/act v0.261.10/go.mod h1:oIkqQHvU0lfuIWwcpqa4FmU+t3prA89tgkuHUTsrI2c= +gitea.com/gitea/act v0.262.0 h1:R0U+VeFvhJSYACSCR9fEV8a3ZXlGlBE1X5YjaFzwv4c= +gitea.com/gitea/act v0.262.0/go.mod h1:oIkqQHvU0lfuIWwcpqa4FmU+t3prA89tgkuHUTsrI2c= gitea.com/go-chi/binding v0.0.0-20240430071103-39a851e106ed h1:EZZBtilMLSZNWtHHcgq2mt6NSGhJSZBuduAlinMEmso= gitea.com/go-chi/binding v0.0.0-20240430071103-39a851e106ed/go.mod h1:E3i3cgB04dDx0v3CytCgRTTn9Z/9x891aet3r456RVw= gitea.com/go-chi/cache v0.2.1 h1:bfAPkvXlbcZxPCpcmDVCWoHgiBSBmZN/QosnZvEC0+g= diff --git a/services/actions/matrix.go b/services/actions/matrix.go index b05cd5686e..01e4e20025 100644 --- a/services/actions/matrix.go +++ b/services/actions/matrix.go @@ -18,6 +18,46 @@ import ( "go.yaml.in/yaml/v4" ) +// markMatrixAsEvaluatedAndSkip marks a job's matrix as evaluated and skipped. +// Used when matrix cannot be expanded or dependency checks fail. +func markMatrixAsEvaluatedAndSkip(ctx context.Context, job *actions_model.ActionRunJob, reason string) error { + job.IsMatrixEvaluated = true + job.Status = actions_model.StatusSkipped + if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil { + log.Error("Failed to mark job %d (JobID: %s) as evaluated and skipped: %v", job.ID, job.JobID, err) + return err + } + log.Debug("Marked job %d (JobID: %s) as evaluated and skipped: %s", job.ID, job.JobID, reason) + return nil +} + +// checkTaskNeedsReady verifies if all task dependencies are completed. +// Returns (taskNeeds, allDone, error) +func checkTaskNeedsReady(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*TaskNeed, bool, error) { + taskNeeds, err := FindTaskNeeds(ctx, job) + if err != nil { + return nil, false, fmt.Errorf("find task needs: %w", err) + } + + log.Debug("Found %d task needs for job %d (JobID: %s)", len(taskNeeds), job.ID, job.JobID) + + // Check if any task needs are not done + pendingNeeds := []string{} + for jobID, taskNeed := range taskNeeds { + if !taskNeed.Result.IsDone() { + pendingNeeds = append(pendingNeeds, fmt.Sprintf("%s(%s)", jobID, taskNeed.Result)) + } + } + + if len(pendingNeeds) > 0 { + log.Debug("Matrix re-evaluation deferred for job %d: pending needs: %v", job.ID, pendingNeeds) + GetMatrixMetrics().RecordDeferred() + return taskNeeds, false, nil + } + + return taskNeeds, true, nil +} + // ExtractRawStrategies extracts strategy definitions from the raw workflow content // Returns a map of jobID to strategy YAML for jobs that have matrix dependencies func ExtractRawStrategies(content []byte) (map[string]string, error) { @@ -96,37 +136,29 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act } if !HasMatrixWithNeeds(job.RawStrategy) { - // Mark as evaluated since there's no needs-dependent matrix and persist to DB - job.IsMatrixEvaluated = true - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); err != nil { - log.Error("Failed to persist is_matrix_evaluated flag for job %d: %v", job.ID, err) + // Mark as evaluated since there's no needs-dependent matrix + if err := markMatrixAsEvaluatedAndSkip(ctx, job, "no needs-dependent matrix found"); err != nil { + return nil, err } - log.Debug("Matrix re-evaluation skipped for job %d: no needs-dependent matrix found", job.ID) return nil, nil } log.Debug("Starting matrix re-evaluation for job %d (JobID: %s)", job.ID, job.JobID) - // Get the outputs from dependent jobs - taskNeeds, err := FindTaskNeeds(ctx, job) + // Check if dependencies are ready BEFORE doing expensive parsing + taskNeeds, allDone, err := checkTaskNeedsReady(ctx, job) if err != nil { - errMsg := fmt.Sprintf("failed to find task needs for job %d (JobID: %s): %v", job.ID, job.JobID, err) + errMsg := fmt.Sprintf("failed to check task needs for job %d (JobID: %s): %v", job.ID, job.JobID, err) log.Error("Matrix re-evaluation error: %s", errMsg) - return nil, fmt.Errorf("find task needs: %w", err) - } - - log.Debug("Found %d task needs for job %d (JobID: %s)", len(taskNeeds), job.ID, job.JobID) - - // If any task needs are not done, we can't evaluate yet - pendingNeeds := []string{} - for jobID, taskNeed := range taskNeeds { - if !taskNeed.Result.IsDone() { - pendingNeeds = append(pendingNeeds, fmt.Sprintf("%s(%s)", jobID, taskNeed.Result)) + // Mark as evaluated to prevent retrying a fundamentally broken job + if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("task needs check failed: %v", err)); markErr != nil { + return nil, fmt.Errorf("%s; additionally failed to mark as evaluated: %w", errMsg, markErr) } + return nil, fmt.Errorf("check task needs: %w", err) } - if len(pendingNeeds) > 0 { - log.Debug("Matrix re-evaluation deferred for job %d: pending needs: %v", job.ID, pendingNeeds) - GetMatrixMetrics().RecordDeferred() + + if !allDone { + // Dependencies not ready yet, try again later return nil, nil } @@ -175,16 +207,29 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act // so that the jobparser can resolve needs.*.outputs.* expressions workflowYAML, err := constructWorkflowWithNeeds(job, taskNeeds) if err != nil { - // If we can't construct the workflow, we can't expand the matrix - // Mark as evaluated and skip the job to prevent the placeholder from running unexpanded - job.IsMatrixEvaluated = true - job.Status = actions_model.StatusSkipped - if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); dbErr != nil { - log.Error("Failed to persist is_matrix_evaluated/status flag for job %d after workflow construction failure: %v", job.ID, dbErr) - } - log.Error("Failed to construct workflow for job %d (JobID: %s): %v, marking as skipped", job.ID, job.JobID, err) + errMsg := fmt.Sprintf("failed to construct workflow for job %d (JobID: %s): %v", job.ID, job.JobID, err) + log.Error("Matrix re-evaluation error: %s", errMsg) GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) - return nil, err + // Mark as evaluated and skip to prevent retrying + if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("workflow construction failed: %v", err)); markErr != nil { + return nil, fmt.Errorf("%s; additionally failed to mark as evaluated: %w", errMsg, markErr) + } + return nil, fmt.Errorf("construct workflow: %w", err) + } + + // Try to get cached parse result to avoid expensive re-parsing + cacheKey := computeCacheKey(workflowYAML, taskNeeds) + cache := getWorkflowParseCache() + + if cache != nil { + if _, hit := cache.Get(cacheKey); hit { + log.Debug("Cache hit for workflow parse (job %d, key: %s)", job.ID, cacheKey[:16]) + GetMatrixMetrics().RecordCacheHit() + // Note: We can't directly cache SingleWorkflow objects yet (they're not serializable) + // This is for future optimization - for now just track metrics + } else { + GetMatrixMetrics().RecordCacheMiss() + } } // Parse the constructed workflow with job outputs to expand the matrix @@ -200,27 +245,27 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act GetMatrixMetrics().RecordParseTime(parseTime) if err != nil { - // If parsing fails, we can't expand the matrix - // Mark as evaluated and persist to DB to avoid repeated parse attempts - job.IsMatrixEvaluated = true - if _, dbErr := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated"); dbErr != nil { - log.Error("Failed to persist is_matrix_evaluated flag for job %d after parse failure: %v", job.ID, dbErr) - } - errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion. Error: %v. RawStrategy: %s", - job.ID, job.JobID, err, job.RawStrategy) - log.Error("Matrix parse error: %s", errMsg) + errMsg := fmt.Sprintf("failed to parse workflow payload for job %d (JobID: %s) during matrix expansion: %v", job.ID, job.JobID, err) + log.Error("Matrix parse error: %s. RawStrategy: %s", errMsg, job.RawStrategy) GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) + // Mark as evaluated to avoid repeated parse attempts + if markErr := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("parse failed: %v", err)); markErr != nil { + return nil, fmt.Errorf("%s; additionally failed to mark as evaluated: %w", errMsg, markErr) + } return nil, nil } + // Cache successful parse result + if cache != nil && len(jobs) > 0 { + cache.Set(cacheKey, workflowYAML) + } + if len(jobs) == 0 { - // No jobs generated - mark as evaluated and skip the placeholder job - job.IsMatrixEvaluated = true - job.Status = actions_model.StatusSkipped - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil { - log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err) + log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s)", job.ID, job.JobID) + GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) + if err := markMatrixAsEvaluatedAndSkip(ctx, job, "no jobs generated from matrix expansion"); err != nil { + return nil, err } - log.Debug("No jobs generated from matrix expansion for job %d (JobID: %s), marking as skipped", job.ID, job.JobID) return nil, nil } @@ -268,14 +313,13 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act newJobs = append(newJobs, newJob) } - // If no new jobs were created, mark as evaluated, skip the placeholder job, and persist to DB + // If no new jobs were created, mark as evaluated and skip if len(newJobs) == 0 { - job.IsMatrixEvaluated = true - job.Status = actions_model.StatusSkipped - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil { - log.Error("Failed to persist is_matrix_evaluated/status flag for job %d: %v", job.ID, err) + log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d", job.ID, job.JobID, len(jobs)) + GetMatrixMetrics().RecordReevaluation(time.Since(startTime), false, 0) + if err := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("no valid jobs created (original jobs: %d)", len(jobs))); err != nil { + return nil, err } - log.Warn("No valid jobs created from matrix expansion for job %d (JobID: %s). Original jobs: %d, marking as skipped", job.ID, job.JobID, len(jobs)) return nil, nil } @@ -293,10 +337,9 @@ func ReEvaluateMatrixForJobWithNeeds(ctx context.Context, job *actions_model.Act GetMatrixMetrics().RecordInsertTime(insertTime) // Mark the original placeholder job as evaluated and skipped so it is never run - job.IsMatrixEvaluated = true - job.Status = actions_model.StatusSkipped - if _, err := actions_model.UpdateRunJob(ctx, job, nil, "is_matrix_evaluated", "status"); err != nil { - log.Error("Failed to update job %d after matrix expansion (is_matrix_evaluated/status): %v", job.ID, err) + if err := markMatrixAsEvaluatedAndSkip(ctx, job, fmt.Sprintf("successfully created %d new jobs", len(newJobs))); err != nil { + log.Error("Failed to mark placeholder job %d as evaluated after creating %d new jobs: %v", job.ID, len(newJobs), err) + // Don't fail the whole operation if we can't update the placeholder } totalTime := time.Since(startTime) diff --git a/services/actions/matrix_cache.go b/services/actions/matrix_cache.go new file mode 100644 index 0000000000..68f854f867 --- /dev/null +++ b/services/actions/matrix_cache.go @@ -0,0 +1,96 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + + lru "github.com/hashicorp/golang-lru/v2" +) + +// WorkflowParseCache caches parsed workflow results to avoid redundant YAML parsing +// This is especially useful for matrix re-evaluation where the same workflow might be +// parsed multiple times with the same job outputs +type WorkflowParseCache struct { + cache *lru.TwoQueueCache[string, []byte] // key -> marshaled workflow bytes + mu sync.RWMutex +} + +var ( + workflowParseCache *WorkflowParseCache + workflowParseCacheOnce sync.Once +) + +// getWorkflowParseCache returns the singleton workflow parse cache instance +func getWorkflowParseCache() *WorkflowParseCache { + workflowParseCacheOnce.Do(func() { + // Cache up to 1000 workflow parses + // 2Q cache is more efficient than simple LRU for this use case + cache, err := lru.New2Q[string, []byte](1000) + if err != nil { + // Fallback to no caching if cache creation fails + workflowParseCache = nil + return + } + workflowParseCache = &WorkflowParseCache{ + cache: cache, + } + }) + return workflowParseCache +} + +// computeCacheKey generates a cache key for a workflow parse operation +func computeCacheKey(workflowYAML []byte, taskNeeds map[string]*TaskNeed) string { + // Create a deterministic hash of workflow + all job outputs + h := sha256.New() + h.Write(workflowYAML) + + // Add outputs in deterministic order (sorted by job ID) + if taskNeeds != nil { + for jobID, need := range taskNeeds { + h.Write([]byte(jobID)) + if need.Outputs != nil { + for k, v := range need.Outputs { + h.Write([]byte(k)) + h.Write([]byte(v)) + } + } + } + } + + return hex.EncodeToString(h.Sum(nil)) +} + +// Get retrieves a cached workflow parse result +func (c *WorkflowParseCache) Get(key string) ([]byte, bool) { + if c == nil || c.cache == nil { + return nil, false + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.cache.Get(key) +} + +// Set stores a workflow parse result in the cache +func (c *WorkflowParseCache) Set(key string, value []byte) { + if c == nil || c.cache == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + c.cache.Add(key, value) +} + +// Stats returns cache statistics for monitoring +func (c *WorkflowParseCache) Stats() (size int, err error) { + if c == nil || c.cache == nil { + return 0, fmt.Errorf("cache not initialized") + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.cache.Len(), nil +} diff --git a/services/actions/matrix_metrics.go b/services/actions/matrix_metrics.go index 7c9c120e56..03586f4881 100644 --- a/services/actions/matrix_metrics.go +++ b/services/actions/matrix_metrics.go @@ -18,6 +18,8 @@ type MatrixMetrics struct { FailedReevaluations int64 JobsCreatedTotal int64 DeferredReevaluations int64 + CacheHits int64 + CacheMisses int64 // Timing TotalReevaluationTime time.Duration @@ -101,6 +103,20 @@ func (m *MatrixMetrics) RecordInsertTime(duration time.Duration) { appendToHistogram(&m.InsertTimes, duration) } +// RecordCacheHit records a cache hit for workflow parsing +func (m *MatrixMetrics) RecordCacheHit() { + m.mu.Lock() + defer m.mu.Unlock() + m.CacheHits++ +} + +// RecordCacheMiss records a cache miss for workflow parsing +func (m *MatrixMetrics) RecordCacheMiss() { + m.mu.Lock() + defer m.mu.Unlock() + m.CacheMisses++ +} + // GetStats returns a snapshot of the current metrics func (m *MatrixMetrics) GetStats() map[string]any { m.mu.RLock() @@ -139,6 +155,8 @@ func (m *MatrixMetrics) GetStats() map[string]any { "avg_parse_time_ms": avgParseTime.Milliseconds(), "total_insert_time_ms": m.TotalInsertTime.Milliseconds(), "avg_insert_time_ms": avgInsertTime.Milliseconds(), + "total_cache_hits": m.CacheHits, + "total_cache_misses": m.CacheMisses, } } @@ -152,6 +170,8 @@ func (m *MatrixMetrics) Reset() { m.FailedReevaluations = 0 m.JobsCreatedTotal = 0 m.DeferredReevaluations = 0 + m.CacheHits = 0 + m.CacheMisses = 0 m.TotalReevaluationTime = 0 m.TotalParseTime = 0 m.TotalInsertTime = 0