Skip to content

Commit f72ef00

Browse files
committed
feat(pycg): iterative decomposition of runaway shards
A uniform shard ceiling forces a global choice: small shards everywhere (high cut, low recall) just to tame the few that diverge. Instead, start coarse and re-shard only the shards that time out. Algorithm: plan shards with SCC + Louvain at the ceiling, run each through PyCG, and treat any timed-out shard as a runaway. Re-partition that runaway's files alone at half the budget and re-run. Repeat down to a floor (10 files). Files that still diverge at the floor, or form an atomic cycle that will not split, fall back to Jedi-only coverage. Refactor the planned executor into a reusable primitive that returns (edges, runaways), used by both the sequential and Ray paths, and drive it from an adaptive loop. Odoo benchmark (1028 modules, level 2, Ray): 22210 PyCG edges, up from 17149 for the best uniform ceiling, with only 20 of 1028 files irreducible. Cost is wall time (about 12.7 min) since rounds run in sequence. Add a unit test driving the adaptive loop with a stubbed runner.
1 parent 81fd409 commit f72ef00

2 files changed

Lines changed: 177 additions & 95 deletions

File tree

codeanalyzer/semantic_analysis/pycg/pycg_analysis.py

Lines changed: 127 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def _handler(signum: int, frame: object) -> None:
8181
from codeanalyzer.schema.py_schema import PyCallEdge, PyModule
8282
from codeanalyzer.semantic_analysis.call_graph import iter_callables_in_symbol_table
8383
from codeanalyzer.semantic_analysis.pycg.pycg_exceptions import PyCGExceptions
84-
from codeanalyzer.semantic_analysis.pycg.shard_planner import ShardPlan, plan_shards
84+
from codeanalyzer.semantic_analysis.pycg.shard_planner import plan_shards
8585
from codeanalyzer.utils import ProgressBar, logger
8686

8787

@@ -379,6 +379,13 @@ class PyCG:
379379
# -1 restores PyCG's unbounded run-to-convergence behaviour.
380380
_PYCG_MAX_ITER: int = 50
381381

