|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Repository-local static Lua dependency graph helpers. |
| 3 | +
|
| 4 | +Mechanism only: |
| 5 | +- Parse `require('opencode.*')` edges from `lua/opencode/**/*.lua` |
| 6 | +- Build snapshot graph from worktree or git ref |
| 7 | +- Provide SCC / back-edge utilities |
| 8 | +""" |
| 9 | + |
| 10 | +from __future__ import annotations |
| 11 | + |
| 12 | +from collections import Counter, defaultdict |
| 13 | +from dataclasses import dataclass |
| 14 | +from pathlib import Path |
| 15 | +import re |
| 16 | +import subprocess |
| 17 | +from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple |
| 18 | + |
| 19 | + |
| 20 | +REQUIRE_PATTERNS = [ |
| 21 | + re.compile(r"require\s*\(\s*['\"](opencode(?:\.[^'\"]+)?)['\"]\s*\)"), |
| 22 | + re.compile(r"require\s+['\"](opencode(?:\.[^'\"]+)?)['\"]"), |
| 23 | +] |
| 24 | + |
| 25 | + |
| 26 | +@dataclass |
| 27 | +class SnapshotGraph: |
| 28 | + snapshot: str |
| 29 | + files: int |
| 30 | + nodes: Dict[str, str] # module -> relative file path |
| 31 | + edges: Set[Tuple[str, str]] |
| 32 | + |
| 33 | + |
| 34 | +def module_from_relpath(relpath: str) -> Optional[str]: |
| 35 | + if not relpath.startswith("lua/opencode/") or not relpath.endswith(".lua"): |
| 36 | + return None |
| 37 | + mod = relpath[len("lua/") : -len(".lua")] |
| 38 | + if mod.endswith("/init"): |
| 39 | + mod = mod[: -len("/init")] |
| 40 | + return mod.replace("/", ".") |
| 41 | + |
| 42 | + |
| 43 | +def _worktree_files(repo: Path) -> List[Tuple[str, str]]: |
| 44 | + out: List[Tuple[str, str]] = [] |
| 45 | + base = repo / "lua" / "opencode" |
| 46 | + for fp in base.rglob("*.lua"): |
| 47 | + rel = fp.relative_to(repo).as_posix() |
| 48 | + text = fp.read_text(encoding="utf-8", errors="ignore") |
| 49 | + out.append((rel, text)) |
| 50 | + return out |
| 51 | + |
| 52 | + |
| 53 | +def _git_files(repo: Path, ref: str) -> List[Tuple[str, str]]: |
| 54 | + cmd = ["git", "ls-tree", "-r", "--name-only", ref, "lua/opencode"] |
| 55 | + ls = subprocess.check_output(cmd, cwd=repo, text=True) |
| 56 | + |
| 57 | + out: List[Tuple[str, str]] = [] |
| 58 | + for rel in ls.splitlines(): |
| 59 | + if not rel.endswith(".lua"): |
| 60 | + continue |
| 61 | + show_cmd = ["git", "show", f"{ref}:{rel}"] |
| 62 | + try: |
| 63 | + text = subprocess.check_output(show_cmd, cwd=repo, text=True, stderr=subprocess.DEVNULL) |
| 64 | + except subprocess.CalledProcessError: |
| 65 | + continue |
| 66 | + out.append((rel, text)) |
| 67 | + return out |
| 68 | + |
| 69 | + |
| 70 | +def load_snapshot_graph(repo: Path, snapshot: str) -> SnapshotGraph: |
| 71 | + files = _worktree_files(repo) if snapshot == "worktree" else _git_files(repo, snapshot) |
| 72 | + |
| 73 | + nodes: Dict[str, str] = {} |
| 74 | + for rel, _ in files: |
| 75 | + module = module_from_relpath(rel) |
| 76 | + if module: |
| 77 | + nodes[module] = rel |
| 78 | + |
| 79 | + edges: Set[Tuple[str, str]] = set() |
| 80 | + for rel, content in files: |
| 81 | + src = module_from_relpath(rel) |
| 82 | + if not src: |
| 83 | + continue |
| 84 | + |
| 85 | + deps: Set[str] = set() |
| 86 | + for pat in REQUIRE_PATTERNS: |
| 87 | + deps.update(m.group(1) for m in pat.finditer(content)) |
| 88 | + |
| 89 | + for dep in deps: |
| 90 | + if dep in nodes: |
| 91 | + edges.add((src, dep)) |
| 92 | + |
| 93 | + return SnapshotGraph(snapshot=snapshot, files=len(files), nodes=nodes, edges=edges) |
| 94 | + |
| 95 | + |
| 96 | +def tarjan_scc(nodes: Iterable[str], edges: Iterable[Tuple[str, str]]) -> List[List[str]]: |
| 97 | + graph: Dict[str, List[str]] = defaultdict(list) |
| 98 | + for a, b in edges: |
| 99 | + graph[a].append(b) |
| 100 | + |
| 101 | + index = 0 |
| 102 | + stack: List[str] = [] |
| 103 | + on_stack: Set[str] = set() |
| 104 | + indices: Dict[str, int] = {} |
| 105 | + lowlink: Dict[str, int] = {} |
| 106 | + result: List[List[str]] = [] |
| 107 | + |
| 108 | + def strongconnect(v: str) -> None: |
| 109 | + nonlocal index |
| 110 | + indices[v] = index |
| 111 | + lowlink[v] = index |
| 112 | + index += 1 |
| 113 | + stack.append(v) |
| 114 | + on_stack.add(v) |
| 115 | + |
| 116 | + for w in graph[v]: |
| 117 | + if w not in indices: |
| 118 | + strongconnect(w) |
| 119 | + lowlink[v] = min(lowlink[v], lowlink[w]) |
| 120 | + elif w in on_stack: |
| 121 | + lowlink[v] = min(lowlink[v], indices[w]) |
| 122 | + |
| 123 | + if lowlink[v] == indices[v]: |
| 124 | + comp: List[str] = [] |
| 125 | + while True: |
| 126 | + w = stack.pop() |
| 127 | + on_stack.remove(w) |
| 128 | + comp.append(w) |
| 129 | + if w == v: |
| 130 | + break |
| 131 | + result.append(comp) |
| 132 | + |
| 133 | + for n in sorted(set(nodes)): |
| 134 | + if n not in indices: |
| 135 | + strongconnect(n) |
| 136 | + |
| 137 | + return result |
| 138 | + |
| 139 | + |
| 140 | +def back_edges(nodes: Iterable[str], edges: Iterable[Tuple[str, str]]) -> Set[Tuple[str, str]]: |
| 141 | + graph: Dict[str, List[str]] = defaultdict(list) |
| 142 | + for a, b in edges: |
| 143 | + graph[a].append(b) |
| 144 | + for n in graph: |
| 145 | + graph[n] = sorted(set(graph[n])) |
| 146 | + |
| 147 | + white, gray, black = 0, 1, 2 |
| 148 | + color: Dict[str, int] = {n: white for n in set(nodes)} |
| 149 | + backs: Set[Tuple[str, str]] = set() |
| 150 | + |
| 151 | + def dfs(v: str) -> None: |
| 152 | + color[v] = gray |
| 153 | + for w in graph[v]: |
| 154 | + c = color.get(w, white) |
| 155 | + if c == white: |
| 156 | + dfs(w) |
| 157 | + elif c == gray: |
| 158 | + backs.add((v, w)) |
| 159 | + color[v] = black |
| 160 | + |
| 161 | + for n in sorted(color.keys()): |
| 162 | + if color[n] == white: |
| 163 | + dfs(n) |
| 164 | + |
| 165 | + return backs |
| 166 | + |
| 167 | + |
| 168 | +def degree(edges: Iterable[Tuple[str, str]]) -> Tuple[Counter, Counter]: |
| 169 | + indeg: Counter = Counter() |
| 170 | + outdeg: Counter = Counter() |
| 171 | + for src, dst in edges: |
| 172 | + outdeg[src] += 1 |
| 173 | + indeg[dst] += 1 |
| 174 | + return indeg, outdeg |
| 175 | + |
| 176 | + |
| 177 | +def find_cycle_in_scc(members: List[str], edges: Iterable[Tuple[str, str]]) -> List[str]: |
| 178 | + """Return one concrete cycle path within an SCC, e.g. [a, b, c, a]. |
| 179 | +
|
| 180 | + Uses DFS from the first member; backtracks until a back-edge is found. |
| 181 | + Returns [] if no cycle is found (shouldn't happen for a real SCC > 1). |
| 182 | + """ |
| 183 | + member_set = set(members) |
| 184 | + graph: Dict[str, List[str]] = defaultdict(list) |
| 185 | + for a, b in edges: |
| 186 | + if a in member_set and b in member_set: |
| 187 | + graph[a].append(b) |
| 188 | + for n in graph: |
| 189 | + graph[n] = sorted(set(graph[n])) |
| 190 | + |
| 191 | + path: List[str] = [] |
| 192 | + on_path: Dict[str, int] = {} # node -> index in path |
| 193 | + visited: Set[str] = set() |
| 194 | + |
| 195 | + def dfs(v: str) -> List[str]: |
| 196 | + path.append(v) |
| 197 | + on_path[v] = len(path) - 1 |
| 198 | + for w in graph[v]: |
| 199 | + if w in on_path: |
| 200 | + # Found cycle: extract from w's position to end, close it |
| 201 | + return path[on_path[w]:] + [w] |
| 202 | + if w not in visited: |
| 203 | + visited.add(w) |
| 204 | + result = dfs(w) |
| 205 | + if result: |
| 206 | + return result |
| 207 | + path.pop() |
| 208 | + del on_path[v] |
| 209 | + return [] |
| 210 | + |
| 211 | + start = sorted(members)[0] |
| 212 | + visited.add(start) |
| 213 | + return dfs(start) |
| 214 | + |
| 215 | + |
| 216 | +def largest_scc_size(comps: Sequence[Sequence[str]]) -> int: |
| 217 | + nontrivial = [c for c in comps if len(c) > 1] |
| 218 | + return max((len(c) for c in nontrivial), default=0) |
0 commit comments