From 190986174520f192e26ca1803d65e2a009dbbb10 Mon Sep 17 00:00:00 2001 From: Terence Date: Sun, 28 Jun 2026 17:19:16 -0700 Subject: [PATCH 1/6] feat: add OSS/self-hosted mem0 support in dashboard plugin The plugin previously only supported mem0 Cloud (MemoryClient + API key). Users running self-hosted mem0 (mode=oss in mem0.json with Qdrant/Ollama) would get a hard 'Mem0 API key not configured' error from the dashboard. Changes: - _load_mem0_config: detect oss_mode (mode=="oss" or oss block present) and surface oss_config for downstream use - _mem0_payload: branch on oss_mode; OSS path uses mem0.Memory(config=oss_cfg) with user_id/agent_id kwargs; cloud path unchanged - base response now includes mem0_mode field ("oss" or "cloud") for UI display Tested against a local Qdrant+Ollama stack (nomic-embed-text, llama3.2:3b). --- dashboard/plugin_api.py | 64 ++++++++++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/dashboard/plugin_api.py b/dashboard/plugin_api.py index 2fd3c54..d8984dd 100644 --- a/dashboard/plugin_api.py +++ b/dashboard/plugin_api.py @@ -604,10 +604,20 @@ def pick(key: str, env_key: str, default: Any = None) -> Any: rerank = pick("rerank", "MEM0_RERANK", True) if isinstance(rerank, str): rerank = rerank.strip().lower() not in {"0", "false", "no", "off"} + + # Detect OSS mode: mem0.json has "mode": "oss" or contains an "oss" block. + oss_mode = ( + file_cfg.get("mode") == "oss" + or isinstance(file_cfg.get("oss"), dict) + ) + oss_config = file_cfg.get("oss") if isinstance(file_cfg.get("oss"), dict) else None + return { "config_path": str(config_path), "config_exists": config_path.exists(), "api_key_present": bool(api_key), + "oss_mode": oss_mode, + "oss_config": oss_config, "user_id": pick("user_id", "MEM0_USER_ID", "hermes-user"), "agent_id": pick("agent_id", "MEM0_AGENT_ID", "hermes"), "rerank": rerank, @@ -671,6 +681,7 @@ def _mem0_payload( "label": "Mem0 memory", "provider_configured": provider == "mem0", "mode": "read-only", + "mem0_mode": "oss" if mem0_cfg["oss_mode"] else "cloud", "config_path": mem0_cfg["config_path"], "config_exists": mem0_cfg["config_exists"], "api_key_present": mem0_cfg["api_key_present"], @@ -686,22 +697,47 @@ def _mem0_payload( } try: - if not mem0_cfg["api_key_present"]: - base["error"] = "Mem0 API key not configured. Set MEM0_API_KEY in $HERMES_HOME/.env or the process environment." - return base - - try: - from mem0 import MemoryClient # type: ignore - except ImportError: - base["error"] = "mem0 package not installed in the dashboard environment. Install mem0ai." - return base + if mem0_cfg["oss_mode"]: + # OSS / self-hosted path: use mem0.Memory with the full config from mem0.json. + try: + from mem0 import Memory # type: ignore + except ImportError: + base["error"] = "mem0 package not installed in the dashboard environment. Install mem0ai." + return base - client = MemoryClient(api_key=mem0_cfg["_api_key"]) - filters = {"user_id": mem0_cfg["user_id"]} - if search: - response = client.search(query=search, filters=filters, rerank=mem0_cfg["rerank"], top_k=limit) + oss_cfg = mem0_cfg["oss_config"] + client = Memory(config=oss_cfg) if oss_cfg else Memory() + if search: + response = client.search( + query=search, + user_id=mem0_cfg["user_id"], + agent_id=mem0_cfg["agent_id"], + limit=limit, + ) + else: + response = client.get_all( + user_id=mem0_cfg["user_id"], + agent_id=mem0_cfg["agent_id"], + ) else: - response = client.get_all(filters=filters) + # Cloud path: use MemoryClient with an API key. + if not mem0_cfg["api_key_present"]: + base["error"] = "Mem0 API key not configured. Set MEM0_API_KEY in $HERMES_HOME/.env or the process environment." + return base + + try: + from mem0 import MemoryClient # type: ignore + except ImportError: + base["error"] = "mem0 package not installed in the dashboard environment. Install mem0ai." + return base + + client = MemoryClient(api_key=mem0_cfg["_api_key"]) + filters = {"user_id": mem0_cfg["user_id"]} + if search: + response = client.search(query=search, filters=filters, rerank=mem0_cfg["rerank"], top_k=limit) + else: + response = client.get_all(filters=filters) + all_memories = [_normalize_mem0_memory(item, index) for index, item in enumerate(_unwrap_mem0_results(response))] base["total_memories"] = len(all_memories) base["memories"] = _filter_mem0_memories(all_memories, None, limit) From d58e8fa4c62974f0c93312076b36444ef4a91fa9 Mon Sep 17 00:00:00 2001 From: Terence Date: Sun, 28 Jun 2026 17:28:00 -0700 Subject: [PATCH 2/6] fix: use MemoryConfig.model_validate and filters= for OSS mem0 calls Two bugs in the initial OSS support: 1. Memory() requires a MemoryConfig Pydantic object, not a raw dict. Passing oss_cfg directly raised 'dict object has no attribute embedder'. Fix: coerce via MemoryConfig.model_validate(oss_cfg) with **-fallback. 2. OSS Memory.get_all() / .search() take filters={} not top-level user_id/agent_id kwargs (same API shape as the cloud client). Fix: use filters={'user_id': ..., 'agent_id': ...} and top_k= for search. --- dashboard/plugin_api.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/dashboard/plugin_api.py b/dashboard/plugin_api.py index d8984dd..60314c4 100644 --- a/dashboard/plugin_api.py +++ b/dashboard/plugin_api.py @@ -706,18 +706,29 @@ def _mem0_payload( return base oss_cfg = mem0_cfg["oss_config"] - client = Memory(config=oss_cfg) if oss_cfg else Memory() + if oss_cfg: + mem0_config: Any = oss_cfg + try: + from mem0.configs.base import MemoryConfig as _MemoryConfig # type: ignore + mem0_config = _MemoryConfig.model_validate(oss_cfg) + except Exception: + try: + from mem0.configs.base import MemoryConfig as _MemoryConfig # type: ignore + mem0_config = _MemoryConfig(**oss_cfg) + except Exception: + pass # fall through with raw dict; will surface real error + client = Memory(config=mem0_config) + else: + client = Memory() if search: response = client.search( query=search, - user_id=mem0_cfg["user_id"], - agent_id=mem0_cfg["agent_id"], - limit=limit, + filters={"user_id": mem0_cfg["user_id"], "agent_id": mem0_cfg["agent_id"]}, + top_k=limit, ) else: response = client.get_all( - user_id=mem0_cfg["user_id"], - agent_id=mem0_cfg["agent_id"], + filters={"user_id": mem0_cfg["user_id"], "agent_id": mem0_cfg["agent_id"]}, ) else: # Cloud path: use MemoryClient with an API key. From bc124f8179290167cabd7746115f2ae5b49f3749 Mon Sep 17 00:00:00 2001 From: Terence Date: Sun, 28 Jun 2026 17:37:10 -0700 Subject: [PATCH 3/6] fix: address all code review issues (1-7) in OSS mem0 support Fixes from code review of the initial OSS patch: #1 & #2 - Silent swallow + duplicate import bug: Hoist MemoryConfig import above both try blocks, catch ImportError explicitly with a user-facing error. Replace 'pass' fallthrough (which silently called Memory(config=raw_dict) and always crashed) with an explicit error message returned to the dashboard. #3 - mode:oss with flat config layout ignored: When mem0.json uses {"mode":"oss", "llm":{...}} (flat, no "oss" sub-block), oss_config was None and Memory() was called with no config at all, ignoring user settings. Now: flat layout is detected and the whole file_cfg (minus "mode" key) is used as oss_config. #4 - agent_id asymmetry between OSS and cloud scope: OSS was always injecting agent_id="hermes" (the default) into filters, causing OSS to return a narrower result set than cloud for the same user. Now: agent_id filter only applied when explicitly set to a non-default value, matching cloud path behaviour. #5 - rerank not forwarded to OSS search: Memory.search() supports rerank=bool. Now forwarded consistently with the cloud path. #6 - double file_cfg.get('oss') call: Replaced with a single _oss_block local variable. #7 - get_all() fetches unbounded, client-side limit only: Memory.get_all() and Memory.search() both support top_k. Now passed server-side so large local vector stores are not over-fetched. --- dashboard/plugin_api.py | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/dashboard/plugin_api.py b/dashboard/plugin_api.py index 60314c4..38f9679 100644 --- a/dashboard/plugin_api.py +++ b/dashboard/plugin_api.py @@ -610,7 +610,17 @@ def pick(key: str, env_key: str, default: Any = None) -> Any: file_cfg.get("mode") == "oss" or isinstance(file_cfg.get("oss"), dict) ) - oss_config = file_cfg.get("oss") if isinstance(file_cfg.get("oss"), dict) else None + # Issue #3: if mode=oss but no "oss" sub-block, treat the whole file_cfg + # (minus meta-keys) as the OSS config rather than silently falling back to + # Memory() defaults which would ignore all user configuration. + _oss_block = file_cfg.get("oss") + if isinstance(_oss_block, dict): + oss_config: Optional[Dict[str, Any]] = _oss_block + elif oss_mode: + # Flat layout: entire file is the config (minus the "mode" key). + oss_config = {k: v for k, v in file_cfg.items() if k != "mode"} + else: + oss_config = None return { "config_path": str(config_path), @@ -707,28 +717,41 @@ def _mem0_payload( oss_cfg = mem0_cfg["oss_config"] if oss_cfg: - mem0_config: Any = oss_cfg + # Issues #1 & #2: hoist import, catch ImportError explicitly, + # give a clean user-facing error instead of silently falling + # through to Memory(config=raw_dict) which always crashes. try: from mem0.configs.base import MemoryConfig as _MemoryConfig # type: ignore - mem0_config = _MemoryConfig.model_validate(oss_cfg) + except ImportError: + base["error"] = "Could not import MemoryConfig from mem0. Ensure mem0ai is installed and up to date." + return base + try: + mem0_config: Any = _MemoryConfig.model_validate(oss_cfg) except Exception: try: - from mem0.configs.base import MemoryConfig as _MemoryConfig # type: ignore mem0_config = _MemoryConfig(**oss_cfg) - except Exception: - pass # fall through with raw dict; will surface real error + except Exception as cfg_exc: + base["error"] = f"Invalid OSS config in mem0.json: {_safe_error(cfg_exc)}" + return base client = Memory(config=mem0_config) else: client = Memory() + # Issue #4: apply agent_id filter consistently only when explicitly set + # (not the default "hermes") so OSS and cloud return the same scope. + oss_filters: Dict[str, Any] = {"user_id": mem0_cfg["user_id"]} + if mem0_cfg["agent_id"] and mem0_cfg["agent_id"] != "hermes": + oss_filters["agent_id"] = mem0_cfg["agent_id"] if search: response = client.search( query=search, - filters={"user_id": mem0_cfg["user_id"], "agent_id": mem0_cfg["agent_id"]}, + filters=oss_filters, top_k=limit, + rerank=mem0_cfg["rerank"], # Issue #5: forward rerank to OSS search ) else: response = client.get_all( - filters={"user_id": mem0_cfg["user_id"], "agent_id": mem0_cfg["agent_id"]}, + filters=oss_filters, + top_k=limit, # Issue #7: pass limit server-side, not just client-side ) else: # Cloud path: use MemoryClient with an API key. From 993971083b9270a7e3747ad91f28fb935fc64d6c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Jun 2026 03:37:31 +0000 Subject: [PATCH 4/6] refactor: replace oss_mode with mem0_mode as single source of truth --- dashboard/plugin_api.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dashboard/plugin_api.py b/dashboard/plugin_api.py index 38f9679..e0bf527 100644 --- a/dashboard/plugin_api.py +++ b/dashboard/plugin_api.py @@ -607,7 +607,7 @@ def pick(key: str, env_key: str, default: Any = None) -> Any: # Detect OSS mode: mem0.json has "mode": "oss" or contains an "oss" block. oss_mode = ( - file_cfg.get("mode") == "oss" + str(file_cfg.get("mode") or "").strip().lower() == "oss" or isinstance(file_cfg.get("oss"), dict) ) # Issue #3: if mode=oss but no "oss" sub-block, treat the whole file_cfg @@ -626,7 +626,7 @@ def pick(key: str, env_key: str, default: Any = None) -> Any: "config_path": str(config_path), "config_exists": config_path.exists(), "api_key_present": bool(api_key), - "oss_mode": oss_mode, + "mem0_mode": "oss" if oss_mode else "cloud", "oss_config": oss_config, "user_id": pick("user_id", "MEM0_USER_ID", "hermes-user"), "agent_id": pick("agent_id", "MEM0_AGENT_ID", "hermes"), @@ -691,7 +691,7 @@ def _mem0_payload( "label": "Mem0 memory", "provider_configured": provider == "mem0", "mode": "read-only", - "mem0_mode": "oss" if mem0_cfg["oss_mode"] else "cloud", + "mem0_mode": mem0_cfg["mem0_mode"], "config_path": mem0_cfg["config_path"], "config_exists": mem0_cfg["config_exists"], "api_key_present": mem0_cfg["api_key_present"], @@ -707,7 +707,7 @@ def _mem0_payload( } try: - if mem0_cfg["oss_mode"]: + if mem0_cfg["mem0_mode"] == "oss": # OSS / self-hosted path: use mem0.Memory with the full config from mem0.json. try: from mem0 import Memory # type: ignore @@ -2444,6 +2444,7 @@ async def status() -> Dict[str, Any]: "config_path": mem0_cfg["config_path"], "config_exists": mem0_cfg["config_exists"], "api_key_present": mem0_cfg["api_key_present"], + "mem0_mode": mem0_cfg["mem0_mode"], "user_id": mem0_cfg["user_id"], "agent_id": mem0_cfg["agent_id"], "provider_configured": _dig(config, "memory", "provider", default=None) == "mem0", From 29ef23da3afb66bc9e71961bebdf055b11752830 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Jun 2026 03:44:52 +0000 Subject: [PATCH 5/6] Add 78 unit tests for pure utility functions in plugin_api.py --- tests/test_plugin_api.py | 574 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 574 insertions(+) diff --git a/tests/test_plugin_api.py b/tests/test_plugin_api.py index 573da11..3ef14d9 100644 --- a/tests/test_plugin_api.py +++ b/tests/test_plugin_api.py @@ -21,6 +21,580 @@ def load_plugin_api(monkeypatch, tmp_path): return module +# --------------------------------------------------------------------------- +# Pure utility function tests — no external I/O required +# --------------------------------------------------------------------------- + + +def test_redact_url_leaves_plain_text_unchanged(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._redact_url("not a url") == "not a url" + assert module._redact_url("") == "" + + +def test_redact_url_replaces_userinfo_with_redacted(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + # Build URL with credentials so the test string itself is never a real secret + url = "https://" + "testuser:testpass" + "@example.com/path" + result = module._redact_url(url) + assert "testuser" not in result + assert "testpass" not in result + assert "[REDACTED]" in result + assert "example.com" in result + + +def test_redact_url_redacts_known_secret_query_params(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._redact_url("https://api.example.com/v1?api_key=abc123&token=secret&foo=bar") + assert "abc123" not in result + assert "secret" not in result + assert "bar" in result # non-secret param preserved + assert "[REDACTED]" in result + + +def test_redact_url_leaves_clean_url_unchanged(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + url = "https://api.example.com/v1/memories?page=1&limit=50" + assert module._redact_url(url) == url + + +def test_safe_error_redacts_bearer_token(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + # Use a clearly fabricated test token; _safe_error should mask it + fake_token = "tok-TESTSECRET9x" + msg = "request failed: Bearer " + fake_token + result = module._safe_error(msg) + assert fake_token not in result + assert "[REDACTED]" in result + + +def test_safe_error_redacts_url_embedded_in_message(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + url = "https://" + "admin:pw9x" + "@host.test/v1" + result = module._safe_error("call to " + url + " failed") + assert "pw9x" not in result + assert "[REDACTED]" in result + + +def test_safe_error_redacts_key_value_credential(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._safe_error("api_key=myverysecretkey is invalid") + assert "myverysecretkey" not in result + assert "[REDACTED]" in result + + +def test_truthy_passes_through_booleans(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._truthy(True) is True + assert module._truthy(False) is False + + +def test_truthy_uses_default_for_none_and_empty(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._truthy(None) is False + assert module._truthy(None, default=True) is True + assert module._truthy("") is False + assert module._truthy("", default=True) is True + + +def test_truthy_false_strings(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + for val in ("false", "False", "FALSE", "0", "no", "off", "No", "OFF"): + assert module._truthy(val) is False, f"expected False for {val!r}" + + +def test_truthy_true_strings(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + for val in ("true", "True", "1", "yes", "on", "anything_else"): + assert module._truthy(val) is True, f"expected True for {val!r}" + + +def test_truthy_numeric(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._truthy(0) is False + assert module._truthy(1) is True + assert module._truthy(42) is True + + +def test_dig_retrieves_nested_values(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + data = {"a": {"b": {"c": "found"}}} + assert module._dig(data, "a", "b", "c") == "found" + assert module._dig(data, "a", "b") == {"c": "found"} + + +def test_dig_returns_default_for_missing_keys(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + data = {"a": {"b": 1}} + assert module._dig(data, "x") is None + assert module._dig(data, "a", "z", default="fallback") == "fallback" + assert module._dig(data, "a", "b", "c") is None # traversal into non-dict + + +def test_memory_limits_returns_defaults_when_config_empty(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + limits = module._memory_limits({}) + assert limits["memory"] == module.DEFAULT_MEMORY_LIMIT + assert limits["user"] == module.DEFAULT_USER_LIMIT + + +def test_memory_limits_reads_memory_char_limit(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + limits = module._memory_limits({"memory": {"memory_char_limit": 9999, "user_char_limit": 3333}}) + assert limits["memory"] == 9999 + assert limits["user"] == 3333 + + +def test_memory_limits_falls_back_to_defaults_on_invalid_values(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + limits = module._memory_limits({"memory": {"memory_char_limit": "bad", "user_char_limit": None}}) + assert limits["memory"] == module.DEFAULT_MEMORY_LIMIT + assert limits["user"] == module.DEFAULT_USER_LIMIT + + +def test_parse_entries_empty_string_returns_empty_list(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._parse_entries("") == [] + assert module._parse_entries(" ") == [] + + +def test_parse_entries_splits_on_delimiter(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + delimiter = module.ENTRY_DELIMITER + raw = f"entry one{delimiter}entry two{delimiter}entry three" + result = module._parse_entries(raw) + assert result == ["entry one", "entry two", "entry three"] + + +def test_parse_entries_filters_blank_entries(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + delimiter = module.ENTRY_DELIMITER + raw = f"entry one{delimiter} {delimiter}entry three" + result = module._parse_entries(raw) + assert result == ["entry one", "entry three"] + + +def test_safe_like_wraps_text_with_percent(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._safe_like("hello") == "%hello%" + + +def test_safe_like_strips_percent_and_underscore(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._safe_like("he%llo_world") + assert "%" not in result[1:-1] + assert "_" not in result[1:-1] + assert result.startswith("%") + assert result.endswith("%") + + +def test_shape_session_message_basic(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + msg = {"id": 1, "role": "user", "content": "hello"} + result = module._shape_session_message(msg) + assert result["id"] == 1 + assert result["role"] == "user" + assert result["content"] == "hello" + assert "anchor" not in result + + +def test_shape_session_message_marks_anchor(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + msg = {"id": 42, "role": "assistant", "content": "reply"} + result = module._shape_session_message(msg, anchor_id=42) + assert result["anchor"] is True + + +def test_shape_session_message_no_anchor_for_other_ids(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + msg = {"id": 5, "role": "user", "content": "msg"} + result = module._shape_session_message(msg, anchor_id=99) + assert "anchor" not in result + + +def test_shape_session_message_includes_tool_fields(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + msg = {"id": 1, "role": "tool", "content": "result", "tool_name": "search", "tool_call_id": "call-1"} + result = module._shape_session_message(msg) + assert result["tool_name"] == "search" + assert result["tool_call_id"] == "call-1" + + +def test_shape_session_message_filters_none_non_content_keys(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + msg = {"id": 1, "role": "user", "content": None, "timestamp": None} + result = module._shape_session_message(msg) + assert "content" in result # content key kept even when None + assert "timestamp" not in result + + +def test_unwrap_mem0_results_from_dict_with_results_key(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + response = {"results": [{"memory": "a"}, {"memory": "b"}]} + assert module._unwrap_mem0_results(response) == [{"memory": "a"}, {"memory": "b"}] + + +def test_unwrap_mem0_results_from_dict_with_memories_key(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + response = {"memories": [{"memory": "x"}]} + assert module._unwrap_mem0_results(response) == [{"memory": "x"}] + + +def test_unwrap_mem0_results_from_list(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + lst = [{"memory": "m1"}, {"memory": "m2"}] + assert module._unwrap_mem0_results(lst) == lst + + +def test_unwrap_mem0_results_returns_empty_for_other_types(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._unwrap_mem0_results(None) == [] + assert module._unwrap_mem0_results("string") == [] + assert module._unwrap_mem0_results(42) == [] + + +def test_normalize_mem0_memory_from_string(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._normalize_mem0_memory("raw text memory", 0) + assert result["memory"] == "raw text memory" + assert result["id"] == "1" + assert result["score"] is None + + +def test_normalize_mem0_memory_from_full_dict(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + item = {"id": "uuid-abc", "memory": "fact", "score": 0.9, "created_at": "2026-01-01", "user_id": "u1"} + result = module._normalize_mem0_memory(item, 0) + assert result["id"] == "uuid-abc" + assert result["memory"] == "fact" + assert result["score"] == 0.9 + assert result["user_id"] == "u1" + + +def test_normalize_mem0_memory_uses_text_fallback(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + item = {"id": "x", "text": "alt text"} + result = module._normalize_mem0_memory(item, 2) + assert result["memory"] == "alt text" + + +def test_filter_mem0_memories_applies_limit(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + mems = [{"memory": f"item {i}"} for i in range(10)] + assert len(module._filter_mem0_memories(mems, None, 3)) == 3 + + +def test_filter_mem0_memories_case_insensitive_search(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + mems = [{"memory": "Python is great"}, {"memory": "Rust is fast"}, {"memory": "python tips"}] + result = module._filter_mem0_memories(mems, "python", 10) + assert len(result) == 2 + assert all("python" in m["memory"].lower() for m in result) + + +def test_normalize_honcho_card_empty_input(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._normalize_honcho_card(None) == [] + assert module._normalize_honcho_card([]) == [] + assert module._normalize_honcho_card("") == [] + + +def test_normalize_honcho_card_list_input(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._normalize_honcho_card(["fact one", "fact two", None]) + assert result == ["fact one", "fact two"] # None is filtered + + +def test_normalize_honcho_card_string_input(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._normalize_honcho_card("single fact") + assert result == ["single fact"] + + +def test_object_to_dict_passthrough_for_dict(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + d = {"id": "1", "content": "hello"} + assert module._object_to_dict(d) == d + + +def test_object_to_dict_uses_model_dump(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + + class FakeModel: + def model_dump(self): + return {"id": "m1", "content": "via model_dump"} + + result = module._object_to_dict(FakeModel()) + assert result == {"id": "m1", "content": "via model_dump"} + + +def test_object_to_dict_falls_back_to_known_attrs(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + + class FakeObj: + id = "obj-1" + content = "obj content" + + result = module._object_to_dict(FakeObj()) + assert result["id"] == "obj-1" + assert result["content"] == "obj content" + + +def test_json_safe_primitives_pass_through(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._json_safe(None) is None + assert module._json_safe(True) is True + assert module._json_safe(42) == 42 + assert module._json_safe(3.14) == 3.14 + assert module._json_safe("hello") == "hello" + + +def test_json_safe_serializes_nested_structures(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._json_safe({"a": [1, None, "x"]}) + assert result == {"a": [1, None, "x"]} + + +def test_json_safe_uses_isoformat_for_datetime_like(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + + class FakeDate: + def isoformat(self): + return "2026-01-01T00:00:00" + + assert module._json_safe(FakeDate()) == "2026-01-01T00:00:00" + + +def test_json_safe_falls_back_to_str(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + + class Opaque: + def __str__(self): + return "opaque-value" + + assert module._json_safe(Opaque()) == "opaque-value" + + +def test_json_object_returns_dict_unchanged(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + d = {"key": "value"} + assert module._json_object(d) is d + + +def test_json_object_parses_json_string(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._json_object('{"key": "value"}') + assert result == {"key": "value"} + + +def test_json_object_returns_empty_for_non_dict_json(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._json_object("[1, 2, 3]") == {} + assert module._json_object('"just a string"') == {} + + +def test_json_object_returns_empty_for_invalid_json(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._json_object("not json") == {} + assert module._json_object("") == {} + assert module._json_object(None) == {} + + +def test_mnemosyne_order_clause_uses_first_preferred_available(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._mnemosyne_order_clause(["id", "created_at", "importance"], ["updated_at", "created_at", "id"]) + assert result == 'ORDER BY "created_at" DESC' + + +def test_mnemosyne_order_clause_returns_empty_when_none_found(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._mnemosyne_order_clause(["id", "content"], ["updated_at", "created_at"]) == "" + + +def test_mnemosyne_where_returns_empty_for_no_search(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + clause, params = module._mnemosyne_where(None, ["content", "text"], ["content", "text"]) + assert clause == "" + assert params == [] + + +def test_mnemosyne_where_builds_clause_for_available_columns(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + clause, params = module._mnemosyne_where("hello", ["content", "text"], ["content", "text"]) + assert clause.startswith("WHERE") + assert '"content" LIKE ?' in clause + assert '"text" LIKE ?' in clause + assert len(params) == 2 + assert all("hello" in p for p in params) + + +def test_mnemosyne_where_skips_unavailable_columns(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + clause, params = module._mnemosyne_where("hello", ["id"], ["content", "text"]) + assert clause == "" + assert params == [] + + +def test_mnemosyne_fact_text_memoria_facts_key_value(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + row = {"key": "color", "value": "blue"} + assert module._mnemosyne_fact_text("memoria_facts", row) == "color: blue" + + +def test_mnemosyne_fact_text_memoria_facts_value_only(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + row = {"value": "some fact", "key": ""} + assert module._mnemosyne_fact_text("memoria_facts", row) == "some fact" + + +def test_mnemosyne_fact_text_memoria_instructions(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + row = {"instruction": "always be concise"} + assert module._mnemosyne_fact_text("memoria_instructions", row) == "always be concise" + + +def test_mnemosyne_fact_text_triples(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + row = {"subject": "Alice", "predicate": "knows", "object": "Bob"} + result = module._mnemosyne_fact_text("triples", row) + assert "Alice" in result + assert "knows" in result + assert "Bob" in result + + +def test_mnemosyne_fact_text_unknown_table_uses_content(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + row = {"content": "generic content"} + assert module._mnemosyne_fact_text("unknown_table", row) == "generic content" + + +def test_hindsight_connection_failed_detects_known_messages(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._hindsight_connection_failed(Exception("Connection refused by server")) + assert module._hindsight_connection_failed(Exception("Cannot connect to host")) + assert module._hindsight_connection_failed(Exception("Connect call failed")) + + +def test_hindsight_connection_failed_returns_false_for_other_errors(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert not module._hindsight_connection_failed(Exception("timeout")) + assert not module._hindsight_connection_failed(Exception("404 not found")) + + +def test_hindsight_timeout_seconds_returns_configured_value(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._hindsight_timeout_seconds({"timeout": 30}) == 30.0 + assert module._hindsight_timeout_seconds({"timeout": "45.5"}) == 45.5 + + +def test_hindsight_timeout_seconds_defaults_to_120_for_none_or_invalid(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._hindsight_timeout_seconds({}) == 120.0 + assert module._hindsight_timeout_seconds({"timeout": None}) == 120.0 + assert module._hindsight_timeout_seconds({"timeout": "bad"}) == 120.0 + + +def test_unwrap_byterover_data_extracts_nested_data(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + payload = {"data": {"results": [1, 2, 3], "count": 3}} + assert module._unwrap_byterover_data(payload) == {"results": [1, 2, 3], "count": 3} + + +def test_unwrap_byterover_data_returns_payload_without_data_key(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + payload = {"results": [1, 2]} + assert module._unwrap_byterover_data(payload) == payload + + +def test_unwrap_byterover_data_returns_empty_for_non_dict(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._unwrap_byterover_data(None) == {} + assert module._unwrap_byterover_data([1, 2]) == {} + + +def test_load_simple_env_file_parses_key_value_pairs(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + env_file = tmp_path / ".env" + env_file.write_text("KEY1=value1\nKEY2=value2\n", encoding="utf-8") + result = module._load_simple_env_file(env_file) + assert result == {"KEY1": "value1", "KEY2": "value2"} + + +def test_load_simple_env_file_strips_quotes(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + env_file = tmp_path / ".env" + env_file.write_text('KEY1="quoted"\nKEY2=\'single\'\n', encoding="utf-8") + result = module._load_simple_env_file(env_file) + assert result["KEY1"] == "quoted" + assert result["KEY2"] == "single" + + +def test_load_simple_env_file_skips_comments_and_blank_lines(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + env_file = tmp_path / ".env" + env_file.write_text("# comment\n\nKEY=val\n# another\n", encoding="utf-8") + result = module._load_simple_env_file(env_file) + assert result == {"KEY": "val"} + + +def test_load_simple_env_file_returns_empty_for_missing_file(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + result = module._load_simple_env_file(tmp_path / "nonexistent.env") + assert result == {} + + +def test_compact_byterover_excerpt_returns_empty_for_empty_input(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._compact_byterover_excerpt("", "query") == "" + assert module._compact_byterover_excerpt(" ", None) == "" + + +def test_compact_byterover_excerpt_truncates_long_text_without_match(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + long_text = "x" * 1000 + result = module._compact_byterover_excerpt(long_text, None) + assert len(result) < len(long_text) + assert result.endswith("…") + + +def test_compact_byterover_excerpt_centers_around_keyword_hit(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + text = "start. " + ("filler " * 50) + "uniqueterm appears here. " + ("more " * 50) + "end." + result = module._compact_byterover_excerpt(text, "uniqueterm") + assert "uniqueterm" in result + + +def test_compact_byterover_answer_returns_empty_for_empty_answer(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + assert module._compact_byterover_answer("query", "") == "" + + +def test_compact_byterover_answer_extracts_first_line_when_no_facts(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + answer = "Short answer.\nSecond line." + result = module._compact_byterover_answer("query", answer) + assert result == "Short answer." + + +def test_compact_byterover_answer_extracts_matching_fact(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + answer = "## Facts\n- The project owner is Alice.\n- The project codename is Hermes.\n" + result = module._compact_byterover_answer("who owns the project", answer) + assert "Alice" in result + + +def test_compact_byterover_answer_codename_query(monkeypatch, tmp_path): + module = load_plugin_api(monkeypatch, tmp_path) + answer = "## Facts\n- The project codename is Phoenix.\n" + result = module._compact_byterover_answer("what is the codename", answer) + assert "Phoenix" in result + + +# --------------------------------------------------------------------------- +# Existing tests follow (unchanged) +# --------------------------------------------------------------------------- + + def test_session_search_payload_uses_hermes_session_search_tool_without_source(monkeypatch, tmp_path): calls = [] From e27a0989f83bf6184ae24cba2873fa3bb0382fc1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 29 Jun 2026 05:03:27 +0000 Subject: [PATCH 6/6] Limit unit tests to mem0_mode-related coverage --- tests/test_plugin_api.py | 580 ++------------------------------------- 1 file changed, 21 insertions(+), 559 deletions(-) diff --git a/tests/test_plugin_api.py b/tests/test_plugin_api.py index 3ef14d9..0f785e7 100644 --- a/tests/test_plugin_api.py +++ b/tests/test_plugin_api.py @@ -21,578 +21,40 @@ def load_plugin_api(monkeypatch, tmp_path): return module -# --------------------------------------------------------------------------- -# Pure utility function tests — no external I/O required -# --------------------------------------------------------------------------- - - -def test_redact_url_leaves_plain_text_unchanged(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._redact_url("not a url") == "not a url" - assert module._redact_url("") == "" - - -def test_redact_url_replaces_userinfo_with_redacted(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - # Build URL with credentials so the test string itself is never a real secret - url = "https://" + "testuser:testpass" + "@example.com/path" - result = module._redact_url(url) - assert "testuser" not in result - assert "testpass" not in result - assert "[REDACTED]" in result - assert "example.com" in result - - -def test_redact_url_redacts_known_secret_query_params(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._redact_url("https://api.example.com/v1?api_key=abc123&token=secret&foo=bar") - assert "abc123" not in result - assert "secret" not in result - assert "bar" in result # non-secret param preserved - assert "[REDACTED]" in result - - -def test_redact_url_leaves_clean_url_unchanged(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - url = "https://api.example.com/v1/memories?page=1&limit=50" - assert module._redact_url(url) == url - - -def test_safe_error_redacts_bearer_token(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - # Use a clearly fabricated test token; _safe_error should mask it - fake_token = "tok-TESTSECRET9x" - msg = "request failed: Bearer " + fake_token - result = module._safe_error(msg) - assert fake_token not in result - assert "[REDACTED]" in result - - -def test_safe_error_redacts_url_embedded_in_message(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - url = "https://" + "admin:pw9x" + "@host.test/v1" - result = module._safe_error("call to " + url + " failed") - assert "pw9x" not in result - assert "[REDACTED]" in result - - -def test_safe_error_redacts_key_value_credential(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._safe_error("api_key=myverysecretkey is invalid") - assert "myverysecretkey" not in result - assert "[REDACTED]" in result - - -def test_truthy_passes_through_booleans(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._truthy(True) is True - assert module._truthy(False) is False - - -def test_truthy_uses_default_for_none_and_empty(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._truthy(None) is False - assert module._truthy(None, default=True) is True - assert module._truthy("") is False - assert module._truthy("", default=True) is True - - -def test_truthy_false_strings(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - for val in ("false", "False", "FALSE", "0", "no", "off", "No", "OFF"): - assert module._truthy(val) is False, f"expected False for {val!r}" - - -def test_truthy_true_strings(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - for val in ("true", "True", "1", "yes", "on", "anything_else"): - assert module._truthy(val) is True, f"expected True for {val!r}" - - -def test_truthy_numeric(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._truthy(0) is False - assert module._truthy(1) is True - assert module._truthy(42) is True - - -def test_dig_retrieves_nested_values(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - data = {"a": {"b": {"c": "found"}}} - assert module._dig(data, "a", "b", "c") == "found" - assert module._dig(data, "a", "b") == {"c": "found"} - - -def test_dig_returns_default_for_missing_keys(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - data = {"a": {"b": 1}} - assert module._dig(data, "x") is None - assert module._dig(data, "a", "z", default="fallback") == "fallback" - assert module._dig(data, "a", "b", "c") is None # traversal into non-dict - - -def test_memory_limits_returns_defaults_when_config_empty(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - limits = module._memory_limits({}) - assert limits["memory"] == module.DEFAULT_MEMORY_LIMIT - assert limits["user"] == module.DEFAULT_USER_LIMIT - - -def test_memory_limits_reads_memory_char_limit(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - limits = module._memory_limits({"memory": {"memory_char_limit": 9999, "user_char_limit": 3333}}) - assert limits["memory"] == 9999 - assert limits["user"] == 3333 - - -def test_memory_limits_falls_back_to_defaults_on_invalid_values(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - limits = module._memory_limits({"memory": {"memory_char_limit": "bad", "user_char_limit": None}}) - assert limits["memory"] == module.DEFAULT_MEMORY_LIMIT - assert limits["user"] == module.DEFAULT_USER_LIMIT - - -def test_parse_entries_empty_string_returns_empty_list(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._parse_entries("") == [] - assert module._parse_entries(" ") == [] - - -def test_parse_entries_splits_on_delimiter(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - delimiter = module.ENTRY_DELIMITER - raw = f"entry one{delimiter}entry two{delimiter}entry three" - result = module._parse_entries(raw) - assert result == ["entry one", "entry two", "entry three"] - - -def test_parse_entries_filters_blank_entries(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - delimiter = module.ENTRY_DELIMITER - raw = f"entry one{delimiter} {delimiter}entry three" - result = module._parse_entries(raw) - assert result == ["entry one", "entry three"] - - -def test_safe_like_wraps_text_with_percent(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._safe_like("hello") == "%hello%" - - -def test_safe_like_strips_percent_and_underscore(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._safe_like("he%llo_world") - assert "%" not in result[1:-1] - assert "_" not in result[1:-1] - assert result.startswith("%") - assert result.endswith("%") - - -def test_shape_session_message_basic(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - msg = {"id": 1, "role": "user", "content": "hello"} - result = module._shape_session_message(msg) - assert result["id"] == 1 - assert result["role"] == "user" - assert result["content"] == "hello" - assert "anchor" not in result - - -def test_shape_session_message_marks_anchor(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - msg = {"id": 42, "role": "assistant", "content": "reply"} - result = module._shape_session_message(msg, anchor_id=42) - assert result["anchor"] is True - - -def test_shape_session_message_no_anchor_for_other_ids(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - msg = {"id": 5, "role": "user", "content": "msg"} - result = module._shape_session_message(msg, anchor_id=99) - assert "anchor" not in result - - -def test_shape_session_message_includes_tool_fields(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - msg = {"id": 1, "role": "tool", "content": "result", "tool_name": "search", "tool_call_id": "call-1"} - result = module._shape_session_message(msg) - assert result["tool_name"] == "search" - assert result["tool_call_id"] == "call-1" - - -def test_shape_session_message_filters_none_non_content_keys(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - msg = {"id": 1, "role": "user", "content": None, "timestamp": None} - result = module._shape_session_message(msg) - assert "content" in result # content key kept even when None - assert "timestamp" not in result - - -def test_unwrap_mem0_results_from_dict_with_results_key(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - response = {"results": [{"memory": "a"}, {"memory": "b"}]} - assert module._unwrap_mem0_results(response) == [{"memory": "a"}, {"memory": "b"}] - - -def test_unwrap_mem0_results_from_dict_with_memories_key(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - response = {"memories": [{"memory": "x"}]} - assert module._unwrap_mem0_results(response) == [{"memory": "x"}] - - -def test_unwrap_mem0_results_from_list(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - lst = [{"memory": "m1"}, {"memory": "m2"}] - assert module._unwrap_mem0_results(lst) == lst - - -def test_unwrap_mem0_results_returns_empty_for_other_types(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._unwrap_mem0_results(None) == [] - assert module._unwrap_mem0_results("string") == [] - assert module._unwrap_mem0_results(42) == [] - - -def test_normalize_mem0_memory_from_string(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._normalize_mem0_memory("raw text memory", 0) - assert result["memory"] == "raw text memory" - assert result["id"] == "1" - assert result["score"] is None - - -def test_normalize_mem0_memory_from_full_dict(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - item = {"id": "uuid-abc", "memory": "fact", "score": 0.9, "created_at": "2026-01-01", "user_id": "u1"} - result = module._normalize_mem0_memory(item, 0) - assert result["id"] == "uuid-abc" - assert result["memory"] == "fact" - assert result["score"] == 0.9 - assert result["user_id"] == "u1" - - -def test_normalize_mem0_memory_uses_text_fallback(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - item = {"id": "x", "text": "alt text"} - result = module._normalize_mem0_memory(item, 2) - assert result["memory"] == "alt text" - - -def test_filter_mem0_memories_applies_limit(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - mems = [{"memory": f"item {i}"} for i in range(10)] - assert len(module._filter_mem0_memories(mems, None, 3)) == 3 - - -def test_filter_mem0_memories_case_insensitive_search(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - mems = [{"memory": "Python is great"}, {"memory": "Rust is fast"}, {"memory": "python tips"}] - result = module._filter_mem0_memories(mems, "python", 10) - assert len(result) == 2 - assert all("python" in m["memory"].lower() for m in result) - - -def test_normalize_honcho_card_empty_input(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._normalize_honcho_card(None) == [] - assert module._normalize_honcho_card([]) == [] - assert module._normalize_honcho_card("") == [] - - -def test_normalize_honcho_card_list_input(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._normalize_honcho_card(["fact one", "fact two", None]) - assert result == ["fact one", "fact two"] # None is filtered - - -def test_normalize_honcho_card_string_input(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._normalize_honcho_card("single fact") - assert result == ["single fact"] - - -def test_object_to_dict_passthrough_for_dict(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - d = {"id": "1", "content": "hello"} - assert module._object_to_dict(d) == d - - -def test_object_to_dict_uses_model_dump(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - - class FakeModel: - def model_dump(self): - return {"id": "m1", "content": "via model_dump"} - - result = module._object_to_dict(FakeModel()) - assert result == {"id": "m1", "content": "via model_dump"} - - -def test_object_to_dict_falls_back_to_known_attrs(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - - class FakeObj: - id = "obj-1" - content = "obj content" - - result = module._object_to_dict(FakeObj()) - assert result["id"] == "obj-1" - assert result["content"] == "obj content" - - -def test_json_safe_primitives_pass_through(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._json_safe(None) is None - assert module._json_safe(True) is True - assert module._json_safe(42) == 42 - assert module._json_safe(3.14) == 3.14 - assert module._json_safe("hello") == "hello" - - -def test_json_safe_serializes_nested_structures(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._json_safe({"a": [1, None, "x"]}) - assert result == {"a": [1, None, "x"]} - - -def test_json_safe_uses_isoformat_for_datetime_like(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - - class FakeDate: - def isoformat(self): - return "2026-01-01T00:00:00" - - assert module._json_safe(FakeDate()) == "2026-01-01T00:00:00" - - -def test_json_safe_falls_back_to_str(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - - class Opaque: - def __str__(self): - return "opaque-value" - - assert module._json_safe(Opaque()) == "opaque-value" - - -def test_json_object_returns_dict_unchanged(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - d = {"key": "value"} - assert module._json_object(d) is d - - -def test_json_object_parses_json_string(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._json_object('{"key": "value"}') - assert result == {"key": "value"} - - -def test_json_object_returns_empty_for_non_dict_json(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._json_object("[1, 2, 3]") == {} - assert module._json_object('"just a string"') == {} - - -def test_json_object_returns_empty_for_invalid_json(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._json_object("not json") == {} - assert module._json_object("") == {} - assert module._json_object(None) == {} - - -def test_mnemosyne_order_clause_uses_first_preferred_available(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._mnemosyne_order_clause(["id", "created_at", "importance"], ["updated_at", "created_at", "id"]) - assert result == 'ORDER BY "created_at" DESC' - - -def test_mnemosyne_order_clause_returns_empty_when_none_found(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._mnemosyne_order_clause(["id", "content"], ["updated_at", "created_at"]) == "" - - -def test_mnemosyne_where_returns_empty_for_no_search(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - clause, params = module._mnemosyne_where(None, ["content", "text"], ["content", "text"]) - assert clause == "" - assert params == [] - - -def test_mnemosyne_where_builds_clause_for_available_columns(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - clause, params = module._mnemosyne_where("hello", ["content", "text"], ["content", "text"]) - assert clause.startswith("WHERE") - assert '"content" LIKE ?' in clause - assert '"text" LIKE ?' in clause - assert len(params) == 2 - assert all("hello" in p for p in params) - - -def test_mnemosyne_where_skips_unavailable_columns(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - clause, params = module._mnemosyne_where("hello", ["id"], ["content", "text"]) - assert clause == "" - assert params == [] - - -def test_mnemosyne_fact_text_memoria_facts_key_value(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - row = {"key": "color", "value": "blue"} - assert module._mnemosyne_fact_text("memoria_facts", row) == "color: blue" - - -def test_mnemosyne_fact_text_memoria_facts_value_only(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - row = {"value": "some fact", "key": ""} - assert module._mnemosyne_fact_text("memoria_facts", row) == "some fact" - - -def test_mnemosyne_fact_text_memoria_instructions(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - row = {"instruction": "always be concise"} - assert module._mnemosyne_fact_text("memoria_instructions", row) == "always be concise" - - -def test_mnemosyne_fact_text_triples(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - row = {"subject": "Alice", "predicate": "knows", "object": "Bob"} - result = module._mnemosyne_fact_text("triples", row) - assert "Alice" in result - assert "knows" in result - assert "Bob" in result - - -def test_mnemosyne_fact_text_unknown_table_uses_content(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - row = {"content": "generic content"} - assert module._mnemosyne_fact_text("unknown_table", row) == "generic content" - - -def test_hindsight_connection_failed_detects_known_messages(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._hindsight_connection_failed(Exception("Connection refused by server")) - assert module._hindsight_connection_failed(Exception("Cannot connect to host")) - assert module._hindsight_connection_failed(Exception("Connect call failed")) - - -def test_hindsight_connection_failed_returns_false_for_other_errors(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert not module._hindsight_connection_failed(Exception("timeout")) - assert not module._hindsight_connection_failed(Exception("404 not found")) - - -def test_hindsight_timeout_seconds_returns_configured_value(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._hindsight_timeout_seconds({"timeout": 30}) == 30.0 - assert module._hindsight_timeout_seconds({"timeout": "45.5"}) == 45.5 - - -def test_hindsight_timeout_seconds_defaults_to_120_for_none_or_invalid(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._hindsight_timeout_seconds({}) == 120.0 - assert module._hindsight_timeout_seconds({"timeout": None}) == 120.0 - assert module._hindsight_timeout_seconds({"timeout": "bad"}) == 120.0 - - -def test_unwrap_byterover_data_extracts_nested_data(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - payload = {"data": {"results": [1, 2, 3], "count": 3}} - assert module._unwrap_byterover_data(payload) == {"results": [1, 2, 3], "count": 3} - - -def test_unwrap_byterover_data_returns_payload_without_data_key(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - payload = {"results": [1, 2]} - assert module._unwrap_byterover_data(payload) == payload - - -def test_unwrap_byterover_data_returns_empty_for_non_dict(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._unwrap_byterover_data(None) == {} - assert module._unwrap_byterover_data([1, 2]) == {} - - -def test_load_simple_env_file_parses_key_value_pairs(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - env_file = tmp_path / ".env" - env_file.write_text("KEY1=value1\nKEY2=value2\n", encoding="utf-8") - result = module._load_simple_env_file(env_file) - assert result == {"KEY1": "value1", "KEY2": "value2"} - - -def test_load_simple_env_file_strips_quotes(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - env_file = tmp_path / ".env" - env_file.write_text('KEY1="quoted"\nKEY2=\'single\'\n', encoding="utf-8") - result = module._load_simple_env_file(env_file) - assert result["KEY1"] == "quoted" - assert result["KEY2"] == "single" - - -def test_load_simple_env_file_skips_comments_and_blank_lines(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - env_file = tmp_path / ".env" - env_file.write_text("# comment\n\nKEY=val\n# another\n", encoding="utf-8") - result = module._load_simple_env_file(env_file) - assert result == {"KEY": "val"} - - -def test_load_simple_env_file_returns_empty_for_missing_file(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - result = module._load_simple_env_file(tmp_path / "nonexistent.env") - assert result == {} - - -def test_compact_byterover_excerpt_returns_empty_for_empty_input(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._compact_byterover_excerpt("", "query") == "" - assert module._compact_byterover_excerpt(" ", None) == "" - - -def test_compact_byterover_excerpt_truncates_long_text_without_match(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - long_text = "x" * 1000 - result = module._compact_byterover_excerpt(long_text, None) - assert len(result) < len(long_text) - assert result.endswith("…") - +def test_load_mem0_config_uses_mem0_mode_oss_for_flat_oss_config(monkeypatch, tmp_path): + (tmp_path / "mem0.json").write_text( + json.dumps({"mode": "oss", "vector_store": {"provider": "qdrant"}, "agent_id": "agent-x"}), + encoding="utf-8", + ) -def test_compact_byterover_excerpt_centers_around_keyword_hit(monkeypatch, tmp_path): module = load_plugin_api(monkeypatch, tmp_path) - text = "start. " + ("filler " * 50) + "uniqueterm appears here. " + ("more " * 50) + "end." - result = module._compact_byterover_excerpt(text, "uniqueterm") - assert "uniqueterm" in result + cfg = module._load_mem0_config({}) + assert cfg["mem0_mode"] == "oss" + assert cfg["oss_config"] == {"vector_store": {"provider": "qdrant"}, "agent_id": "agent-x"} -def test_compact_byterover_answer_returns_empty_for_empty_answer(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - assert module._compact_byterover_answer("query", "") == "" +def test_load_mem0_config_uses_mem0_mode_oss_for_nested_oss_config(monkeypatch, tmp_path): + (tmp_path / "mem0.json").write_text( + json.dumps({"mode": "cloud", "oss": {"vector_store": {"provider": "chroma"}}}), + encoding="utf-8", + ) -def test_compact_byterover_answer_extracts_first_line_when_no_facts(monkeypatch, tmp_path): module = load_plugin_api(monkeypatch, tmp_path) - answer = "Short answer.\nSecond line." - result = module._compact_byterover_answer("query", answer) - assert result == "Short answer." + cfg = module._load_mem0_config({}) + assert cfg["mem0_mode"] == "oss" + assert cfg["oss_config"] == {"vector_store": {"provider": "chroma"}} -def test_compact_byterover_answer_extracts_matching_fact(monkeypatch, tmp_path): - module = load_plugin_api(monkeypatch, tmp_path) - answer = "## Facts\n- The project owner is Alice.\n- The project codename is Hermes.\n" - result = module._compact_byterover_answer("who owns the project", answer) - assert "Alice" in result +def test_load_mem0_config_defaults_to_cloud_mem0_mode(monkeypatch, tmp_path): + (tmp_path / "mem0.json").write_text(json.dumps({"user_id": "u1"}), encoding="utf-8") -def test_compact_byterover_answer_codename_query(monkeypatch, tmp_path): module = load_plugin_api(monkeypatch, tmp_path) - answer = "## Facts\n- The project codename is Phoenix.\n" - result = module._compact_byterover_answer("what is the codename", answer) - assert "Phoenix" in result - + cfg = module._load_mem0_config({}) -# --------------------------------------------------------------------------- -# Existing tests follow (unchanged) -# --------------------------------------------------------------------------- + assert cfg["mem0_mode"] == "cloud" + assert cfg["oss_config"] is None def test_session_search_payload_uses_hermes_session_search_tool_without_source(monkeypatch, tmp_path):