diff --git a/.github/triage-issues-agentic.lock.yml b/.github/triage-issues-agentic.lock.yml new file mode 100644 index 00000000..4c31f520 --- /dev/null +++ b/.github/triage-issues-agentic.lock.yml @@ -0,0 +1,137 @@ +name: Issue Triage Agent (Agentic) + +on: + issues: + types: [opened, edited] + + schedule: + - cron: '0 */6 * * *' + + workflow_dispatch: + inputs: + mode: + description: 'Run mode: single or all' + required: true + default: 'all' + type: choice + options: + - all + - single + force: + description: 'Re-triage issues that already have a triage label (debug only)' + required: false + default: false + type: boolean + +permissions: + contents: read + issues: write + pull-requests: write + models: read # required for the GitHub Models inference API + +jobs: + triage: + runs-on: ubuntu-latest + timeout-minutes: 15 + + steps: + - name: Checkout repo + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + pip install -r requirements.txt || true + gh --version + + # ───────────────────────────────────────────────────────────── + # SINGLE-ISSUE PATH (triggered by issue opened/edited) + # + # We do NOT inline ${{ github.event.issue.body }} into a heredoc — + # backticks, quotes, and $-signs in the body would otherwise mangle + # the script. Instead we fetch the issue via `gh api` so it round-trips + # cleanly through JSON. + # ───────────────────────────────────────────────────────────── + - name: Fetch issue payload (single) + if: github.event_name == 'issues' + run: | + echo "📥 Fetching issue #${{ github.event.issue.number }} via gh api" + gh api "repos/${GITHUB_REPOSITORY}/issues/${{ github.event.issue.number }}" \ + --jq '{number: .number, title: .title, body: .body}' \ + > /tmp/issue.json + echo "── Issue payload (sanitized preview) ──" + jq '{number, title, body_length: (.body | length)}' /tmp/issue.json + # Wrap into a list so triage_batch.py can consume it uniformly + jq '[.]' /tmp/issue.json > /tmp/issues.json + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Log full body (single) + if: github.event_name == 'issues' + run: | + echo "================================================================================" + echo "� FULL ISSUE BODY (as the AI will see it)" + echo "================================================================================" + jq -r '.body' /tmp/issue.json + echo "================================================================================" + + - name: Pre-label as triaged (single) + if: github.event_name == 'issues' + run: | + gh label create "triaged" --repo "$GITHUB_REPOSITORY" 2>/dev/null || true + gh issue edit ${{ github.event.issue.number }} \ + --add-label "triaged" --repo "$GITHUB_REPOSITORY" + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + continue-on-error: true + + # ───────────────────────────────────────────────────────────── + # AI-DRIVEN TRIAGE — the LLM reads the body and decides everything: + # classification, the comment to post, and (for auto_fix) the literal + # current_text / desired_text to substitute in the docs. + # triage_batch.py applies the AI's decision (label + comment + PR). + # ───────────────────────────────────────────────────────────── + - name: AI triage + (optional) auto-PR (single) + if: github.event_name == 'issues' + run: | + echo "🤖 Running AI triage via GitHub Models on issue #${{ github.event.issue.number }}" + python3 triage_batch.py /tmp/issues.json + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + LEARN_PR_TOKEN: ${{ secrets.LEARN_PR_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + AI_MODEL: openai/gpt-4o-mini + # Set AI_DEBUG=1 to dump the full prompt + raw model response. + AI_DEBUG: '1' + # Single-issue (event-driven) runs always reprocess — a human + # just opened/edited the issue and wants fresh triage. + TRIAGE_MODE: single + + # ───────────────────────────────────────────────────────────── + # BATCH PATH (scheduled OR workflow_dispatch with mode=all) + # ───────────────────────────────────────────────────────────── + - name: Batch triage (scheduled / manual) + if: github.event_name == 'schedule' || (github.event_name == 'workflow_dispatch' && github.event.inputs.mode == 'all') + run: | + echo "🔄 BATCH TRIAGE starting..." + # We include `labels` in the JSON so triage_batch.py can skip + # issues we've already handled (anything carrying one of the + # TRIAGE_LABELS — triaged / auto-fix / needs-review / etc.). + gh issue list --state open \ + --json number,title,body,labels --limit 50 \ + --repo "$GITHUB_REPOSITORY" > /tmp/issues.json + echo "📋 Fetched $(jq length /tmp/issues.json) open issues" + python3 triage_batch.py /tmp/issues.json + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + LEARN_PR_TOKEN: ${{ secrets.LEARN_PR_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + AI_MODEL: openai/gpt-4o-mini + TRIAGE_MODE: batch + # Only re-triage already-labeled issues when the user explicitly + # asks for it via the workflow_dispatch "force" input. + FORCE_RETRIAGE: ${{ github.event.inputs.force == 'true' && '1' || '' }} diff --git a/_smoke_test.py b/_smoke_test.py new file mode 100644 index 00000000..e5ab21a9 --- /dev/null +++ b/_smoke_test.py @@ -0,0 +1,58 @@ +"""Smoke-test the AI triage pipeline. + +Locally there is no GITHUB_TOKEN with models:read scope, so the AI call +raises and we exercise the heuristic fallback path. The point of this test +is to prove the fallback returns sensible classifications + comments. +""" +import os +import sys + +os.environ.pop("GITHUB_TOKEN", None) # force the fallback path + +from ai_triage import ai_or_fallback + +ISSUES = [ + { + "number": 7, + "title": "MS Learn Module Update Request: [REPLACE_WITH_MODULE_TITLE]", + "body": ( + "Which of the MS Learn modules from the dropdown are you submitting an update request?\n" + "None\n\nAdditional information\n\n" + "Fix a broken user experience (broken links, exercise error, etc.)\n\n" + "Update incorrect information\n\nAdd new content to the module\n\n" + "Some other request\nInformation about the requested update\nNo response" + ), + "expected": "spam", + }, + { + "number": 4, + "title": "MS Learn Module Update Request: Manage your work with GitHub Projects", + "body": ( + "Information about the requested update\n" + "https://learn.microsoft.com/en-us/training/modules/manage-work-github-projects/7-knowledge-check\n\n" + "In Module assessment/Check your knowledge\n\n" + "What Project descriptor automatically saves when you change it?\n\n" + "Project name (Correct)\n\n\nProject description\n\n\nProject README\n\n" + "first answer is marked as correct, should be: None of them." + ), + "expected": "auto_fix", + }, +] + +lines = [] +for issue in ISSUES: + r = ai_or_fallback(issue) + lines.append( + f"#{issue['number']:>3} expected={issue['expected']:<13} " + f"got={r['classification']:<13} ({r['source']}, conf={r['confidence']})" + ) + lines.append(f" comment[:140] = {r['comment'][:140]!r}") + if r.get("fix_plan"): + fp = r["fix_plan"] + lines.append(f" fix_plan = current={fp.get('current_text')!r} desired={fp.get('desired_text')!r}") + +text = "\n".join(lines) + "\n" +sys.stdout.write(text) +sys.stdout.flush() +with open("_smoke_result.txt", "w") as fh: + fh.write(text) diff --git a/ai_triage.py b/ai_triage.py new file mode 100644 index 00000000..f2694981 --- /dev/null +++ b/ai_triage.py @@ -0,0 +1,536 @@ +""" +AI-powered issue triage using GitHub Models inference API. + +This module replaces the regex/heuristic classifier with an actual LLM call +(GitHub Models — same models Copilot uses) that reads the full issue body +and reasons about what to do. + +Required env vars: + GITHUB_TOKEN — already present in every Actions run. + For private repos / org models you may need a token with + the `models:read` scope. In public repos the default + GITHUB_TOKEN works. + +Optional env vars: + AI_MODEL — model id to use. Default: "openai/gpt-4o-mini". + AI_DEBUG — if set to "1", prints the full prompt and raw response. + +The module exposes two functions: + ai_triage_issue(issue) -> dict + ai_or_fallback(issue) -> dict (falls back to issue_analyzer if AI fails) + +The returned dict always has the shape: + + { + "classification": "auto_fix" | "needs_human" | "needs_context" | "spam", + "confidence": int (0-100), + "reasoning": str, # why the AI chose this label + "comment": str, # markdown body to post on the issue + "fix_plan": { # populated only when classification == auto_fix + "module_url": str | null, + "current_text": str | null, # short literal phrase to find in the docs + "desired_text": str | null, # short literal phrase to replace it with + "explanation": str | null, + } | null, + "source": "ai" | "fallback", + } +""" + +from __future__ import annotations + +import json +import os +import sys +import urllib.error +import urllib.request + +# GitHub Models inference endpoint (OpenAI-compatible Chat Completions API) +_GITHUB_MODELS_URL = "https://models.github.ai/inference/chat/completions" +_DEFAULT_MODEL = os.environ.get("AI_MODEL", "openai/gpt-4o-mini") +_DEBUG = os.environ.get("AI_DEBUG") == "1" + + +SYSTEM_PROMPT = """You are an automated issue triage agent for the +MicrosoftDocs/learn-pr GitHub repository. You read user-submitted issues +about Microsoft Learn training modules and decide what action to take. + +Your job has THREE parts: + +1) IGNORE THE ISSUE TEMPLATE SCAFFOLDING. + Issues are submitted through a GitHub issue form whose body contains a lot + of fixed template text — section headers, checkbox option labels, and + placeholder values. You must mentally strip ALL of this out and only + reason about what the USER actually typed. Treat these as noise: + • "Which of the MS Learn modules from the dropdown are you submitting..." + • "Additional information" + • "Information about the requested update" + • "Fix a broken user experience (broken links, exercise error, etc.)" + • "Update incorrect information" + • "Add new content to the module" + • "Some other request" + • "[REPLACE_WITH_MODULE_TITLE]" + • "No response", standalone "None" + • Any line consisting solely of "- [ ]" or "- [x]" plus one of the option labels above. + +2) DECIDE A CLASSIFICATION (exactly one): + + • spam — After stripping template noise the user wrote nothing of + substance (empty, gibberish, single word, placeholder + left in, etc.). + • auto_fix — The user clearly identifies a SPECIFIC, LITERAL piece of + text in a SPECIFIC Microsoft Learn page that is wrong, + AND tells you the SHORT, LITERAL replacement text. The + change must be small enough to apply as a plain text + substitution (typo fix, wrong answer marked correct, + swapped term, broken URL, etc.). If the fix requires + rewriting a paragraph, restructuring content, or making + an editorial judgement call, this is NOT auto_fix. + • needs_human — The user reports a real, substantive problem with a + specific module but the fix requires human judgement + (rewrite, restructure, editorial decision, ambiguous + correction). + • needs_context — The user's report is too vague to act on: no module link, + no specific section, or no clear statement of what is + wrong vs. what should be there. + +3) PRODUCE A USER-FRIENDLY COMMENT for the issue thread that: + - Tells the user what you classified the issue as and WHY in 1–2 plain + sentences. + - For needs_human / needs_context: ask precisely for what is missing. + - For auto_fix: confirm you understood the requested change and say a PR + attempt is being made. + - For spam: politely explain the issue appears empty/template-only and ask + them to refile with the form filled out. + +OUTPUT FORMAT — STRICT. + +Reply with a SINGLE JSON object, no markdown fences, no prose before/after. +Schema: + +{ + "classification": "spam" | "auto_fix" | "needs_human" | "needs_context", + "confidence": , + "reasoning": "", + "comment": "", + "fix_plan": { + "module_url": "", + "current_text": "", + "desired_text": "", + "explanation": "" + } +} + +Rules: +- "fix_plan" MUST be present in every response. +- Set every field of "fix_plan" to null UNLESS classification == "auto_fix". +- When classification == "auto_fix", "current_text" and "desired_text" must + both be non-null AND must be short literal phrases (typically 1–15 words) + that could be found by exact-string search in the source markdown. +- Never invent a URL the user did not provide. +- Never include the issue template scaffolding text in any field. +""" + + +def _build_user_message(issue: dict) -> str: + """Format the issue into the user turn of the chat.""" + number = issue.get("number", "?") + title = issue.get("title") or "" + body = issue.get("body") or "" + return ( + f"Triage this GitHub issue.\n\n" + f"Issue number: #{number}\n" + f"Issue title:\n{title}\n\n" + f"Issue body (raw, includes template scaffolding — strip it mentally):\n" + f"---BODY START---\n{body}\n---BODY END---\n" + ) + + +def _call_github_models(messages: list[dict], model: str, token: str) -> dict: + """POST to the GitHub Models chat completions endpoint.""" + payload = json.dumps( + { + "model": model, + "messages": messages, + "temperature": 0.1, + "response_format": {"type": "json_object"}, + } + ).encode("utf-8") + + req = urllib.request.Request( + _GITHUB_MODELS_URL, + data=payload, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "Accept": "application/json", + "X-GitHub-Api-Version": "2022-11-28", + }, + method="POST", + ) + + with urllib.request.urlopen(req, timeout=60) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def _parse_ai_response(api_response: dict) -> dict: + """Pull the JSON object out of the chat-completion response.""" + try: + content = api_response["choices"][0]["message"]["content"] + except (KeyError, IndexError) as exc: + raise ValueError(f"Unexpected API response shape: {api_response}") from exc + + if _DEBUG: + print("─── AI raw content ───") + print(content) + print("──────────────────────") + + # Models occasionally wrap JSON in ```json fences despite response_format. + cleaned = content.strip() + if cleaned.startswith("```"): + cleaned = cleaned.strip("`") + if cleaned.lower().startswith("json"): + cleaned = cleaned[4:] + cleaned = cleaned.strip() + + return json.loads(cleaned) + + +_VALID_CLASSIFICATIONS = {"spam", "auto_fix", "needs_human", "needs_context"} + + +def _normalize(result: dict) -> dict: + """Make sure the AI's response matches the contract we promise callers.""" + classification = result.get("classification") + if classification not in _VALID_CLASSIFICATIONS: + raise ValueError( + f"AI returned invalid classification: {classification!r}. " + f"Expected one of {_VALID_CLASSIFICATIONS}." + ) + + confidence_raw = result.get("confidence", 50) + try: + confidence = max(0, min(100, int(confidence_raw))) + except (TypeError, ValueError): + confidence = 50 + + fix_plan = result.get("fix_plan") or { + "module_url": None, + "current_text": None, + "desired_text": None, + "explanation": None, + } + # If the AI said auto_fix but didn't actually give us a swap, downgrade. + if classification == "auto_fix": + if not (fix_plan.get("current_text") and fix_plan.get("desired_text")): + classification = "needs_human" + fix_plan = { + "module_url": fix_plan.get("module_url"), + "current_text": None, + "desired_text": None, + "explanation": ( + "AI proposed auto_fix but did not provide both current_text " + "and desired_text; downgraded to needs_human." + ), + } + + return { + "classification": classification, + "confidence": confidence, + "reasoning": (result.get("reasoning") or "").strip(), + "comment": (result.get("comment") or "").strip(), + "fix_plan": fix_plan if classification == "auto_fix" else None, + "source": "ai", + } + + +def ai_triage_issue(issue: dict, token: str | None = None, model: str | None = None) -> dict: + """Run a single issue through GitHub Models and return the structured decision. + + Raises on any error — callers that want a fallback should use + ``ai_or_fallback`` instead. + """ + token = token or os.environ.get("GITHUB_TOKEN") + if not token: + raise RuntimeError("GITHUB_TOKEN not set — cannot call GitHub Models.") + + model = model or _DEFAULT_MODEL + messages = [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": _build_user_message(issue)}, + ] + + if _DEBUG: + print("─── AI prompt (user turn) ───") + print(messages[-1]["content"]) + print("─────────────────────────────") + + api_response = _call_github_models(messages, model=model, token=token) + parsed = _parse_ai_response(api_response) + return _normalize(parsed) + + +def ai_or_fallback(issue: dict) -> dict: + """Try the AI; if anything goes wrong, fall back to the heuristic analyzer.""" + try: + return ai_triage_issue(issue) + except (urllib.error.URLError, urllib.error.HTTPError, ValueError, + json.JSONDecodeError, RuntimeError, KeyError) as exc: + print(f"⚠️ AI triage failed ({type(exc).__name__}: {exc}); " + f"falling back to heuristic analyzer.") + # Local import keeps this file usable even if issue_analyzer is missing. + from issue_analyzer import intelligent_classify, generate_context_aware_comment + + classification, confidence, analysis = intelligent_classify(issue) + analysis = analysis or {} + + # The spam fast-path returns an empty analysis dict; build a minimal + # comment ourselves rather than asking the templated comment generator + # (which expects fully populated keys). + if classification == "spam" or not analysis: + comment = ( + "🗑️ This issue appears to be empty or to contain only the " + "unfilled MS Learn issue template. If this was a mistake, " + "please reopen with the form filled in." + ) + else: + try: + comment = generate_context_aware_comment(analysis, classification) + except (KeyError, TypeError) as comment_exc: + print(f"⚠️ Heuristic comment generation failed ({comment_exc}); " + f"using a generic fallback comment.") + comment = ( + f"This issue has been classified as `{classification}` by the " + "heuristic fallback (the AI triage was unavailable). A human " + "reviewer will follow up shortly." + ) + + return { + "classification": classification, + "confidence": int(confidence) if isinstance(confidence, (int, float)) else 50, + "reasoning": "Heuristic fallback (no AI response).", + "comment": comment, + "fix_plan": None, # heuristic path doesn't propose a structured fix + "source": "fallback", + } + + +# ─────────────────────────────────────────────────────────────────── +# Fix-plan refinement: ground current/desired in the actual file +# ─────────────────────────────────────────────────────────────────── +REFINEMENT_SYSTEM_PROMPT = """You are an expert at producing patches for +Microsoft Learn module source files. + +A `MicrosoftDocs/learn-pr` module directory looks like: + + learn-pr/learn-pr/// + ├── index.yml ← module metadata + unit list + ├── -knowledge-check.yml ← knowledge-check questions/answers (YAML) + ├── ... + └── includes/ + ├── -.md ← unit prose content (Markdown) + └── ... + +You will be given: + 1. A user-reported issue about something incorrect in a module. + 2. The user's high-level "current"/"desired" description (which may be + paraphrased and may NOT match the file verbatim). + 3. The file we have determined is the one to edit, including its path + and full contents. Pay attention to the FILE TYPE: + • `.yml` / `.yaml` → structured data; edit fields, not prose. + • `.md` → prose; edit the literal text. + +Your job: produce a literal text substitution that, when applied with a +plain Python `str.replace`, fixes the bug described by the user. + +Output a SINGLE JSON object — no markdown fences, no prose: + +{ + "current_text": "", + "desired_text": "", + "explanation": "" +} + +CRITICAL RULES (read carefully): + + • "current_text" MUST appear EXACTLY in the file (whitespace, + indentation, punctuation, casing — all matching). If you cannot + find such a substring, return all three fields as null instead of + guessing. + • "current_text" should be the smallest UNIQUE snippet that captures + the bug. Include enough surrounding context (1–3 lines) so the + substring appears only ONCE in the file. + • The substitution must leave the document syntactically valid + (preserve YAML indentation, Markdown link syntax, etc.). + +HOW TO HANDLE KNOWLEDGE-CHECK YAML FILES (.yml): + These files contain entries like: + + - content: "Project name" + isCorrect: true + explanation: "..." + - content: "Project description" + isCorrect: false + explanation: "..." + + To CHANGE WHICH ANSWER IS CORRECT, toggle the `isCorrect:` flags — + do not edit the `content:` strings. + + SPECIAL CASE — "NONE OF THE ABOVE": + If the user says something like "none of the options are correct", + "the correct answer is 'none of the above'", "none of them", or similar: + + 1. FIRST, check if there is ALREADY an answer choice with content + containing "none of the above", "none of them", "none", or similar. + 2. If such an option EXISTS: Set that choice's `isCorrect: true` and + ALL others to `isCorrect: false`. + 3. If NO "none" option exists: Set ALL existing `isCorrect:` values + to `false`. In your "explanation" field, include exactly this + marker: "[NEEDS_HUMAN: no 'none of the above' option exists]" + + IMPORTANT: Your substitution must leave the YAML valid. Normally at + least one answer should be `isCorrect: true`. The ONLY exception is + the "none of the above" case when no such option exists — then the + marker in explanation signals human review is required. + + Your `current_text` and `desired_text` should be MULTI-LINE blocks + that include enough YAML to be unambiguous (e.g. the `content:` + line above the `isCorrect:` line, so we know WHICH answer's flag + is being toggled). + +HOW TO HANDLE UNIT MARKDOWN FILES (.md): + Edit the literal prose. For typos, broken links, factually-wrong + sentences, etc., make the smallest change that fixes the bug. Do + NOT rewrite whole paragraphs unless the user explicitly says so. + +GENERAL: + • Never invent quotes or content that isn't in the file. + • Keep desired_text minimal — change only what is necessary. + • If the file genuinely doesn't contain anything matching the user's + description, return all three fields as null. We'd rather skip a + fix than ship a wrong one. +""" + + +def refine_fix_plan_with_file_content( + issue: dict, + fix_plan: dict, + file_path: str, + token: str | None = None, + model: str | None = None, + max_file_chars: int = 16000, +) -> dict | None: + """Ask the LLM to convert a vague current/desired pair into an + exact verbatim substitution rooted in the actual file contents. + + Returns the refined fix_plan dict (with the SAME shape as the input + plus a guaranteed-grounded current_text/desired_text), or None when + the model gives up / errors out. + """ + token = token or os.environ.get("GITHUB_TOKEN") + if not token: + return None + + try: + with open(file_path, "r", encoding="utf-8") as fh: + file_contents = fh.read() + except OSError as exc: + print(f"⚠️ Could not read {file_path} for refinement: {exc}") + return None + + # Keep the prompt under control by truncating very large files. Most + # Learn unit files are small (a few KB); 16 KB is plenty. + truncated = file_contents[:max_file_chars] + truncation_note = ( + f"\n\n[file truncated to {max_file_chars} chars of {len(file_contents)}]" + if len(file_contents) > max_file_chars else "" + ) + + file_ext = os.path.splitext(file_path)[1].lower() + file_kind = { + ".yml": "YAML (likely a knowledge-check; edit `isCorrect:` flags, not `content:` strings)", + ".yaml": "YAML (likely a knowledge-check; edit `isCorrect:` flags, not `content:` strings)", + ".md": "Markdown unit prose (edit the literal text)", + }.get(file_ext, "unknown file type") + + user_msg = ( + f"Issue title:\n{issue.get('title', '')}\n\n" + f"Issue body:\n{issue.get('body', '')}\n\n" + f"AI's first-pass interpretation of the change:\n" + f" current (paraphrased): {fix_plan.get('current_text')!r}\n" + f" desired (paraphrased): {fix_plan.get('desired_text')!r}\n" + f" explanation: {fix_plan.get('explanation')!r}\n\n" + f"File path: {file_path}\n" + f"File type: {file_kind}\n\n" + f"File contents below — your current_text MUST be a verbatim " + f"substring of this:\n" + f"---FILE START---\n{truncated}{truncation_note}\n---FILE END---" + ) + + messages = [ + {"role": "system", "content": REFINEMENT_SYSTEM_PROMPT}, + {"role": "user", "content": user_msg}, + ] + + if _DEBUG: + print("─── Refinement prompt (user turn) ───") + print(user_msg[:1200] + ("...[trimmed]" if len(user_msg) > 1200 else "")) + print("─────────────────────────────────────") + + try: + api_response = _call_github_models(messages, model=model or _DEFAULT_MODEL, token=token) + raw_content = api_response["choices"][0]["message"]["content"].strip() + except (urllib.error.URLError, urllib.error.HTTPError, KeyError, + IndexError, ValueError) as exc: + print(f"⚠️ Refinement call failed ({type(exc).__name__}: {exc})") + return None + + if _DEBUG: + print("─── Refinement raw content ───") + print(raw_content) + print("──────────────────────────────") + + # Strip stray markdown fences if any + cleaned = raw_content + if cleaned.startswith("```"): + cleaned = cleaned.strip("`") + if cleaned.lower().startswith("json"): + cleaned = cleaned[4:] + cleaned = cleaned.strip() + + try: + refined = json.loads(cleaned) + except json.JSONDecodeError as exc: + print(f"⚠️ Refinement JSON parse failed: {exc}") + return None + + current = refined.get("current_text") + desired = refined.get("desired_text") + explanation = refined.get("explanation") or fix_plan.get("explanation") + + if not current or not desired: + print("⚠️ Refinement returned null current/desired — model declined to guess.") + return None + + # Hard guard: the model is REQUIRED to ground current in the file. + # If it didn't, the substitution would silently fail. + if current not in file_contents: + print(f"⚠️ Refined current_text not found verbatim in {file_path}; rejecting.") + return None + + return { + "module_url": fix_plan.get("module_url"), + "current_text": current, + "desired_text": desired, + "explanation": explanation, + } + + +# Convenience for ad-hoc debugging: `python3 ai_triage.py ` +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python3 ai_triage.py ") + sys.exit(1) + with open(sys.argv[1]) as fh: + issue_data = json.load(fh) + result = ai_or_fallback(issue_data) + print(json.dumps(result, indent=2)) diff --git a/issue_analyzer.py b/issue_analyzer.py new file mode 100644 index 00000000..222dceac --- /dev/null +++ b/issue_analyzer.py @@ -0,0 +1,516 @@ +""" +AI-Powered Issue Analysis Module + +Uses intelligent analysis to: +1. Extract intent and context from issue text +2. Identify specific problems and solutions +3. Generate context-aware PR changes +4. Determine confidence levels for automation +""" + +import re +import json + + +# ───────────────────────────────────────────────────────────────── +# Template noise stripping +# The MS Learn issue forms include literal scaffolding text (checkboxes, +# section headers, placeholders) that should NOT count toward specificity +# or be extracted as the user's "current/desired" state. +# ───────────────────────────────────────────────────────────────── +_TEMPLATE_NOISE_PATTERNS = [ + # Section headers + r"which of the ms learn modules from the dropdown.*?\?", + r"additional information", + r"information about the requested update", + # Checkbox options (filled or unfilled) + r"-?\s*\[[ x]\]\s*fix a broken user experience[^\n]*", + r"-?\s*\[[ x]\]\s*update incorrect information[^\n]*", + r"-?\s*\[[ x]\]\s*add new content to the module[^\n]*", + r"-?\s*\[[ x]\]\s*some other request[^\n]*", + # Bare option lines (when the user didn't even check a box) + r"^\s*fix a broken user experience[^\n]*$", + r"^\s*update incorrect information[^\n]*$", + r"^\s*add new content to the module[^\n]*$", + r"^\s*some other request[^\n]*$", + # Placeholders / "no response" answers + r"\[replace_with[^\]]*\]", + r"^\s*no response\s*$", + r"^\s*none\s*$", +] + + +def _strip_template_noise(text): + """Remove MS Learn issue-template scaffolding so it doesn't pollute analysis.""" + if not text: + return "" + cleaned = text + for pat in _TEMPLATE_NOISE_PATTERNS: + cleaned = re.sub(pat, "", cleaned, flags=re.IGNORECASE | re.MULTILINE) + # Collapse multiple blank lines + cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) + return cleaned.strip() + + +def _is_effectively_empty(text): + """True if, after stripping template noise, there is no real user content.""" + return len(_strip_template_noise(text or "").split()) < 5 + + +class IssueAnalyzer: + """Intelligent issue analysis without external AI dependencies""" + + def __init__(self, title, body): + self.title = title + # Keep the raw body for length/reporting, but analyze on the cleaned text + self.body = body + self.cleaned_body = _strip_template_noise(body or "") + self.full_text = f"{title}\n{self.cleaned_body}".lower() + self.analysis = {} + + def analyze(self): + """Perform comprehensive issue analysis""" + # Build analysis in two passes to avoid circular references + # First pass: gather all information + self.analysis = { + "issue_type": self._detect_issue_type(), + "specificity_score": self._calculate_specificity(), + "context_level": self._extract_context(), + "problem_statement": self._extract_problem(), + "proposed_fix": self._extract_proposed_fix(), + } + + # Second pass: calculate confidence and actionability + # (these depend on the analysis from the first pass) + self.analysis["confidence"] = self._calculate_confidence() + self.analysis["actionability"] = self._assess_actionability() + + return self.analysis + + def _detect_issue_type(self): + """Detect what type of issue this is""" + types = [] + + if any(x in self.full_text for x in ["incorrect", "wrong", "wrong answer", "wrong response", "not correct", "inaccurate"]): + types.append("incorrect_content") + + if any(x in self.full_text for x in ["missing", "not there", "should have", "need to add"]): + types.append("missing_content") + + if any(x in self.full_text for x in ["confusing", "unclear", "hard to understand", "difficult", "misleading", "misrepresent"]): + types.append("clarity_issue") + + if any(x in self.full_text for x in ["typo", "spelling", "grammar", "punctuation"]): + types.append("typo_or_grammar") + + if any(x in self.full_text for x in ["outdated", "old", "deprecated", "no longer", "incorrect information"]): + types.append("outdated_content") + + if any(x in self.full_text for x in ["broken link", "404", "doesn't work", "link error"]): + types.append("broken_link") + + if any(x in self.full_text for x in ["example", "code", "sample"]): + types.append("code_related") + + return types if types else ["general_feedback"] + + def _calculate_specificity(self): + """Score how specific and actionable the issue is (0-100)""" + score = 0 + + # Has specific section mentioned (+25) + if any(x in self.full_text for x in ["unit", "module", "section", "chapter", "knowledge check", "exercise", "lab", "assessment", "check your knowledge"]): + score += 25 + + # Has exact location/number (+20) + if re.search(r'(question|q)\s*(\d+|[a-z])', self.full_text): + score += 20 + if re.search(r'(line|paragraph)\s*(\d+)', self.full_text): + score += 20 + + # Describes current vs desired state (+30) + if any(x in self.full_text for x in ["current", "should be", "instead of", "currently"]): + score += 30 + elif any(x in self.full_text for x in ["says", "shows", "displays", "marked as", "written", "described"]): + score += 20 + + # Has concrete examples or quoted text (+15) + if re.search(r'"[^"]{5,}"', self.full_text) or re.search(r"'[^']{5,}'", self.full_text): + score += 15 + # Also credit parenthetical content like (Option 1), (Correct), etc. + elif re.search(r'\([^)]{3,}\)', self.full_text): + score += 12 + + # Has URL (+15) - increased from 10 since having a link is very specific + if "http" in self.full_text: + score += 15 + + # Has explicit answer/correct marker (+10) + if any(x in self.full_text for x in ["(correct)", "(correct", "correct answer", "correct is", "should be"]): + score += 10 + + # Identifies incorrect/inconsistent information (+20) + if any(x in self.full_text for x in ["incorrect", "inaccurate", "misleading", "misrepresent", "no such thing", "not", "alignment", "inconsistent"]): + score += 20 + + return min(score, 100) + + def _extract_context(self): + """Extract available context from the issue""" + context = { + "has_url": "http" in self.full_text, + "has_specific_module": any(x in self.full_text for x in ["unit", "module", "section", "assessment", "check your knowledge"]), + "has_examples": bool(re.search(r'"[^"]{5,}"', self.full_text)) or bool(re.search(r'\([^)]{3,}\)', self.full_text)), + "has_current_state": any(x in self.full_text for x in ["current", "says", "shows", "displays", "marked as", "is", "written", "described", "suggests"]), + "has_desired_state": any(x in self.full_text for x in ["should be", "instead of", "should have", "should", "instead", "correct", "needs to be", "all code reviews", "require", "actually", "really"]), + # Use cleaned body for length so template scaffolding doesn't inflate this + "issue_length": len(self.cleaned_body.split()), + "has_code": "```" in self.cleaned_body or bool(re.search(r'`[^`]+`', self.cleaned_body)), + "has_screenshot": "screenshot" in self.full_text or "image" in self.full_text, + "has_quiz_content": any(x in self.full_text for x in ["answer", "knowledge check", "question", "multiple choice", "correct", "quiz"]), + # New: true when the body is essentially just the unfilled template + "is_template_only": _is_effectively_empty(self.body), + } + return context + + @staticmethod + def _looks_like_template_fragment(text): + """Reject matches that are obviously checkbox/template scaffolding.""" + if not text: + return True + t = text.strip().lower() + bad_substrings = [ + "add new content", + "some other request", + "update incorrect information", + "fix a broken user experience", + "[ ]", "[x]", + "replace_with", + "no response", + "information ab", # truncated "Information about..." + ] + return any(b in t for b in bad_substrings) + + def _extract_problem(self): + """Extract the core problem from the issue""" + # Try to extract "current -> desired" pattern + problem = { + "current": None, + "desired": None, + "location": None, + "summary": None, + } + + # Extract current state (what's wrong) + # Pattern 1: "currently/now/says/shows..." + current_match = re.search( + r'(?:currently|now|says|shows|displays|marked as|written|suggests)(?:\s+[a-z]+)*\s+["\']?([^"\'.!?]{10,80})["\']?', + self.full_text + ) + if current_match and not self._looks_like_template_fragment(current_match.group(1)): + problem["current"] = current_match.group(1).strip() + + # Pattern 2: Look for parenthetical marked items like "(Correct)" or "(Answer)" + if not problem["current"]: + paren_match = re.search(r'\(([^)]{5,50})\)\s*\n', self.full_text) + if paren_match and not self._looks_like_template_fragment(paren_match.group(1)): + problem["current"] = paren_match.group(1).strip() + + # Pattern 3: Look for natural language problem statements like "There is no such thing as..." + if not problem["current"]: + natural_problem = re.search(r'(There is (?:no|not)[^.!?]{15,80})', self.full_text) + if natural_problem and not self._looks_like_template_fragment(natural_problem.group(1)): + problem["current"] = natural_problem.group(1).strip() + + # Extract desired state (what should be) + # Pattern 1: "should/should be/instead/needs to be..." + desired_match = re.search( + r'(?:should be|should|instead of|correct is|needs to be|correct|instead)(?:\s+[a-z]+)*\s+["\']?([^"\'.!?:]{8,80})["\']?', + self.full_text + ) + if desired_match and not self._looks_like_template_fragment(desired_match.group(1)): + problem["desired"] = desired_match.group(1).strip() + + # Pattern 2: Look for explicit direction like "should be: None of them" + explicit_desired = re.search(r'should be:\s*([^.\n]{5,80})', self.full_text) + if explicit_desired and not self._looks_like_template_fragment(explicit_desired.group(1)): + problem["desired"] = explicit_desired.group(1).strip() + + # Pattern 3: Look for fact statements like "all code reviews consume..." + if not problem["desired"]: + fact_match = re.search(r'(all (?:code reviews|PRU|[a-z]+s?) (?:consume|require)[^.!?]{8,80})', self.full_text) + if fact_match and not self._looks_like_template_fragment(fact_match.group(1)): + problem["desired"] = fact_match.group(1).strip() + + # Extract location + location_keywords = ["in the", "on the", "under", "inside", "within", "at the", "in module", "check your", "knowledge check"] + for keyword in location_keywords: + match = re.search(rf'{keyword}\s+([^.!?,{{]{{3,60}})', self.full_text) + if match and not self._looks_like_template_fragment(match.group(1)): + problem["location"] = match.group(1).strip() + break + + # Generate summary from title + problem["summary"] = self.title.strip() + + return problem + + def _extract_proposed_fix(self): + """Extract or infer the proposed fix""" + fix = { + "explicit": None, + "inferred": None, + "confidence": 0, + } + + # Check for explicit fix suggestion + explicit_match = re.search( + r'(?:fix|change|update|replace|modify)(?:\s+(?:to|with))?\s+["\']?([^"\'.!?]{5,100})["\']?', + self.full_text + ) + if explicit_match: + fix["explicit"] = explicit_match.group(1).strip() + fix["confidence"] += 0.5 + + # Infer fix if we have current/desired pattern + if self.analysis.get("problem_statement"): + problem = self.analysis["problem_statement"] + if problem.get("current") and problem.get("desired"): + fix["inferred"] = f"Replace '{problem['current']}' with '{problem['desired']}'" + fix["confidence"] += 0.3 + + fix["confidence"] = min(fix["confidence"], 1.0) + return fix + + def _assess_actionability(self): + """Assess if this issue is actionable (0-100)""" + score = 0 + + # Has issue type identified (+20) + issue_types = self.analysis.get("issue_type", []) + if issue_types: + score += 20 + + # Has context level (+20) + context = self.analysis.get("context_level", {}) + if context.get("has_url"): + score += 15 + if context.get("has_specific_module"): + score += 15 + + # Has specific problem described (+20) + problem = self.analysis.get("problem_statement", {}) + if problem.get("current") or problem.get("desired"): + score += 20 + + # Has proposed fix (+20) + fix = self.analysis.get("proposed_fix", {}) + if fix.get("explicit") or fix.get("inferred"): + score += 20 + + return min(score, 100) + + def _calculate_confidence(self): + """Calculate confidence in auto-fix capability (0-100)""" + score = 0 + context = self.analysis.get("context_level", {}) + + # URL present (+30) - increased from 25 because having a link is very significant + if context.get("has_url"): + score += 30 + + # Specific module (+20) + if context.get("has_specific_module"): + score += 20 + + # Current AND desired states (+25) + if context.get("has_current_state") and context.get("has_desired_state"): + score += 25 + + # Quiz/knowledge check content (+15) - these are highly actionable + if context.get("has_quiz_content"): + score += 15 + + # Issue type not vague (+20) - increased from 15 to weight specific issue types more + issue_types = self.analysis.get("issue_type", []) + non_vague = [t for t in issue_types if t not in ["clarity_issue", "general_feedback"]] + if non_vague: + score += 20 + + # Has good specificity (+15) + if self.analysis.get("specificity_score", 0) > 50: + score += 15 + + return min(score, 100) + + +def intelligent_classify(issue): + """ + Use intelligent analysis to classify issues + Returns: (category, confidence_0_to_100, analysis) + + Classification Logic: + - spam: Template-only / empty / trivially short + - auto_fix: Has a clear, extractable current→desired change AND high signal + - needs_human: Has enough context but may need human review + - needs_context: Missing critical information + """ + title = issue.get("title", "") + body = issue.get("body", "") + + # Quick spam filter: trivially short + if len(f"{title}\n{body}".strip()) < 20: + return "spam", 100, {} + + # Template-only spam filter: body is just unfilled MS Learn issue template + if _is_effectively_empty(body): + # Still build an analysis so downstream comment shows what we saw + analyzer = IssueAnalyzer(title, body) + analysis = analyzer.analyze() + analysis["spam_reason"] = "template_only" + return "spam", 100, analysis + + analyzer = IssueAnalyzer(title, body) + analysis = analyzer.analyze() + + # All scores are already on a 0-100 scale + confidence = analysis["confidence"] + actionability = analysis["actionability"] + specificity = analysis["specificity_score"] + context = analysis.get("context_level", {}) + problem = analysis.get("problem_statement", {}) + has_url = context.get("has_url", False) + has_current = bool(problem.get("current")) + has_desired = bool(problem.get("desired")) + + # AUTO-FIX: only when we have a concrete current AND desired to swap. + # Without both, we cannot generate a safe patch, no matter how confident + # the heuristics feel. + # + # Additionally, refuse auto_fix when the proposed "desired" looks more + # like editorial commentary than a literal phrase to substitute: + # heuristic = too long (>8 words) AND not quoted/parenthesized. + desired_str = problem.get("desired") or "" + desired_is_short_phrase = ( + len(desired_str.split()) <= 8 + or any(q in desired_str for q in ['"', "'"]) + ) + + if (specificity >= 60 and confidence >= 70 and actionability >= 70 + and has_current and has_desired and desired_is_short_phrase): + return "auto_fix", confidence, analysis + + # NEEDS_HUMAN: has URL + good specificity = enough info for a person + if has_url and specificity >= 50: + return "needs_human", confidence, analysis + + # NEEDS_HUMAN: medium confidence/actionability but decent specificity + if (confidence >= 40 or actionability >= 40) and specificity >= 30: + return "needs_human", confidence, analysis + + # NEEDS_CONTEXT: low specificity and low confidence + if specificity < 30 and confidence < 50: + return "needs_context", confidence, analysis + + # FALLBACK: if we have ANY URL, route to human + if has_url: + return "needs_human", confidence, analysis + + return "needs_context", confidence, analysis + + +def generate_context_aware_comment(analysis, classification): + """ + Generate a helpful comment based on analysis + """ + issue_types = analysis.get("issue_type", []) + context = analysis.get("context_level", {}) + problem = analysis.get("problem_statement", {}) + fix = analysis.get("proposed_fix", {}) + + if classification == "auto_fix": + return f"""🧠 **Auto-triage Analysis:** + +**Issue Type:** {', '.join(issue_types)} +**Specificity Score:** {analysis.get('specificity_score', 0)}/100 +**Confidence:** {analysis.get('confidence', 0)}/100 + +I've identified this as an actionable issue: +- Location: {problem.get('location', 'Not specified')} +- Problem: {problem.get('current', 'Not clearly stated')} +- Should be: {problem.get('desired', 'Not clearly stated')} + +Attempting to open a PR to fix this now! 🚀 +(If no PR link appears in a follow-up comment, the auto-fix could not be applied — a human reviewer will take it from here.)""" + + elif classification == "needs_human": + missing = [] + if not context.get("has_url"): + missing.append("Direct MS Learn URL") + if not context.get("has_specific_module"): + missing.append("Specific module/unit reference") + if not problem.get("current"): + missing.append("Current state description") + if not problem.get("desired"): + missing.append("Desired state description") + + return f"""🚩 **Review Needed** + +**Issue Type:** {', '.join(issue_types) or 'Not clearly categorized'} +**Specificity Score:** {analysis.get('specificity_score', 0)}/100 + +To help me proceed, please provide: +{chr(10).join(f'- {item}' for item in missing)} + +Current info available: +- Module specific: {context.get('has_specific_module')} +- Has examples: {context.get('has_examples')} +- Issue length: {context.get('issue_length')} words""" + + else: # needs_context + return f"""❓ **More Context Needed** + +**Analysis:** +- Issue Type: {', '.join(issue_types) or 'Unclear'} +- Specificity Score: {analysis.get('specificity_score', 0)}/100 + +Please provide: +1. **Direct link** to the MS Learn module or page +2. **Specific section** affected (unit, knowledge check, section, etc.) +3. **Current state:** What is shown/said now? +4. **Desired state:** What should it be? +5. **Examples:** Concrete code or text examples + +This will help me automatically create a PR to fix it!""" + + +def generate_pr_description(issue, analysis): + """ + Generate PR description based on issue analysis + """ + problem = analysis.get("problem_statement", {}) + fix = analysis.get("proposed_fix", {}) + issue_types = analysis.get("issue_type", []) + + description = f"""## Fixes Issue #{issue['number']} + +### Issue Type +{', '.join(issue_types)} + +### Problem +{problem.get('current', 'Issue details from #' + str(issue['number']))} + +### Solution +{fix.get('explicit') or fix.get('inferred') or 'Updated content to match learning objectives'} + +### Context +- **Module:** {problem.get('location', 'See issue #' + str(issue['number']))} +- **Specificity Score:** {analysis['specificity_score']}/100 + +### Related Issue +Closes #{issue['number']} + +--- +*Auto-generated PR from intelligent issue analysis* +""" + return description diff --git a/triage.py b/triage.py new file mode 100644 index 00000000..54406b45 --- /dev/null +++ b/triage.py @@ -0,0 +1,130 @@ +import os +import sys +import json +import subprocess +import re +from issue_analyzer import intelligent_classify, generate_context_aware_comment + +GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") + +def ensure_label(label_name): + # create label safely if it doesn't exist + run(f'gh label create "{label_name}" --repo "$GITHUB_REPOSITORY" || true') + + +def add_label(issue_number, label_name): + ensure_label(label_name) + gh(f'issue edit {issue_number} --add-label "{label_name}"') + +def run(cmd): + """Run shell commands safely""" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + if result.returncode != 0: + print("ERROR:", result.stderr) + return result.stdout.strip() + + +def gh(cmd): + """GitHub CLI wrapper with auth""" + return run(f'gh {cmd} --repo "$GITHUB_REPOSITORY"') + + +def extract_urls(text): + return re.findall(r'https?://\S+', text) + + +def extract_module_mentions(text): + """ + Detect MS Learn module references (basic heuristic) + """ + patterns = [ + r"module", + r"learn.microsoft.com", + r"knowledge check", + r"check your knowledge", + r"github copilot" + ] + + return any(p.lower() in text.lower() for p in patterns) + + +def classify(issue): + """Use intelligent AI-powered analysis for classification""" + category, confidence, analysis = intelligent_classify(issue) + issue['_analysis'] = analysis + issue['_confidence'] = confidence + return category + + + + +def comment(issue_number, message): + gh(f'issue comment {issue_number} --body "{message}"') + + +def close(issue_number): + gh(f'issue close {issue_number}') + + +def process(issue): + issue_number = issue["number"] + result = classify(issue) + analysis = issue.get("_analysis", {}) + + print(f"Issue #{issue_number} classified as: {result}") + print(f"Specificity Score: {analysis.get('specificity_score', 0)}/100") + + # ALWAYS ensure label exists first + add_label(issue_number, "triaged") + + if result == "auto_fix": + comment_text = generate_context_aware_comment(analysis, result) + comment(issue_number, comment_text) + add_label(issue_number, "auto-fix") + + elif result == "needs_human": + comment_text = generate_context_aware_comment(analysis, result) + comment(issue_number, comment_text) + add_label(issue_number, "needs-review") + + elif result == "needs_context": + comment_text = generate_context_aware_comment(analysis, result) + comment(issue_number, comment_text) + add_label(issue_number, "needs-context") + + elif result == "spam": + comment(issue_number, + "🗑️ **Closed:** Insufficient actionable content to triage.") + add_label(issue_number, "spam") + close(issue_number) + +def main(): + if len(sys.argv) < 2: + print("Usage: triage.py '' or triage.py ''") + sys.exit(1) + + raw = sys.argv[1] + + # Try to parse as JSON first + try: + issue = json.loads(raw) + except json.JSONDecodeError: + # If not valid JSON, try to treat as issue number and fetch it + try: + issue_number = int(raw) + print(f"📝 Fetching issue #{issue_number} from GitHub...") + issue_json = run(f'gh issue view {issue_number} --json number,title,body,labels --repo "$GITHUB_REPOSITORY"') + issue = json.loads(issue_json) + except (ValueError, json.JSONDecodeError): + print(f"❌ Invalid input: '{raw}' is neither valid JSON nor a valid issue number") + sys.exit(1) + + process(issue) + + +if __name__ == "__main__": + if not GITHUB_TOKEN: + print("Missing GITHUB_TOKEN") + sys.exit(1) + + main() diff --git a/triage_batch.py b/triage_batch.py new file mode 100644 index 00000000..bac6434a --- /dev/null +++ b/triage_batch.py @@ -0,0 +1,1191 @@ +import os +import json +import subprocess +import re +import sys +from datetime import datetime +from issue_analyzer import intelligent_classify, generate_context_aware_comment, generate_pr_description +from ai_triage import ai_or_fallback + +REPO = os.environ.get("GITHUB_REPOSITORY") + +# ============================== +# UTILITIES +# ============================== +def run(cmd): + """Execute shell command safely""" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + if result.returncode != 0: + print(f"⚠️ Command failed: {cmd}") + print(f"STDERR: {result.stderr}") + return result.stdout.strip() + + +def gh(cmd): + """GitHub CLI wrapper""" + return run(f'gh {cmd} --repo "{REPO}"') + + +# ============================== +# LABEL MANAGEMENT +# ============================== +def ensure_label(label): + """Create label if it doesn't exist""" + run(f'gh label create "{label}" --repo "{REPO}" 2>/dev/null || true') + + +def add_label(issue, label): + """Add label to issue""" + ensure_label(label) + gh(f'issue edit {issue} --add-label "{label}"') + + +# Labels we apply during triage. Used to detect "already processed" +# issues so the scheduled batch run doesn't re-comment / re-PR them. +TRIAGE_LABELS = frozenset({ + "triaged", + "auto-fix", + "auto-fix-applied", + "auto-fix-attempted", + "needs-review", + "needs-context", + "spam", +}) + + +def _issue_labels(issue): + """Return a normalized set of label names on the issue. + + `gh issue list` returns labels as `[{"name": "...", ...}, ...]`, but + raw API payloads sometimes pass them as plain strings. Handle both. + """ + labels = issue.get("labels") or [] + out = set() + for lbl in labels: + if isinstance(lbl, dict): + name = lbl.get("name") + else: + name = lbl + if name: + out.add(name.strip().lower()) + return out + + +def already_triaged(issue): + """True iff the issue already carries one of our triage labels. + + The single-issue (event-driven) workflow path always re-processes — + when a human edits an issue, they want fresh triage — but the + scheduled batch run uses this to skip issues it has already handled. + """ + return bool(_issue_labels(issue) & {l.lower() for l in TRIAGE_LABELS}) + + +# ============================== +# TEXT PARSING +# ============================== +def extract_urls(text): + """Extract URLs from text""" + return re.findall(r'https?://\S+', text) + + +def extract_module_slug(url): + """Extract MS Learn module slug from URL""" + if "training/modules/" not in url: + return None + match = re.search(r'/training/modules/([^/?]+)', url) + return match.group(1) if match else None + + +def extract_unit_slug(url): + """Extract the unit slug from a Microsoft Learn module URL. + + Handles both shapes you'll see in the wild: + • .../modules//units/ (the formal anchor) + • .../modules//- (the public training URL, + e.g. .../7-knowledge-check) + """ + if not url: + return None + + # Strip query/fragment so a trailing ? or # never trips the regex. + url = url.split("?", 1)[0].split("#", 1)[0] + + # Shape 1: explicit /units/ + m = re.search(r"/units/([^/?#]+)", url) + if m: + return m.group(1) + + # Shape 2: trailing segment after /modules// — e.g. + # /training/modules/manage-work-github-projects/7-knowledge-check + m = re.search(r"/training/modules/[^/]+/([^/?#]+)/?$", url) + if m: + slug = m.group(1) + # Only accept it if it looks like a unit slug (starts with a digit + # or contains a hyphen), to avoid grabbing query-only paths. + if re.match(r"^\d+[-/]?|\w[-\w]*$", slug): + return slug + + return None + + +# ============================== +# CLASSIFICATION (AI-powered, with heuristic fallback) +# ============================== +def classify(issue): + """ + Ask an LLM (via GitHub Models) to triage the issue. The full structured + decision — including the user-facing comment and an auto-fix plan — is + cached on the issue dict for downstream steps. The function returns the + classification string for backward compatibility with existing callers. + """ + # If we've already classified this issue in this run, reuse it. This avoids + # paying for the same LLM call twice (once during the auto_fix pre-scan, + # once during process_issue) and avoids flaky non-determinism between calls. + if "_ai_decision" in issue: + return issue["_ai_decision"]["classification"] + + decision = ai_or_fallback(issue) + issue["_ai_decision"] = decision + # Preserve the legacy keys that other code still reads. + issue["_confidence"] = decision["confidence"] + issue["_analysis"] = { + "specificity_score": decision["confidence"], # informational only + "confidence": decision["confidence"], + "actionability": decision["confidence"], + "issue_type": [], + "context_level": {}, + "problem_statement": { + "current": (decision.get("fix_plan") or {}).get("current_text"), + "desired": (decision.get("fix_plan") or {}).get("desired_text"), + "location": (decision.get("fix_plan") or {}).get("module_url"), + "summary": issue.get("title", ""), + }, + "proposed_fix": { + "explicit": (decision.get("fix_plan") or {}).get("explanation"), + "inferred": None, + "confidence": decision["confidence"] / 100.0, + }, + "_source": decision["source"], + "_reasoning": decision["reasoning"], + } + return decision["classification"] + + +# ============================== +# LEARN REPO FILE RESOLUTION +# ============================== +UPSTREAM_REPO = "MicrosoftDocs/learn-pr" +REPO_PATH = "/tmp/learn-pr" + + +def _gh_authenticated_user(token): + """Return the login of the account behind LEARN_PR_TOKEN.""" + result = subprocess.run( + ["gh", "api", "user", "--jq", ".login"], + capture_output=True, text=True, + env={**os.environ, "GH_TOKEN": token, "GITHUB_TOKEN": token}, + ) + if result.returncode != 0: + print(f"⚠️ Could not resolve token owner: {result.stderr.strip()}") + return None + return result.stdout.strip() or None + + +def _ensure_fork(token, upstream=UPSTREAM_REPO): + """Ensure the token's owner has a fork of `upstream`. + + Returns the fork in `owner/repo` form, or None on failure. + + Strategy: + 1. Resolve the token owner via `gh api user`. + 2. Probe `gh api repos//` — if it exists and is a + fork, we're done (saves a CLI call and avoids the flag-compat + issues with `gh repo fork`). + 3. Otherwise call `gh repo fork ` to create it. We don't + pass `--clone=false` / `--remote=false` because newer `gh` + versions reject those flags when a repository argument is given; + the bare command is already "fork, don't clone" by default in + non-interactive contexts. + """ + owner = _gh_authenticated_user(token) + if not owner: + return None + + repo_name = upstream.split("/", 1)[1] + fork = f"{owner}/{repo_name}" + + print(f"🔱 Ensuring fork {fork} exists ...") + gh_env = {**os.environ, "GH_TOKEN": token, "GITHUB_TOKEN": token} + + # 1. Quick existence probe via the REST API. + probe = subprocess.run( + ["gh", "api", f"repos/{fork}", "--jq", ".fork"], + capture_output=True, text=True, env=gh_env, + ) + if probe.returncode == 0 and probe.stdout.strip().lower() == "true": + print(f"✅ Fork already exists: {fork}") + return fork + + # 2. Create the fork. `gh repo fork` is idempotent: if the fork was + # just created by a parallel run it prints a notice and exits 0. + print(f"🔱 Creating fork {fork} from {upstream} ...") + result = subprocess.run( + ["gh", "repo", "fork", upstream], + capture_output=True, text=True, env=gh_env, + ) + combined = (result.stdout + result.stderr).lower() + if result.returncode != 0 and "already exists" not in combined: + print(f"❌ Could not fork {upstream}: " + f"{(result.stderr or result.stdout).strip()}") + return None + + print(f"✅ Fork ready: {fork}") + return fork + + +def _sync_fork_main(token, fork, upstream=UPSTREAM_REPO): + """Sync the fork's default branch with the upstream's default branch. + + Uses the GitHub REST endpoint POST /repos/{owner}/{repo}/merge-upstream + which is the same thing the "Sync fork" button does in the web UI. + Safe to call repeatedly; it's a no-op when the fork is already up to date. + """ + print(f"🔄 Syncing {fork}:main ⇐ {upstream}:main ...") + result = subprocess.run( + ["gh", "api", "--method", "POST", + f"repos/{fork}/merge-upstream", + "-f", "branch=main"], + capture_output=True, text=True, + env={**os.environ, "GH_TOKEN": token, "GITHUB_TOKEN": token}, + ) + if result.returncode != 0: + # Non-fatal: an out-of-date fork is still usable, but we want to log + # it loudly so we know to investigate if PRs start diverging. + msg = (result.stderr or result.stdout or "").strip() + print(f"⚠️ Could not sync fork main (continuing anyway): {msg}") + return False + print(f"✅ Fork main synced: {result.stdout.strip()}") + return True + + +def clone_learn_repo(): + """Sparse-clone the user's fork (as `origin`) after syncing it with upstream. + + The MS Learn monorepo is ~1–2 GB; a full clone exhausts the runner's + disk (`No space left on device`). Instead we do a partial + sparse + clone that downloads only the commit graph (no blobs, no working + copy). Individual module subtrees are materialized on demand via + :func:`ensure_sparse_paths` when an auto-fix issue actually needs them. + + Why we clone the fork rather than the upstream: + • `MicrosoftDocs/learn-pr` is behind SAML SSO and rejects + unauthenticated `git clone`. + • The fork lives in the LEARN_PR_TOKEN owner's own namespace, so it + is freely accessible with the token — no SAML authorization needed. + • Before cloning we hit `POST repos//merge-upstream`, which is + the same thing the "Sync fork" button does in the UI. GitHub + performs the merge server-side, so we don't need direct upstream + access to keep the fork current. + + Returns the (repo_path, fork_slug) tuple, or (None, None) on failure. + """ + token = os.environ.get("LEARN_PR_TOKEN") + if not token: + print("❌ LEARN_PR_TOKEN not set — skipping auto-PR step.") + return None, None + + # 1. Ensure a fork exists in the token-owner's namespace. + fork = _ensure_fork(token) + if not fork: + print("❌ Could not establish a fork — aborting auto-PR step.") + return None, None + + # 2. Sync the fork's `main` with upstream's `main` FIRST. We do this + # before cloning so the fork we then pull down is already current. + _sync_fork_main(token, fork) + + # 3. Sparse-clone the fork as `origin`. Flags explained: + # --filter=blob:none — fetch the commit + tree graph but no + # file blobs (blobs are pulled lazily + # when paths are checked out). + # --no-checkout — don't materialize a working copy yet. + # --sparse — enable sparse-checkout mode (we'll add + # specific paths later via + # ensure_sparse_paths()). + # Combined with `git sparse-checkout`, this means we only ever + # pay for the module subtrees we actually edit. + print(f"📥 Sparse-cloning fork {fork} as origin " + f"(blob:none, no checkout) ...") + run(f"rm -rf {REPO_PATH}") + fork_url = f"https://x-access-token:{token}@github.com/{fork}.git" + result = subprocess.run( + f"git clone --filter=blob:none --no-checkout --sparse " + f"{fork_url} {REPO_PATH}", + shell=True, capture_output=True, text=True, + ) + if result.returncode != 0 or not os.path.isdir(os.path.join(REPO_PATH, ".git")): + print(f"❌ Failed to sparse-clone fork {fork}: " + f"{result.stderr.strip()}") + return None, None + print(f"✅ Sparse-cloned fork to {REPO_PATH}") + + # 4. Initialize sparse-checkout in no-cone mode with an effectively + # empty pattern set. We use a literal path that will never match + # anything in the repo as a placeholder; subsequent + # ensure_sparse_paths() calls will replace this with real patterns. + run(f"cd {REPO_PATH} && git sparse-checkout init --no-cone") + run(f"cd {REPO_PATH} && git sparse-checkout set '/__no_match_placeholder__'") + + # 5. Add the upstream as a read-only remote called `upstream`. This + # might 401 in a SAML-enforced setup, which is fine — we never + # fetch from it; `origin/main` is already in sync thanks to the + # REST sync above. + upstream_url = f"https://github.com/{UPSTREAM_REPO}.git" + run(f"cd {REPO_PATH} && git remote remove upstream 2>/dev/null; " + f"git remote add upstream {upstream_url}") + + # 6. Make sure origin/main is the freshest view of the fork's default + # branch (it should be after the clone, but a no-op fetch costs + # nothing and guards against odd default-branch races). + run(f"cd {REPO_PATH} && git fetch origin main --quiet || true") + + # 7. Track which sparse paths we've already added in-process so + # repeated calls for the same module are no-ops. + if not hasattr(clone_learn_repo, "_materialized_paths"): + clone_learn_repo._materialized_paths = set() + clone_learn_repo._materialized_paths.clear() + + return REPO_PATH, fork + + +def _candidate_module_paths(module_slug): + """Return the sparse-checkout path patterns to try for a module slug. + + The MS Learn monorepo nests modules under several roots. The two most + common in `MicrosoftDocs/learn-pr` are: + • `learn-pr/learn-pr///` ← e.g. .../github/manage-work-... + • `learn-pr///` + • `learn-pr-mooc/...` + + Materializing all plausible roots costs only an extra index entry per + pattern, so we over-include rather than guess. + """ + safe_slug = module_slug.strip("/").strip() + if not safe_slug: + return [] + return [ + # 4-deep nesting (the common case in the modern monorepo) + f"learn-pr/learn-pr/**/{safe_slug}/**", + # 3-deep nesting + f"learn-pr/**/{safe_slug}/**", + # Legacy / direct nesting + f"learn-pr/{safe_slug}/**", + f"learn-pr-mooc/**/{safe_slug}/**", + f"{safe_slug}/**", + ] + + +def ensure_sparse_paths(repo_path, module_slugs): + """Materialize the working-copy slices needed for the given module(s). + + Call this before reading files from a module. It: + 1. Adds the candidate sparse-checkout paths for each slug. + 2. Runs `git checkout` to materialize matching files (blobs are + pulled on-demand thanks to the partial clone). + Subsequent calls for the same slug are no-ops. + """ + if not module_slugs: + return + if not hasattr(clone_learn_repo, "_materialized_paths"): + clone_learn_repo._materialized_paths = set() + cache = clone_learn_repo._materialized_paths + + new_patterns = [] + for slug in module_slugs: + if not slug or slug in cache: + continue + cache.add(slug) + patterns = _candidate_module_paths(slug) + if patterns: + print(f"🧩 Materializing sparse paths for module {slug!r}...") + for p in patterns: + print(f" + {p}") + new_patterns.extend(patterns) + + if not new_patterns: + return + + # `sparse-checkout add` is incremental — it ORs new patterns into the + # existing set, so previously materialized modules stay materialized. + quoted = " ".join(f"'{p}'" for p in new_patterns) + run(f"cd {repo_path} && git sparse-checkout add {quoted}") + # Materialize matching files on the current branch. + run(f"cd {repo_path} && git checkout --quiet") + print("✅ Sparse paths materialized") + + +def find_module_path(repo_path, module_slug): + """Find the directory whose **basename** matches module_slug exactly. + + We compare on basename, not substring, so a slug like + `code-reviews-pull-requests-github-copilot` doesn't accidentally + match a parent directory that happens to contain the substring. + """ + slug = module_slug.lower() + for root, dirs, _files in os.walk(repo_path): + # Skip the .git internals + dirs[:] = [d for d in dirs if d != ".git"] + for d in dirs: + if d.lower() == slug: + return os.path.join(root, d) + return None + + +def _file_matches_unit(filename, unit_slug): + """True when the file name corresponds to the given unit slug. + + Handles both shapes seen in practice: + • `7-knowledge-check.yml` (most modules) + • `knowledge-check.yml` (older modules) + • `7-knowledge-check.md` (rare; some units are full md) + The match is on the stem (without extension) and tolerates a leading + `-` ordering prefix on either side. + """ + if not unit_slug: + return False + stem = os.path.splitext(filename)[0].lower() + slug = unit_slug.lower() + # Strip a leading "N-" ordering prefix from BOTH sides for comparison. + stem_naked = re.sub(r"^\d+-", "", stem) + slug_naked = re.sub(r"^\d+-", "", slug) + return ( + stem == slug + or stem_naked == slug_naked + or stem.endswith("-" + slug_naked) + or slug_naked == stem + ) + + +def find_target_file(repo_path, module_slug, unit_slug): + """Locate the file that this issue is about. + + MS Learn modules in `MicrosoftDocs/learn-pr` are laid out like: + + learn-pr/learn-pr/// + ├── index.yml + ├── -knowledge-check.yml ← knowledge-check answers + ├── ... + └── includes/ + ├── -.md ← unit prose content + └── ... + + Strategy: + 1. Locate the module directory by exact basename. + 2. If we have a unit slug, look for a file whose stem matches it, + checking BOTH the module root (for .yml) and `includes/` + (for .md). Prefer .yml when the unit slug looks like a + knowledge check, since fixes for "this answer is wrong" need + to edit the YAML, not the markdown. + 3. If we don't have a unit slug, give up — we refuse to guess + which file to edit. + """ + module_path = find_module_path(repo_path, module_slug) + if not module_path: + return None + + # Search roots, in priority order. + search_roots = [module_path] + includes_path = os.path.join(module_path, "includes") + if os.path.isdir(includes_path): + search_roots.append(includes_path) + + if not unit_slug: + print("⚠️ No unit slug — refusing to guess which file to edit.") + return None + + is_kc = "knowledge" in unit_slug.lower() and "check" in unit_slug.lower() + + # Build a list of (filepath, ext) candidates whose name matches unit_slug. + candidates = [] + for root in search_roots: + try: + entries = os.listdir(root) + except OSError: + continue + for f in entries: + full = os.path.join(root, f) + if not os.path.isfile(full): + continue + ext = os.path.splitext(f)[1].lower() + if ext not in (".yml", ".yaml", ".md"): + continue + if _file_matches_unit(f, unit_slug): + candidates.append((full, ext)) + + if not candidates: + return None + + # Prefer YAML for knowledge checks, otherwise prefer markdown. + def score(item): + path, ext = item + if is_kc: + return 0 if ext in (".yml", ".yaml") else 1 + return 0 if ext == ".md" else 1 + + candidates.sort(key=score) + return candidates[0][0] + + +# ============================== +# GIT OPERATIONS +# ============================== +def _run_git(repo_path, args, check=False): + """Run `git ` in `repo_path` without going through a shell. + + Returns the completed CompletedProcess so callers can inspect both + stdout and stderr — `run()` swallows stderr, which has bitten us in + the past (silent commit failures). + """ + cmd = ["git", "-C", repo_path, *args] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0 and check: + print(f"❌ git {' '.join(args)} failed (exit {result.returncode})") + if result.stdout: + print(f" stdout: {result.stdout.strip()}") + if result.stderr: + print(f" stderr: {result.stderr.strip()}") + return result + + +def setup_git_config(): + """Configure a global git identity for the bot. + + Note: we ALSO set repo-local user.name/email inside commit_changes() — + this global config is just a belt-and-suspenders default in case some + git operation runs outside a known repo. + """ + run("git config --global user.name 'issue-triage-bot'") + run("git config --global user.email 'bot@github.com'") + + +def reset_repo(repo_path): + """Reset repo to clean state""" + run(f"cd {repo_path} && git fetch origin main && git reset --hard origin/main && git clean -fd") + + +def create_branch(repo_path, branch_name): + """Create a fresh branch off the upstream's latest `main`. + + Defensive in two ways: + 1. Refuse to ever create a branch literally named "main" — we never + want to push directly to the default branch. + 2. Delete any pre-existing local branch with the same name and recreate + it from a fresh `origin/main`, so re-running the workflow always + produces a PR based on the freshest upstream content. + """ + if branch_name.strip().lower() in {"main", "master"}: + raise ValueError( + f"Refusing to create a working branch named {branch_name!r} — " + "fixes must live on a feature branch, never on main." + ) + + # Make sure we have origin/main up-to-date locally + run(f"cd {repo_path} && git fetch origin main --quiet") + # Wipe any pre-existing local branch with the same name + run(f"cd {repo_path} && git checkout --quiet origin/main") + run(f"cd {repo_path} && git branch -D {branch_name} 2>/dev/null || true") + # Create the branch off the latest upstream main + run(f"cd {repo_path} && git checkout -b {branch_name} origin/main") + + +def commit_changes(repo_path, issue_num, commit_msg): + """Stage all changes and commit them, returning True iff a commit was made. + + Differs from the old implementation in three important ways: + 1. Uses subprocess directly with a list of args — no shell quoting, + so commit messages with apostrophes / colons / quotes are safe. + 2. Sets repo-local user.name/user.email before committing, so a + missing or wrong global git identity doesn't silently break us. + 3. Pre-checks `git status --porcelain` so "nothing staged" is + reported as exactly that, rather than inferred from a stderr + substring after the fact. + Every git invocation's stdout AND stderr are surfaced on failure so + we never again see a "STDERR:" with empty content. + """ + # Repo-local identity — the global config may be absent in some + # runner environments (cached/restored repos, etc.). + _run_git(repo_path, ["config", "user.name", "issue-triage-bot"]) + _run_git(repo_path, ["config", "user.email", "bot@github.com"]) + + # Stage everything (additions, modifications, deletions). + add = _run_git(repo_path, ["add", "-A"], check=True) + if add.returncode != 0: + print(f"❌ Could not stage changes for issue #{issue_num} — aborting commit.") + return False + + # Did anything actually change? + status = _run_git(repo_path, ["status", "--porcelain"]) + if not status.stdout.strip(): + print(f"⚠️ No staged changes for issue #{issue_num} — nothing to commit.") + return False + + # Commit. Passing the message as a separate argv entry avoids ALL + # shell-escaping issues. + commit = _run_git(repo_path, ["commit", "-m", commit_msg]) + if commit.returncode != 0: + print(f"❌ git commit failed for issue #{issue_num} " + f"(exit {commit.returncode})") + if commit.stdout: + print(f" stdout: {commit.stdout.strip()}") + if commit.stderr: + print(f" stderr: {commit.stderr.strip()}") + return False + + # Optional but useful: surface the commit short-hash + subject. + head = _run_git(repo_path, ["log", "-1", "--pretty=%h %s"]) + print(f"✅ Committed: {head.stdout.strip()}") + return True + + +def push_branch(repo_path, branch_name, remote="origin"): + """Push branch to the named remote. + + Default is `origin`, which is wired to the user's fork by + `clone_learn_repo()` (see that function's docstring for why we clone + the fork as origin rather than the upstream). + + Hard-refuses to push to `main` / `master` — those branches are managed + purely by the upstream-sync step (`_sync_fork_main`). + """ + if branch_name.strip().lower() in {"main", "master"}: + raise ValueError( + f"Refusing to push branch {branch_name!r} — " + "the fork's default branch is kept in sync with upstream, " + "never modified by the bot." + ) + run( + f"cd {repo_path} && " + f"git push {remote} {branch_name} --force-with-lease " + f"|| git push {remote} {branch_name} --force" + ) + + +def create_pr(repo, branch_name, issue_num, title, body, head_owner=None): + """Open a PR against `repo` from the user's fork. + + `head_owner` is the GitHub login of the fork's owner; when provided we + use the cross-repo `--head :` syntax so gh knows to look + on the fork. + """ + # Write the body to a file to avoid quoting hell in the shell. + body_path = f"/tmp/pr-body-{issue_num}.md" + with open(body_path, "w") as f: + f.write(body) + + head = f"{head_owner}:{branch_name}" if head_owner else branch_name + gh_cmd = ( + f"pr create " + f"--repo {repo} " + f"--title \"{title}\" " + f"--body-file {body_path} " + f"--head {head} " + f"--base main" + ) + # We need to call gh with the LEARN_PR_TOKEN, not GITHUB_TOKEN (which is + # scoped to the triage repo and can't see the fork). + token = os.environ.get("LEARN_PR_TOKEN") + if token: + return subprocess.run( + f"gh {gh_cmd}", + shell=True, capture_output=True, text=True, + env={**os.environ, "GH_TOKEN": token, "GITHUB_TOKEN": token}, + ).stdout.strip() + return run(f"gh {gh_cmd}") + + +# ============================== +# ISSUE PROCESSING +# ============================== +def apply_intelligent_fix(file_path, analysis): + """Apply a literal text substitution and PERSIST it to disk. + + Returns True iff the file was changed AND saved. Previously this + function mutated `content` in memory and returned True without ever + writing the file back — which made every "successful" auto-fix end + with `git status` reporting no staged changes. Don't do that again. + """ + with open(file_path, "r", encoding="utf-8") as f: + original_content = f.read() + + problem = analysis.get("problem_statement", {}) + current = problem.get("current") + desired = problem.get("desired") + + if not current or not desired: + return False + + new_content = None + fix_kind = None + + # Strategy 1: exact substring match (preferred — predictable). + if current in original_content: + new_content = original_content.replace(current, desired, 1) + fix_kind = "exact" + + # Strategy 2: case-insensitive single replacement. + if new_content is None: + ci_pattern = re.compile(re.escape(current), re.IGNORECASE) + if ci_pattern.search(original_content): + new_content = ci_pattern.sub(desired, original_content, count=1) + fix_kind = "case-insensitive" + + # Strategy 3: long-phrase contextual rewrite (rarely needed; keep as a + # last-resort fallback). + if new_content is None and len(current) > 20: + key_words = current.split()[:3] + kw_pattern = r"\b" + r"\s+".join(re.escape(w) for w in key_words) + r"\b" + match = re.search(kw_pattern, original_content, re.IGNORECASE) + if match: + start, end = match.start(), match.end() + sentence_start = original_content.rfind(".", 0, start) + 1 + sentence_end = original_content.find(".", end) + 1 + if sentence_end == 0: + sentence_end = len(original_content) + new_content = ( + original_content[:sentence_start] + + desired + + original_content[sentence_end:] + ) + fix_kind = "contextual" + + if new_content is None: + print(f"⚠️ Could not find pattern to replace in {file_path}") + return False + + if new_content == original_content: + # The substitution was a no-op (e.g. current == desired). Nothing + # to write, and reporting success would mislead the caller. + print(f"⚠️ Substitution was a no-op ({fix_kind}); not writing {file_path}") + return False + + # PERSIST the change. This is the line whose absence caused issue #4 + # to "succeed" with `git status` clean. + with open(file_path, "w", encoding="utf-8") as f: + f.write(new_content) + + delta = len(new_content) - len(original_content) + print(f"✅ Applied {fix_kind} fix to {file_path} " + f"({len(original_content)} → {len(new_content)} chars, {delta:+d})") + return True + + +def validate_knowledge_check_yaml(file_path, explanation=None): + """Validate a knowledge-check YAML after edits. + + Returns a tuple of (is_valid, needs_human_review, reason). + + Checks: + 1. The file parses as valid YAML. + 2. At least one answer has `isCorrect: true` — UNLESS the + explanation contains "[NEEDS_HUMAN:" which signals the AI + intentionally set all to false for a "none of the above" + case where no such option exists. + """ + ext = os.path.splitext(file_path)[1].lower() + if ext not in (".yml", ".yaml"): + # Not a YAML file — skip validation, assume OK. + return (True, False, None) + + try: + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + except OSError as exc: + return (False, True, f"Could not read file: {exc}") + + # Check for the special marker indicating intended "all false" state. + needs_human_marker = "[NEEDS_HUMAN:" in (explanation or "") + + # Count isCorrect occurrences. + true_count = len(re.findall(r"isCorrect:\s*true", content, re.IGNORECASE)) + false_count = len(re.findall(r"isCorrect:\s*false", content, re.IGNORECASE)) + total_answers = true_count + false_count + + if total_answers == 0: + # Not a knowledge-check file, or unexpected format. + return (True, False, "No isCorrect fields found — not a knowledge-check YAML") + + if true_count == 0: + if needs_human_marker: + # The AI intentionally set all to false; flag for human review + # but don't block the PR. + return (True, True, + "All answers marked incorrect (no 'none of the above' option exists) — " + "human review required to add the missing option or correct the question.") + else: + # No correct answer and no marker — this is an error. + return (False, True, + "No correct answer (`isCorrect: true`) found in knowledge-check YAML. " + "This would break the module. Human review required.") + + if true_count > 1: + # Multiple correct answers — MS Learn knowledge checks typically + # allow only one correct answer per question. + return (True, True, + f"Multiple correct answers ({true_count}) found. " + "Verify this is intentional for this question type.") + + # Exactly one correct answer — all good. + return (True, False, None) + + +def process_auto_fix_issue(issue, repo_path, fork_slug=None): + """ + Process auto-fix issues using the AI's structured fix plan. + + Workflow: + 1. Resolve the MS Learn URL → module / unit → target markdown file + 2. Apply the literal current_text → desired_text replacement that the + AI produced (falling back to the heuristic analysis if absent). + 3. Commit, push to the user's fork, open a cross-repo PR against + MicrosoftDocs/learn-pr. + """ + issue_num = issue["number"] + body = issue.get("body", "") + title = issue.get("title", "") + + decision = issue.get("_ai_decision") or {} + fix_plan = decision.get("fix_plan") or {} + analysis = issue.get("_analysis", {}) + + # Prefer the URL the AI extracted (it ignored template noise); fall back + # to scanning the body ourselves. + primary_url = fix_plan.get("module_url") + if not primary_url: + urls = extract_urls(body) + if not urls: + print(f"⚠️ Issue #{issue_num} has no URL — cannot auto-fix") + return False + primary_url = urls[0] + + module_slug = extract_module_slug(primary_url) + unit_slug = extract_unit_slug(primary_url) + + if not module_slug: + print(f"⚠️ Issue #{issue_num} URL doesn't match a learn module pattern: {primary_url}") + return False + + print(f"🔍 Module: {module_slug}, Unit: {unit_slug}") + + # Materialize ONLY this module's files from the sparse-clone. + # The clone itself is blobless + no-checkout (see clone_learn_repo); + # this is the step that actually puts files on disk. + ensure_sparse_paths(repo_path, [module_slug]) + + # Find file to edit + target_file = find_target_file(repo_path, module_slug, unit_slug) + if not target_file: + print(f"⚠️ Could not locate file in learn-pr for module={module_slug}") + return False + + print(f"📝 Target file: {target_file}") + + # Create branch + branch_name = f"fix/issue-{issue_num}" + create_branch(repo_path, branch_name) + + # Snapshot content before mutation so we can revert if the commit fails. + with open(target_file, "r") as f: + original_content = f.read() + + # ─── Refine the AI's fix plan against the ACTUAL file ──────────── + # The first-pass classification (run on issue body alone) often + # paraphrases — e.g. it produces "first answer is marked as correct" + # which doesn't appear verbatim in the source markdown. Here we + # re-ask the LLM with the file contents in hand to ground the + # current/desired pair in real text. + from ai_triage import refine_fix_plan_with_file_content + refined = refine_fix_plan_with_file_content(issue, fix_plan, target_file) + if refined: + fix_plan = refined + # Update the cached decision so PR body / explanation reflect the + # grounded substitution. + decision["fix_plan"] = refined + + # Prefer the (now-grounded) AI literal current/desired pair. Fall + # back to the heuristic extraction stashed in `_analysis` only if + # refinement failed AND the original plan also had something usable. + current = fix_plan.get("current_text") or (analysis.get("problem_statement") or {}).get("current") + desired = fix_plan.get("desired_text") or (analysis.get("problem_statement") or {}).get("desired") + + if not (current and desired): + print(f"⚠️ No grounded current/desired pair for issue #{issue_num} — skipping PR") + return False + + # Final sanity check: even after refinement, refuse to attempt a + # substitution where current isn't actually in the file. This stops + # us from creating empty PRs. + if current not in original_content: + print(f"⚠️ current_text {current!r} not found verbatim in target file — " + "refusing to attempt fix") + return False + + fix_applied = apply_intelligent_fix( + target_file, + {"problem_statement": {"current": current, "desired": desired}}, + ) + + if fix_applied: + # ─── Validate knowledge-check YAMLs ──────────────────────────── + # For YAML files, check that we haven't left the file in a broken + # state (e.g. no correct answer). The AI may have intentionally + # set all answers to false for a "none of the above" case — the + # validation function detects this via the [NEEDS_HUMAN:] marker. + ai_explanation = fix_plan.get("explanation") + is_valid, needs_human, validation_reason = validate_knowledge_check_yaml( + target_file, explanation=ai_explanation + ) + + if not is_valid: + # The fix broke the file — revert and abort. + print(f"❌ Post-fix validation failed: {validation_reason}") + with open(target_file, "w") as f: + f.write(original_content) + return False + + # Commit and push to the fork (origin) + commit_msg = f"Fix #{issue_num}: {title[:50]}" + if commit_changes(repo_path, issue_num, commit_msg): + push_branch(repo_path, branch_name, remote="origin") + + # PR description — prefer the AI's explanation, fall back to template + pr_body = generate_pr_description(issue, { + "problem_statement": {"current": current, "desired": desired, + "location": primary_url, + "summary": title}, + "proposed_fix": {"explicit": ai_explanation, "inferred": None}, + "issue_type": [], + "specificity_score": decision.get("confidence", 0), + }) + + # Add validation warning to PR body if human review is needed. + if needs_human and validation_reason: + pr_body += ( + "\n\n---\n\n" + "⚠️ **Reviewer attention required:**\n\n" + f"{validation_reason}\n" + ) + + pr_title = f"Fix #{issue_num}: {title[:60]}" + + # Open a cross-repo PR: head = :, base = upstream main + head_owner = fork_slug.split("/", 1)[0] if fork_slug else None + pr_url = create_pr( + repo=UPSTREAM_REPO, + branch_name=branch_name, + issue_num=issue_num, + title=pr_title, + body=pr_body, + head_owner=head_owner, + ) + + print(f"✅ PR created: {pr_url or branch_name}") + + fork_link = ( + f"https://github.com/{fork_slug}/tree/{branch_name}" + if fork_slug else "(see fork)" + ) + + # Build comment with optional human review warning. + comment_body = ( + "✅ **Auto-fix PR opened**\n\n" + f"Branch: `{branch_name}` on {fork_link}\n\n" + f"{pr_url if pr_url else 'PR submitted (URL pending)'}" + ) + if needs_human and validation_reason: + comment_body += ( + "\n\n⚠️ **Note:** This fix may require human review. " + f"{validation_reason}" + ) + + comment_path = f"/tmp/pr-comment-{issue_num}.txt" + with open(comment_path, "w") as f: + f.write(comment_body) + gh(f'issue comment {issue_num} -F {comment_path}') + return True + else: + # Revert file if commit failed + with open(target_file, "w") as f: + f.write(original_content) + return False + else: + # Could not apply fix, revert + with open(target_file, "w") as f: + f.write(original_content) + print(f"⚠️ Could not apply fix to {target_file}") + return False + + +def _post_comment(issue_num, body): + """Post a comment via a temp file so multi-line / special-char bodies survive.""" + path = f"/tmp/triage-comment-{issue_num}.txt" + with open(path, "w") as f: + f.write(body) + gh(f"issue comment {issue_num} -F {path}") + + +def _comment_for(issue, classification, override=None): + """Pick the best comment text: AI-authored if present, else templated fallback.""" + if override: + return override + decision = issue.get("_ai_decision") or {} + if decision.get("comment"): + return decision["comment"] + return generate_context_aware_comment(issue.get("_analysis", {}), classification) + + +def process_issue(issue, repo_path=None, fork_slug=None): + """Main issue processing function — AI-driven, with heuristic fallback.""" + issue_num = issue["number"] + result = classify(issue) + decision = issue.get("_ai_decision") or {} + source = decision.get("source", "fallback") + confidence = decision.get("confidence", issue.get("_confidence", 0)) + + print(f"\n{'='*60}") + print(f"Issue #{issue_num} → {result} ({source}, confidence={confidence}/100)") + print(f"Title: {issue.get('title', 'N/A')[:60]}") + if decision.get("reasoning"): + print(f"AI reasoning: {decision['reasoning'][:240]}") + print(f"{'='*60}") + + # Ensure triaged label + add_label(issue_num, "triaged") + + if result == "auto_fix": + if repo_path: + # We have a real cloned repo, so we can honestly attempt a PR. + _post_comment(issue_num, _comment_for(issue, result)) + success = process_auto_fix_issue(issue, repo_path, fork_slug=fork_slug) + if success: + add_label(issue_num, "auto-fix-applied") + else: + add_label(issue_num, "auto-fix-attempted") + _post_comment( + issue_num, + "⚠️ I classified this as auto-fixable but could not generate a PR " + "(either the target file/section wasn't found in " + "`MicrosoftDocs/learn-pr`, or the proposed change couldn't be " + "applied automatically). A human reviewer will take it from here.", + ) + else: + # No repo available (token missing or clone failed). + # Down-grade to needs_human so we don't make false promises. + print("⚠️ No learn-pr clone available — downgrading auto_fix → needs_human") + _post_comment(issue_num, _comment_for(issue, "needs_human")) + add_label(issue_num, "needs-review") + + elif result == "needs_human": + _post_comment(issue_num, _comment_for(issue, result)) + add_label(issue_num, "needs-review") + + elif result == "needs_context": + _post_comment(issue_num, _comment_for(issue, result)) + add_label(issue_num, "needs-context") + + elif result == "spam": + _post_comment( + issue_num, + _comment_for( + issue, result, + override=( + "🗑️ **Closed:** This issue appears to be the unfilled MS Learn " + "issue template (or otherwise lacks actionable content). Please " + "reopen with the template filled in if this was a mistake." + ), + ), + ) + add_label(issue_num, "spam") + gh(f'issue close {issue_num}') + + print(f"✅ Processed issue #{issue_num}") + + +# ============================== +# MAIN +# ============================== +def main(): + """Main entry point for batch processing. + + Behaviour: + • In single-issue mode (TRIAGE_MODE=single, set by the workflow's + `issues` event), we ALWAYS process the issue — a human just + opened/edited it and wants fresh triage. + • In batch mode (the default), we skip issues that already carry + one of our triage labels, so the every-6-hours cron doesn't + spam comments / try to re-open PRs for the same issue. + • The user can override the skip with FORCE_RETRIAGE=1, useful + when iterating on the analyzer. + """ + if len(sys.argv) < 2: + print("Usage: triage_batch.py ") + sys.exit(1) + + issues_file = sys.argv[1] + if not os.path.exists(issues_file): + print(f"❌ File not found: {issues_file}") + sys.exit(1) + + with open(issues_file, "r") as f: + issues = json.load(f) + + mode = os.environ.get("TRIAGE_MODE", "batch").lower() + force = os.environ.get("FORCE_RETRIAGE", "").lower() in {"1", "true", "yes"} + skip_already_triaged = (mode == "batch") and not force + + print(f"🔄 Processing {len(issues)} issue(s) — mode={mode}, " + f"skip_already_triaged={skip_already_triaged}") + + # ── Filter step: drop issues we've already handled ──────────────── + if skip_already_triaged: + kept, skipped = [], [] + for issue in issues: + (skipped if already_triaged(issue) else kept).append(issue) + if skipped: + preview = ", ".join(f"#{i.get('number','?')}" for i in skipped[:10]) + more = "" if len(skipped) <= 10 else f" (+{len(skipped) - 10} more)" + print(f"⏭️ Skipping {len(skipped)} already-triaged issue(s): " + f"{preview}{more}") + print(" (set FORCE_RETRIAGE=1 to re-process them)") + issues = kept + + if not issues: + print("✅ Nothing new to process.") + return + + # Only clone repo if we have auto-fix issues to act on. + auto_fix_issues = [i for i in issues if classify(i) == "auto_fix"] + repo_path, fork_slug = None, None + + if auto_fix_issues: + print(f"\n📦 Found {len(auto_fix_issues)} auto-fix issues, cloning learn repo...") + setup_git_config() + repo_path, fork_slug = clone_learn_repo() + + # Process all (remaining) issues + for i, issue in enumerate(issues, 1): + try: + process_issue(issue, repo_path, fork_slug=fork_slug) + except Exception as e: + issue_num = issue.get("number", "unknown") + print(f"❌ Error processing issue #{issue_num}: {e}") + try: + gh(f'issue comment {issue_num} --body "❌ Error during triage: {str(e)[:100]}"') + except Exception: + pass + + print(f"\n✅ Batch processing complete!") + + +if __name__ == "__main__": + main()