diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c65beb5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Python bytecode +__pycache__/ +*.py[cod] +*.pyo + +# Distribution / packaging +*.egg-info/ +dist/ +build/ +*.egg + +# Virtual environments +.venv/ +venv/ +env/ + +# Pytest +.pytest_cache/ +.coverage +htmlcov/ + +# Editor artifacts +.idea/ +.vscode/ +*.swp diff --git a/kyb_graph_analytics/__init__.py b/kyb_graph_analytics/__init__.py new file mode 100644 index 0000000..8fce43a --- /dev/null +++ b/kyb_graph_analytics/__init__.py @@ -0,0 +1,28 @@ +""" +kyb_graph_analytics +=================== +Graph-based analytics system for detecting shell companies and hidden +ownership structures in KYB/AML investigations. + +Modules +------- +graph_builder - Build directed ownership/relationship graphs from raw data. +centrality - PageRank and Betweenness centrality measures. +community_detection - Louvain community detection. +entity_resolution - Fuzzy entity matching and deduplication. +shell_company_detector - Composite risk scoring combining all analyses. +""" + +from .graph_builder import GraphBuilder +from .centrality import CentralityAnalyzer +from .community_detection import CommunityDetector +from .entity_resolution import EntityResolver +from .shell_company_detector import ShellCompanyDetector + +__all__ = [ + "GraphBuilder", + "CentralityAnalyzer", + "CommunityDetector", + "EntityResolver", + "ShellCompanyDetector", +] diff --git a/kyb_graph_analytics/centrality.py b/kyb_graph_analytics/centrality.py new file mode 100644 index 0000000..7031f05 --- /dev/null +++ b/kyb_graph_analytics/centrality.py @@ -0,0 +1,210 @@ +""" +centrality.py +------------- +Compute centrality measures on a KYB/AML ownership graph. + +PageRank + Identifies the most *influential* entities in an ownership network. + High-PageRank nodes are likely ultimate beneficial owners (UBOs) or + pivotal holding companies. + +Betweenness Centrality + Identifies *bridge* entities that sit on many shortest paths. + High-betweenness nodes are often intermediary shell companies used to + obfuscate ownership chains. + +In-Degree / Out-Degree + Simple counts of incoming/outgoing ownership edges. An entity with + many owners but few or no subsidiaries may be an opaque vehicle. +""" + +from __future__ import annotations + +from typing import Dict, Optional + +import networkx as nx + + +class CentralityAnalyzer: + """Compute and expose centrality metrics for an ownership graph. + + Parameters + ---------- + graph: + A ``networkx.DiGraph`` (or ``Graph``) representing the ownership + network produced by :class:`~kyb_graph_analytics.GraphBuilder`. + """ + + def __init__(self, graph: nx.Graph) -> None: + self.graph = graph + + # ------------------------------------------------------------------ + # PageRank + # ------------------------------------------------------------------ + + def pagerank( + self, + alpha: float = 0.85, + weight: Optional[str] = "weight", + max_iter: int = 100, + tol: float = 1.0e-6, + ) -> Dict[str, float]: + """Compute PageRank for all nodes. + + Parameters + ---------- + alpha: + Damping factor (default 0.85). + weight: + Edge attribute to use as weight. Pass ``None`` to treat all + edges equally. + max_iter: + Maximum number of iterations. + tol: + Convergence tolerance. + + Returns + ------- + dict mapping node ID → PageRank score (float in [0, 1]). + """ + if self.graph.number_of_nodes() == 0: + return {} + return nx.pagerank( + self.graph, + alpha=alpha, + weight=weight, + max_iter=max_iter, + tol=tol, + ) + + # ------------------------------------------------------------------ + # Betweenness Centrality + # ------------------------------------------------------------------ + + def betweenness_centrality( + self, + normalized: bool = True, + weight: Optional[str] = None, + ) -> Dict[str, float]: + """Compute Betweenness Centrality for all nodes. + + Parameters + ---------- + normalized: + When *True* (default) values are normalised to [0, 1]. + weight: + Edge attribute interpreted as *distance* (lower weight = shorter + path). Pass ``None`` to count hops only. + + Returns + ------- + dict mapping node ID → betweenness score (float in [0, 1]). + """ + if self.graph.number_of_nodes() == 0: + return {} + return nx.betweenness_centrality( + self.graph, + normalized=normalized, + weight=weight, + ) + + # ------------------------------------------------------------------ + # Degree Centrality + # ------------------------------------------------------------------ + + def in_degree_centrality(self) -> Dict[str, float]: + """Normalised in-degree centrality (for directed graphs). + + Returns + ------- + dict mapping node ID → normalised in-degree score. + """ + if self.graph.number_of_nodes() == 0: + return {} + if isinstance(self.graph, nx.DiGraph): + return nx.in_degree_centrality(self.graph) + return nx.degree_centrality(self.graph) + + def out_degree_centrality(self) -> Dict[str, float]: + """Normalised out-degree centrality (for directed graphs). + + Returns + ------- + dict mapping node ID → normalised out-degree score. + """ + if self.graph.number_of_nodes() == 0: + return {} + if isinstance(self.graph, nx.DiGraph): + return nx.out_degree_centrality(self.graph) + return nx.degree_centrality(self.graph) + + # ------------------------------------------------------------------ + # Combined report + # ------------------------------------------------------------------ + + def all_centrality_scores( + self, + pagerank_alpha: float = 0.85, + ) -> Dict[str, Dict[str, float]]: + """Return a combined dict of all centrality measures per node. + + Parameters + ---------- + pagerank_alpha: + Damping factor forwarded to :meth:`pagerank`. + + Returns + ------- + dict mapping node ID → ``{"pagerank": …, "betweenness": …, + "in_degree": …, "out_degree": …}``. + """ + pr = self.pagerank(alpha=pagerank_alpha) + bc = self.betweenness_centrality() + in_deg = self.in_degree_centrality() + out_deg = self.out_degree_centrality() + + return { + node: { + "pagerank": pr.get(node, 0.0), + "betweenness": bc.get(node, 0.0), + "in_degree": in_deg.get(node, 0.0), + "out_degree": out_deg.get(node, 0.0), + } + for node in self.graph.nodes() + } + + def top_nodes( + self, + measure: str = "pagerank", + n: int = 10, + pagerank_alpha: float = 0.85, + ) -> list: + """Return the top-*n* nodes ranked by *measure*. + + Parameters + ---------- + measure: + One of ``"pagerank"``, ``"betweenness"``, ``"in_degree"``, + ``"out_degree"``. + n: + Number of top nodes to return. + pagerank_alpha: + Damping factor forwarded to :meth:`pagerank`. + + Returns + ------- + list of ``(node_id, score)`` tuples, sorted descending. + """ + scores_map = { + "pagerank": self.pagerank(alpha=pagerank_alpha), + "betweenness": self.betweenness_centrality(), + "in_degree": self.in_degree_centrality(), + "out_degree": self.out_degree_centrality(), + } + if measure not in scores_map: + raise ValueError( + f"Unknown measure '{measure}'. Choose from: " + + ", ".join(scores_map) + ) + scores = scores_map[measure] + return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:n] diff --git a/kyb_graph_analytics/community_detection.py b/kyb_graph_analytics/community_detection.py new file mode 100644 index 0000000..536d36e --- /dev/null +++ b/kyb_graph_analytics/community_detection.py @@ -0,0 +1,231 @@ +""" +community_detection.py +---------------------- +Louvain community detection for KYB/AML ownership graphs. + +Communities in an ownership graph reveal clusters of closely related +entities that may constitute a single beneficial ownership group. A +community containing many shell-like companies warrants deeper scrutiny. + +Louvain is applied to the *undirected* version of the graph so that +ownership links in either direction contribute to the same community. +""" + +from __future__ import annotations + +from typing import Dict, List, Optional + +import networkx as nx + +try: + import community as community_louvain # python-louvain package +except ImportError as exc: # pragma: no cover + raise ImportError( + "The 'python-louvain' package is required for community detection. " + "Install it with: pip install python-louvain" + ) from exc + + +class CommunityDetector: + """Detect ownership communities using the Louvain algorithm. + + Parameters + ---------- + graph: + A NetworkX graph (directed or undirected). Directed graphs are + automatically converted to undirected for community detection. + random_state: + Seed for the Louvain random-number generator. Set to an integer + for reproducible results. + """ + + def __init__( + self, + graph: nx.Graph, + random_state: Optional[int] = 42, + ) -> None: + self.graph = graph + self.random_state = random_state + self._partition: Optional[Dict[str, int]] = None + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _undirected(self) -> nx.Graph: + """Return an undirected copy of the graph, collapsing parallel edges.""" + if isinstance(self.graph, nx.DiGraph): + return self.graph.to_undirected() + return self.graph + + # ------------------------------------------------------------------ + # Partition + # ------------------------------------------------------------------ + + def detect(self, resolution: float = 1.0) -> Dict[str, int]: + """Run Louvain community detection and return the partition. + + Parameters + ---------- + resolution: + Controls community granularity. Higher values produce more, + smaller communities; lower values produce fewer, larger ones. + Default is ``1.0`` (standard Louvain). + + Returns + ------- + dict mapping node ID → integer community label. + """ + undirected = self._undirected() + if undirected.number_of_nodes() == 0: + self._partition = {} + return self._partition + + self._partition = community_louvain.best_partition( + undirected, + weight="weight", + resolution=resolution, + random_state=self.random_state, + ) + return self._partition + + @property + def partition(self) -> Optional[Dict[str, int]]: + """The last computed partition, or *None* if :meth:`detect` has not + been called yet.""" + return self._partition + + # ------------------------------------------------------------------ + # Community grouping + # ------------------------------------------------------------------ + + def communities(self, resolution: float = 1.0) -> Dict[int, List[str]]: + """Return detected communities as a dict of label → member list. + + Calls :meth:`detect` internally if not already done. + + Parameters + ---------- + resolution: + Forwarded to :meth:`detect`. + + Returns + ------- + dict mapping community label → list of node IDs in that community. + """ + partition = self.detect(resolution=resolution) + groups: Dict[int, List[str]] = {} + for node, label in partition.items(): + groups.setdefault(label, []).append(node) + return groups + + def community_of(self, node_id: str) -> Optional[int]: + """Return the community label for *node_id*. + + Returns ``None`` if :meth:`detect` has not been called or the node + does not appear in the partition. + """ + if self._partition is None: + return None + return self._partition.get(node_id) + + # ------------------------------------------------------------------ + # Modularity + # ------------------------------------------------------------------ + + def modularity(self, resolution: float = 1.0) -> float: + """Compute the modularity score of the current (or new) partition. + + Higher modularity (closer to 1.0) indicates more clearly separated + communities; lower scores suggest poorly structured clusters. + + Parameters + ---------- + resolution: + Forwarded to :meth:`detect`. + + Returns + ------- + float modularity score. + """ + partition = self.detect(resolution=resolution) + if not partition: + return 0.0 + undirected = self._undirected() + return community_louvain.modularity(partition, undirected, weight="weight") + + # ------------------------------------------------------------------ + # Suspicious community indicators + # ------------------------------------------------------------------ + + def suspicious_communities( + self, + min_size: int = 2, + max_size: int = 50, + resolution: float = 1.0, + ) -> List[Dict]: + """Identify communities that exhibit shell-company warning signs. + + A community is flagged as suspicious when: + - Its size is in the range [*min_size*, *max_size*], which filters + out trivial singletons and very large legitimate conglomerates. + - It contains at least one entity of type ``"company"`` and at least + one ``"individual"`` (typical ownership structure). + - OR it consists entirely of ``"company"`` nodes with no individuals + (layers of holding companies with no traceable UBO). + + Parameters + ---------- + min_size: + Minimum community size to consider. + max_size: + Maximum community size to consider. + resolution: + Forwarded to :meth:`detect`. + + Returns + ------- + list of dicts, each with keys: + ``"community_id"``, ``"members"``, ``"size"``, + ``"has_individuals"``, ``"has_companies"``, ``"reason"``. + """ + communities = self.communities(resolution=resolution) + suspicious = [] + + for label, members in communities.items(): + size = len(members) + if size < min_size or size > max_size: + continue + + types = [ + self.graph.nodes[m].get("entity_type", "unknown") + for m in members + ] + has_individuals = any(t == "individual" for t in types) + has_companies = any(t == "company" for t in types) + all_companies = all(t == "company" for t in types) + + reasons = [] + if all_companies and size > 1: + reasons.append( + "Community contains only company nodes with no traceable UBO" + ) + elif has_companies and not has_individuals and size > 1: + # Mixed companies/unknown-type entities but no individual UBO + reasons.append( + "No individual beneficial owners in the ownership community" + ) + + if reasons: + suspicious.append( + { + "community_id": label, + "members": members, + "size": size, + "has_individuals": has_individuals, + "has_companies": has_companies, + "reason": "; ".join(reasons), + } + ) + + return suspicious diff --git a/kyb_graph_analytics/entity_resolution.py b/kyb_graph_analytics/entity_resolution.py new file mode 100644 index 0000000..1421f15 --- /dev/null +++ b/kyb_graph_analytics/entity_resolution.py @@ -0,0 +1,252 @@ +""" +entity_resolution.py +-------------------- +Identify and merge duplicate or alias entity records in a KYB/AML graph. + +Shell-company schemes frequently use slight name variations (typos, +abbreviations, transliterations) to mask that multiple records refer to +the same real-world entity. This module provides: + +* ``EntityResolver`` – fuzzy string-similarity matching that groups + candidate duplicate entities and can collapse them in the graph. + +The similarity metric is token-sort-ratio computed over the *name* +attribute of each node, falling back to the node ID when the attribute is +absent. The implementation intentionally avoids heavy ML dependencies so +the library can run without GPU resources. +""" + +from __future__ import annotations + +import unicodedata +import re +from typing import Dict, List, Optional, Set, Tuple + +import networkx as nx + + +# --------------------------------------------------------------------------- +# String normalisation helpers +# --------------------------------------------------------------------------- + +def _normalise(text: str) -> str: + """Lower-case, strip accents, collapse whitespace.""" + # Strip unicode accents + nfkd = unicodedata.normalize("NFKD", text) + ascii_text = nfkd.encode("ascii", "ignore").decode("ascii") + # Lower-case and collapse non-alphanumeric runs to single space + cleaned = re.sub(r"[^a-z0-9]+", " ", ascii_text.lower()).strip() + return cleaned + + +def _token_sort_ratio(a: str, b: str) -> float: + """Compute a token-sort similarity ratio between two strings. + + Tokens in both strings are sorted alphabetically and joined before + comparison, making the metric order-invariant. Returns a float in + [0.0, 1.0]. + """ + tokens_a = sorted(_normalise(a).split()) + tokens_b = sorted(_normalise(b).split()) + joined_a = " ".join(tokens_a) + joined_b = " ".join(tokens_b) + + if not joined_a and not joined_b: + return 1.0 + if not joined_a or not joined_b: + return 0.0 + + # Longest common subsequence length as similarity proxy + lcs_len = _lcs_length(joined_a, joined_b) + return 2 * lcs_len / (len(joined_a) + len(joined_b)) + + +def _lcs_length(s: str, t: str) -> int: + """Iterative LCS length computation (space-optimised).""" + m, n = len(s), len(t) + if m > n: + s, t, m, n = t, s, n, m + # Use two rows + prev = [0] * (m + 1) + curr = [0] * (m + 1) + for j in range(1, n + 1): + for i in range(1, m + 1): + if t[j - 1] == s[i - 1]: + curr[i] = prev[i - 1] + 1 + else: + curr[i] = max(curr[i - 1], prev[i]) + prev, curr = curr, [0] * (m + 1) + return prev[m] + + +# --------------------------------------------------------------------------- +# EntityResolver +# --------------------------------------------------------------------------- + +class EntityResolver: + """Detect and optionally merge duplicate entity nodes in a graph. + + Parameters + ---------- + graph: + A NetworkX graph whose nodes may carry a ``"name"`` attribute used + for similarity comparison. + threshold: + Minimum similarity score (0.0–1.0) to consider two entities as + potential duplicates. Default is ``0.85``. + name_attr: + Node attribute to use as the canonical name for comparison. + Defaults to ``"name"``; falls back to the node ID when absent. + """ + + def __init__( + self, + graph: nx.Graph, + threshold: float = 0.85, + name_attr: str = "name", + ) -> None: + if not 0.0 <= threshold <= 1.0: + raise ValueError("threshold must be between 0.0 and 1.0") + self.graph = graph + self.threshold = threshold + self.name_attr = name_attr + + # ------------------------------------------------------------------ + # Label extraction + # ------------------------------------------------------------------ + + def _label(self, node_id: str) -> str: + """Return the comparison label for a node.""" + return str(self.graph.nodes[node_id].get(self.name_attr, node_id)) + + # ------------------------------------------------------------------ + # Duplicate candidate detection + # ------------------------------------------------------------------ + + def find_duplicates(self) -> List[Tuple[str, str, float]]: + """Return all pairs of nodes with similarity >= threshold. + + Returns + ------- + list of ``(node_a, node_b, similarity_score)`` tuples, sorted by + descending score. + """ + nodes = list(self.graph.nodes()) + candidates: List[Tuple[str, str, float]] = [] + + for i, a in enumerate(nodes): + for b in nodes[i + 1 :]: + score = _token_sort_ratio(self._label(a), self._label(b)) + if score >= self.threshold: + candidates.append((a, b, score)) + + return sorted(candidates, key=lambda x: x[2], reverse=True) + + def duplicate_groups(self) -> List[List[str]]: + """Return groups of mutually similar entities using union-find. + + Returns + ------- + list of groups, where each group is a list of node IDs that are + considered the same real-world entity. + """ + pairs = self.find_duplicates() + parent: Dict[str, str] = {n: n for n in self.graph.nodes()} + + def find(x: str) -> str: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x: str, y: str) -> None: + parent[find(x)] = find(y) + + for a, b, _ in pairs: + union(a, b) + + groups: Dict[str, List[str]] = {} + for node in self.graph.nodes(): + root = find(node) + groups.setdefault(root, []).append(node) + + return [g for g in groups.values() if len(g) > 1] + + # ------------------------------------------------------------------ + # Graph merging + # ------------------------------------------------------------------ + + def merge_duplicates( + self, + groups: Optional[List[List[str]]] = None, + ) -> nx.Graph: + """Return a new graph where each group of duplicates is merged into + a single canonical node. + + The canonical node for each group is the one with the longest name + attribute (or the first alphabetically if lengths are equal). All + edges from/to merged nodes are redirected to the canonical node. + Self-loops introduced by merging are removed. + + Parameters + ---------- + groups: + Explicit list of duplicate groups. When *None* (default), + :meth:`duplicate_groups` is called automatically. + + Returns + ------- + A new NetworkX graph (same type as ``self.graph``) with duplicates + merged. + """ + if groups is None: + groups = self.duplicate_groups() + + # Build a mapping: old_node → canonical_node + merge_map: Dict[str, str] = {} + for group in groups: + canonical = max(group, key=lambda n: len(self._label(n))) + for node in group: + merge_map[node] = canonical + + # Relabel nodes in a copy of the graph + merged = nx.relabel_nodes(self.graph, merge_map, copy=True) + # Remove self-loops introduced by merging + merged.remove_edges_from(list(nx.selfloop_edges(merged))) + return merged + + # ------------------------------------------------------------------ + # Convenience report + # ------------------------------------------------------------------ + + def resolution_report(self) -> List[Dict]: + """Return a human-readable list of detected duplicate groups. + + Returns + ------- + list of dicts with keys: + ``"canonical"``, ``"aliases"``, ``"similarity_pairs"``. + """ + groups = self.duplicate_groups() + pairs = { + frozenset((a, b)): score + for a, b, score in self.find_duplicates() + } + report = [] + for group in groups: + canonical = max(group, key=lambda n: len(self._label(n))) + aliases = [n for n in group if n != canonical] + sim_pairs = [ + {"a": a, "b": b, "score": pairs[frozenset((a, b))]} + for a in group + for b in group + if a < b and frozenset((a, b)) in pairs + ] + report.append( + { + "canonical": canonical, + "aliases": aliases, + "similarity_pairs": sim_pairs, + } + ) + return report diff --git a/kyb_graph_analytics/graph_builder.py b/kyb_graph_analytics/graph_builder.py new file mode 100644 index 0000000..a145255 --- /dev/null +++ b/kyb_graph_analytics/graph_builder.py @@ -0,0 +1,263 @@ +""" +graph_builder.py +---------------- +Build directed, weighted ownership / relationship graphs from structured +entity data for KYB/AML investigations. + +Entities represent companies, individuals, accounts, or other nodes. +Edges represent ownership, control, or transactional relationships. +""" + +from __future__ import annotations + +from typing import Any, Dict, Iterable, List, Optional, Tuple + +import networkx as nx + + +class GraphBuilder: + """Construct and manage an ownership/relationship graph. + + Parameters + ---------- + directed: + When *True* (default) the graph is a ``DiGraph``; otherwise it is + an undirected ``Graph``. Ownership relationships are inherently + directional, so ``directed=True`` is strongly recommended. + """ + + def __init__(self, directed: bool = True) -> None: + self._directed = directed + self.graph: nx.DiGraph | nx.Graph = ( + nx.DiGraph() if directed else nx.Graph() + ) + + # ------------------------------------------------------------------ + # Node management + # ------------------------------------------------------------------ + + def add_entity( + self, + entity_id: str, + entity_type: str = "unknown", + **attributes: Any, + ) -> None: + """Add a single entity node to the graph. + + Parameters + ---------- + entity_id: + Unique identifier for the entity (e.g. company registration + number, person ID). + entity_type: + Semantic type label: ``"company"``, ``"individual"``, + ``"account"``, etc. + **attributes: + Arbitrary extra node attributes (name, jurisdiction, …). + """ + self.graph.add_node( + entity_id, + entity_type=entity_type, + **attributes, + ) + + def add_entities(self, entities: Iterable[Dict[str, Any]]) -> None: + """Bulk-add entities from an iterable of attribute dicts. + + Each dict must contain an ``"id"`` key; an optional + ``"entity_type"`` key is recognised as a special attribute. + + Parameters + ---------- + entities: + Iterable of dicts, each with at minimum ``{"id": ""}``. + """ + for entity in entities: + entity = dict(entity) + entity_id = entity.pop("id") + entity_type = entity.pop("entity_type", "unknown") + self.add_entity(entity_id, entity_type=entity_type, **entity) + + # ------------------------------------------------------------------ + # Edge management + # ------------------------------------------------------------------ + + def add_relationship( + self, + source_id: str, + target_id: str, + relationship_type: str = "owns", + weight: float = 1.0, + **attributes: Any, + ) -> None: + """Add a directed relationship edge between two entities. + + Parameters + ---------- + source_id: + The entity that *owns* or *controls* the target. + target_id: + The entity that is owned or controlled. + relationship_type: + Semantic label: ``"owns"``, ``"controls"``, ``"transacts"``, + ``"directs"``, etc. + weight: + Ownership stake (0.0–1.0) or transaction volume. Defaults to + ``1.0`` (full ownership / single connection). + **attributes: + Extra edge attributes stored verbatim. + """ + self.graph.add_edge( + source_id, + target_id, + relationship_type=relationship_type, + weight=weight, + **attributes, + ) + + def add_relationships( + self, relationships: Iterable[Dict[str, Any]] + ) -> None: + """Bulk-add relationships from an iterable of attribute dicts. + + Each dict must contain ``"source"`` and ``"target"`` keys. + Optional keys: ``"relationship_type"``, ``"weight"``. + + Parameters + ---------- + relationships: + Iterable of dicts describing edges. + """ + for rel in relationships: + rel = dict(rel) + source = rel.pop("source") + target = rel.pop("target") + rel_type = rel.pop("relationship_type", "owns") + weight = rel.pop("weight", 1.0) + self.add_relationship(source, target, rel_type, weight, **rel) + + # ------------------------------------------------------------------ + # Graph-level helpers + # ------------------------------------------------------------------ + + def from_edge_list( + self, + edges: Iterable[Tuple[str, str]], + relationship_type: str = "owns", + weight: float = 1.0, + ) -> None: + """Populate the graph from a bare list of (source, target) tuples. + + Nodes that do not yet exist are created automatically with + ``entity_type="unknown"``. + + Parameters + ---------- + edges: + Iterable of ``(source_id, target_id)`` pairs. + relationship_type: + Default relationship type applied to all edges. + weight: + Default weight applied to all edges. + """ + for source, target in edges: + if source not in self.graph: + self.add_entity(source) + if target not in self.graph: + self.add_entity(target) + self.add_relationship(source, target, relationship_type, weight) + + def get_subgraph(self, node_ids: List[str]) -> nx.DiGraph | nx.Graph: + """Return a node-induced subgraph for the given entity IDs.""" + return self.graph.subgraph(node_ids).copy() + + def ownership_chain(self, entity_id: str) -> List[str]: + """Return all ancestors of *entity_id* in the ownership hierarchy. + + In a directed graph where edges go from owner → owned, ancestors + are the upstream owners reachable from the node via *predecessors*. + + Parameters + ---------- + entity_id: + The entity to trace ownership for. + + Returns + ------- + list of str + Ancestor entity IDs, excluding *entity_id* itself. + """ + if not self._directed: + raise ValueError( + "ownership_chain() is only meaningful on a directed graph." + ) + return [ + n + for n in nx.ancestors(self.graph, entity_id) + if n != entity_id + ] + + def subsidiaries(self, entity_id: str) -> List[str]: + """Return all descendants of *entity_id* (companies it owns). + + Parameters + ---------- + entity_id: + The parent entity. + + Returns + ------- + list of str + Descendant entity IDs. + """ + if not self._directed: + raise ValueError( + "subsidiaries() is only meaningful on a directed graph." + ) + return list(nx.descendants(self.graph, entity_id)) + + def detect_cycles(self) -> List[List[str]]: + """Return all simple cycles in the graph. + + Circular ownership (company A owns B owns C owns A) is a strong + indicator of a shell structure. + + Returns + ------- + list of list of str + Each inner list is one cycle, represented as a sequence of + node IDs. + """ + if self._directed: + return list(nx.simple_cycles(self.graph)) + return [] + + # ------------------------------------------------------------------ + # Statistics + # ------------------------------------------------------------------ + + @property + def node_count(self) -> int: + """Total number of entity nodes.""" + return self.graph.number_of_nodes() + + @property + def edge_count(self) -> int: + """Total number of relationship edges.""" + return self.graph.number_of_edges() + + def summary(self) -> Dict[str, Any]: + """Return a dict of high-level graph statistics.""" + cycles = self.detect_cycles() + return { + "nodes": self.node_count, + "edges": self.edge_count, + "directed": self._directed, + "is_weakly_connected": ( + nx.is_weakly_connected(self.graph) + if self._directed and self.node_count > 0 + else None + ), + "cycle_count": len(cycles), + "cycles": cycles, + } diff --git a/kyb_graph_analytics/shell_company_detector.py b/kyb_graph_analytics/shell_company_detector.py new file mode 100644 index 0000000..ab64ce7 --- /dev/null +++ b/kyb_graph_analytics/shell_company_detector.py @@ -0,0 +1,295 @@ +""" +shell_company_detector.py +-------------------------- +Composite risk scoring for shell company and hidden ownership detection. + +This module combines: + - Graph topology analysis (cycle detection, layer depth) + - PageRank and Betweenness centrality + - Louvain community detection + - Entity resolution (duplicate/alias detection) + +Each entity receives a ``risk_score`` between 0.0 and 1.0 together with +a list of ``flags`` explaining what triggered the score. Scores above +``HIGH_RISK_THRESHOLD`` (0.7) warrant immediate KYB/AML review. + +Risk factors +~~~~~~~~~~~~ ++-------------------------------------+----------+ +| Factor | Weight | ++=====================================+==========+ +| Member of circular ownership cycle | 0.40 | +| Betweenness centrality spike | 0.20 | +| PageRank significantly above mean | 0.15 | +| Many ownership layers (depth ≥ 3) | 0.15 | +| Part of suspicious community | 0.20 | +| Possible duplicate/alias entity | 0.15 | ++-------------------------------------+----------+ + +Scores are capped at 1.0. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +import networkx as nx + +from .graph_builder import GraphBuilder +from .centrality import CentralityAnalyzer +from .community_detection import CommunityDetector +from .entity_resolution import EntityResolver + +# Risk thresholds +HIGH_RISK_THRESHOLD = 0.7 +MEDIUM_RISK_THRESHOLD = 0.4 + +# Factor weights (must sum ≤ 1 each, but they stack up to 1.0) +_W_CYCLE = 0.40 +_W_BETWEENNESS = 0.20 +_W_PAGERANK = 0.15 +_W_DEPTH = 0.15 +_W_COMMUNITY = 0.20 +_W_DUPLICATE = 0.15 + + +class ShellCompanyDetector: + """Detect shell companies and hidden ownership in an entity graph. + + Parameters + ---------- + graph_builder: + A :class:`~kyb_graph_analytics.GraphBuilder` instance containing + the populated ownership graph. + pagerank_threshold_multiplier: + Nodes with PageRank > *mean × multiplier* are flagged. + Default ``2.0`` (twice the mean). + betweenness_threshold: + Absolute betweenness centrality score above which a node is + flagged as a structural bridge. Default ``0.1``. + max_community_size: + Upper bound for ``suspicious_communities()`` in community detection. + entity_resolution_threshold: + Similarity threshold forwarded to :class:`~kyb_graph_analytics.EntityResolver`. + random_state: + Seed for Louvain; set for reproducibility. + """ + + def __init__( + self, + graph_builder: GraphBuilder, + pagerank_threshold_multiplier: float = 2.0, + betweenness_threshold: float = 0.1, + max_community_size: int = 50, + entity_resolution_threshold: float = 0.85, + random_state: Optional[int] = 42, + ) -> None: + self.gb = graph_builder + self.graph = graph_builder.graph + self._pr_mult = pagerank_threshold_multiplier + self._bw_thresh = betweenness_threshold + self._max_comm_size = max_community_size + self._er_thresh = entity_resolution_threshold + self._random_state = random_state + + # Sub-analysers (lazy initialised) + self._centrality: Optional[CentralityAnalyzer] = None + self._community: Optional[CommunityDetector] = None + self._resolver: Optional[EntityResolver] = None + + # ------------------------------------------------------------------ + # Lazy accessor properties + # ------------------------------------------------------------------ + + @property + def centrality(self) -> CentralityAnalyzer: + if self._centrality is None: + self._centrality = CentralityAnalyzer(self.graph) + return self._centrality + + @property + def community_detector(self) -> CommunityDetector: + if self._community is None: + self._community = CommunityDetector( + self.graph, random_state=self._random_state + ) + return self._community + + @property + def entity_resolver(self) -> EntityResolver: + if self._resolver is None: + self._resolver = EntityResolver( + self.graph, threshold=self._er_thresh + ) + return self._resolver + + # ------------------------------------------------------------------ + # Pre-computed sets (populated in analyse()) + # ------------------------------------------------------------------ + + def _build_cycle_set(self) -> set: + """Return the set of node IDs participating in at least one cycle.""" + members: set = set() + for cycle in self.gb.detect_cycles(): + members.update(cycle) + return members + + def _build_suspicious_community_set(self) -> set: + """Return the set of node IDs in suspicious communities.""" + members: set = set() + for comm in self.community_detector.suspicious_communities( + max_size=self._max_comm_size + ): + members.update(comm["members"]) + return members + + def _build_duplicate_set(self) -> set: + """Return the set of node IDs flagged as potential duplicates.""" + members: set = set() + for group in self.entity_resolver.duplicate_groups(): + members.update(group) + return members + + def _ownership_depth(self, node_id: str) -> int: + """Return the number of ownership layers above *node_id*.""" + try: + return len(self.gb.ownership_chain(node_id)) + except Exception: + return 0 + + # ------------------------------------------------------------------ + # Core analysis + # ------------------------------------------------------------------ + + def analyse(self) -> List[Dict[str, Any]]: + """Run full shell-company detection and return scored entity records. + + Returns + ------- + list of dicts, one per graph node, with keys: + ``"entity_id"``, ``"entity_type"``, ``"risk_score"``, + ``"risk_level"``, ``"flags"``. + + Sorted by descending ``risk_score``. + """ + if self.graph.number_of_nodes() == 0: + return [] + + # Pre-compute sets and scores + cycle_nodes = self._build_cycle_set() + susp_community_nodes = self._build_suspicious_community_set() + duplicate_nodes = self._build_duplicate_set() + + pr_scores = self.centrality.pagerank() + bw_scores = self.centrality.betweenness_centrality() + + pr_mean = ( + sum(pr_scores.values()) / len(pr_scores) if pr_scores else 0.0 + ) + pr_threshold = pr_mean * self._pr_mult + + results = [] + for node in self.graph.nodes(): + node_data = self.graph.nodes[node] + flags: List[str] = [] + score = 0.0 + + # 1. Circular ownership + if node in cycle_nodes: + flags.append("Participates in circular ownership cycle") + score += _W_CYCLE + + # 2. High betweenness (structural bridge) + bw = bw_scores.get(node, 0.0) + if bw > self._bw_thresh: + flags.append( + f"High betweenness centrality ({bw:.3f} > {self._bw_thresh})" + ) + score += _W_BETWEENNESS + + # 3. Elevated PageRank + pr = pr_scores.get(node, 0.0) + if pr > pr_threshold and pr_threshold > 0: + flags.append( + f"PageRank ({pr:.4f}) exceeds 2× mean ({pr_mean:.4f})" + ) + score += _W_PAGERANK + + # 4. Deep ownership chain + depth = self._ownership_depth(node) + if depth >= 3: + flags.append( + f"Deep ownership chain ({depth} layers above this entity)" + ) + score += _W_DEPTH + + # 5. Suspicious community membership + if node in susp_community_nodes: + flags.append( + "Member of a community with no traceable individual UBO" + ) + score += _W_COMMUNITY + + # 6. Potential duplicate / alias + if node in duplicate_nodes: + flags.append( + "Possible duplicate or alias of another entity" + ) + score += _W_DUPLICATE + + # Cap at 1.0 + score = min(score, 1.0) + + risk_level = ( + "high" + if score >= HIGH_RISK_THRESHOLD + else ("medium" if score >= MEDIUM_RISK_THRESHOLD else "low") + ) + + results.append( + { + "entity_id": node, + "entity_type": node_data.get("entity_type", "unknown"), + "risk_score": round(score, 4), + "risk_level": risk_level, + "flags": flags, + } + ) + + return sorted(results, key=lambda r: r["risk_score"], reverse=True) + + # ------------------------------------------------------------------ + # Convenience summaries + # ------------------------------------------------------------------ + + def high_risk_entities(self) -> List[Dict[str, Any]]: + """Return only entities classified as high risk (score ≥ 0.7).""" + return [r for r in self.analyse() if r["risk_level"] == "high"] + + def summary_report(self) -> Dict[str, Any]: + """Return an aggregate summary of the analysis. + + Returns + ------- + dict with keys: + ``"total_entities"``, ``"high_risk"``, ``"medium_risk"``, + ``"low_risk"``, ``"cycle_count"``, ``"modularity"``, + ``"duplicate_groups"``, ``"top_risks"``. + """ + results = self.analyse() + graph_summary = self.gb.summary() + + high = [r for r in results if r["risk_level"] == "high"] + medium = [r for r in results if r["risk_level"] == "medium"] + low = [r for r in results if r["risk_level"] == "low"] + + return { + "total_entities": len(results), + "high_risk": len(high), + "medium_risk": len(medium), + "low_risk": len(low), + "cycle_count": graph_summary["cycle_count"], + "modularity": round(self.community_detector.modularity(), 4), + "duplicate_groups": len(self.entity_resolver.duplicate_groups()), + "top_risks": results[:5], + } diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..00a5d98 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +networkx>=3.0 +python-louvain>=0.16 +numpy>=1.24 +scipy>=1.10 # required by networkx>=3.0 for pagerank computation diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1a997a0 --- /dev/null +++ b/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup, find_packages + +setup( + name="kyb-graph-analytics", + version="0.1.0", + description="Graph-based fraud detection for KYB/AML: shell company and hidden ownership detection", + packages=find_packages(exclude=["tests*"]), + python_requires=">=3.8", + install_requires=[ + "networkx>=3.0", + "python-louvain>=0.16", + "numpy>=1.24", + "scipy>=1.10", + ], +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_centrality.py b/tests/test_centrality.py new file mode 100644 index 0000000..51e8509 --- /dev/null +++ b/tests/test_centrality.py @@ -0,0 +1,152 @@ +"""Tests for CentralityAnalyzer.""" + +import pytest +import networkx as nx + +from kyb_graph_analytics.graph_builder import GraphBuilder +from kyb_graph_analytics.centrality import CentralityAnalyzer + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def star_graph(): + """Hub-and-spoke: hub -> s1, s2, s3, s4. Hub should have high PR.""" + gb = GraphBuilder() + gb.add_entity("hub", entity_type="company") + for i in range(1, 5): + gb.add_entity(f"s{i}", entity_type="company") + gb.add_relationship("hub", f"s{i}") + return gb + + +@pytest.fixture +def chain_graph(): + """Linear chain: a -> b -> c -> d -> e. Middle nodes have high BC.""" + gb = GraphBuilder() + nodes = list("abcde") + for n in nodes: + gb.add_entity(n, entity_type="company") + for src, tgt in zip(nodes, nodes[1:]): + gb.add_relationship(src, tgt) + return gb + + +@pytest.fixture +def empty_graph(): + return GraphBuilder() + + +# --------------------------------------------------------------------------- +# PageRank +# --------------------------------------------------------------------------- + +class TestPageRank: + def test_returns_all_nodes(self, star_graph): + ca = CentralityAnalyzer(star_graph.graph) + pr = ca.pagerank() + assert set(pr.keys()) == set(star_graph.graph.nodes()) + + def test_scores_sum_to_one(self, star_graph): + ca = CentralityAnalyzer(star_graph.graph) + pr = ca.pagerank() + assert abs(sum(pr.values()) - 1.0) < 1e-4 + + def test_spokes_have_higher_pr_than_hub_in_directed_star(self, star_graph): + # In a directed star (hub -> s1..s4), spokes *receive* inbound links so + # they accumulate more PageRank than the hub (which has no inbound edges). + ca = CentralityAnalyzer(star_graph.graph) + pr = ca.pagerank() + spoke_avg = sum(pr[f"s{i}"] for i in range(1, 5)) / 4 + assert spoke_avg > pr["hub"] + + def test_empty_graph_returns_empty(self, empty_graph): + ca = CentralityAnalyzer(empty_graph.graph) + assert ca.pagerank() == {} + + +# --------------------------------------------------------------------------- +# Betweenness Centrality +# --------------------------------------------------------------------------- + +class TestBetweennessCentrality: + def test_returns_all_nodes(self, chain_graph): + ca = CentralityAnalyzer(chain_graph.graph) + bc = ca.betweenness_centrality() + assert set(bc.keys()) == set(chain_graph.graph.nodes()) + + def test_middle_nodes_have_higher_bc(self, chain_graph): + ca = CentralityAnalyzer(chain_graph.graph) + bc = ca.betweenness_centrality() + # In a -> b -> c -> d -> e, 'c' is the true midpoint + assert bc["c"] >= bc["a"] + assert bc["c"] >= bc["e"] + + def test_all_scores_in_range(self, chain_graph): + ca = CentralityAnalyzer(chain_graph.graph) + bc = ca.betweenness_centrality() + for score in bc.values(): + assert 0.0 <= score <= 1.0 + + def test_empty_graph_returns_empty(self, empty_graph): + ca = CentralityAnalyzer(empty_graph.graph) + assert ca.betweenness_centrality() == {} + + +# --------------------------------------------------------------------------- +# Degree Centrality +# --------------------------------------------------------------------------- + +class TestDegreeCentrality: + def test_in_degree_nonempty(self, star_graph): + ca = CentralityAnalyzer(star_graph.graph) + in_deg = ca.in_degree_centrality() + assert set(in_deg.keys()) == set(star_graph.graph.nodes()) + # Spokes receive edges, hub does not + spoke_in = in_deg["s1"] + hub_in = in_deg["hub"] + assert spoke_in > hub_in + + def test_out_degree_hub_highest(self, star_graph): + ca = CentralityAnalyzer(star_graph.graph) + out_deg = ca.out_degree_centrality() + assert out_deg["hub"] == max(out_deg.values()) + + def test_empty_graph_returns_empty(self, empty_graph): + ca = CentralityAnalyzer(empty_graph.graph) + assert ca.in_degree_centrality() == {} + assert ca.out_degree_centrality() == {} + + +# --------------------------------------------------------------------------- +# Combined scores +# --------------------------------------------------------------------------- + +class TestAllCentralityScores: + def test_combined_keys(self, chain_graph): + ca = CentralityAnalyzer(chain_graph.graph) + all_scores = ca.all_centrality_scores() + for node in chain_graph.graph.nodes(): + assert node in all_scores + assert set(all_scores[node].keys()) == { + "pagerank", "betweenness", "in_degree", "out_degree" + } + + def test_top_nodes(self, chain_graph): + ca = CentralityAnalyzer(chain_graph.graph) + top = ca.top_nodes(measure="betweenness", n=3) + assert len(top) == 3 + # Results should be sorted descending + assert top[0][1] >= top[1][1] >= top[2][1] + + def test_top_nodes_invalid_measure(self, chain_graph): + ca = CentralityAnalyzer(chain_graph.graph) + with pytest.raises(ValueError, match="Unknown measure"): + ca.top_nodes(measure="invalid") + + def test_top_nodes_capped_at_n(self, star_graph): + ca = CentralityAnalyzer(star_graph.graph) + top = ca.top_nodes(measure="pagerank", n=2) + assert len(top) == 2 diff --git a/tests/test_community_detection.py b/tests/test_community_detection.py new file mode 100644 index 0000000..ef4b7b3 --- /dev/null +++ b/tests/test_community_detection.py @@ -0,0 +1,161 @@ +"""Tests for CommunityDetector.""" + +import pytest +import networkx as nx + +from kyb_graph_analytics.graph_builder import GraphBuilder +from kyb_graph_analytics.community_detection import CommunityDetector + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def two_cluster_graph(): + """Two clearly separated cliques with a single bridging edge. + + Cluster 1: c1a, c1b, c1c (all companies) + Cluster 2: c2a, c2b, c2c (all companies) + Bridge: c1c -> c2a + """ + gb = GraphBuilder() + for node in ["c1a", "c1b", "c1c"]: + gb.add_entity(node, entity_type="company") + for node in ["c2a", "c2b", "c2c"]: + gb.add_entity(node, entity_type="company") + # Dense intra-cluster edges + gb.add_relationship("c1a", "c1b") + gb.add_relationship("c1b", "c1c") + gb.add_relationship("c1a", "c1c") + gb.add_relationship("c2a", "c2b") + gb.add_relationship("c2b", "c2c") + gb.add_relationship("c2a", "c2c") + # Bridge + gb.add_relationship("c1c", "c2a") + return gb.graph + + +@pytest.fixture +def mixed_cluster_graph(): + """A community with individuals and companies (not suspicious) plus + one company-only community (suspicious).""" + gb = GraphBuilder() + # Mixed community + gb.add_entity("alice", entity_type="individual", name="Alice") + gb.add_entity("alpha_llc", entity_type="company", name="Alpha LLC") + gb.add_relationship("alice", "alpha_llc") + # Company-only community + gb.add_entity("shell1", entity_type="company", name="Shell One") + gb.add_entity("shell2", entity_type="company", name="Shell Two") + gb.add_relationship("shell1", "shell2") + return gb.graph + + +@pytest.fixture +def empty_graph(): + return GraphBuilder().graph + + +# --------------------------------------------------------------------------- +# Partition / detect +# --------------------------------------------------------------------------- + +class TestDetect: + def test_returns_partition_dict(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + partition = cd.detect() + assert isinstance(partition, dict) + assert set(partition.keys()) == set(two_cluster_graph.nodes()) + + def test_all_values_are_ints(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + partition = cd.detect() + assert all(isinstance(v, int) for v in partition.values()) + + def test_empty_graph_returns_empty(self, empty_graph): + cd = CommunityDetector(empty_graph) + assert cd.detect() == {} + + def test_partition_cached_after_detect(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + p1 = cd.detect() + assert cd.partition is p1 + + +# --------------------------------------------------------------------------- +# Communities grouping +# --------------------------------------------------------------------------- + +class TestCommunities: + def test_returns_dict_of_lists(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + comms = cd.communities() + assert isinstance(comms, dict) + # All members are graph nodes + all_members = {n for members in comms.values() for n in members} + assert all_members == set(two_cluster_graph.nodes()) + + def test_community_of_known_node(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + cd.detect() + label = cd.community_of("c1a") + assert isinstance(label, int) + + def test_community_of_before_detect_returns_none(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + assert cd.community_of("c1a") is None + + def test_community_of_unknown_node_returns_none(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + cd.detect() + assert cd.community_of("nonexistent") is None + + +# --------------------------------------------------------------------------- +# Modularity +# --------------------------------------------------------------------------- + +class TestModularity: + def test_modularity_is_float(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + mod = cd.modularity() + assert isinstance(mod, float) + + def test_modularity_in_valid_range(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + mod = cd.modularity() + # Modularity for a non-degenerate partition is typically in (-1, 1) + assert -1.0 <= mod <= 1.0 + + def test_empty_graph_modularity_zero(self, empty_graph): + cd = CommunityDetector(empty_graph) + assert cd.modularity() == 0.0 + + +# --------------------------------------------------------------------------- +# Suspicious communities +# --------------------------------------------------------------------------- + +class TestSuspiciousCommunities: + def test_flags_company_only_communities(self, mixed_cluster_graph): + cd = CommunityDetector(mixed_cluster_graph) + suspicious = cd.suspicious_communities() + # The shell1/shell2 community should be flagged + flagged_members = {m for c in suspicious for m in c["members"]} + assert "shell1" in flagged_members or "shell2" in flagged_members + + def test_result_has_expected_keys(self, mixed_cluster_graph): + cd = CommunityDetector(mixed_cluster_graph) + suspicious = cd.suspicious_communities() + for item in suspicious: + assert "community_id" in item + assert "members" in item + assert "size" in item + assert "reason" in item + + def test_min_size_filter(self, two_cluster_graph): + cd = CommunityDetector(two_cluster_graph) + # With min_size larger than total nodes, nothing is flagged + suspicious = cd.suspicious_communities(min_size=100) + assert suspicious == [] diff --git a/tests/test_entity_resolution.py b/tests/test_entity_resolution.py new file mode 100644 index 0000000..cabeb43 --- /dev/null +++ b/tests/test_entity_resolution.py @@ -0,0 +1,186 @@ +"""Tests for EntityResolver.""" + +import pytest +import networkx as nx + +from kyb_graph_analytics.entity_resolution import ( + EntityResolver, + _normalise, + _token_sort_ratio, + _lcs_length, +) + + +# --------------------------------------------------------------------------- +# String utility tests +# --------------------------------------------------------------------------- + +class TestNormalise: + def test_lowercase(self): + assert _normalise("HELLO") == "hello" + + def test_strips_accents(self): + assert _normalise("café") == "cafe" + + def test_collapses_whitespace(self): + assert _normalise(" a b ") == "a b" + + def test_removes_punctuation(self): + assert _normalise("Ltd.") == "ltd" + + +class TestTokenSortRatio: + def test_identical_strings(self): + assert _token_sort_ratio("Alpha Corp", "Alpha Corp") == 1.0 + + def test_order_invariant(self): + s1 = _token_sort_ratio("Corp Alpha", "Alpha Corp") + s2 = _token_sort_ratio("Alpha Corp", "Corp Alpha") + assert s1 == s2 + + def test_similar_strings(self): + score = _token_sort_ratio("Alpha Holdings Ltd", "Alpha Holdings Limited") + assert score > 0.8 + + def test_completely_different_strings(self): + score = _token_sort_ratio("Alpha Corp", "XYZ Ventures") + assert score < 0.5 + + def test_both_empty(self): + assert _token_sort_ratio("", "") == 1.0 + + def test_one_empty(self): + assert _token_sort_ratio("Alpha", "") == 0.0 + + +class TestLcsLength: + def test_identical(self): + assert _lcs_length("abc", "abc") == 3 + + def test_no_common(self): + assert _lcs_length("abc", "xyz") == 0 + + def test_partial(self): + assert _lcs_length("abcde", "ace") == 3 + + +# --------------------------------------------------------------------------- +# EntityResolver fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def graph_with_duplicates(): + """Graph containing obvious name duplicates.""" + g = nx.DiGraph() + g.add_node("e1", entity_type="company", name="Alpha Holdings Ltd") + g.add_node("e2", entity_type="company", name="Alpha Holdings Limited") + g.add_node("e3", entity_type="individual", name="John Smith") + g.add_node("e4", entity_type="individual", name="Jon Smith") + g.add_node("e5", entity_type="company", name="Completely Different Corp") + g.add_edge("e3", "e1") + g.add_edge("e4", "e2") + return g + + +@pytest.fixture +def graph_no_duplicates(): + g = nx.DiGraph() + g.add_node("a", entity_type="company", name="Alpha Corp") + g.add_node("b", entity_type="individual", name="Bob Jones") + g.add_node("c", entity_type="company", name="Zeta Industries") + return g + + +# --------------------------------------------------------------------------- +# find_duplicates +# --------------------------------------------------------------------------- + +class TestFindDuplicates: + def test_detects_near_identical_names(self, graph_with_duplicates): + er = EntityResolver(graph_with_duplicates, threshold=0.80) + dupes = er.find_duplicates() + pairs = {(a, b) for a, b, _ in dupes} + assert ("e1", "e2") in pairs or ("e2", "e1") in pairs + + def test_no_false_positives_on_distinct_entities(self, graph_no_duplicates): + er = EntityResolver(graph_no_duplicates, threshold=0.85) + dupes = er.find_duplicates() + assert dupes == [] + + def test_scores_sorted_descending(self, graph_with_duplicates): + er = EntityResolver(graph_with_duplicates, threshold=0.70) + dupes = er.find_duplicates() + if len(dupes) > 1: + for i in range(len(dupes) - 1): + assert dupes[i][2] >= dupes[i + 1][2] + + def test_invalid_threshold_raises(self): + g = nx.DiGraph() + with pytest.raises(ValueError, match="threshold"): + EntityResolver(g, threshold=1.5) + + +# --------------------------------------------------------------------------- +# duplicate_groups +# --------------------------------------------------------------------------- + +class TestDuplicateGroups: + def test_groups_are_lists(self, graph_with_duplicates): + er = EntityResolver(graph_with_duplicates, threshold=0.80) + groups = er.duplicate_groups() + assert isinstance(groups, list) + for g in groups: + assert isinstance(g, list) + assert len(g) >= 2 + + def test_no_groups_on_distinct_graph(self, graph_no_duplicates): + er = EntityResolver(graph_no_duplicates, threshold=0.85) + groups = er.duplicate_groups() + assert groups == [] + + +# --------------------------------------------------------------------------- +# merge_duplicates +# --------------------------------------------------------------------------- + +class TestMergeDuplicates: + def test_merged_graph_has_fewer_nodes(self, graph_with_duplicates): + er = EntityResolver(graph_with_duplicates, threshold=0.80) + merged = er.merge_duplicates() + assert merged.number_of_nodes() < graph_with_duplicates.number_of_nodes() + + def test_no_self_loops_after_merge(self, graph_with_duplicates): + er = EntityResolver(graph_with_duplicates, threshold=0.80) + merged = er.merge_duplicates() + assert list(nx.selfloop_edges(merged)) == [] + + def test_merge_with_explicit_groups(self): + g = nx.DiGraph() + g.add_node("x", name="X Corp") + g.add_node("y", name="Y Corp") + g.add_node("z", name="Z Corp") + g.add_edge("x", "z") + g.add_edge("y", "z") + er = EntityResolver(g, threshold=0.99) + # Force merge x and y + merged = er.merge_duplicates(groups=[["x", "y"]]) + # z should still exist; x and y merged to canonical + assert "z" in merged.nodes() + + +# --------------------------------------------------------------------------- +# resolution_report +# --------------------------------------------------------------------------- + +class TestResolutionReport: + def test_report_has_expected_keys(self, graph_with_duplicates): + er = EntityResolver(graph_with_duplicates, threshold=0.80) + report = er.resolution_report() + for item in report: + assert "canonical" in item + assert "aliases" in item + assert "similarity_pairs" in item + + def test_report_empty_on_no_duplicates(self, graph_no_duplicates): + er = EntityResolver(graph_no_duplicates, threshold=0.85) + assert er.resolution_report() == [] diff --git a/tests/test_graph_builder.py b/tests/test_graph_builder.py new file mode 100644 index 0000000..7a91dfa --- /dev/null +++ b/tests/test_graph_builder.py @@ -0,0 +1,163 @@ +"""Tests for GraphBuilder.""" + +import pytest +import networkx as nx + +from kyb_graph_analytics.graph_builder import GraphBuilder + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def simple_graph(): + """A small ownership graph: Alice -> HoldCo -> TargetCo.""" + gb = GraphBuilder() + gb.add_entity("alice", entity_type="individual", name="Alice Smith") + gb.add_entity("holdco", entity_type="company", name="HoldCo Ltd") + gb.add_entity("targetco", entity_type="company", name="Target Co Ltd") + gb.add_relationship("alice", "holdco", relationship_type="owns", weight=1.0) + gb.add_relationship("holdco", "targetco", relationship_type="owns", weight=0.75) + return gb + + +@pytest.fixture +def cyclic_graph(): + """A graph with a circular ownership cycle: A -> B -> C -> A.""" + gb = GraphBuilder() + for node in ["A", "B", "C"]: + gb.add_entity(node, entity_type="company") + gb.add_relationship("A", "B") + gb.add_relationship("B", "C") + gb.add_relationship("C", "A") + return gb + + +# --------------------------------------------------------------------------- +# Node tests +# --------------------------------------------------------------------------- + +class TestAddEntity: + def test_single_node_added(self, simple_graph): + assert "alice" in simple_graph.graph + + def test_node_attributes(self, simple_graph): + data = simple_graph.graph.nodes["alice"] + assert data["entity_type"] == "individual" + assert data["name"] == "Alice Smith" + + def test_bulk_add_entities(self): + gb = GraphBuilder() + entities = [ + {"id": "c1", "entity_type": "company", "name": "Corp One"}, + {"id": "c2", "entity_type": "company", "name": "Corp Two"}, + ] + gb.add_entities(entities) + assert gb.node_count == 2 + assert "c1" in gb.graph + assert gb.graph.nodes["c2"]["name"] == "Corp Two" + + def test_default_entity_type(self): + gb = GraphBuilder() + gb.add_entity("x") + assert gb.graph.nodes["x"]["entity_type"] == "unknown" + + +# --------------------------------------------------------------------------- +# Edge tests +# --------------------------------------------------------------------------- + +class TestAddRelationship: + def test_edge_exists(self, simple_graph): + assert simple_graph.graph.has_edge("alice", "holdco") + assert simple_graph.graph.has_edge("holdco", "targetco") + + def test_edge_attributes(self, simple_graph): + edge_data = simple_graph.graph["alice"]["holdco"] + assert edge_data["relationship_type"] == "owns" + assert edge_data["weight"] == 1.0 + + def test_bulk_add_relationships(self): + gb = GraphBuilder() + gb.add_entity("a") + gb.add_entity("b") + gb.add_entity("c") + gb.add_relationships([ + {"source": "a", "target": "b", "weight": 0.5}, + {"source": "b", "target": "c", "weight": 0.3}, + ]) + assert gb.edge_count == 2 + + def test_from_edge_list(self): + gb = GraphBuilder() + gb.from_edge_list([("p1", "p2"), ("p2", "p3")]) + assert gb.node_count == 3 + assert gb.edge_count == 2 + + +# --------------------------------------------------------------------------- +# Topology helpers +# --------------------------------------------------------------------------- + +class TestTopologyHelpers: + def test_ownership_chain(self, simple_graph): + chain = simple_graph.ownership_chain("targetco") + assert "alice" in chain + assert "holdco" in chain + assert "targetco" not in chain + + def test_subsidiaries(self, simple_graph): + subs = simple_graph.subsidiaries("alice") + assert "holdco" in subs + assert "targetco" in subs + + def test_detect_cycles_none(self, simple_graph): + cycles = simple_graph.detect_cycles() + assert cycles == [] + + def test_detect_cycles_present(self, cyclic_graph): + cycles = cyclic_graph.detect_cycles() + assert len(cycles) >= 1 + # All three nodes should appear in cycles + cycle_nodes = {n for c in cycles for n in c} + assert {"A", "B", "C"}.issubset(cycle_nodes) + + def test_ownership_chain_undirected_raises(self): + gb = GraphBuilder(directed=False) + gb.add_entity("x") + with pytest.raises(ValueError, match="directed"): + gb.ownership_chain("x") + + def test_subsidiaries_undirected_raises(self): + gb = GraphBuilder(directed=False) + gb.add_entity("x") + with pytest.raises(ValueError, match="directed"): + gb.subsidiaries("x") + + +# --------------------------------------------------------------------------- +# Summary +# --------------------------------------------------------------------------- + +class TestSummary: + def test_summary_keys(self, simple_graph): + s = simple_graph.summary() + for key in ("nodes", "edges", "directed", "cycle_count", "cycles"): + assert key in s + + def test_summary_values(self, simple_graph): + s = simple_graph.summary() + assert s["nodes"] == 3 + assert s["edges"] == 2 + assert s["cycle_count"] == 0 + assert s["directed"] is True + + def test_summary_cyclic(self, cyclic_graph): + s = cyclic_graph.summary() + assert s["cycle_count"] >= 1 + + def test_subgraph(self, simple_graph): + sg = simple_graph.get_subgraph(["alice", "holdco"]) + assert sg.number_of_nodes() == 2 + assert sg.has_edge("alice", "holdco") diff --git a/tests/test_shell_company_detector.py b/tests/test_shell_company_detector.py new file mode 100644 index 0000000..14536f0 --- /dev/null +++ b/tests/test_shell_company_detector.py @@ -0,0 +1,183 @@ +"""Tests for ShellCompanyDetector.""" + +import pytest + +from kyb_graph_analytics.graph_builder import GraphBuilder +from kyb_graph_analytics.shell_company_detector import ( + ShellCompanyDetector, + HIGH_RISK_THRESHOLD, + MEDIUM_RISK_THRESHOLD, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def clean_graph(): + """A simple, transparent ownership structure with a real UBO.""" + gb = GraphBuilder() + gb.add_entity("alice", entity_type="individual", name="Alice Smith") + gb.add_entity("acme", entity_type="company", name="Acme Ltd") + gb.add_relationship("alice", "acme", weight=1.0) + return gb + + +@pytest.fixture +def shell_graph(): + """A graph with multiple shell-company indicators: + - Circular ownership: ShellA -> ShellB -> ShellC -> ShellA + - All companies, no individual UBO + - Deep chain for TargetCo + """ + gb = GraphBuilder() + for node in ["shell_a", "shell_b", "shell_c"]: + gb.add_entity(node, entity_type="company", name=node) + gb.add_entity("target_co", entity_type="company", name="Target Co") + + # Circular ownership cycle + gb.add_relationship("shell_a", "shell_b") + gb.add_relationship("shell_b", "shell_c") + gb.add_relationship("shell_c", "shell_a") + + # Deep chain to target + gb.add_relationship("shell_a", "target_co") + return gb + + +@pytest.fixture +def empty_graph(): + return GraphBuilder() + + +# --------------------------------------------------------------------------- +# analyse() +# --------------------------------------------------------------------------- + +class TestAnalyse: + def test_returns_list(self, clean_graph): + det = ShellCompanyDetector(clean_graph) + results = det.analyse() + assert isinstance(results, list) + + def test_all_entities_present(self, clean_graph): + det = ShellCompanyDetector(clean_graph) + results = det.analyse() + ids = {r["entity_id"] for r in results} + assert ids == set(clean_graph.graph.nodes()) + + def test_result_keys(self, clean_graph): + det = ShellCompanyDetector(clean_graph) + for result in det.analyse(): + assert "entity_id" in result + assert "entity_type" in result + assert "risk_score" in result + assert "risk_level" in result + assert "flags" in result + + def test_sorted_by_risk_score_descending(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + results = det.analyse() + scores = [r["risk_score"] for r in results] + assert scores == sorted(scores, reverse=True) + + def test_risk_score_in_range(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + for r in det.analyse(): + assert 0.0 <= r["risk_score"] <= 1.0 + + def test_risk_level_matches_score(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + for r in det.analyse(): + if r["risk_score"] >= HIGH_RISK_THRESHOLD: + assert r["risk_level"] == "high" + elif r["risk_score"] >= MEDIUM_RISK_THRESHOLD: + assert r["risk_level"] == "medium" + else: + assert r["risk_level"] == "low" + + def test_empty_graph_returns_empty(self, empty_graph): + det = ShellCompanyDetector(empty_graph) + assert det.analyse() == [] + + def test_cycle_nodes_are_flagged(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + results = {r["entity_id"]: r for r in det.analyse()} + # All nodes in the cycle should carry the cycle flag + for node in ["shell_a", "shell_b", "shell_c"]: + flags = results[node]["flags"] + cycle_flags = [f for f in flags if "cycle" in f.lower()] + assert len(cycle_flags) > 0 + + def test_clean_graph_lower_risk_than_shell_graph( + self, clean_graph, shell_graph + ): + clean_det = ShellCompanyDetector(clean_graph) + shell_det = ShellCompanyDetector(shell_graph) + clean_max = max(r["risk_score"] for r in clean_det.analyse()) + shell_max = max(r["risk_score"] for r in shell_det.analyse()) + assert shell_max > clean_max + + +# --------------------------------------------------------------------------- +# high_risk_entities() +# --------------------------------------------------------------------------- + +class TestHighRiskEntities: + def test_all_high_risk(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + high = det.high_risk_entities() + for r in high: + assert r["risk_score"] >= HIGH_RISK_THRESHOLD + + def test_clean_graph_no_high_risk(self, clean_graph): + det = ShellCompanyDetector(clean_graph) + high = det.high_risk_entities() + # A simple two-node clean graph should produce no high-risk entities + assert all(r["risk_score"] < HIGH_RISK_THRESHOLD for r in high) + + +# --------------------------------------------------------------------------- +# summary_report() +# --------------------------------------------------------------------------- + +class TestSummaryReport: + def test_summary_keys(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + summary = det.summary_report() + for key in ( + "total_entities", + "high_risk", + "medium_risk", + "low_risk", + "cycle_count", + "modularity", + "duplicate_groups", + "top_risks", + ): + assert key in summary + + def test_counts_sum_to_total(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + summary = det.summary_report() + assert ( + summary["high_risk"] + summary["medium_risk"] + summary["low_risk"] + == summary["total_entities"] + ) + + def test_cycle_count_in_shell_graph(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + summary = det.summary_report() + assert summary["cycle_count"] >= 1 + + def test_top_risks_length(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + summary = det.summary_report() + # top_risks contains at most 5 entries + assert len(summary["top_risks"]) <= 5 + + def test_modularity_is_numeric(self, shell_graph): + det = ShellCompanyDetector(shell_graph) + summary = det.summary_report() + assert isinstance(summary["modularity"], float)