Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 106 additions & 38 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,57 +280,63 @@ def name(self) -> str: # type: ignore[override]

def is_available(self) -> bool: # type: ignore[override]
try:
from daemon_manager import ensure_bridge_running
return ensure_bridge_running(probe_only=True)
except Exception:
return False

# ─── Lifecycle ────────────────────────────────────────────────────────

def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[override]
"""Called once at agent startup.

kwargs always include ``hermes_home`` and ``platform``. We stash
them so the bridge can resolve the right `~/.hermes/memos-plugin/`
and log the originating channel.

We open the session here but NOT the episode — episode creation
is deferred to ``_ensure_episode()`` (called from the first
``on_turn_start``), so the actual user message can be passed as
the episode's initial text instead of a generic placeholder.
"""
"""Called once at agent startup."""
self._session_id = session_id or self._session_id
self._hermes_home = str(kwargs.get("hermes_home") or "")
self._platform = str(kwargs.get("platform") or "cli")
self._agent_identity = str(kwargs.get("agent_identity") or "hermes")

# ── Bridge lifecycle: detect existing daemon, spawn if needed ──
# daemon_manager ensures exactly one bridge daemon is running (port 18800 bound).
# We create a MemosBridgeClient that connects via stdio (the daemon stays alive as a daemon
# even after the client disconnects).
try:
ensure_bridge_running()
except Exception as err:
logger.warning("MemOS: failed to start bridge — %s", err)
return

