// Copyright 2026 MarketAlly. All rights reserved. // SPDX-License-Identifier: MIT package ai import ( "context" "errors" "fmt" "strconv" "strings" "time" ai_model "code.gitcaddy.com/server/v3/models/ai" issues_model "code.gitcaddy.com/server/v3/models/issues" repo_model "code.gitcaddy.com/server/v3/models/repo" "code.gitcaddy.com/server/v3/models/unit" user_model "code.gitcaddy.com/server/v3/models/user" "code.gitcaddy.com/server/v3/modules/ai" "code.gitcaddy.com/server/v3/modules/log" "code.gitcaddy.com/server/v3/modules/setting" issue_service "code.gitcaddy.com/server/v3/services/issue" ) // OperationRequest represents a queued AI operation type OperationRequest struct { RepoID int64 `json:"repo_id"` Operation string `json:"operation"` // "code-review", "issue-response", "issue-triage", "workflow-inspect", "agent-fix" Tier int `json:"tier"` // 1 or 2 TriggerEvent string `json:"trigger_event"` // e.g. "issue.created" TriggerUserID int64 `json:"trigger_user_id"` // who triggered the event TargetID int64 `json:"target_id"` // issue/PR ID TargetType string `json:"target_type"` // "issue", "pull" } // EnqueueOperation adds an AI operation to the processing queue func EnqueueOperation(req *OperationRequest) error { if aiOperationQueue == nil { return errors.New("AI operation queue not initialized") } return aiOperationQueue.Push(req) } // handleAIOperation is the queue worker that processes AI operations func handleAIOperation(items ...*OperationRequest) []*OperationRequest { for _, req := range items { if err := processOperation(context.Background(), req); err != nil { log.Error("AI operation failed [repo:%d op:%s target:%d]: %v", req.RepoID, req.Operation, req.TargetID, err) } } return nil } func processOperation(ctx context.Context, req *OperationRequest) error { // Load repo repo, err := repo_model.GetRepositoryByID(ctx, req.RepoID) if err != nil { return fmt.Errorf("failed to load repo %d: %w", req.RepoID, err) } if err := repo.LoadOwner(ctx); err != nil { return fmt.Errorf("failed to load repo owner: %w", err) } // Rate limit check count, err := ai_model.CountRecentOperations(ctx, req.RepoID) if err != nil { return fmt.Errorf("failed to count recent operations: %w", err) } maxOps := setting.AI.MaxOperationsPerHour if repo.Owner.IsOrganization() { if orgSettings, err := ai_model.GetOrgAISettings(ctx, repo.OwnerID); err == nil && orgSettings != nil && orgSettings.MaxOpsPerHour > 0 { maxOps = orgSettings.MaxOpsPerHour } } if count >= int64(maxOps) { log.Warn("AI rate limit exceeded for repo %d (%d/%d ops/hour)", req.RepoID, count, maxOps) return nil } // Create operation log entry opLog := &ai_model.OperationLog{ RepoID: req.RepoID, Operation: req.Operation, Tier: req.Tier, TriggerEvent: req.TriggerEvent, TriggerUserID: req.TriggerUserID, TargetID: req.TargetID, TargetType: req.TargetType, Status: ai_model.OperationStatusPending, } // Resolve provider config from cascade var orgID int64 if repo.Owner.IsOrganization() { orgID = repo.OwnerID } aiUnit, err := repo.GetUnit(ctx, unit.TypeAI) var aiCfg *repo_model.AIConfig if err == nil { aiCfg = aiUnit.AIConfig() } else { aiCfg = &repo_model.AIConfig{} } opLog.Provider = ai_model.ResolveProvider(ctx, orgID, aiCfg.PreferredProvider) opLog.Model = ai_model.ResolveModel(ctx, orgID, aiCfg.PreferredModel) // Build per-request provider config from the cascade providerCfg := &ai.ProviderConfig{ Provider: opLog.Provider, Model: opLog.Model, APIKey: ai_model.ResolveAPIKey(ctx, orgID, opLog.Provider), } if err := ai_model.InsertOperationLog(ctx, opLog); err != nil { return fmt.Errorf("failed to insert operation log: %w", err) } start := time.Now() // Dispatch based on operation type var opErr error switch req.Operation { case "issue-response": opErr = handleIssueResponse(ctx, repo, aiCfg, opLog, providerCfg) case "issue-triage": opErr = handleIssueTriage(ctx, repo, opLog, providerCfg) case "code-review": opErr = handleCodeReview(ctx, repo, opLog, providerCfg) case "workflow-inspect": opErr = handleWorkflowInspect(ctx, repo, opLog, providerCfg) case "agent-fix": opErr = handleAgentFix(ctx, repo, aiCfg, opLog) default: opErr = fmt.Errorf("unknown operation: %s", req.Operation) } opLog.DurationMs = time.Since(start).Milliseconds() if opErr != nil { opLog.Status = ai_model.OperationStatusFailed opLog.ErrorMessage = opErr.Error() log.Error("AI operation %s failed for repo %d: %v", req.Operation, req.RepoID, opErr) // Escalate on failure if configured if aiCfg.EscalateToStaff { escalateToStaff(ctx, repo, aiCfg, opLog) } } else { opLog.Status = ai_model.OperationStatusSuccess } if err := ai_model.UpdateOperationLog(ctx, opLog); err != nil { log.Error("Failed to update operation log: %v", err) } return opErr } func handleIssueResponse(ctx context.Context, repo *repo_model.Repository, aiCfg *repo_model.AIConfig, opLog *ai_model.OperationLog, providerCfg *ai.ProviderConfig) error { issue, err := issues_model.GetIssueByID(ctx, opLog.TargetID) if err != nil { return fmt.Errorf("failed to load issue %d: %w", opLog.TargetID, err) } issue.Repo = repo client := ai.GetClient() resp, err := client.GenerateIssueResponse(ctx, &ai.GenerateIssueResponseRequest{ ProviderConfig: providerCfg, RepoID: repo.ID, IssueID: issue.ID, Title: issue.Title, Body: issue.Content, CustomInstructions: aiCfg.IssueInstructions, }) if err != nil { return fmt.Errorf("AI GenerateIssueResponse failed: %w", err) } opLog.InputTokens = resp.InputTokens opLog.OutputTokens = resp.OutputTokens // Post the response as a comment from the AI bot user botUser := user_model.NewAIUser() comment, err := issue_service.CreateIssueComment(ctx, botUser, repo, issue, resp.Response, nil) if err != nil { return fmt.Errorf("failed to create comment: %w", err) } opLog.ResultCommentID = comment.ID return nil } func handleIssueTriage(ctx context.Context, repo *repo_model.Repository, opLog *ai_model.OperationLog, providerCfg *ai.ProviderConfig) error { issue, err := issues_model.GetIssueByID(ctx, opLog.TargetID) if err != nil { return fmt.Errorf("failed to load issue %d: %w", opLog.TargetID, err) } issue.Repo = repo triageResp, err := TriageIssue(ctx, issue, providerCfg) if err != nil { return fmt.Errorf("AI TriageIssue failed: %w", err) } // Apply suggested labels botUser := user_model.NewAIUser() for _, labelName := range triageResp.SuggestedLabels { label, err := issues_model.GetLabelInRepoByName(ctx, repo.ID, labelName) if err != nil { log.Warn("AI suggested label %q not found in repo %d", labelName, repo.ID) continue } if err := issue_service.AddLabel(ctx, issue, botUser, label); err != nil { log.Error("Failed to add label %q to issue %d: %v", labelName, issue.Index, err) } } return nil } func handleCodeReview(ctx context.Context, repo *repo_model.Repository, opLog *ai_model.OperationLog, providerCfg *ai.ProviderConfig) error { issue, err := issues_model.GetIssueByID(ctx, opLog.TargetID) if err != nil { return fmt.Errorf("failed to load issue %d: %w", opLog.TargetID, err) } if err := issue.LoadPullRequest(ctx); err != nil { return fmt.Errorf("failed to load pull request: %w", err) } issue.Repo = repo reviewResp, err := ReviewPullRequest(ctx, issue.PullRequest, providerCfg) if err != nil { return fmt.Errorf("AI ReviewPullRequest failed: %w", err) } // Post the review summary as a comment botUser := user_model.NewAIUser() comment, err := issue_service.CreateIssueComment(ctx, botUser, repo, issue, reviewResp.Summary, nil) if err != nil { return fmt.Errorf("failed to create review comment: %w", err) } opLog.ResultCommentID = comment.ID return nil } func handleAgentFix(ctx context.Context, repo *repo_model.Repository, aiCfg *repo_model.AIConfig, opLog *ai_model.OperationLog) error { // Tier 2: Trigger an Actions workflow for Claude Code headless runID, err := triggerAgentWorkflow(ctx, repo, aiCfg, opLog) if err != nil { return fmt.Errorf("failed to trigger agent workflow: %w", err) } opLog.ActionRunID = runID return nil } func handleWorkflowInspect(ctx context.Context, repo *repo_model.Repository, opLog *ai_model.OperationLog, providerCfg *ai.ProviderConfig) error { // TargetID is used to store context; for workflow inspect the file path is stored in ErrorMessage temporarily // We use the operation's ErrorMessage field pre-populated with the file path before dispatch filePath := opLog.ErrorMessage opLog.ErrorMessage = "" // Clear it before actual use resp, err := InspectWorkflow(ctx, repo, filePath, "", providerCfg) if err != nil { return fmt.Errorf("AI InspectWorkflow failed: %w", err) } opLog.InputTokens = resp.InputTokens opLog.OutputTokens = resp.OutputTokens // If there are issues, post a summary comment (for push-triggered inspections) if len(resp.Issues) > 0 || len(resp.Suggestions) > 0 { var body strings.Builder body.WriteString("## Workflow Inspection Results\n\n") body.WriteString("**File:** `" + filePath + "`\n\n") if !resp.Valid { body.WriteString("**Status:** Issues found\n\n") } for _, issue := range resp.Issues { icon := "ℹ️" switch issue.Severity { case "error": icon = "❌" case "warning": icon = "⚠️" } body.WriteString(icon + " ") if issue.Line > 0 { body.WriteString("**Line " + strconv.Itoa(issue.Line) + ":** ") } body.WriteString(issue.Message + "\n") if issue.Fix != "" { body.WriteString(" - **Fix:** " + issue.Fix + "\n") } } if len(resp.Suggestions) > 0 { body.WriteString("\n### Suggestions\n") for _, s := range resp.Suggestions { body.WriteString("- " + s + "\n") } } log.Info("Workflow inspection for %s in repo %d: %d issues, %d suggestions", filePath, repo.ID, len(resp.Issues), len(resp.Suggestions)) // Note: for push-triggered inspections, the comment would be posted as a repo event // or as part of the commit status. The body is logged for now. _ = body.String() } return nil }