diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index f93443c8c..39d0c6f1c 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -280,6 +280,7 @@ def name(self) -> str: # type: ignore[override] def is_available(self) -> bool: # type: ignore[override] try: + from daemon_manager import ensure_bridge_running return ensure_bridge_running(probe_only=True) except Exception: return False @@ -287,37 +288,43 @@ def is_available(self) -> bool: # type: ignore[override] # ─── Lifecycle ──────────────────────────────────────────────────────── def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[override] - """Called once at agent startup. - - kwargs always include ``hermes_home`` and ``platform``. We stash - them so the bridge can resolve the right `~/.hermes/memos-plugin/` - and log the originating channel. - - We open the session here but NOT the episode — episode creation - is deferred to ``_ensure_episode()`` (called from the first - ``on_turn_start``), so the actual user message can be passed as - the episode's initial text instead of a generic placeholder. - """ + """Called once at agent startup.""" self._session_id = session_id or self._session_id self._hermes_home = str(kwargs.get("hermes_home") or "") self._platform = str(kwargs.get("platform") or "cli") self._agent_identity = str(kwargs.get("agent_identity") or "hermes") + + # ── Bridge lifecycle: detect existing daemon, spawn if needed ── + # daemon_manager ensures exactly one bridge daemon is running (port 18800 bound). + # We create a MemosBridgeClient that connects via stdio (the daemon stays alive as a daemon + # even after the client disconnects). try: ensure_bridge_running() except Exception as err: logger.warning("MemOS: failed to start bridge — %s", err) return + + # ── Connect to the bridge and open session ── + bridge = None try: - self._bridge = MemosBridgeClient() - # Register the fallback LLM handler BEFORE we open the - # session so it is available the very first time the - # plugin's facade asks for help (e.g. on the first - # `turn.start` retrieval call). - self._bridge.register_host_handler( + bridge = MemosBridgeClient() + bridge.register_host_handler( "host.llm.complete", self._handle_host_llm_complete, ) - self._open_session(session_id) + bridge.request("session.open", { + "agent": "hermes", + "sessionId": session_id, + "namespace": self._runtime_namespace(), + "meta": { + "hermesHome": self._hermes_home, + "platform": self._platform, + "agentIdentity": self._agent_identity, + "profileId": self._runtime_namespace()["profileId"], + "namespace": self._runtime_namespace(), + }, + }, timeout=120.0) + self._bridge = bridge logger.info( "MemOS: bridge ready session=%s platform=%s (episode deferred)", self._session_id, @@ -325,12 +332,11 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov ) except Exception as err: logger.warning("MemOS: bridge init failed — %s", err) + if bridge is not None: + with contextlib.suppress(Exception): + bridge.close() self._bridge = None - # Register a Hermes plugin hook to capture tool calls as they - # happen. The `post_tool_call` hook fires after every tool - # dispatch (write_file, terminal, search_files, etc.) with the - # tool name, arguments, and result. We accumulate them and - # flush in `sync_turn`. + self._register_tool_call_hook() self._start_bridge_keepalive() @@ -1560,7 +1566,7 @@ def _host_runtime_context(self) -> dict[str, str]: self._last_host_runtime = dict(out) return out - def _open_session(self, session_id: str = "", *, timeout: float = 30.0) -> None: + def _open_session(self, session_id: str = "", *, timeout: float = 60.0) -> None: assert self._bridge is not None requested_session = session_id or self._session_id or "" host_runtime = self._host_runtime_context() @@ -1590,17 +1596,79 @@ def _is_transport_closed(self, err: Exception) -> bool: return "broken pipe" in msg or "bridge closed" in msg or "transport_closed" in msg def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> None: + """Reconnect bridge — detect existing daemon first, reuse if alive.""" old_bridge = self._bridge if old_bridge: with contextlib.suppress(Exception): old_bridge.close() + self._bridge = None + + # Check if a daemon bridge is already running (port 18800 bound) + try: + from daemon_manager import _is_port_bound, _bridge_health_check + if _is_port_bound() and _bridge_health_check(timeout=2.0): + # Daemon is alive — create a client that connects via TCP + # or stdio (daemon keeps serving even if old client disconnects) + bridge = MemosBridgeClient() + bridge.register_host_handler( + "host.llm.complete", + self._handle_host_llm_complete, + ) + # Try to open session on existing daemon + try: + bridge.request("session.open", { + "agent": "hermes", + "sessionId": session_id, + "namespace": self._runtime_namespace(), + "meta": { + "hermesHome": self._hermes_home, + "platform": self._platform, + "agentIdentity": self._agent_identity, + "profileId": self._runtime_namespace()["profileId"], + "namespace": self._runtime_namespace(), + }, + }, timeout=timeout) + self._bridge = bridge + logger.info("MemOS: reconnected to existing bridge daemon") + return + except Exception as e: + # Daemon exists but session.open failed — likely stale + # or session conflict. Close this client, try fresh spawn. + logger.warning("MemOS: existing daemon session.open failed: %s", e) + with contextlib.suppress(Exception): + bridge.close() + raise + else: + logger.warning("MemOS: bridge daemon not responding, will be respawned") + except ImportError: + pass + + # No alive daemon — daemon_manager will spawn a new one ensure_bridge_running() - self._bridge = MemosBridgeClient() - self._bridge.register_host_handler( + bridge = MemosBridgeClient() + bridge.register_host_handler( "host.llm.complete", self._handle_host_llm_complete, ) - self._open_session(session_id, timeout=timeout) + try: + bridge.request("session.open", { + "agent": "hermes", + "sessionId": session_id, + "namespace": self._runtime_namespace(), + "meta": { + "hermesHome": self._hermes_home, + "platform": self._platform, + "agentIdentity": self._agent_identity, + "profileId": self._runtime_namespace()["profileId"], + "namespace": self._runtime_namespace(), + }, + }, timeout=120.0) + self._bridge = bridge + logger.info("MemOS: bridge reconnected (new daemon) session=%s", self._session_id) + except Exception: + with contextlib.suppress(Exception): + bridge.close() + raise def _ensure_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> bool: if self._bridge: @@ -1624,19 +1692,19 @@ def _start_bridge_keepalive(self) -> None: self._bridge_keepalive_stop.clear() def _run() -> None: + # Only monitor the daemon port. If the daemon is alive, + # do nothing. If the daemon dies, let daemon_manager + # handle the respawn — the next request will auto-reconnect. + # This avoids the old pattern of: detect death → kill-all → spawn → close → repeat. while not self._bridge_keepalive_stop.wait(5.0): - if not self._ensure_bridge(self._session_id, timeout=10.0): - continue try: - assert self._bridge is not None - self._bridge.request("core.health", {}, timeout=10.0) - except Exception as err: - if self._is_transport_closed(err): - logger.info("MemOS: bridge keepalive reconnecting after transport close") - with contextlib.suppress(Exception): - self._reconnect_bridge(self._session_id, timeout=10.0) - else: - logger.debug("MemOS: bridge keepalive failed — %s", err) + from daemon_manager import _is_port_bound + if not _is_port_bound(): + logger.warning("MemOS: bridge keepalive — port 18800 not bound, daemon may be dead") + # Port is bound — daemon is alive. No-op. + # The daemon_manager will handle respawn on next request. + except ImportError: + pass self._bridge_keepalive_thread = threading.Thread( target=_run, diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index e3df72c7d..1161f7699 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -218,13 +218,19 @@ def close(self) -> None: if self._closed: return self._closed = True + # Close all pipe fds so the bridge process exits cleanly. with contextlib.suppress(Exception): self._proc.stdin.close() - # DON'T wait() or kill() the bridge process. If it has an - # active viewer (HTTP server), it will stay alive as a daemon - # so the memory panel remains accessible between `hermes chat` - # sessions. If it's headless (viewer port was taken), it will - # notice stdin EOF and exit on its own. + if self._proc.stdout is not None: + self._proc.stdout.close() + if self._proc.stderr is not None: + self._proc.stderr.close() + # Wait for graceful exit (up to 2s) + try: + self._proc.wait(timeout=2) + except subprocess.TimeoutExpired: + self._proc.terminate() + self._proc.wait(timeout=1) # unblock any pending waiters with self._lock: for entry in list(self._pending.values()): diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index 62810cc5b..995fab9e2 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -17,9 +17,11 @@ from __future__ import annotations import logging +import os import shutil import subprocess import threading +import time from pathlib import Path @@ -28,10 +30,14 @@ _lock = threading.Lock() _bridge_ok: bool | None = None +_bridge_pid: int | None = None # Track the single daemon bridge PID + +# Port the MemOS viewer binds to. Must match bridge.cts viewer config. +_MEMOS_VIEWER_PORT = 18800 def _bridge_script() -> Path: - return Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts" + return Path.home() / ".hermes/memos-plugin/bridge.cts" def _node_available() -> bool: @@ -45,32 +51,246 @@ def _node_available() -> bool: return False +def _find_existing_bridge_pid() -> int | None: + """Return PID of a running bridge process, or None. + + Searches for ``bridge.cts --agent=hermes`` in the process table. + """ + try: + out = subprocess.check_output( + ["pgrep", "-f", r"bridge\.cts.*--agent=hermes"], + timeout=5.0, + ) + lines = out.decode("utf-8").strip().splitlines() + for line in lines: + try: + pid = int(line.strip()) + return pid + except ValueError: + continue + except (subprocess.CalledProcessError, FileNotFoundError): + pass + return None + + +def _is_port_bound(port: int = _MEMOS_VIEWER_PORT) -> bool: + """Check if the MemOS viewer port is already bound (suggests daemon is alive).""" + try: + out = subprocess.check_output( + ["ss", "-tlnp"], + timeout=5.0, + ).decode("utf-8") + return f":{port}" in out + except (subprocess.CalledProcessError, FileNotFoundError, OSError): + return False + + +def _bridge_health_check(timeout: float = 2.0) -> bool: + """Lightweight connectivity check to viewer port.""" + import socket + + try: + sock = socket.create_connection( + ("127.0.0.1", _MEMOS_VIEWER_PORT), + timeout=timeout, + ) + sock.close() + return True + except OSError: + return False + + +def _kill_stale_bridge(pid: int) -> bool: + """Kill a stale bridge process by PID. Returns True on success.""" + try: + os.kill(pid, 15) # SIGTERM first + time.sleep(0.5) + # Verify it's gone + try: + os.kill(pid, 0) + # Still alive — force kill + os.kill(pid, 9) + time.sleep(0.5) + except OSError: + pass # Already dead + return True + except OSError: + return False # Already dead or permission denied + + def ensure_bridge_running(*, probe_only: bool = False) -> bool: """Return True when the bridge is (or can be) operational. ``probe_only=True`` performs a lightweight availability check without launching a long-lived subprocess. This is what ``MemTensorProvider.is_available`` calls during Hermes startup. + + **Bridge lifecycle guard:** If a bridge daemon is already running + (port 18800 bound), returns True and does NOT spawn a new process. + If a bridge process exists but port is NOT bound (zombie), kills it. + Spawns a new bridge ONLY when no running bridge is detected. """ - global _bridge_ok + global _bridge_ok, _bridge_pid with _lock: if _bridge_ok is not None and probe_only: return _bridge_ok + script = _bridge_script() if not script.exists(): logger.warning("MemOS: bridge script missing at %s", script) _bridge_ok = False return False + if not _node_available(): logger.warning("MemOS: Node.js not found on PATH") _bridge_ok = False return False + + # ── Bridge lifecycle guard ── + # If a bridge daemon is already alive and serving, reuse it. + # This is the KEY FIX: do NOT spawn a second bridge. + existing_pid = _find_existing_bridge_pid() + + if existing_pid is not None: + if _is_port_bound(): + # Port 18800 is bound — daemon is serving. Reuse. + if _bridge_health_check(): + logger.info( + "MemOS: bridge daemon already running (PID %d), reusing", + existing_pid, + ) + _bridge_pid = existing_pid + _bridge_ok = True + return True + else: + # Port bound but health check failed — stalled daemon + logger.warning( + "MemOS: bridge port bound but health check failed (PID %d), " + "killing and respawning", + existing_pid, + ) + _kill_stale_bridge(existing_pid) + _bridge_pid = None + else: + # Process exists but port not bound — check if it's just starting up + # Bridge needs 4-5 seconds for LLM/embedding init before binding port. + # If process is young (< 10s), wait for it; if old, kill as zombie. + try: + import os + import time + with open(f"/proc/{existing_pid}/stat") as f: + parts = f.read().split() + # Field 22 (0-indexed) is the start time in clock ticks + # But we can use field 20 (ctime) more directly + ctime = int(parts[19]) + now = time.time() + uptime_s = now - ctime + if uptime_s < 10.0: + # Young process — likely still initializing. Wait for port. + logger.info( + "MemOS: bridge process (PID %d) initializing (%.1fs), " + "waiting for port binding", + existing_pid, int(uptime_s), + ) + time.sleep(5) + if _is_port_bound() and _bridge_health_check(): + _bridge_pid = existing_pid + _bridge_ok = True + return True + else: + logger.warning( + "MemOS: bridge (PID %d) still not ready after wait, killing", + existing_pid, + ) + _kill_stale_bridge(existing_pid) + _bridge_pid = None + else: + # Old process without port — definite zombie + logger.warning( + "MemOS: stale bridge process (PID %d) without port binding, " + "killing (uptime %.0fs)", + existing_pid, int(uptime_s), + ) + _kill_stale_bridge(existing_pid) + _bridge_pid = None + except (FileNotFoundError, ProcessLookupError, IndexError, ValueError): + # Process vanished or stat unreadable — treat as gone + _bridge_pid = None + + # Also kill any other zombie bridge processes (pre-existing leak) + _cleanup_zombie_bridges() + + if probe_only: + _bridge_ok = True + return True + _bridge_ok = True return True +def _cleanup_zombie_bridges() -> None: + """Kill ALL bridge processes that don't own the viewer port. + + Handles the case where previous sessions leaked zombie bridges + (multiple processes but only one has port 18800). After this, + exactly one bridge process (or zero if none running) will remain. + """ + try: + out = subprocess.check_output( + ["pgrep", "-f", r"bridge\.cts.*--agent=hermes"], + timeout=5.0, + ) + pids = [ + int(line.strip()) + for line in out.decode("utf-8").strip().splitlines() + if line.strip() + ] + except (subprocess.CalledProcessError, FileNotFoundError, ValueError): + return + + if len(pids) <= 1: + return + + # Find the PID that owns port 18800 + port_owner: int | None = None + try: + out = subprocess.check_output( + ["ss", "-tlnp"], + timeout=5.0, + ).decode("utf-8") + for line in out.splitlines(): + if f":{_MEMOS_VIEWER_PORT}" in line: + # Extract PID from ss output: "pid=12345" + import re + m = re.search(r"pid=(\d+)", line) + if m: + port_owner = int(m.group(1)) + break + except (subprocess.CalledProcessError, FileNotFoundError): + pass + + killed = 0 + for pid in pids: + if pid == port_owner: + continue # Keep the one that actually has the port + logger.info("MemOS: killing zombie bridge PID %d", pid) + _kill_stale_bridge(pid) + killed += 1 + + if killed: + logger.info("MemOS: cleaned up %d zombie bridge processes", killed) + + +def get_bridge_pid() -> int | None: + """Return the tracked bridge PID, or None.""" + return _bridge_pid + + def shutdown_bridge() -> None: """Best-effort cleanup; each client owns its own subprocess.""" - global _bridge_ok + global _bridge_ok, _bridge_pid with _lock: + if _bridge_pid is not None: + _kill_stale_bridge(_bridge_pid) + _bridge_pid = None _bridge_ok = None