4343import importlib .util # noqa: F401
4444import contextlib
4545import json # noqa: F401
46+ import shutil
4647import signal
48+ import tempfile
4749import time
4850
4951from collections import Counter , defaultdict
5052from pathlib import Path
51- from typing import Any , Dict , Generator , List , Optional , Set , Union
53+ from typing import Any , Dict , Generator , List , Optional , Set , Tuple , Union
5254
5355
5456@contextlib .contextmanager
@@ -79,9 +81,64 @@ def _handler(signum: int, frame: object) -> None:
7981from codeanalyzer .schema .py_schema import PyCallEdge , PyModule
8082from codeanalyzer .semantic_analysis .call_graph import iter_callables_in_symbol_table
8183from codeanalyzer .semantic_analysis .pycg .pycg_exceptions import PyCGExceptions
84+ from codeanalyzer .semantic_analysis .pycg .shard_planner import plan_shards
8285from codeanalyzer .utils import ProgressBar , logger
8386
8487
88+ @contextlib .contextmanager
89+ def _shard_symlink_root (
90+ files : List [str ],
91+ project_dir : Path ,
92+ ) -> Generator [Tuple [Path , List [str ]], None , None ]:
93+ """Materialise a shard's files as a temporary mini-project.
94+
95+ PyCG bounds its import-following to the ``package`` directory — only
96+ modules whose resolved file lives under that root are followed; everything
97+ else becomes a ghost node (``ImportManager``: ``if self.mod_dir not in
98+ mod.__file__: return``). A coupling-derived shard is an arbitrary set of
99+ files that need not form a directory, so we mirror the project layout into
100+ a temp dir holding symlinks to exactly the shard's files plus the
101+ ``__init__.py`` chain each needs for package resolution. Running PyCG with
102+ this mirror as the package root confines analysis to the shard while
103+ emitting project-relative edge names (so ``prefix=""`` — no rename needed).
104+
105+ Yields ``(root, entry_points)`` where *entry_points* are the symlinked
106+ paths inside *root*. The temp tree is removed on exit.
107+ """
108+ root = Path (tempfile .mkdtemp (prefix = "canpy_pycg_shard_" ))
109+ entry_points : List [str ] = []
110+ linked_inits : Set [Path ] = set ()
111+ try :
112+ for f in files :
113+ src = Path (f ).resolve ()
114+ try :
115+ rel = src .relative_to (project_dir )
116+ except ValueError :
117+ continue # defensively skip files outside the project
118+ dst = root / rel
119+ dst .parent .mkdir (parents = True , exist_ok = True )
120+ if not dst .exists ():
121+ dst .symlink_to (src )
122+ entry_points .append (str (dst ))
123+
124+ # Symlink the __init__.py chain from project root down to this
125+ # file's package so PyCG/importlib can resolve the dotted module
126+ # name. These add ~0 analysis cost (usually empty) and keep
127+ # out-of-shard siblings unresolved → ghost nodes.
128+ for i in range (len (rel .parent .parts ) + 1 ):
129+ pkg_rel = Path (* rel .parent .parts [:i ])
130+ real_init = project_dir / pkg_rel / "__init__.py"
131+ link_init = root / pkg_rel / "__init__.py"
132+ if real_init .exists () and link_init not in linked_inits :
133+ link_init .parent .mkdir (parents = True , exist_ok = True )
134+ if not link_init .exists ():
135+ link_init .symlink_to (real_init .resolve ())
136+ linked_inits .add (link_init )
137+ yield root , entry_points
138+ finally :
139+ shutil .rmtree (root , ignore_errors = True )
140+
141+
85142def _pycg_shard_worker (
86143 entry_points : List [str ],
87144 package_dir : str ,
@@ -313,6 +370,7 @@ def __init__(
313370 shard : bool = False ,
314371 shard_ceiling : Optional [int ] = None ,
315372 shard_timeout : Optional [int ] = None ,
373+ shard_strategy : str = "jedi" ,
316374 using_ray : bool = False ,
317375 ) -> None :
318376 self .project_dir = Path (project_dir ).resolve ()
@@ -324,9 +382,31 @@ def __init__(
324382 self .shard_timeout = (
325383 shard_timeout if shard_timeout is not None else self ._PYCG_SHARD_TIMEOUT
326384 )
385+ # "jedi": partition the Jedi module graph (SCC + Louvain) so coupled
386+ # modules co-compute and few edges are severed (see shard_planner).
387+ # "package": legacy one-shard-per-package-directory grouping.
388+ self .shard_strategy = shard_strategy
327389 self .using_ray = using_ray
328390 self ._CallGraphGenerator : Optional [Any ] = None
329391
392+ @staticmethod
393+ def _coalesce_edges (edges : List [PyCallEdge ]) -> List [PyCallEdge ]:
394+ """Sum weights of duplicate ``(source, target)`` pairs across shards."""
395+ merged : Dict [tuple , PyCallEdge ] = {}
396+ for edge in edges :
397+ key = (edge .source , edge .target )
398+ if key in merged :
399+ existing = merged [key ]
400+ merged [key ] = PyCallEdge (
401+ source = existing .source ,
402+ target = existing .target ,
403+ weight = existing .weight + edge .weight ,
404+ provenance = existing .provenance ,
405+ )
406+ else :
407+ merged [key ] = edge
408+ return list (merged .values ())
409+
330410 # ------------------------------------------------------------------
331411 # Entry-point collection
332412 # ------------------------------------------------------------------
@@ -455,6 +535,88 @@ def _run_pycg_batch(
455535 # Sharded analysis
456536 # ------------------------------------------------------------------
457537
538+ def _build_sharded_planned (
539+ self ,
540+ jedi_edges : List [PyCallEdge ],
541+ symbol_table : Dict [str , PyModule ],
542+ resolver : "_PyCGCallableResolver" ,
543+ ) -> List [PyCallEdge ]:
544+ """Coupling-aware sharding driven by the Jedi module graph.
545+
546+ Unlike :meth:`_build_sharded` (one shard per package directory), the
547+ shards here are chosen to *minimise the call edges severed between
548+ shards*: :func:`shard_planner.plan_shards` condenses the Jedi call
549+ graph by strongly-connected component (so import cycles never split)
550+ and clusters it with Louvain so tightly-coupled modules land together.
551+ Each shard — an arbitrary set of files — is run through PyCG via a
552+ symlinked mini-project (:func:`_shard_symlink_root`) that bounds PyCG
553+ to exactly those files.
554+
555+ Reported ``cut_ratio`` is the fraction of Jedi edge weight crossing
556+ shard boundaries — an upper bound on the PyCG edges lost to sharding.
557+ """
558+ plan = plan_shards (
559+ symbol_table , jedi_edges , budget = self .shard_ceiling , merge_small = True
560+ )
561+ m = plan .metrics
562+ logger .info (
563+ "PyCG: planned %d shard(s) from Jedi module graph "
564+ "(cut_ratio=%.3f, max_shard=%d files, %d modules)" ,
565+ int (m ["num_shards" ]), m ["cut_ratio" ],
566+ int (m ["max_shard_files" ]), int (m ["modules" ]),
567+ )
568+ if m ["oversized_shards" ]:
569+ logger .warning (
570+ "PyCG: %d shard(s) exceed the %d-file ceiling — skipped "
571+ "(atomic import cycles larger than the budget)" ,
572+ int (m ["oversized_shards" ]), self .shard_ceiling ,
573+ )
574+
575+ if self .using_ray :
576+ logger .info (
577+ "PyCG: Ray parallelism is not yet wired for the 'jedi' shard "
578+ "strategy — running shards sequentially."
579+ )
580+
581+ all_edges : List [PyCallEdge ] = []
582+ skipped = 0
583+ with ProgressBar (len (plan .shards ), "Building call graph shards" , item_label = "shards" ) as progress :
584+ for idx , files in enumerate (plan .shards ):
585+ n = len (files )
586+ if n > self .shard_ceiling :
587+ skipped += 1
588+ progress .advance ()
589+ continue
590+ try :
591+ with _shard_symlink_root (files , self .project_dir ) as (root , eps ):
592+ with _shard_timeout (self .shard_timeout ):
593+ edges = self ._run_pycg_batch (eps , root , resolver , prefix = "" )
594+ all_edges .extend (edges )
595+ logger .debug ("PyCG shard %d: %d edges from %d files" , idx , len (edges ), n )
596+ except TimeoutError :
597+ logger .warning (
598+ "PyCG shard %d timed out after %ds — skipped" ,
599+ idx , self .shard_timeout ,
600+ )
601+ skipped += 1
602+ except PyCGExceptions .PyCGAnalysisError as exc :
603+ logger .warning ("PyCG shard %d failed — skipped: %s" , idx , exc )
604+ skipped += 1
605+ progress .advance ()
606+
607+ if skipped :
608+ logger .warning (
609+ "PyCG: %d/%d shard(s) skipped (ceiling, %ds timeout, or failure)" ,
610+ skipped , len (plan .shards ), self .shard_timeout ,
611+ )
612+
613+ result = self ._coalesce_edges (all_edges )
614+ logger .info (
615+ "PyCG: %d edges from %d/%d shard(s) (%d before dedup, Jedi-planned)" ,
616+ len (result ), len (plan .shards ) - skipped , len (plan .shards ), len (all_edges ),
617+ )
618+ return result
619+
458620 def _build_sharded (
459621 self ,
460622 entry_points : List [str ],
@@ -668,7 +830,9 @@ def _build_sharded_ray(self, shards: Dict[Path, List[str]]) -> List[PyCallEdge]:
668830 # ------------------------------------------------------------------
669831
670832 def build_call_graph_edges (
671- self , symbol_table : Dict [str , PyModule ]
833+ self ,
834+ symbol_table : Dict [str , PyModule ],
835+ jedi_edges : Optional [List [PyCallEdge ]] = None ,
672836 ) -> List [PyCallEdge ]:
673837 """Run PyCG and return ``PyCallEdge`` entries with ``provenance=["pycg"]``.
674838
@@ -701,12 +865,21 @@ def build_call_graph_edges(
701865
702866 if n_files > self ._PYCG_FILE_CEILING :
703867 if self .shard :
704- mode = "Ray-parallel" if self .using_ray else "sequential"
705- logger .info (
706- "PyCG: starting sharded call graph analysis (%d files, %s)" ,
707- n_files , mode ,
708- )
709- edges = self ._build_sharded (entry_points , resolver )
868+ if self .shard_strategy == "jedi" and jedi_edges is not None :
869+ logger .info (
870+ "PyCG: starting Jedi-planned sharded analysis (%d files)" ,
871+ n_files ,
872+ )
873+ edges = self ._build_sharded_planned (
874+ jedi_edges , symbol_table , resolver
875+ )
876+ else :
877+ mode = "Ray-parallel" if self .using_ray else "sequential"
878+ logger .info (
879+ "PyCG: starting per-package sharded analysis (%d files, %s)" ,
880+ n_files , mode ,
881+ )
882+ edges = self ._build_sharded (entry_points , resolver )
710883 else :
711884 logger .warning (
712885 "PyCG: %d entry points exceeds ceiling of %d — "
0 commit comments