382+
# Iterative decomposition of runaway (timed-out) shards: a shard that the
383+
# wall-clock timeout kills is re-partitioned at half the budget and re-run,
384+
# down to this file-count floor. Below the floor — or for an atomic import
385+
# cycle that won't split — the residue falls back to Jedi-only coverage.
386+
_PYCG_DECOMP_FLOOR: int = 10
387+
_PYCG_MAX_DECOMP_ROUNDS: int = 6
388+
382389
# Directory names that should never be fed to PyCG as entry points, nor
383390
# followed into during import resolution (an in-tree .codeanalyzer venv /
384391
# site-packages lives under project_dir and would otherwise be pulled into
@@ -417,6 +424,7 @@ def __init__(
417424
self.shard_strategy = shard_strategy
418425
self.using_ray = using_ray
419426
self._CallGraphGenerator: Optional[Any] = None
427+
self._resolver: Optional["_PyCGCallableResolver"] = None
420428

421429
@staticmethod
422430
def _coalesce_edges(edges: List[PyCallEdge]) -> List[PyCallEdge]:
@@ -570,20 +578,26 @@ def _build_sharded_planned(
570578
symbol_table: Dict[str, PyModule],
571579
resolver: "_PyCGCallableResolver",
572580
) -> List[PyCallEdge]:
573-
"""Coupling-aware sharding driven by the Jedi module graph.
574-
575-
Unlike :meth:`_build_sharded` (one shard per package directory), the
576-
shards here are chosen to *minimise the call edges severed between
577-
shards*: :func:`shard_planner.plan_shards` condenses the Jedi call
578-
graph by strongly-connected component (so import cycles never split)
579-
and clusters it with Louvain so tightly-coupled modules land together.
580-
Each shard — an arbitrary set of files — is run through PyCG via a
581-
symlinked mini-project (:func:`_shard_symlink_root`) that bounds PyCG
582-
to exactly those files.
583-
584-
Reported ``cut_ratio`` is the fraction of Jedi edge weight crossing
585-
shard boundaries — an upper bound on the PyCG edges lost to sharding.
581+
"""Coupling-aware sharding with iterative decomposition of runaways.
582+
583+
Shards are chosen to *minimise the call edges severed between shards*:
584+
:func:`shard_planner.plan_shards` condenses the Jedi call graph by
585+
strongly-connected component (so import cycles never split) and clusters
586+
it with Louvain so tightly-coupled modules land together. Each shard is
587+
run through PyCG via a symlinked mini-project that bounds analysis to its
588+
files.
589+
590+
PyCG's fixpoint diverges on heavy metaclass/mixin clusters, and a uniform
591+
ceiling would force *every* shard small (severing many edges) just to tame
592+
the few that run away. Instead we start coarse (low cut, high recall on
593+
healthy code) and **only re-decompose the shards that time out**: each
594+
runaway's files are re-partitioned at half the budget and re-run, down to
595+
a floor. A runaway shard contributes zero edges, so splitting it recovers
596+
almost all of them while paying cut on its internal seams alone. The
597+
residue that still diverges at the floor (or is an atomic cycle that won't
598+
split) falls back to Jedi-only coverage.
586599
"""
600+
self._resolver = resolver
587601
plan = plan_shards(
588602
symbol_table, jedi_edges, budget=self.shard_ceiling, merge_small=True
589603
)
@@ -594,64 +608,107 @@ def _build_sharded_planned(
594608
int(m["num_shards"]), m["cut_ratio"],
595609
int(m["max_shard_files"]), int(m["modules"]),
596610
)
597-
if m["oversized_shards"]:
598-
logger.warning(
599-
"PyCG: %d shard(s) exceed the %d-file ceiling — skipped "
600-
"(atomic import cycles larger than the budget)",
601-
int(m["oversized_shards"]), self.shard_ceiling,
611+
612+
runner = (
613+
self._run_fileset_shards_ray if self.using_ray
614+
else self._run_fileset_shards_seq
615+
)
616+
all_edges: List[PyCallEdge] = []
617+
shards = plan.shards
618+
budget = self.shard_ceiling
619+
converged_total = 0
620+
irreducible_files = 0
621+
round_no = 0
622+
623+
while shards:
624+
label = "decomposition round %d (budget %d, %d shard(s))" % (
625+
round_no, budget, len(shards),
602626
)
627+
logger.info("PyCG: %s", label)
628+
edges, runaways = runner(shards)
629+
all_edges.extend(edges)
630+
converged_total += len(shards) - len(runaways)
631+
if not runaways:
632+
break
603633

604-
if self.using_ray:
605-
return self._build_sharded_planned_ray(plan)
634+
next_budget = max(self._PYCG_DECOMP_FLOOR, budget // 2)
635+
stop_decomposing = (
636+
round_no >= self._PYCG_MAX_DECOMP_ROUNDS or next_budget >= budget
637+
)
606638

607-
all_edges: List[PyCallEdge] = []
608-
skipped = 0
609-
with ProgressBar(len(plan.shards), "Building call graph shards", item_label="shards") as progress:
610-
for idx, files in enumerate(plan.shards):
611-
n = len(files)
612-
if n > self.shard_ceiling:
613-
skipped += 1
614-
progress.advance()
639+
next_shards: List[List[str]] = []
640+
for rf in runaways:
641+
# Re-partition this runaway's files alone, at a tighter budget.
642+
# An atomic cycle (or a lone file) that won't shrink is
643+
# irreducible — accept Jedi-only rather than loop forever.
644+
sub_st = {f: symbol_table[f] for f in rf if f in symbol_table}
645+
if stop_decomposing or len(rf) <= 1:
646+
irreducible_files += len(rf)
615647
continue
616-
try:
617-
with _shard_symlink_root(files, self.project_dir) as (root, eps):
618-
with _shard_timeout(self.shard_timeout):
619-
edges = self._run_pycg_batch(eps, root, resolver, prefix="")
620-
all_edges.extend(edges)
621-
logger.debug("PyCG shard %d: %d edges from %d files", idx, len(edges), n)
622-
except TimeoutError:
623-
logger.warning(
624-
"PyCG shard %d timed out after %ds — skipped",
625-
idx, self.shard_timeout,
626-
)
627-
skipped += 1
628-
except PyCGExceptions.PyCGAnalysisError as exc:
629-
logger.warning("PyCG shard %d failed — skipped: %s", idx, exc)
630-
skipped += 1
631-
progress.advance()
648+
sub_plan = plan_shards(sub_st, jedi_edges, budget=next_budget)
649+
if len(sub_plan.shards) <= 1:
650+
# did not actually split (one atomic SCC) — give up on it
651+
irreducible_files += len(rf)
652+
continue
653+
next_shards.extend(sub_plan.shards)
632654

633-
if skipped:
655+
if not next_shards:
656+
break
657+
logger.info(
658+
"PyCG: %d shard(s) ran away — decomposing into %d sub-shard(s) "
659+
"at budget %d", len(runaways), len(next_shards), next_budget,
660+
)
661+
shards, budget = next_shards, next_budget
662+
round_no += 1
663+
664+
if irreducible_files:
634665
logger.warning(
635-
"PyCG: %d/%d shard(s) skipped (ceiling, %ds timeout, or failure)",
636-
skipped, len(plan.shards), self.shard_timeout,
666+
"PyCG: %d file(s) in irreducibly-divergent shards fall back to "
667+
"Jedi-only coverage", irreducible_files,
637668
)
638669

639670
result = self._coalesce_edges(all_edges)
640671
logger.info(
641-
"PyCG: %d edges from %d/%d shard(s) (%d before dedup, Jedi-planned)",
642-
len(result), len(plan.shards) - skipped, len(plan.shards), len(all_edges),
672+
"PyCG: %d edges from %d converged shard(s) over %d round(s) "
673+
"(%d before dedup, Jedi-planned%s)",
674+
len(result), converged_total, round_no + 1, len(all_edges),
675+
", Ray-parallel" if self.using_ray else "",
643676
)
644677
return result
645678

646-
def _build_sharded_planned_ray(self, plan: "ShardPlan") -> List[PyCallEdge]:
647-
"""Ray-parallel execution of Jedi-planned file-set shards.
679+
def _run_fileset_shards_seq(
680+
self, shards: List[List[str]],
681+
) -> Tuple[List[PyCallEdge], List[List[str]]]:
682+
"""Run each file-set shard sequentially; return ``(edges, runaways)``.
648683
649-
Each shard is materialised as a symlink mini-project up front (the
650-
trees must outlive their remote tasks), submitted as a Ray task, and
651-
collected against a single wall-clock deadline — Ray workers cannot use
652-
SIGALRM, so the timeout is enforced at the orchestrator level (mirroring
653-
:meth:`_build_sharded_ray`). All symlink trees are removed once the
654-
batch completes.
684+
A shard that times out or raises is returned in *runaways* (its file
685+
list) for the caller to re-decompose; it contributes no edges.
686+
"""
687+
resolver = self._resolver
688+
edges_all: List[PyCallEdge] = []
689+
runaways: List[List[str]] = []
690+
with ProgressBar(len(shards), "Building call graph shards", item_label="shards") as progress:
691+
for files in shards:
692+
try:
693+
with _shard_symlink_root(files, self.project_dir) as (root, eps):
694+
with _shard_timeout(self.shard_timeout):
695+
edges = self._run_pycg_batch(eps, root, resolver, prefix="")
696+
edges_all.extend(edges)
697+
except (TimeoutError, PyCGExceptions.PyCGAnalysisError):
698+
runaways.append(files)
699+
progress.advance()
700+
return edges_all, runaways
701+
702+
def _run_fileset_shards_ray(
703+
self, shards: List[List[str]],
704+
) -> Tuple[List[PyCallEdge], List[List[str]]]:
705+
"""Ray-parallel variant of :meth:`_run_fileset_shards_seq`.
706+
707+
Each shard is materialised as a symlink mini-project up front (the trees
708+
must outlive their remote tasks), submitted as a Ray task, and collected
709+
against one wall-clock deadline — Ray workers cannot use SIGALRM, so the
710+
timeout is enforced orchestrator-side. Timed-out/failed shards become
711+
runaways; symlink trees are removed once the batch completes.
655712
"""
656713
import os
657714
import ray
@@ -661,22 +718,17 @@ def _build_sharded_planned_ray(self, plan: "ShardPlan") -> List[PyCallEdge]:
661718

662719
roots: List[Path] = []
663720
futures: List[Any] = []
664-
meta: Dict[Any, tuple] = {} # ObjectRef -> (shard_idx, n_files)
665-
skipped = 0
666-
all_edges: List[PyCallEdge] = []
721+
meta: Dict[Any, List[str]] = {} # ObjectRef -> shard file list
722+
edges_all: List[PyCallEdge] = []
723+
runaways: List[List[str]] = []
667724
try:
668-
with ProgressBar(len(plan.shards), "Building call graph shards (parallel)", item_label="shards") as progress:
669-
for idx, files in enumerate(plan.shards):
670-
n = len(files)
671-
if n > self.shard_ceiling:
672-
skipped += 1
673-
progress.advance()
674-
continue
725+
with ProgressBar(len(shards), "Building call graph shards (parallel)", item_label="shards") as progress:
726+
for files in shards:
675727
root, eps = _materialize_shard_root(files, self.project_dir)
676728
roots.append(root)
677729
fut = remote_fn.remote(eps, str(root), "", self.max_iter)
678730
futures.append(fut)
679-
meta[fut] = (idx, n)
731+
meta[fut] = files
680732

681733
deadline = (
682734
time.perf_counter() + float(self.shard_timeout)
@@ -696,44 +748,24 @@ def _build_sharded_planned_ray(self, plan: "ShardPlan") -> List[PyCallEdge]:
696748
break
697749

698750
fut = ready[0]
699-
idx, n = meta[fut]
700751
try:
701752
triples = ray.get(fut)
702-
all_edges.extend(
753+
edges_all.extend(
703754
PyCallEdge(source=s, target=t, weight=w, provenance=["pycg"])
704755
for s, t, w in triples
705756
)
706-
logger.debug("PyCG shard %d: %d edges from %d files (Ray)", idx, len(triples), n)
707-
except Exception as exc:
708-
logger.warning("PyCG shard %d failed — skipped: %s", idx, exc)
709-
skipped += 1
757+
except Exception:
758+
runaways.append(meta[fut])
710759
progress.advance()
711760

712-
for fut in pending:
713-
idx, _ = meta[fut]
714-
logger.warning(
715-
"PyCG shard %d timed out after %ds — skipped",
716-
idx, self.shard_timeout,
717-
)
761+
for fut in pending: # exceeded the deadline
718762
ray.cancel(fut, force=True)
719-
skipped += 1
763+
runaways.append(meta[fut])
720764
progress.advance()
721765
finally:
722766
for root in roots:
723767
shutil.rmtree(root, ignore_errors=True)
724-
725-
if skipped:
726-
logger.warning(
727-
"PyCG: %d/%d shard(s) skipped (ceiling, %ds timeout, or failure)",
728-
skipped, len(plan.shards), self.shard_timeout,
729-
)
730-
731-
result = self._coalesce_edges(all_edges)
732-
logger.info(
733-
"PyCG: %d edges from %d/%d shard(s) (%d before dedup, Jedi-planned, Ray-parallel)",
734-
len(result), len(plan.shards) - skipped, len(plan.shards), len(all_edges),
735-
)
736-
return result
768+
return edges_all, runaways
737769

738770
def _build_sharded(
739771
self,

test/test_pycg_sharding.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,56 @@ def test_max_iter_default_and_override(tmp_path):
2323
assert PyCG(tmp_path, max_iter=-1).max_iter == -1
2424

2525

26+
def test_adaptive_decomposition_splits_runaways(tmp_path, monkeypatch):
27+
"""A shard that 'runs away' is re-decomposed until its pieces converge.
28+
29+
Drives the real adaptive loop + planner, but stubs the PyCG runner with a
30+
size threshold: shards larger than the threshold time out (runaway); smaller
31+
ones converge and yield one synthetic edge per file. A 16-file coupled
32+
cluster must therefore be split across rounds until every piece is small
33+
enough, with no files lost.
34+
"""
35+
from codeanalyzer.schema.py_schema import PyCallEdge, PyCallable, PyModule
36+
from codeanalyzer.semantic_analysis.pycg.pycg_analysis import _PyCGCallableResolver
37+
38+
# A loosely-coupled chain of 40 modules: splittable (not one atomic cycle).
39+
st, jedi = {}, []
40+
for i in range(40):
41+
path = f"/proj/m{i}.py"
42+
st[path] = PyModule(
43+
file_path=path, module_name=f"m{i}",
44+
functions={"f": PyCallable(signature=f"m{i}.f", name="f", path=path)},
45+
)
46+
if i:
47+
jedi.append(PyCallEdge(source=f"m{i-1}.f", target=f"m{i}.f", weight=1,
48+
provenance=["jedi"]))
49+
50+
# threshold >= the decomposition floor (10) so pieces can shrink enough to converge.
51+
pycg = PyCG(tmp_path, shard_ceiling=40)
52+
threshold = 12 # shards with > 12 files "diverge"
53+
rounds_seen = []
54+
55+
def fake_runner(shards):
56+
rounds_seen.append([len(s) for s in shards])
57+
edges, runaways = [], []
58+
for files in shards:
59+
if len(files) > threshold:
60+
runaways.append(files)
61+
else:
62+
edges += [PyCallEdge(source=f, target="x", weight=1, provenance=["pycg"])
63+
for f in files]
64+
return edges, runaways
65+
66+
monkeypatch.setattr(pycg, "_run_fileset_shards_seq", fake_runner)
67+
edges = pycg._build_sharded_planned(jedi, st, _PyCGCallableResolver(set()))
68+
69+
assert len(rounds_seen) >= 2, "runaway shard was never decomposed"
70+
# Every shard that was finally accepted is within the convergence threshold.
71+
assert all(sz <= threshold for sz in rounds_seen[-1])
72+
# No files lost: one pycg edge per file across all 16 modules.
73+
assert len({e.source for e in edges}) == 40
74+
75+
2676
def test_pycg_does_not_follow_into_in_tree_dependency(tmp_path):
2777
"""An in-tree ``.codeanalyzer`` venv under project_dir must stay a ghost.
2878

0 commit comments

Comments
 (0)