Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@

## [Unreleased]

### Changed

- **`proxyctl trace` 的 `[4/4] 实际连接` 改为按链路聚合并给出明确结论。**
旧版逐条平铺活跃连接、且只取其中一条与规则预测对比,在改规则后的过渡态
(新连接走代理、旧连接仍复用原链路并存)下会自相矛盾地误报"预测出口 X,
实际出口 Y"。新版把相同链路的连接合并计数、累加流量,按"链路是否命中预测
出口"分类,并区分三种结论:全部命中(与规则一致)/ 全部未命中(规则可能
未生效或被前置规则截胡)/ 新旧并存(旧连接在建连时已定死链路,会随超时
自然断开,刷新后即全部走预测出口)。判定改用"预测组名是否出现在链路中"的
集合判定,消除旧版"出口"一词在告警行(取链路入口组)与详情行(取末端节点)
指代不一致的歧义。
- **`[4/4]` 连接采集改为 host 精确匹配优先。** 仅在拿不到 host(典型 fake-ip
模式)时才回退按 `destinationIP` 匹配,避免 Cloudflare 等共享 IP 把同 IP
其他站点的连接误算进当前域名。

## [0.5.11] — 2026-05-31

### Fixed
Expand Down
118 changes: 69 additions & 49 deletions src/proxyctl/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,20 +506,30 @@ def _section_connections(domain: str, resolved_ips: list,
"""[4/4] 实际连接验证 — 从 Clash API 活跃连接 + 日志历史中取路由信息。"""
print(f"\n{BOLD}[4/4] 实际连接{NC}")

domain_conns = []
host_conns = []
ip_conns = []
for _ in range(3):
data = _api_get(api, "/connections", secret)
if data and isinstance(data, dict):
for c in (data.get("connections") or []):
m = c.get("metadata", {})
h = m.get("host", "")
di = m.get("destinationIP", "")
if (h == domain or h.endswith("." + domain)
or (di and di in resolved_ips)):
domain_conns.append(c)
if domain_conns:
# sniffer/TUN 模式下 host 可能是目的 IP 或为空,真实域名落在
# sniffHost(嗅探到的 SNI / HTTP Host)。两字段独立判断、不互相
# 遮蔽:任一匹配即纳入;只有两者都缺失(纯 IP)才按目的 IP 回退
hosts = [x for x in (m.get("host", ""), m.get("sniffHost", "")) if x]
if any(x == domain or x.endswith("." + domain) for x in hosts):
host_conns.append(c)
elif not hosts and di and di in resolved_ips:
# host/sniffHost 都没有的纯 IP 连接才回退;任一字段非空但
# 指向别的域名的连接,即使同 IP 也不算进来——否则共享 IP 下
# 会把同 IP 别站的活跃连接误算成本域名
ip_conns.append(c)
if host_conns or ip_conns:
break
time.sleep(0.3)
# host 精确匹配优先;只有全是无 host 的连接时才回退到按 IP 匹配的那批
domain_conns = host_conns if host_conns else ip_conns

if not domain_conns:
# 没有活跃连接,从引擎日志中捞历史记录
Expand All @@ -536,54 +546,64 @@ def _section_connections(domain: str, resolved_ips: list,
print(f" {DIM}基于规则预测,该域名应走: {BOLD}{predicted_proxy}{NC}")
return

# 去重:按 rule+chains
seen: set = set()
deduped = []
def fmt_bytes(b: int) -> str:
if b < 1024: return f"{b}B"
if b < 1024 * 1024: return f"{b/1024:.1f}K"
return f"{b/1024/1024:.1f}M"

# 按完整链路聚合:相同链路的连接合并计数、流量累加,不再逐条平铺
groups: dict = {}
for c in domain_conns:
rule = c.get("rule", "?")
rp = c.get("rulePayload", "")
chains = c.get("chains", [])
key = f"{rule}|{rp}|{','.join(chains)}"
if key not in seen:
seen.add(key)
deduped.append(c)

for c in deduped:
rule = c.get("rule", "?")
rp = c.get("rulePayload", "")
chains = c.get("chains", [])
chain_str = " → ".join(reversed(chains)) if chains else "?"
rp_str = f"({rp})" if rp else ""
print(f" 规则: {CYAN}{rule}{rp_str}{NC}")
print(f" 链路: {chain_str}")

# 与预测对比
chain_str = " → ".join(reversed(chains)) if chains else "?"
g = groups.setdefault(chain_str, {"chains": chains, "count": 0,
"up": 0, "down": 0})
g["count"] += 1
g["up"] += c.get("upload", 0)
g["down"] += c.get("download", 0)

# 是否符合预测:预测的组名出现在链路任意一跳即算命中。
# proxy 链 chains=[节点, proxy-tuic, proxy] → 含 'proxy' → 符合
# DIRECT 链 chains=['DIRECT'] → 不含 → 不符合
# (不再纠结 chains[0] 是末端节点还是 chains[-1] 是入口组——用集合判定消除歧义)
def _matches(chains: list):
if not predicted_proxy:
return None
return any(ch.lower() == predicted_proxy.lower() for ch in chains)

total = len(domain_conns)
print(f" {DIM}(mihomo 当前活跃连接 {total} 条){NC}")
if predicted_proxy:
actual_chains = domain_conns[0].get("chains", [])
actual_outbound = actual_chains[-1] if actual_chains else "?"
if actual_outbound.lower() != predicted_proxy.lower():
print(f" {YELLOW}⚠ 预测出口 {predicted_proxy},实际出口 {actual_outbound}{NC}")
print(f" 应走(规则预测): {BOLD}{predicted_proxy}{NC}")
print(" 实走:")

items = [(_matches(g["chains"]), s, g) for s, g in groups.items()]
items.sort(key=lambda x: (x[0] is not True, -x[2]["count"])) # 符合在前,多的在前

match_n = mismatch_n = 0
for ok, chain_str, g in items:
if ok is True:
mark = f"{GREEN}✓{NC}"; match_n += g["count"]
elif ok is False:
mark = f"{YELLOW}✗{NC}"; mismatch_n += g["count"]
else:
print(f" {GREEN}✓ 与规则预测一致{NC}")

# 连接详情
print(f" {DIM}---{NC}")
domain_conns.sort(key=lambda x: x.get("start", ""), reverse=True)
for c in domain_conns[:5]:
m = c.get("metadata", {})
host = m.get("host", "?")
dport = m.get("destinationPort", "?")
up = c.get("upload", 0)
down = c.get("download", 0)
chains = c.get("chains", [])
outbound = chains[0] if chains else "?"

def fmt_bytes(b: int) -> str:
if b < 1024: return f"{b}B"
if b < 1024 * 1024: return f"{b/1024:.1f}K"
return f"{b/1024/1024:.1f}M"
mark = " "
flow = f"↑{fmt_bytes(g['up'])} ↓{fmt_bytes(g['down'])}"
print(f" {mark} {chain_str} {DIM}·{NC} {g['count']} 条 {DIM}{flow}{NC}")

print(f" {host}:{dport} {outbound} ↑{fmt_bytes(up)} ↓{fmt_bytes(down)}")
if not predicted_proxy:
return
if mismatch_n == 0:
print(f" {GREEN}✓ 结论: 全部 {total} 条都走 {predicted_proxy},与规则一致{NC}")
elif match_n == 0:
print(f" {YELLOW}✗ 结论: {total} 条都没走 {predicted_proxy};"
f"规则可能未生效,或被前置规则截胡{NC}")
else:
print(f" {YELLOW}⚠ 结论: {match_n} 条新连接已走 {predicted_proxy},"
f"另 {mismatch_n} 条仍走旧链路{NC}")
print(f" {DIM} 旧链路是改规则前建立的连接(建连时已定死,不随规则切换),"
f"会随超时自然断开;{NC}")
print(f" {DIM} 刷新页面后即全部走 {predicted_proxy}。{NC}")


def cmd_trace(raw_input: str, api: str, secret: str, config: dict = None):
Expand Down
149 changes: 149 additions & 0 deletions tests/unit/test_trace_sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,152 @@ def test_grep_log_connections_parses_mihomo(_isolate_home: Path):
assert isinstance(out, list)
# 至少能找到一些
assert len(out) >= 1


# ────────────────────────────────────────────────────────────────────────────
# _section_connections: 活跃连接按链路聚合 + 与预测对比
# ────────────────────────────────────────────────────────────────────────────

def _mock_connections(monkeypatch, conns: list):
def fake_api_get(api, path, secret):
if path == "/connections":
return {"connections": conns}
return {}

monkeypatch.setattr(trace, "_api_get", fake_api_get)


def _conn(host, chains, up=1, down=1, ip="", port="443", sniff=""):
return {
"metadata": {"host": host, "sniffHost": sniff,
"destinationIP": ip, "destinationPort": port},
"chains": chains, "upload": up, "download": down,
}


# mihomo chains 顺序:[末端节点, ..., 入口组],proxy 链含组名 'proxy'
_PROXY_CHAIN = ["电信专用(直连)", "proxy-tuic", "proxy"]


def test_section_connections_all_match(monkeypatch, capsys):
"""全部走预测出口 → ✓ 一致,相同链路聚合成一组计数。"""
_mock_connections(monkeypatch, [
_conn("aicodewith.com", _PROXY_CHAIN, up=590, down=3500),
_conn("aicodewith.com", _PROXY_CHAIN, up=100, down=200),
])
trace._section_connections("aicodewith.com", [], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "应走(规则预测): proxy" in out
assert "2 条" in out # 两条相同链路聚合
assert "✓ 结论: 全部 2 条都走 proxy" in out


def test_section_connections_mixed_old_and_new(monkeypatch, capsys):
"""改规则后的过渡态:新连接走 proxy、旧连接仍走 DIRECT —— 给出明确结论而非自相矛盾。"""
_mock_connections(monkeypatch, [
_conn("aicodewith.com", _PROXY_CHAIN, up=590, down=3500),
_conn("aicodewith.com", ["DIRECT"], up=73400, down=18200),
])
trace._section_connections("aicodewith.com", [], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "1 条新连接已走 proxy" in out
assert "仍走旧链路" in out
assert "刷新页面后即全部走 proxy" in out


def test_section_connections_none_match(monkeypatch, capsys):
"""全部走 DIRECT、没有一条走预测出口 → 提示规则可能未生效。"""
_mock_connections(monkeypatch, [
_conn("aicodewith.com", ["DIRECT"], up=100, down=200),
])
trace._section_connections("aicodewith.com", [], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "都没走 proxy" in out
assert "规则可能未生效" in out


def test_section_connections_host_match_excludes_shared_ip(monkeypatch, capsys):
"""有 host 精确匹配时,不把同 IP 的其他站点连接(Cloudflare 共享 IP)误算进来。"""
_mock_connections(monkeypatch, [
_conn("aicodewith.com", _PROXY_CHAIN, ip="104.26.4.164"),
# 同一个 Cloudflare IP 上的别站,host 不匹配 → 应被排除
_conn("other-site.com", ["DIRECT"], up=999, down=999, ip="104.26.4.164"),
])
trace._section_connections(
"aicodewith.com", ["104.26.4.164"], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "活跃连接 1 条" in out
assert "✓ 结论: 全部 1 条都走 proxy" in out


def test_section_connections_falls_back_to_ip_when_no_host(monkeypatch, capsys):
"""fake-ip 模式 metadata 无 host 时,回退按 destinationIP 匹配。"""
_mock_connections(monkeypatch, [
_conn("", _PROXY_CHAIN, ip="198.18.0.5"),
])
trace._section_connections(
"aicodewith.com", ["198.18.0.5"], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "活跃连接 1 条" in out


def test_section_connections_ip_fallback_excludes_hosted_other_site(
monkeypatch, capsys):
"""回归(codex P2):目标站没有 host 匹配连接时,同 IP 上 host 指向别站的
连接不能被 IP 回退误收 —— 否则共享 IP 误报在这个边角依然复现。"""
_mock_connections(monkeypatch, [
# 同一个 Cloudflare 共享 IP,但 host 非空且指向别的站点
_conn("other-site.com", ["DIRECT"], up=999, down=999, ip="104.26.4.164"),
])
# 隔离日志兜底,避免读到真实 mihomo.log 造成测试不稳定
monkeypatch.setattr(trace, "_grep_log_connections", lambda domain: [])
trace._section_connections(
"aicodewith.com", ["104.26.4.164"], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
# 别站连接被排除 → 视为无活跃连接,绝不把别站的 DIRECT 链路算成本域名
assert "无活跃连接" in out
assert "DIRECT" not in out
assert "other-site" not in out


def test_section_connections_sniffhost_match_included(monkeypatch, capsys):
"""sniffer 模式:host 为空但 sniffHost 指向目标域名 → 应识别为目标连接。"""
_mock_connections(monkeypatch, [
_conn("", _PROXY_CHAIN, ip="104.26.4.164", sniff="aicodewith.com"),
])
trace._section_connections(
"aicodewith.com", ["104.26.4.164"], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "活跃连接 1 条" in out
assert "✓ 结论: 全部 1 条都走 proxy" in out


def test_section_connections_ip_fallback_excludes_sniffhost_other_site(
monkeypatch, capsys):
"""回归(codex P2#2):host 为空但 sniffHost 指向别站的同 IP 连接,
不能被 IP 回退误收 —— sniffHost 必须先折进有效 host 再决定是否回退。"""
_mock_connections(monkeypatch, [
_conn("", ["DIRECT"], up=999, down=999, ip="104.26.4.164",
sniff="other-site.com"),
])
monkeypatch.setattr(trace, "_grep_log_connections", lambda domain: [])
trace._section_connections(
"aicodewith.com", ["104.26.4.164"], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "无活跃连接" in out
assert "DIRECT" not in out
assert "other-site" not in out


def test_section_connections_sniffhost_match_when_host_is_ip(monkeypatch, capsys):
"""回归(codex P2#3):host 被填成目的 IP、真实域名在 sniffHost 时,
host 非空也不能遮蔽 sniffHost —— 两字段独立判断,目标连接仍应纳入。"""
_mock_connections(monkeypatch, [
_conn("104.26.4.164", _PROXY_CHAIN, ip="104.26.4.164",
sniff="aicodewith.com"),
])
trace._section_connections(
"aicodewith.com", ["104.26.4.164"], "proxy", "http://x", "s")
out = trace._strip_ansi(capsys.readouterr().out)
assert "活跃连接 1 条" in out
assert "✓ 结论: 全部 1 条都走 proxy" in out