diff --git a/libs/openant-core/parsers/python/call_graph_builder.py b/libs/openant-core/parsers/python/call_graph_builder.py index a6741cc1..1895c826 100644 --- a/libs/openant-core/parsers/python/call_graph_builder.py +++ b/libs/openant-core/parsers/python/call_graph_builder.py @@ -183,21 +183,130 @@ def _extract_calls_from_code(self, code: str, caller_id: str) -> Set[str]: # Fall back to regex-based extraction return self._extract_calls_regex(code, caller_id) + # Local function-value aliases within this caller: `fn = helper` makes a + # subsequent `fn()` a call to `helper`. Resolve the alias target to a + # function ID up front so the call resolver can follow it. + aliases = self._build_alias_map(tree, caller_file) + for node in ast.walk(tree): if isinstance(node, ast.Call): - resolved = self._resolve_call_node(node, caller_file, caller_class) + resolved = self._resolve_call_node(node, caller_file, caller_class, aliases) if resolved: calls.add(resolved) return calls - def _resolve_call_node(self, node: ast.Call, caller_file: str, caller_class: Optional[str]) -> Optional[str]: + def _build_alias_map(self, tree: ast.AST, caller_file: str) -> Dict[str, str]: + """Map local names bound to a known function value -> that function's ID. + + Captures simple `name = other_name` bindings where `other_name` resolves + (same-file or via the global single-name index) to a user function. This + lets `fn = helper; fn()` recover the `helper` edge. Class/method targets + are out of scope (only ast.Name = ast.Name single-target assigns). + + SINGLE-UNCONDITIONAL-ASSIGNMENT GUARD: an alias is kept ONLY when the + name is bound exactly once AND at the function/module top level (a direct + statement of the body, not nested inside an If/For/While/Try/With or any + other block). A name bound 2+ times (last-write-wins) or bound inside a + conditional/loop/try/with is a "maybe" binding; resolving it would assert + a maybe as definite, so we drop it and let `x()` fall through to normal + resolution (no edge). Precision over recall. + """ + # Count EVERY assignment to each name anywhere in the body (any nesting, + # both Assign targets and AnnAssign), so a reassignment or a conditional + # rebinding disqualifies the name even if one binding is top-level. + assign_counts: Dict[str, int] = {} + for node in ast.walk(tree): + if isinstance(node, ast.Assign): + for target in node.targets: + for name in self._assigned_names(target): + assign_counts[name] = assign_counts.get(name, 0) + 1 + elif isinstance(node, ast.AnnAssign) and node.value is not None: + for name in self._assigned_names(node.target): + assign_counts[name] = assign_counts.get(name, 0) + 1 + + # Only consider candidate aliases declared at the body's TOP LEVEL. + top_level = self._function_body_statements(tree) + aliases: Dict[str, str] = {} + for stmt in top_level: + if not isinstance(stmt, ast.Assign): + continue + if not isinstance(stmt.value, ast.Name): + continue + if len(stmt.targets) != 1: + continue + target = stmt.targets[0] + if not isinstance(target, ast.Name): + continue + if target.id == stmt.value.id: + continue + # GUARD: bound exactly once across the whole body (single + + # unconditional, because a conditional binding would push the + # top-level count higher OR not appear at top level at all). + if assign_counts.get(target.id, 0) != 1: + continue + resolved = self._resolve_simple_call(stmt.value.id, caller_file) + if resolved: + aliases[target.id] = resolved + return aliases + + @staticmethod + def _assigned_names(target: ast.AST): + """Yield the simple Name ids bound by an assignment target (recursively + through tuple/list unpacking).""" + if isinstance(target, ast.Name): + yield target.id + elif isinstance(target, (ast.Tuple, ast.List)): + for elt in target.elts: + yield from CallGraphBuilder._assigned_names(elt) + + @staticmethod + def _function_body_statements(tree: ast.AST) -> List[ast.stmt]: + """Return the top-level statements of the analysed function body. + + The extractor's `code` includes the `def` line, so a single function + parses to Module(body=[FunctionDef]); the real body is that def's body. + Fall back to the module body for bare module-level code. + """ + body = getattr(tree, 'body', []) + if len(body) == 1 and isinstance(body[0], (ast.FunctionDef, ast.AsyncFunctionDef)): + return list(body[0].body) + return list(body) + + def _resolve_local_function(self, func_name: str, caller_file: str) -> Optional[str]: + """Resolve a bare name to a same-file, non-method user function. + + Deliberately same-file ONLY (no global cross-file fallback) so a genuine + builtin call is never linked to an unrelated same-named function in + another file. + """ + for func_id in self.functions_by_file.get(caller_file, []): + func_data = self.functions.get(func_id, {}) + if func_data.get('name') == func_name and not func_data.get('class_name'): + return func_id + return None + + def _resolve_call_node(self, node: ast.Call, caller_file: str, caller_class: Optional[str], + aliases: Optional[Dict[str, str]] = None) -> Optional[str]: """Resolve an AST Call node to a function ID.""" func = node.func # Simple function call: func_name(...) if isinstance(func, ast.Name): func_name = func.id + # Local function-value alias: `fn = helper; fn()` resolves to helper. + if aliases and func_name in aliases: + return aliases[func_name] + # A same-file user function with the same name as a stdlib + # module/builtin wins over the builtin filter: the call is + # unambiguously to the local definition. SCOPE is deliberately + # restricted to the caller's own file -- we do NOT use the global + # cross-file name index here, so a genuine builtin call (e.g. + # time()) is never linked to an unrelated same-named function in + # some other file. + local = self._resolve_local_function(func_name, caller_file) + if local: + return local if self._is_builtin(func_name): return None return self._resolve_simple_call(func_name, caller_file) @@ -262,14 +371,34 @@ def _resolve_simple_call(self, func_name: str, caller_file: str) -> Optional[str return None def _resolve_self_call(self, method_name: str, caller_file: str, caller_class: str) -> Optional[str]: - """Resolve a self.method() call within a class.""" - class_key = f"{caller_file}:{caller_class}" - class_methods = self.methods_by_class.get(class_key, []) + """Resolve a self.method() call within a class or its (same-file) bases. - for func_id in class_methods: - func_data = self.functions.get(func_id, {}) - if func_data.get('name') == method_name: - return func_id + Walks the class first, then its base classes transitively (breadth-first, + cycle-guarded), so a method inherited from a base resolves. Base lookup + is restricted to classes defined in the caller's own file -- external + base classes aren't in our index, so they're left unresolved rather than + mis-linked. + """ + seen: Set[str] = set() + queue: List[str] = [caller_class] + while queue: + class_name = queue.pop(0) + if class_name in seen: + continue + seen.add(class_name) + + class_key = f"{caller_file}:{class_name}" + for func_id in self.methods_by_class.get(class_key, []): + func_data = self.functions.get(func_id, {}) + if func_data.get('name') == method_name: + return func_id + + class_data = self.classes.get(class_key, {}) + for base in class_data.get('bases', []): + # Only same-file base names are resolvable via our index. + base_name = base.split('.')[-1] + if base_name not in seen: + queue.append(base_name) return None @@ -328,9 +457,13 @@ def _resolve_import(self, import_path: str, func_name: str, caller_file: str) -> return target_id # Strategy 2: Check if import_path itself is a function + # The matched function's NAME must equal the called function name -- + # matching on parts[-1] (the module path tail) alone spuriously links + # e.g. `import alpha; alpha.run()` to a free function named `alpha`, + # ignoring that `run` (not `alpha`) was actually called. for func_id in self.functions: func_data = self.functions[func_id] - if func_data.get('name') == parts[-1]: + if func_data.get('name') == func_name: # Check if file path matches module path file_path = func_data.get('file_path', '') module_path = file_path.replace('/', '.').replace('.py', '') @@ -349,6 +482,12 @@ def _extract_calls_regex(self, code: str, caller_id: str) -> Set[str]: pattern = r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*\(' for match in re.finditer(pattern, code): func_name = match.group(1) + # Same as the AST path: a same-file user function named like a + # stdlib module/builtin still wins over the builtin filter. + local = self._resolve_local_function(func_name, caller_file) + if local: + calls.add(local) + continue if not self._is_builtin(func_name): resolved = self._resolve_simple_call(func_name, caller_file) if resolved: diff --git a/libs/openant-core/parsers/python/function_extractor.py b/libs/openant-core/parsers/python/function_extractor.py index 8714e9dc..b23ba814 100644 --- a/libs/openant-core/parsers/python/function_extractor.py +++ b/libs/openant-core/parsers/python/function_extractor.py @@ -216,7 +216,12 @@ def classify_function(self, func_name: str, decorators: List[str], return 'constructor' if func_name.startswith('__') and func_name.endswith('__'): return 'dunder_method' - if '@property' in dec_str: + # @property getter, @.setter/.deleter, and @cached_property/ + # @functools.cached_property are all property accessors. Reuse the + # single _property_role predicate so classification can't drift from + # the qualified_name role-suffix logic (a literal '@property' match + # silently mislabels @cached_property as a plain method). + if self._property_role(decorators) is not None: return 'property' if '@staticmethod' in dec_str: return 'static_method' @@ -228,8 +233,19 @@ def classify_function(self, func_name: str, decorators: List[str], if 'middleware' in func_name.lower() or 'middleware' in path_lower: return 'middleware' - # Test functions - if func_name.startswith('test_') or 'test' in path_lower: + # Test functions. Match the file by PATH COMPONENT, not bare substring: + # 'test' in path_lower wrongly flags e.g. latest.py / contest.py / fastest.py. + # A real test file is one whose directory or filename is/starts-with 'test' + # (pytest's discovery convention: test_*.py / *_test.py / a tests/ dir). + path_parts = path_lower.replace('\\', '/').split('/') + filename = path_parts[-1] if path_parts else '' + is_test_path = ( + any(part == 'test' or part == 'tests' for part in path_parts[:-1]) + or filename.startswith('test_') + or filename.endswith('_test.py') + or filename == 'test.py' + ) + if func_name.startswith('test_') or is_test_path: return 'test' # Utility functions @@ -268,29 +284,194 @@ def extract_imports(self, tree: ast.AST, file_path: str) -> Dict[str, str]: return imports + def _property_role(self, decorators: List[str]) -> Optional[str]: + """Classify a property accessor from its decorators: getter | setter | + deleter | None. Match on the decorator's final dotted segment (TOKEN), + not a bare substring, so `@property`/`@cached_property`/ + `@functools.cached_property`/`@x.setter`/`@x.deleter` are recognized but + a method whose decorator merely CONTAINS the text (e.g. + `@some_property_validator`, `@app.property_route`) is NOT misclassified.""" + for d in decorators: + leaf = d.lstrip('@').split('(')[0].rsplit('.', 1)[-1] + if leaf == 'setter': + return 'setter' + if leaf == 'deleter': + return 'deleter' + for d in decorators: + leaf = d.lstrip('@').split('(')[0].rsplit('.', 1)[-1] + if leaf in ('property', 'cached_property'): + return 'getter' + return None + + def _store_function(self, func_id: str, func_data: Dict) -> str: + """Insert a function unit, disambiguating any residual func_id collision. + + Property accessors are already disambiguated by ROLE upstream (in + process_function, via the qualified_name), so they never collide here. + The residual cases are TRUE same-qualified-name duplicates -- two nested + defs of the same name in one scope, or a lambda sharing a name with a + def. Keying solely on qualified_name would let the second overwrite the + first (a recall loss), so disambiguate DETERMINISTICALLY by source line + (`#L`), never by emission order -- the canonical-unit choice must + be stable across edits. The earlier-in-source unit (parsed first) keeps + the clean id. + """ + if func_id not in self.functions: + self.functions[func_id] = func_data + return func_id + line = func_data.get('start_line', 0) + unique_id = f"{func_id}#L{line}" + n = 2 + while unique_id in self.functions: + unique_id = f"{func_id}#L{line}.{n}" + n += 1 + self.functions[unique_id] = func_data + return unique_id + + def _count_function(self, func_data: Dict, *, is_method: bool) -> None: + """Update statistics for a single emitted function/method unit.""" + self.stats['total_functions'] += 1 + if is_method: + self.stats['total_methods'] += 1 + else: + self.stats['standalone_functions'] += 1 + if func_data['is_async']: + self.stats['async_functions'] += 1 + unit_type = func_data['unit_type'] + self.stats['by_type'][unit_type] = self.stats['by_type'].get(unit_type, 0) + 1 + + def _process_function_tree(self, node: ast.AST, file_path: Path, content: str, + class_name: Optional[str] = None) -> None: + """Register a function and recurse into its body. + + Handles defs nested inside a function body (which the top-level child + iteration never reaches) and classes nested inside a function. Each + nested def is emitted as its own unit; nested classes are delegated to + process_class so their methods are extracted too. + """ + func_id, func_data = self.process_function(node, str(file_path), content, class_name) + self._store_function(func_id, func_data) + self._count_function(func_data, is_method=class_name is not None) + + # Recurse into the body: a def nested inside this function's body is + # never reached by the top-level / direct-method walks. + for child in node.body: + if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)): + # A def nested inside a function is a standalone (non-method) + # function in its own right; do not attribute it to a class. + self._process_function_tree(child, file_path, content, class_name=None) + elif isinstance(child, ast.ClassDef): + self._process_class_tree(child, file_path, content, outer_qualifier=None) + + def _process_class_tree(self, node: ast.ClassDef, file_path: Path, content: str, + outer_qualifier: Optional[str] = None) -> None: + """Register a class, its methods, and any classes nested within it. + + `outer_qualifier` is the dotted prefix of any enclosing class + (e.g. 'Outer' so an inner class method is keyed 'Outer.Inner.deep'). + """ + class_id, class_data, method_nodes = self.process_class( + node, str(file_path), content, outer_qualifier=outer_qualifier + ) + self.classes[class_id] = class_data + self.stats['total_classes'] += 1 + + qualified_class = f"{outer_qualifier}.{node.name}" if outer_qualifier else node.name + + for method_node, method_class_name in method_nodes: + # Methods may themselves contain nested defs -- recurse. + self._process_function_tree(method_node, file_path, content, class_name=method_class_name) + + # Recurse into nested classes so their methods are extracted. + for item in node.body: + if isinstance(item, ast.ClassDef): + self._process_class_tree(item, file_path, content, outer_qualifier=qualified_class) + + def extract_assigned_lambdas(self, tree: ast.AST, file_path: Path, content: str) -> None: + """Emit a function unit for each module-level `name = lambda ...`. + + Only FunctionDef/AsyncFunctionDef/ClassDef are recognised as units, so a + named lambda (a common handler / dispatch idiom) is invisible and calls + to it cannot resolve. Capture module-level single-target name bindings to + a lambda as functions. + """ + relative_path = str(file_path.relative_to(self.repo_path)) + for node in ast.iter_child_nodes(tree): + if not isinstance(node, ast.Assign): + continue + if not isinstance(node.value, ast.Lambda): + continue + for target in node.targets: + if not isinstance(target, ast.Name): + continue + name = target.id + func_id = f"{relative_path}:{name}" + params = [a.arg for a in node.value.args.args] + if node.value.args.vararg: + params.append(f"*{node.value.args.vararg.arg}") + for a in node.value.args.kwonlyargs: + params.append(a.arg) + if node.value.args.kwarg: + params.append(f"**{node.value.args.kwarg.arg}") + func_data = { + 'name': name, + 'qualified_name': name, + 'file_path': relative_path, + 'start_line': node.lineno, + 'end_line': getattr(node, 'end_lineno', node.lineno), + 'code': self.get_source_segment(content, node), + 'class_name': None, + 'decorators': [], + 'is_async': False, + 'parameters': params, + 'docstring': None, + 'unit_type': self.classify_function(name, [], None, relative_path), + 'is_lambda': True, + } + self._store_function(func_id, func_data) + self._count_function(func_data, is_method=False) + def process_function(self, node: ast.FunctionDef, file_path: str, content: str, class_name: Optional[str] = None) -> Dict: """Process a function definition and extract metadata.""" func_name = node.name - qualified_name = f"{class_name}.{func_name}" if class_name else func_name - - # Generate unique ID relative_path = str(Path(file_path).relative_to(self.repo_path)) - func_id = f"{relative_path}:{qualified_name}" # Extract metadata decorators = self.extract_decorators(node) + + # @property getter, @x.setter and @x.deleter accessors all share the + # qualified name `Class.x`, which would collide into one func_id and let + # the setter overwrite the getter. Disambiguate by ROLE in the + # qualified_name (getter stays canonical `C.x`; setter -> `C.x.setter`, + # deleter -> `C.x.deleter`). This keeps func_id == path:qualified_name -- + # the invariant call_graph_builder relies on to reconstruct call targets + # -- and is order-independent (role is intrinsic, not emission position). + property_role = self._property_role(decorators) + qualified_name = f"{class_name}.{func_name}" if class_name else func_name + if property_role in ('setter', 'deleter'): + qualified_name = f"{qualified_name}.{property_role}" + + # Generate unique ID (after any role suffix) + func_id = f"{relative_path}:{qualified_name}" parameters = self.extract_parameters(node) docstring = self.get_docstring(node) code = self.get_source_segment(content, node) is_async = isinstance(node, ast.AsyncFunctionDef) unit_type = self.classify_function(func_name, decorators, class_name, relative_path) + # The captured `code` (get_source_segment) includes any decorator lines, + # so start_line must point at the first decorator, not the `def` line. + # Off-by-one for one decorator; off-by-N for stacked decorators. + start_line = node.lineno + if getattr(node, 'decorator_list', None): + start_line = min(start_line, min(d.lineno for d in node.decorator_list)) + func_data = { 'name': func_name, 'qualified_name': qualified_name, 'file_path': relative_path, - 'start_line': node.lineno, + 'start_line': start_line, 'end_line': getattr(node, 'end_lineno', node.lineno), 'code': code, 'class_name': class_name, @@ -299,13 +480,20 @@ def process_function(self, node: ast.FunctionDef, file_path: str, 'parameters': parameters, 'docstring': docstring[:500] if docstring else None, # Truncate long docstrings 'unit_type': unit_type, + 'property_role': property_role, } return func_id, func_data - def process_class(self, node: ast.ClassDef, file_path: str, content: str) -> Tuple[str, Dict, List[Tuple]]: - """Process a class definition and extract metadata.""" - class_name = node.name + def process_class(self, node: ast.ClassDef, file_path: str, content: str, + outer_qualifier: Optional[str] = None) -> Tuple[str, Dict, List[Tuple]]: + """Process a class definition and extract metadata. + + `outer_qualifier` is the dotted name of any enclosing class, so a class + nested inside another is keyed by its full path (e.g. 'Outer.Inner') and + its methods become 'Outer.Inner.method'. + """ + class_name = f"{outer_qualifier}.{node.name}" if outer_qualifier else node.name relative_path = str(Path(file_path).relative_to(self.repo_path)) class_id = f"{relative_path}:{class_name}" @@ -493,37 +681,17 @@ def process_file(self, file_path: Path) -> None: # Extract imports self.imports[relative_path] = self.extract_imports(tree, relative_path) - # Process top-level functions and classes + # Process top-level functions and classes. The tree helpers recurse so + # defs nested in function bodies and classes nested in classes/functions + # are also extracted (not just the direct children). for node in ast.iter_child_nodes(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): - func_id, func_data = self.process_function(node, file_path, content) - self.functions[func_id] = func_data - self.stats['total_functions'] += 1 - self.stats['standalone_functions'] += 1 - if func_data['is_async']: - self.stats['async_functions'] += 1 - - # Track by type - unit_type = func_data['unit_type'] - self.stats['by_type'][unit_type] = self.stats['by_type'].get(unit_type, 0) + 1 - + self._process_function_tree(node, file_path, content, class_name=None) elif isinstance(node, ast.ClassDef): - class_id, class_data, method_nodes = self.process_class(node, file_path, content) - self.classes[class_id] = class_data - self.stats['total_classes'] += 1 - - # Process methods - for method_node, class_name in method_nodes: - func_id, func_data = self.process_function(method_node, file_path, content, class_name) - self.functions[func_id] = func_data - self.stats['total_functions'] += 1 - self.stats['total_methods'] += 1 - if func_data['is_async']: - self.stats['async_functions'] += 1 - - # Track by type - unit_type = func_data['unit_type'] - self.stats['by_type'][unit_type] = self.stats['by_type'].get(unit_type, 0) + 1 + self._process_class_tree(node, file_path, content, outer_qualifier=None) + + # Module-level lambdas bound to a name (handler = lambda ...). + self.extract_assigned_lambdas(tree, file_path, content) # Extract module-level code module_result = self.extract_module_level_code(tree, content, file_path) diff --git a/libs/openant-core/tests/parsers/python/test_call_graph_builder_u8.py b/libs/openant-core/tests/parsers/python/test_call_graph_builder_u8.py new file mode 100644 index 00000000..50528df4 --- /dev/null +++ b/libs/openant-core/tests/parsers/python/test_call_graph_builder_u8.py @@ -0,0 +1,155 @@ +"""Regression tests for four confirmed call_graph_builder bugs (u8 bundle). + +Each test drives the REAL parser pipeline: write source file(s) to a temp +repo, run FunctionExtractor, build the call graph with CallGraphBuilder, then +assert the edge that should exist (or the false edge that must NOT exist). + +Bugs covered: + [10] builtin-filter leak — user fn named like a stdlib module (`time`) loses + its call edge because _is_builtin short-circuits before same-file lookup. + [12] dataflow alias — `fn = helper; fn()` loses the edge to `helper`. + [27] import over-resolution — `import alpha; alpha.run()` spuriously resolves + to a free function named `alpha` (matched on module tail, not call name). + [46] inherited-self dispatch — `self.shared()` calling an inherited base + method isn't resolved. +""" + +import sys +import tempfile +from pathlib import Path + +_CORE_ROOT = Path(__file__).resolve().parents[3] +sys.path.insert(0, str(_CORE_ROOT)) + +from parsers.python.function_extractor import FunctionExtractor +from parsers.python.call_graph_builder import CallGraphBuilder + + +def _build(files: dict) -> CallGraphBuilder: + """Write {relpath: source} into a temp repo, run the real pipeline.""" + d = tempfile.mkdtemp() + for rel, src in files.items(): + p = Path(d) / rel + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(src) + extractor_output = FunctionExtractor(d).extract_all() + builder = CallGraphBuilder(extractor_output) + builder.build_call_graph() + return builder + + +# ---------------------------------------------------------------- [10] +def test_user_fn_named_like_stdlib_module_resolves(): + """A user fn named `time` (a stdlib module name) must still get its edge.""" + builder = _build({ + "m.py": "def time():\n return 1\n\ndef main():\n time()\n", + }) + caller = "m.py:main" + callee = "m.py:time" + assert callee in builder.call_graph[caller], ( + f"Edge {caller} -> {callee} missing (builtin filter leaked).\n" + f" Got: {builder.call_graph[caller]}" + ) + + +def test_genuine_builtin_call_not_linked_to_unrelated_samename_fn(): + """SCOPE guard: a genuine `time()` builtin call in file b.py must NOT link + to a user `def time()` in an UNRELATED file a.py (no cross-file leak).""" + builder = _build({ + "a.py": "def time():\n return 1\n", + "b.py": "def consume():\n return time()\n", + }) + caller = "b.py:consume" + bad = "a.py:time" + assert bad not in builder.call_graph.get(caller, []), ( + f"False cross-file edge {caller} -> {bad} (builtin call wrongly linked).\n" + f" Got: {builder.call_graph.get(caller, [])}" + ) + + +# ---------------------------------------------------------------- [12] +def test_function_value_alias_resolves(): + """`fn = helper; fn()` must produce main -> helper.""" + builder = _build({ + "m.py": "def helper():\n return 1\n\ndef main():\n fn = helper\n fn()\n", + }) + caller = "m.py:main" + callee = "m.py:helper" + assert callee in builder.call_graph[caller], ( + f"Edge {caller} -> {callee} missing (alias not followed).\n" + f" Got: {builder.call_graph[caller]}" + ) + + +# ---------------------------------------------------- [12] single-assign GUARD +def test_alias_reassignment_not_resolved(): + """GUARD (reassignment): `fn = a; fn = b; fn()` is last-write-wins, so the + binding is a "maybe". The guard must NOT assert a definite edge to EITHER + target — pinned behavior: no alias edge at all (fall through to no edge).""" + builder = _build({ + "m.py": ( + "def a():\n return 1\n\n" + "def b():\n return 2\n\n" + "def main():\n fn = a\n fn = b\n fn()\n" + ), + }) + caller = "m.py:main" + edges = builder.call_graph.get(caller, []) + assert "m.py:a" not in edges and "m.py:b" not in edges, ( + f"reassigned alias asserted a maybe-binding as definite: {edges}" + ) + + +def test_alias_conditional_binding_not_resolved(): + """GUARD (conditional): an alias bound inside if/else is not unconditional, + so it must NOT be resolved (no edge to either branch's target).""" + builder = _build({ + "m.py": ( + "def a():\n return 1\n\n" + "def b():\n return 2\n\n" + "def main(cond):\n" + " if cond:\n fn = a\n else:\n fn = b\n" + " fn()\n" + ), + }) + caller = "m.py:main" + edges = builder.call_graph.get(caller, []) + assert "m.py:a" not in edges and "m.py:b" not in edges, ( + f"conditional alias resolved despite non-unconditional binding: {edges}" + ) + + +# ---------------------------------------------------------------- [27] +def test_import_module_call_does_not_false_link_to_samename_free_fn(): + """`import alpha; alpha.run()` must NOT link caller -> free fn `alpha`.""" + builder = _build({ + "m.py": "import alpha\n\ndef alpha():\n return 1\n\ndef caller():\n return alpha.run()\n", + }) + caller = "m.py:caller" + bad = "m.py:alpha" + assert bad not in builder.call_graph.get(caller, []), ( + f"False edge {caller} -> {bad} (matched module tail, not call name).\n" + f" Got: {builder.call_graph.get(caller, [])}" + ) + + +# ---------------------------------------------------------------- [46] +def test_inherited_self_method_resolves(): + """self.shared() inherited from a base class must resolve to Base.shared.""" + builder = _build({ + "m.py": ( + "class Base:\n" + " def shared(self):\n" + " return 1\n" + "\n" + "class Child(Base):\n" + " def run(self):\n" + " return self.shared()\n" + ), + }) + caller = "m.py:Child.run" + callee = "m.py:Base.shared" + assert callee in builder.call_graph[caller], ( + f"Edge {caller} -> {callee} missing (inherited self-call not resolved).\n" + f" Got: {builder.call_graph[caller]}" + ) diff --git a/libs/openant-core/tests/parsers/python/test_function_extractor_u9.py b/libs/openant-core/tests/parsers/python/test_function_extractor_u9.py new file mode 100644 index 00000000..4206a050 --- /dev/null +++ b/libs/openant-core/tests/parsers/python/test_function_extractor_u9.py @@ -0,0 +1,231 @@ +"""Regression tests for FunctionExtractor (u9 blind bug batch, 2026-06-08). + +Six confirmed bugs in parsers/python/function_extractor.py: + + BUG 9 nested-def inside a function body is never extracted as a unit. + BUG 21 module-level `handler = lambda ...` is never extracted as a unit. + BUG 23 decorated function start_line points at `def` line, not the decorator + (off-by-one; off-by-N for stacked decorators) while `code` includes them. + BUG 34 `@x.setter` is classified 'method' not 'property', AND getter/setter + share a func_id so the setter silently overwrites the getter. + BUG 45 a method of a class nested inside another class is never extracted. + BUG 48 unit_type='test' for any file whose path merely CONTAINS the substring + 'test' (e.g. latest.py), instead of a path-component check. + +Each test writes a minimal source file under a tmp dir, runs the extractor, +and asserts on the returned dict (the parser operates on files, so we model +the real entry path: FunctionExtractor(repo).extract_all([rel_path])). +""" + +import sys +from pathlib import Path + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[3] +sys.path.insert(0, str(_CORE_ROOT)) + +from parsers.python.function_extractor import FunctionExtractor + + +def _extract(tmp_path: Path, filename: str, source: str) -> dict: + """Write `source` to tmp_path/filename, extract, return the full result dict.""" + f = tmp_path / filename + f.parent.mkdir(parents=True, exist_ok=True) + f.write_text(source, encoding="utf-8") + extractor = FunctionExtractor(str(tmp_path)) + return extractor.extract_all([filename]) + + +# --- BUG 9: nested def inside a function body --------------------------------- + +def test_bug9_nested_def_is_extracted(tmp_path): + source = "def outer():\n def inner():\n return 1\n return inner\n" + result = _extract(tmp_path, "m.py", source) + func_names = {fd["name"] for fd in result["functions"].values()} + assert "inner" in func_names, ( + f"nested def 'inner' not extracted. functions={list(result['functions'])}" + ) + + +# --- BUG 21: module-level lambda assigned to a name --------------------------- + +def test_bug21_module_level_lambda_is_extracted(tmp_path): + source = "handler = lambda req: req.upper()\n" + result = _extract(tmp_path, "m.py", source) + func_names = {fd["name"] for fd in result["functions"].values()} + assert "handler" in func_names, ( + f"module-level lambda 'handler' not extracted. functions={list(result['functions'])}" + ) + + +# --- BUG 23: decorated start_line includes the decorator line ----------------- + +def test_bug23_decorated_start_line_includes_decorator(tmp_path): + source = "class Foo:\n @staticmethod\n def bar():\n pass\n" + result = _extract(tmp_path, "m.py", source) + fid = "m.py:Foo.bar" + assert fid in result["functions"], f"missing {fid}: {list(result['functions'])}" + # @staticmethod is on line 2; def bar is on line 3. start_line must be 2. + assert result["functions"][fid]["start_line"] == 2, ( + f"decorated start_line off-by-one: got {result['functions'][fid]['start_line']}, expected 2" + ) + + +def test_bug23_stacked_decorators_start_line(tmp_path): + source = ( + "def d1(f):\n return f\n\n" + "def d2(f):\n return f\n\n" + "@d1\n@d2\ndef target():\n pass\n" + ) + result = _extract(tmp_path, "m.py", source) + fid = "m.py:target" + assert fid in result["functions"], f"missing {fid}: {list(result['functions'])}" + # @d1 on line 7, @d2 on line 8, def target on line 9. start_line must be 7. + assert result["functions"][fid]["start_line"] == 7, ( + f"stacked-decorator start_line wrong: got {result['functions'][fid]['start_line']}, expected 7" + ) + + +# --- BUG 34: property getter/setter ------------------------------------------ + +def test_bug34_property_setter_classified_and_not_collapsed(tmp_path): + source = ( + "class C:\n" + " @property\n" + " def x(self):\n" + " return self._x\n" + "\n" + " @x.setter\n" + " def x(self, value):\n" + " self._x = value\n" + ) + result = _extract(tmp_path, "m.py", source) + funcs = result["functions"] + # Getter keeps the CANONICAL id; the setter is disambiguated by ROLE in the + # qualified_name (order-independent) -- so both survive AND func_id stays + # path:qualified_name (the call_graph_builder reconstruction invariant). + assert "m.py:C.x" in funcs, f"getter lost its canonical id; ids={list(funcs)}" + assert "m.py:C.x.setter" in funcs, f"setter not stored under role id; ids={list(funcs)}" + getter, setter = funcs["m.py:C.x"], funcs["m.py:C.x.setter"] + # Invariant: func_id == path:qualified_name for BOTH units. + assert getter["qualified_name"] == "C.x", getter["qualified_name"] + assert setter["qualified_name"] == "C.x.setter", setter["qualified_name"] + # Both classified 'property'; roles distinguished. + assert getter["unit_type"] == "property" and setter["unit_type"] == "property" + assert getter["property_role"] == "getter", getter["property_role"] + assert setter["property_role"] == "setter", setter["property_role"] + + +# --- BUG 45: nested class members --------------------------------------------- + +def test_bug45_nested_class_method_is_extracted(tmp_path): + source = ( + "class Outer:\n" + " class Inner:\n" + " def deep(self):\n" + " return 1\n" + " def shallow(self):\n" + " return 2\n" + ) + result = _extract(tmp_path, "m.py", source) + deep_units = [fd for fd in result["functions"].values() if fd["name"] == "deep"] + assert deep_units, ( + f"nested-class method 'deep' not extracted. functions={list(result['functions'])}" + ) + + +# --- BUG 48: test classification by substring vs path component --------------- + +def test_bug48_substring_test_not_classified_as_test(tmp_path): + source = "def compute():\n return 1\n" + result = _extract(tmp_path, "latest.py", source) + fid = "latest.py:compute" + assert fid in result["functions"], f"missing {fid}: {list(result['functions'])}" + assert result["functions"][fid]["unit_type"] == "function", ( + f"'latest.py' misclassified as test: got {result['functions'][fid]['unit_type']}, expected 'function'" + ) + + +def test_bug48_real_test_file_still_classified_test(tmp_path): + """Guard: a genuine test file path component must STILL classify as test.""" + source = "def helper():\n return 1\n" + result = _extract(tmp_path, "tests/test_thing.py", source) + fid = "tests/test_thing.py:helper" + assert fid in result["functions"], f"missing {fid}: {list(result['functions'])}" + assert result["functions"][fid]["unit_type"] == "test", ( + f"genuine test file not classified test: got {result['functions'][fid]['unit_type']}" + ) + + +# --- BUG 34 re-verification (2026-06-09): property-classification edge cases ----- +def test_bug34_cached_property_getter_classified_property(tmp_path): + source = ( + "from functools import cached_property\n" + "class C:\n" + " @cached_property\n" + " def x(self):\n" + " return self._x\n" + ) + funcs = _extract(tmp_path, "m.py", source)["functions"] + assert "m.py:C.x" in funcs, list(funcs) + assert funcs["m.py:C.x"]["property_role"] == "getter" + assert funcs["m.py:C.x"]["unit_type"] == "property", ( + f"cached_property getter misclassified: {funcs['m.py:C.x']['unit_type']}" + ) + + +def test_bug34_functools_cached_property_classified_property(tmp_path): + source = ( + "import functools\n" + "class C:\n" + " @functools.cached_property\n" + " def x(self):\n" + " return self._x\n" + ) + funcs = _extract(tmp_path, "m.py", source)["functions"] + assert funcs["m.py:C.x"]["unit_type"] == "property" + + +def test_bug34_orphan_setter_not_lost(tmp_path): + source = "class C:\n @x.setter\n def x(self, v):\n self._x = v\n" + funcs = _extract(tmp_path, "m.py", source)["functions"] + assert "m.py:C.x.setter" in funcs, list(funcs) + assert funcs["m.py:C.x.setter"]["property_role"] == "setter" + assert funcs["m.py:C.x.setter"]["unit_type"] == "property" + + +def test_bug34_isolated_deleter(tmp_path): + source = "class C:\n @x.deleter\n def x(self):\n del self._x\n" + funcs = _extract(tmp_path, "m.py", source)["functions"] + assert "m.py:C.x.deleter" in funcs, list(funcs) + assert funcs["m.py:C.x.deleter"]["property_role"] == "deleter" + + +def test_bug34_two_classes_same_property_no_collision(tmp_path): + source = ( + "class C:\n @property\n def x(self):\n return 1\n" + "class D:\n @property\n def x(self):\n return 2\n" + ) + funcs = _extract(tmp_path, "m.py", source)["functions"] + assert "m.py:C.x" in funcs and "m.py:D.x" in funcs, list(funcs) + + +def test_bug34_non_property_decorator_not_misclassified(tmp_path): + """Narrowing guard: a method whose decorator merely CONTAINS 'property' as + a substring (not the property protocol) must NOT classify as 'property'.""" + source = ( + "class C:\n" + " @app.property_route\n" + " def a(self):\n" + " return 1\n" + " @some_property_validator\n" + " def b(self):\n" + " return 2\n" + ) + funcs = _extract(tmp_path, "m.py", source)["functions"] + a = next(fd for fd in funcs.values() if fd["name"] == "a") + b = next(fd for fd in funcs.values() if fd["name"] == "b") + assert a["unit_type"] != "property", f"@app.property_route mis-classified: {a['unit_type']}" + assert b["unit_type"] != "property", f"@some_property_validator mis-classified: {b['unit_type']}" + assert a["property_role"] is None and b["property_role"] is None