# ── Connect to the bridge and open session ──
bridge = None
try:
self._bridge = MemosBridgeClient()
# Register the fallback LLM handler BEFORE we open the
# session so it is available the very first time the
# plugin's facade asks for help (e.g. on the first
# `turn.start` retrieval call).
self._bridge.register_host_handler(
bridge = MemosBridgeClient()
bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._open_session(session_id)
bridge.request("session.open", {
"agent": "hermes",
"sessionId": session_id,
"namespace": self._runtime_namespace(),
"meta": {
"hermesHome": self._hermes_home,
"platform": self._platform,
"agentIdentity": self._agent_identity,
"profileId": self._runtime_namespace()["profileId"],
"namespace": self._runtime_namespace(),
},
}, timeout=120.0)
self._bridge = bridge
logger.info(
"MemOS: bridge ready session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)
except Exception as err:
logger.warning("MemOS: bridge init failed — %s", err)
if bridge is not None:
with contextlib.suppress(Exception):
bridge.close()
self._bridge = None
# Register a Hermes plugin hook to capture tool calls as they
# happen. The `post_tool_call` hook fires after every tool
# dispatch (write_file, terminal, search_files, etc.) with the
# tool name, arguments, and result. We accumulate them and
# flush in `sync_turn`.

self._register_tool_call_hook()
self._start_bridge_keepalive()

Expand Down Expand Up @@ -1560,7 +1566,7 @@ def _host_runtime_context(self) -> dict[str, str]:
self._last_host_runtime = dict(out)
return out

def _open_session(self, session_id: str = "", *, timeout: float = 30.0) -> None:
def _open_session(self, session_id: str = "", *, timeout: float = 60.0) -> None:
assert self._bridge is not None
requested_session = session_id or self._session_id or ""
host_runtime = self._host_runtime_context()
Expand Down Expand Up @@ -1590,17 +1596,79 @@ def _is_transport_closed(self, err: Exception) -> bool:
return "broken pipe" in msg or "bridge closed" in msg or "transport_closed" in msg

def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> None:
"""Reconnect bridge — detect existing daemon first, reuse if alive."""
old_bridge = self._bridge
if old_bridge:
with contextlib.suppress(Exception):
old_bridge.close()
self._bridge = None

# Check if a daemon bridge is already running (port 18800 bound)
try:
from daemon_manager import _is_port_bound, _bridge_health_check
if _is_port_bound() and _bridge_health_check(timeout=2.0):
# Daemon is alive — create a client that connects via TCP
# or stdio (daemon keeps serving even if old client disconnects)
bridge = MemosBridgeClient()
bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
# Try to open session on existing daemon
try:
bridge.request("session.open", {
"agent": "hermes",
"sessionId": session_id,
"namespace": self._runtime_namespace(),
"meta": {
"hermesHome": self._hermes_home,
"platform": self._platform,
"agentIdentity": self._agent_identity,
"profileId": self._runtime_namespace()["profileId"],
"namespace": self._runtime_namespace(),
},
}, timeout=timeout)
self._bridge = bridge
logger.info("MemOS: reconnected to existing bridge daemon")
return
except Exception as e:
# Daemon exists but session.open failed — likely stale
# or session conflict. Close this client, try fresh spawn.
logger.warning("MemOS: existing daemon session.open failed: %s", e)
with contextlib.suppress(Exception):
bridge.close()
raise
else:
logger.warning("MemOS: bridge daemon not responding, will be respawned")
except ImportError:
pass

# No alive daemon — daemon_manager will spawn a new one
ensure_bridge_running()
self._bridge = MemosBridgeClient()
self._bridge.register_host_handler(
bridge = MemosBridgeClient()
bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._open_session(session_id, timeout=timeout)
try:
bridge.request("session.open", {
"agent": "hermes",
"sessionId": session_id,
"namespace": self._runtime_namespace(),
"meta": {
"hermesHome": self._hermes_home,
"platform": self._platform,
"agentIdentity": self._agent_identity,
"profileId": self._runtime_namespace()["profileId"],
"namespace": self._runtime_namespace(),
},
}, timeout=120.0)
self._bridge = bridge
logger.info("MemOS: bridge reconnected (new daemon) session=%s", self._session_id)
except Exception:
with contextlib.suppress(Exception):
bridge.close()
raise

def _ensure_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> bool:
if self._bridge:
Expand All @@ -1624,19 +1692,19 @@ def _start_bridge_keepalive(self) -> None:
self._bridge_keepalive_stop.clear()

def _run() -> None:
# Only monitor the daemon port. If the daemon is alive,
# do nothing. If the daemon dies, let daemon_manager
# handle the respawn — the next request will auto-reconnect.
# This avoids the old pattern of: detect death → kill-all → spawn → close → repeat.
while not self._bridge_keepalive_stop.wait(5.0):
if not self._ensure_bridge(self._session_id, timeout=10.0):
continue
try:
assert self._bridge is not None
self._bridge.request("core.health", {}, timeout=10.0)
except Exception as err:
if self._is_transport_closed(err):
logger.info("MemOS: bridge keepalive reconnecting after transport close")
with contextlib.suppress(Exception):
self._reconnect_bridge(self._session_id, timeout=10.0)
else:
logger.debug("MemOS: bridge keepalive failed — %s", err)
from daemon_manager import _is_port_bound
if not _is_port_bound():
logger.warning("MemOS: bridge keepalive — port 18800 not bound, daemon may be dead")
# Port is bound — daemon is alive. No-op.
# The daemon_manager will handle respawn on next request.
except ImportError:
pass

self._bridge_keepalive_thread = threading.Thread(
target=_run,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,19 @@ def close(self) -> None:
if self._closed:
return
self._closed = True
# Close all pipe fds so the bridge process exits cleanly.
with contextlib.suppress(Exception):
self._proc.stdin.close()
# DON'T wait() or kill() the bridge process. If it has an
# active viewer (HTTP server), it will stay alive as a daemon
# so the memory panel remains accessible between `hermes chat`
# sessions. If it's headless (viewer port was taken), it will
# notice stdin EOF and exit on its own.
if self._proc.stdout is not None:
self._proc.stdout.close()
if self._proc.stderr is not None:
self._proc.stderr.close()
# Wait for graceful exit (up to 2s)
try:
self._proc.wait(timeout=2)
except subprocess.TimeoutExpired:
self._proc.terminate()
self._proc.wait(timeout=1)
# unblock any pending waiters
with self._lock:
for entry in list(self._pending.values()):
Expand Down
Loading