Skip to content

Commit 5c02ba3

Browse files
committed
feat(pycg): Ray-parallel execution for Jedi-planned shards
Materialise each planned file-set shard as a symlink mini-project up front (the trees must outlive their remote tasks), submit one Ray task per shard, and collect against a single wall-clock deadline (Ray workers can't use SIGALRM, so the timeout is enforced at the orchestrator, mirroring _build_sharded_ray). Symlink trees are cleaned up once the batch completes. Factor _materialize_shard_root out of the _shard_symlink_root context manager so both the sequential and Ray paths share tree construction. Under --ray the jedi strategy now parallelises instead of falling back to sequential.
1 parent 6916884 commit 5c02ba3

1 file changed

Lines changed: 136 additions & 36 deletions

File tree

codeanalyzer/semantic_analysis/pycg/pycg_analysis.py

Lines changed: 136 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,15 @@ def _handler(signum: int, frame: object) -> None:
8181
from codeanalyzer.schema.py_schema import PyCallEdge, PyModule
8282
from codeanalyzer.semantic_analysis.call_graph import iter_callables_in_symbol_table
8383
from codeanalyzer.semantic_analysis.pycg.pycg_exceptions import PyCGExceptions
84-
from codeanalyzer.semantic_analysis.pycg.shard_planner import plan_shards
84+
from codeanalyzer.semantic_analysis.pycg.shard_planner import ShardPlan, plan_shards
8585
from codeanalyzer.utils import ProgressBar, logger
8686

8787

88-
@contextlib.contextmanager
89-
def _shard_symlink_root(
88+
def _materialize_shard_root(
9089
files: List[str],
9190
project_dir: Path,
92-
) -> Generator[Tuple[Path, List[str]], None, None]:
93-
"""Materialise a shard's files as a temporary mini-project.
91+
) -> Tuple[Path, List[str]]:
92+
"""Build a temporary symlink mini-project for a shard; return ``(root, eps)``.
9493
9594
PyCG bounds its import-following to the ``package`` directory — only
9695
modules whose resolved file lives under that root are followed; everything
@@ -102,38 +101,50 @@ def _shard_symlink_root(
102101
this mirror as the package root confines analysis to the shard while
103102
emitting project-relative edge names (so ``prefix=""`` — no rename needed).
104103
105-
Yields ``(root, entry_points)`` where *entry_points* are the symlinked
106-
paths inside *root*. The temp tree is removed on exit.
104+
The caller owns the returned *root* and must ``shutil.rmtree`` it.
107105
"""
108106
root = Path(tempfile.mkdtemp(prefix="canpy_pycg_shard_"))
109107
entry_points: List[str] = []
110108
linked_inits: Set[Path] = set()
109+
for f in files:
110+
src = Path(f).resolve()
111+
try:
112+
rel = src.relative_to(project_dir)
113+
except ValueError:
114+
continue # defensively skip files outside the project
115+
dst = root / rel
116+
dst.parent.mkdir(parents=True, exist_ok=True)
117+
if not dst.exists():
118+
dst.symlink_to(src)
119+
entry_points.append(str(dst))
120+
121+
# Symlink the __init__.py chain from project root down to this file's
122+
# package so PyCG/importlib can resolve the dotted module name. These
123+
# add ~0 analysis cost (usually empty) and keep out-of-shard siblings
124+
# unresolved → ghost nodes.
125+
for i in range(len(rel.parent.parts) + 1):
126+
pkg_rel = Path(*rel.parent.parts[:i])
127+
real_init = project_dir / pkg_rel / "__init__.py"
128+
link_init = root / pkg_rel / "__init__.py"
129+
if real_init.exists() and link_init not in linked_inits:
130+
link_init.parent.mkdir(parents=True, exist_ok=True)
131+
if not link_init.exists():
132+
link_init.symlink_to(real_init.resolve())
133+
linked_inits.add(link_init)
134+
return root, entry_points
135+
136+
137+
@contextlib.contextmanager
138+
def _shard_symlink_root(
139+
files: List[str],
140+
project_dir: Path,
141+
) -> Generator[Tuple[Path, List[str]], None, None]:
142+
"""Context-manager wrapper around :func:`_materialize_shard_root`.
143+
144+
Yields ``(root, entry_points)`` and removes the temp tree on exit.
145+
"""
146+
root, entry_points = _materialize_shard_root(files, project_dir)
111147
try:
112-
for f in files:
113-
src = Path(f).resolve()
114-
try:
115-
rel = src.relative_to(project_dir)
116-
except ValueError:
117-
continue # defensively skip files outside the project
118-
dst = root / rel
119-
dst.parent.mkdir(parents=True, exist_ok=True)
120-
if not dst.exists():
121-
dst.symlink_to(src)
122-
entry_points.append(str(dst))
123-
124-
# Symlink the __init__.py chain from project root down to this
125-
# file's package so PyCG/importlib can resolve the dotted module
126-
# name. These add ~0 analysis cost (usually empty) and keep
127-
# out-of-shard siblings unresolved → ghost nodes.
128-
for i in range(len(rel.parent.parts) + 1):
129-
pkg_rel = Path(*rel.parent.parts[:i])
130-
real_init = project_dir / pkg_rel / "__init__.py"
131-
link_init = root / pkg_rel / "__init__.py"
132-
if real_init.exists() and link_init not in linked_inits:
133-
link_init.parent.mkdir(parents=True, exist_ok=True)
134-
if not link_init.exists():
135-
link_init.symlink_to(real_init.resolve())
136-
linked_inits.add(link_init)
137148
yield root, entry_points
138149
finally:
139150
shutil.rmtree(root, ignore_errors=True)
@@ -573,10 +584,7 @@ def _build_sharded_planned(
573584
)
574585

575586
if self.using_ray:
576-
logger.info(
577-
"PyCG: Ray parallelism is not yet wired for the 'jedi' shard "
578-
"strategy — running shards sequentially."
579-
)
587+
return self._build_sharded_planned_ray(plan)
580588

581589
all_edges: List[PyCallEdge] = []
582590
skipped = 0
@@ -617,6 +625,98 @@ def _build_sharded_planned(
617625
)
618626
return result
619627

628+
def _build_sharded_planned_ray(self, plan: "ShardPlan") -> List[PyCallEdge]:
629+
"""Ray-parallel execution of Jedi-planned file-set shards.
630+
631+
Each shard is materialised as a symlink mini-project up front (the
632+
trees must outlive their remote tasks), submitted as a Ray task, and
633+
collected against a single wall-clock deadline — Ray workers cannot use
634+
SIGALRM, so the timeout is enforced at the orchestrator level (mirroring
635+
:meth:`_build_sharded_ray`). All symlink trees are removed once the
636+
batch completes.
637+
"""
638+
import os
639+
import ray
640+
641+
os.environ.setdefault("RAY_IGNORE_UNHANDLED_ERRORS", "1")
642+
remote_fn = ray.remote(_pycg_shard_worker)
643+
644+
roots: List[Path] = []
645+
futures: List[Any] = []
646+
meta: Dict[Any, tuple] = {} # ObjectRef -> (shard_idx, n_files)
647+
skipped = 0
648+
all_edges: List[PyCallEdge] = []
649+
try:
650+
with ProgressBar(len(plan.shards), "Building call graph shards (parallel)", item_label="shards") as progress:
651+
for idx, files in enumerate(plan.shards):
652+
n = len(files)
653+
if n > self.shard_ceiling:
654+
skipped += 1
655+
progress.advance()
656+
continue
657+
root, eps = _materialize_shard_root(files, self.project_dir)
658+
roots.append(root)
659+
fut = remote_fn.remote(eps, str(root), "")
660+
futures.append(fut)
661+
meta[fut] = (idx, n)
662+
663+
deadline = (
664+
time.perf_counter() + float(self.shard_timeout)
665+
if self.shard_timeout > 0 else None
666+
)
667+
pending = list(futures)
668+
while pending:
669+
if deadline is not None:
670+
remaining = deadline - time.perf_counter()
671+
if remaining <= 0:
672+
break
673+
else:
674+
remaining = None
675+
676+
ready, pending = ray.wait(pending, num_returns=1, timeout=remaining)
677+
if not ready:
678+
break
679+
680+
fut = ready[0]
681+
idx, n = meta[fut]
682+
try:
683+
triples = ray.get(fut)
684+
all_edges.extend(
685+
PyCallEdge(source=s, target=t, weight=w, provenance=["pycg"])
686+
for s, t, w in triples
687+
)
688+
logger.debug("PyCG shard %d: %d edges from %d files (Ray)", idx, len(triples), n)
689+
except Exception as exc:
690+
logger.warning("PyCG shard %d failed — skipped: %s", idx, exc)
691+
skipped += 1
692+
progress.advance()
693+
694+
for fut in pending:
695+
idx, _ = meta[fut]
696+
logger.warning(
697+
"PyCG shard %d timed out after %ds — skipped",
698+
idx, self.shard_timeout,
699+
)
700+
ray.cancel(fut, force=True)
701+
skipped += 1
702+
progress.advance()
703+
finally:
704+
for root in roots:
705+
shutil.rmtree(root, ignore_errors=True)
706+
707+
if skipped:
708+
logger.warning(
709+
"PyCG: %d/%d shard(s) skipped (ceiling, %ds timeout, or failure)",
710+
skipped, len(plan.shards), self.shard_timeout,
711+
)
712+
713+
result = self._coalesce_edges(all_edges)
714+
logger.info(
715+
"PyCG: %d edges from %d/%d shard(s) (%d before dedup, Jedi-planned, Ray-parallel)",
716+
len(result), len(plan.shards) - skipped, len(plan.shards), len(all_edges),
717+
)
718+
return result
719+
620720
def _build_sharded(
621721
self,
622722
entry_points: List[str],

0 commit comments

Comments
 (0)