diff --git a/src/mcp_cli/commands/resources/prompts.py b/src/mcp_cli/commands/resources/prompts.py index ea9943f0..48097866 100644 --- a/src/mcp_cli/commands/resources/prompts.py +++ b/src/mcp_cli/commands/resources/prompts.py @@ -14,6 +14,17 @@ from chuk_term.ui import output +def _prompt_message_text(content) -> str: + """Best-effort extraction of displayable text from a prompt message's content.""" + if isinstance(content, str): + return content + if isinstance(content, dict): + return content.get("text") or content.get("data") or "" + if isinstance(content, list): + return "\n".join(_prompt_message_text(c) for c in content) + return str(content) if content is not None else "" + + class PromptsCommand(UnifiedCommand): """List and manage MCP prompts.""" @@ -89,6 +100,32 @@ async def execute(self, **kwargs) -> CommandResult: error="No tool manager available. Please connect to a server first.", ) + # Get mode: fetch and render a single prompt, then stop. + get_name = kwargs.get("get") + if get_name: + result = await context.tool_manager.get_prompt(get_name) + messages = ( + result.get("messages", []) if isinstance(result, dict) else [] + ) + if not messages: + return CommandResult( + success=False, + error=f"No prompt content returned for: {get_name}", + ) + parts: list[str] = [] + description = ( + result.get("description") if isinstance(result, dict) else None + ) + if description: + parts.append(description) + for msg in messages: + role = msg.get("role", "") if isinstance(msg, dict) else "" + text = _prompt_message_text( + msg.get("content") if isinstance(msg, dict) else msg + ) + parts.append(f"[{role}] {text}" if role else text) + return CommandResult(success=True, output="\n".join(parts), data=result) + # Get prompts from tool manager prompts = await context.tool_manager.list_prompts() diff --git a/src/mcp_cli/commands/resources/resources.py b/src/mcp_cli/commands/resources/resources.py index 3c3ea0b0..4db00169 100644 --- a/src/mcp_cli/commands/resources/resources.py +++ b/src/mcp_cli/commands/resources/resources.py @@ -14,7 +14,7 @@ class ResourcesCommand(UnifiedCommand): - """List available MCP resources.""" + """List MCP resources, or read the contents of one.""" @property def name(self) -> str: @@ -26,28 +26,30 @@ def aliases(self) -> list[str]: @property def description(self) -> str: - return "List available MCP resources" + return "List MCP resources, or read one with --read " @property def help_text(self) -> str: return """ -List available MCP resources from connected servers. +List MCP resources from connected servers, or read the contents of one. Usage: - /resources [options] - List resources (chat mode) - resources [options] - List resources (interactive mode) - mcp-cli resources - List resources (CLI mode) - + /resources [options] - List resources (chat mode) + resources [options] - List resources (interactive mode) + mcp-cli resources - List resources (CLI mode) + mcp-cli resources --read - Print the contents of a single resource + Options: --server - Show resources from specific server --raw - Output as JSON --uri - Filter by URI pattern + --read - Read and print the contents of the given resource URI Examples: - /resources - List all resources - /resources --server 0 - List resources from first server - resources --raw - Output as JSON - resources --uri "file://*" - Filter file resources + /resources - List all resources + /resources --server 0 - List resources from first server + resources --raw - Output as JSON + resources --read debug://mail_log- Read a specific resource's contents """ @property @@ -72,6 +74,12 @@ def parameters(self) -> list[CommandParameter]: required=False, help="Filter by URI pattern", ), + CommandParameter( + name="read", + type=str, + required=False, + help="Read and print the contents of the given resource URI", + ), ] async def execute(self, **kwargs) -> CommandResult: @@ -88,6 +96,29 @@ async def execute(self, **kwargs) -> CommandResult: error="No tool manager available. Please connect to a server first.", ) + # Read mode: dump the contents of a single resource and stop. + read_uri = kwargs.get("read") + if read_uri: + content = await context.tool_manager.read_resource(read_uri) + contents = ( + content.get("contents", []) if isinstance(content, dict) else [] + ) + if not contents: + return CommandResult( + success=False, + error=f"No content returned for resource: {read_uri}", + ) + parts: list[str] = [] + for item in contents: + if item.get("text") is not None: + parts.append(item["text"]) + elif item.get("blob") is not None: + mime = item.get("mimeType", "application/octet-stream") + parts.append(f"<{mime}: {len(item['blob'])} base64 bytes>") + return CommandResult( + success=True, output="\n".join(parts), data=content + ) + # Get resources from tool manager resources = await context.tool_manager.list_resources() @@ -104,7 +135,12 @@ async def execute(self, **kwargs) -> CommandResult: # URI might be in extra or id field uri = resource.id or resource.extra.get("uri", "unknown") name = resource.name or "Unnamed" - type_val = resource.type or resource.extra.get("mime_type", "unknown") + type_val = ( + resource.type + or resource.extra.get("mimeType") + or resource.extra.get("mime_type") + or "unknown" + ) table_data.append( { diff --git a/src/mcp_cli/main.py b/src/mcp_cli/main.py index 619f3918..c1010f59 100644 --- a/src/mcp_cli/main.py +++ b/src/mcp_cli/main.py @@ -1201,12 +1201,17 @@ async def _servers_wrapper(**params): # Resources command -@app.command("resources", help="List available resources") +@app.command("resources", help="List resources, or read one with --read ") def resources_command( config_file: str = typer.Option( "server_config.json", help="Configuration file path" ), server: str | None = typer.Option(None, help="Server to connect to"), + read: str | None = typer.Option( + None, + "--read", + help="Read and print the contents of the given resource URI", + ), provider: str = typer.Option("openai", help="LLM provider name"), model: str | None = typer.Option(None, help="Model name"), disable_filesystem: bool = typer.Option(False, help="Disable filesystem access"), @@ -1217,7 +1222,7 @@ def resources_command( log_level: str = typer.Option("WARNING", "--log-level", help="Set log level"), theme: str = typer.Option("default", "--theme", help="UI theme"), ) -> None: - """Show all recorded resources.""" + """Show all recorded resources, or read one with --read .""" # Configure logging and theme for this command _setup_command_logging(quiet, verbose, log_level, theme) @@ -1229,7 +1234,7 @@ def resources_command( from mcp_cli.adapters.cli import cli_execute async def _resources_wrapper(**params): - return await cli_execute("resources") + return await cli_execute("resources", read=read) run_command_sync( _resources_wrapper, @@ -1243,12 +1248,17 @@ async def _resources_wrapper(**params): # Prompts command -@app.command("prompts", help="List available prompts") +@app.command("prompts", help="List prompts, or fetch one with --get ") def prompts_command( config_file: str = typer.Option( "server_config.json", help="Configuration file path" ), server: str | None = typer.Option(None, help="Server to connect to"), + get: str | None = typer.Option( + None, + "--get", + help="Fetch and print a specific prompt by name", + ), provider: str = typer.Option("openai", help="LLM provider name"), model: str | None = typer.Option(None, help="Model name"), disable_filesystem: bool = typer.Option(False, help="Disable filesystem access"), @@ -1259,7 +1269,7 @@ def prompts_command( log_level: str = typer.Option("WARNING", "--log-level", help="Set log level"), theme: str = typer.Option("default", "--theme", help="UI theme"), ) -> None: - """Show all prompt templates.""" + """Show all prompt templates, or fetch one with --get .""" # Configure logging and theme for this command _setup_command_logging(quiet, verbose, log_level, theme) @@ -1271,7 +1281,7 @@ def prompts_command( from mcp_cli.adapters.cli import cli_execute async def _prompts_wrapper(**params): - return await cli_execute("prompts") + return await cli_execute("prompts", get=get) run_command_sync( _prompts_wrapper, diff --git a/src/mcp_cli/tools/manager.py b/src/mcp_cli/tools/manager.py index 9dc2f479..8c7cbb4f 100644 --- a/src/mcp_cli/tools/manager.py +++ b/src/mcp_cli/tools/manager.py @@ -44,6 +44,8 @@ ) from mcp_cli.tools.filter import DisabledReason, ToolFilter from mcp_cli.tools.models import ( + PromptInfo, + ResourceInfo, ServerInfo, ToolCallResult, ToolDefinitionInput, @@ -1258,15 +1260,17 @@ def get_streams(self): logger.error(f"Error getting streams: {e}") return [] - def list_resources(self): - """List available resources from servers.""" + async def list_resources(self) -> list[ResourceInfo]: + """List available resources from servers as ResourceInfo objects.""" if not self.stream_manager: return [] try: - if hasattr(self.stream_manager, "list_resources"): - return self.stream_manager.list_resources() - return [] + if not hasattr(self.stream_manager, "list_resources"): + return [] + # StreamManager.list_resources() is async and returns list[dict]. + raw = await self.stream_manager.list_resources() + return [ResourceInfo.from_raw(item) for item in (raw or [])] except Exception as e: logger.error(f"Error listing resources: {e}") return [] @@ -1300,15 +1304,50 @@ async def read_resource( logger.error("Error reading resource %s from %s: %s", uri, server_name, e) return {} - def list_prompts(self): - """List available prompts from servers.""" + async def get_prompt( + self, + name: str, + arguments: dict[str, Any] | None = None, + server_name: str | None = None, + ) -> dict[str, Any]: + """Get a prompt by name. + + Args: + name: Prompt name to fetch. + arguments: Optional arguments used to render the prompt template. + server_name: Optional server name to target. + + Returns: + Prompt content dict from the server. + """ if not self.stream_manager: - return [] + logger.debug("get_prompt: no stream_manager available") + return {} try: - if hasattr(self.stream_manager, "list_prompts"): - return self.stream_manager.list_prompts() + result: dict[str, Any] = await self.stream_manager.get_prompt( + name, arguments, server_name + ) + if not result: + logger.debug( + "get_prompt returned empty for %s (server=%s)", name, server_name + ) + return result + except Exception as e: + logger.error("Error getting prompt %s from %s: %s", name, server_name, e) + return {} + + async def list_prompts(self) -> list[PromptInfo]: + """List available prompts from servers as PromptInfo objects.""" + if not self.stream_manager: return [] + + try: + if not hasattr(self.stream_manager, "list_prompts"): + return [] + # StreamManager.list_prompts() is async and returns list[dict]. + raw = await self.stream_manager.list_prompts() + return [PromptInfo.from_raw(item) for item in (raw or [])] except Exception as e: logger.error(f"Error listing prompts: {e}") return [] diff --git a/src/mcp_cli/tools/models.py b/src/mcp_cli/tools/models.py index f5eb2b5f..9f75396b 100644 --- a/src/mcp_cli/tools/models.py +++ b/src/mcp_cli/tools/models.py @@ -468,13 +468,63 @@ def from_raw(cls, raw: Any) -> "ResourceInfo": in ``extra["value"]`` so it is never lost. """ if isinstance(raw, dict): - known = {k: raw.get(k) for k in ("id", "name", "type")} - extra = {k: v for k, v in raw.items() if k not in known} + # MCP resources are keyed by ``uri`` and typed by ``mimeType``; map + # those onto the canonical id/type fields so the UI doesn't need to + # know the wire shape. The original keys are still kept in ``extra``. + known = { + "id": raw.get("id") or raw.get("uri"), + "name": raw.get("name"), + "type": raw.get("type") or raw.get("mimeType"), + } + extra = {k: v for k, v in raw.items() if k not in ("id", "name", "type")} return cls(**known, extra=extra) # primitive - wrap it return cls(extra={"value": raw}) +class PromptInfo(BaseModel): + """ + Canonical representation of *one* prompt entry as returned by + ``prompts.list``. + + Mirrors :class:`ResourceInfo`: the common fields used in the UI are + normalised and **all additional keys** are preserved inside ``extra``. + """ + + # Common attributes we frequently need in the UI + name: str | None = None + description: str | None = None + arguments: list[Any] = Field(default_factory=list) + + # Anything else goes here … + extra: dict[str, Any] = Field(default_factory=dict) + + model_config = {"frozen": False, "arbitrary_types_allowed": True} + + # ------------------------------------------------------------------ # + # Factory helpers + # ------------------------------------------------------------------ # + @classmethod + def from_raw(cls, raw: Any) -> "PromptInfo": + """ + Convert a raw list item (dict | str | …) into a PromptInfo. + + If *raw* is not a mapping we treat it as an opaque scalar and store it + in ``extra["value"]`` so it is never lost. + """ + if isinstance(raw, dict): + known = ("name", "description", "arguments") + extra = {k: v for k, v in raw.items() if k not in known} + return cls( + name=raw.get("name"), + description=raw.get("description"), + arguments=raw.get("arguments") or [], + extra=extra, + ) + # primitive - wrap it + return cls(extra={"value": raw}) + + # ────────────────────────────────────────────────────────────────────────────── # Transport and Server Configuration Models # ────────────────────────────────────────────────────────────────────────────── diff --git a/tests/commands/definitions/test_prompts_command.py b/tests/commands/definitions/test_prompts_command.py index edec0957..a922209e 100644 --- a/tests/commands/definitions/test_prompts_command.py +++ b/tests/commands/definitions/test_prompts_command.py @@ -114,3 +114,44 @@ async def test_execute_error_handling(self, command): assert result.success is False assert "Server error" in result.error + + @pytest.mark.asyncio + async def test_execute_get_renders_messages(self, command): + """--get fetches a prompt and renders its messages.""" + from unittest.mock import AsyncMock + + with patch("mcp_cli.context.get_context") as mock_get_ctx: + mock_ctx = mock_get_ctx.return_value + mock_ctx.tool_manager.get_prompt = AsyncMock( + return_value={ + "description": "Greeter", + "messages": [ + {"role": "user", "content": {"type": "text", "text": "Hello"}} + ], + } + ) + # list_prompts must NOT be consulted in get mode + mock_ctx.tool_manager.list_prompts = AsyncMock( + side_effect=AssertionError( + "list_prompts should not be called in get mode" + ) + ) + result = await command.execute(get="greet") + + assert result.success is True + assert "Greeter" in result.output + assert "[user] Hello" in result.output + mock_ctx.tool_manager.get_prompt.assert_awaited_once_with("greet") + + @pytest.mark.asyncio + async def test_execute_get_not_found(self, command): + """--get on a missing/empty prompt is a failure, not a crash.""" + from unittest.mock import AsyncMock + + with patch("mcp_cli.context.get_context") as mock_get_ctx: + mock_ctx = mock_get_ctx.return_value + mock_ctx.tool_manager.get_prompt = AsyncMock(return_value={}) + result = await command.execute(get="nope") + + assert result.success is False + assert "nope" in result.error diff --git a/tests/commands/definitions/test_resources_command.py b/tests/commands/definitions/test_resources_command.py index 2fd32f6e..be8cdd8f 100644 --- a/tests/commands/definitions/test_resources_command.py +++ b/tests/commands/definitions/test_resources_command.py @@ -24,6 +24,7 @@ def test_command_properties(self, command): assert "server" in params assert "raw" in params assert "uri" in params + assert "read" in params @pytest.mark.asyncio async def test_execute_list_all(self, command): @@ -126,3 +127,42 @@ async def test_execute_error_handling(self, command): assert result.success is False assert "Server not connected" in result.error + + @pytest.mark.asyncio + async def test_execute_read_returns_contents(self, command): + """--read prints the resource's text contents.""" + with patch("mcp_cli.context.get_context") as mock_get_ctx: + mock_ctx = mock_get_ctx.return_value + mock_ctx.tool_manager.read_resource = AsyncMock( + return_value={ + "contents": [ + { + "uri": "debug://mail_log", + "mimeType": "text/plain", + "text": "line1\nline2", + } + ] + } + ) + # list_resources must NOT be consulted in read mode + mock_ctx.tool_manager.list_resources = AsyncMock( + side_effect=AssertionError( + "list_resources should not be called in read mode" + ) + ) + result = await command.execute(read="debug://mail_log") + + assert result.success is True + assert result.output == "line1\nline2" + mock_ctx.tool_manager.read_resource.assert_awaited_once_with("debug://mail_log") + + @pytest.mark.asyncio + async def test_execute_read_not_found(self, command): + """--read on a missing/empty resource is a failure, not a crash.""" + with patch("mcp_cli.context.get_context") as mock_get_ctx: + mock_ctx = mock_get_ctx.return_value + mock_ctx.tool_manager.read_resource = AsyncMock(return_value={}) + result = await command.execute(read="debug://nope") + + assert result.success is False + assert "debug://nope" in result.error diff --git a/tests/tools/test_models.py b/tests/tools/test_models.py index 76283673..ffc598b5 100644 --- a/tests/tools/test_models.py +++ b/tests/tools/test_models.py @@ -13,6 +13,7 @@ ExperimentalCapabilities, FunctionDefinition, LLMToolDefinition, + PromptInfo, ResourceInfo, ServerCapabilities, ServerInfo, @@ -324,6 +325,26 @@ class TestResourceInfo: {"id": "i1", "name": "n1", "type": "t1", "extra": {"foo": 42}}, ), ({}, {"id": None, "name": None, "type": None, "extra": {}}), + # MCP wire shape: uri -> id, mimeType -> type (originals kept in extra) + ( + {"uri": "debug://x", "name": "x", "mimeType": "text/plain"}, + { + "id": "debug://x", + "name": "x", + "type": "text/plain", + "extra": {"uri": "debug://x", "mimeType": "text/plain"}, + }, + ), + # Explicit id/type win over uri/mimeType + ( + {"id": "i", "uri": "u", "type": "t", "mimeType": "m", "name": "n"}, + { + "id": "i", + "name": "n", + "type": "t", + "extra": {"uri": "u", "mimeType": "m"}, + }, + ), ], ) def test_resourceinfo_from_raw_dict(self, raw, expected): @@ -358,6 +379,40 @@ def test_resourceinfo_direct_creation(self): assert ri.extra["mime"] == "text/plain" +class TestPromptInfo: + """Test PromptInfo Pydantic model.""" + + def test_promptinfo_from_raw_dict(self): + """from_raw normalizes the common fields and preserves the rest in extra.""" + pi = PromptInfo.from_raw( + { + "name": "greet", + "description": "Greet someone", + "arguments": [{"name": "who", "required": True}], + "title": "Greeter", + } + ) + assert pi.name == "greet" + assert pi.description == "Greet someone" + assert pi.arguments == [{"name": "who", "required": True}] + assert pi.extra == {"title": "Greeter"} + + def test_promptinfo_from_raw_defaults_arguments(self): + """Missing arguments default to an empty list (never None).""" + pi = PromptInfo.from_raw({"name": "p"}) + assert pi.name == "p" + assert pi.description is None + assert pi.arguments == [] + + @pytest.mark.parametrize("primitive", ["a string", 7, None]) + def test_promptinfo_from_raw_primitive(self, primitive): + """Primitives are wrapped under extra['value'].""" + pi = PromptInfo.from_raw(primitive) + assert pi.name is None and pi.description is None + assert pi.arguments == [] + assert pi.extra == {"value": primitive} + + # ---------------------------------------------------------------------------- # Additional coverage tests for 90%+ # ---------------------------------------------------------------------------- diff --git a/tests/tools/test_tool_manager.py b/tests/tools/test_tool_manager.py index 2ee9e9f3..88e55bae 100644 --- a/tests/tools/test_tool_manager.py +++ b/tests/tools/test_tool_manager.py @@ -10,7 +10,7 @@ from mcp_cli.tools.filter import DisabledReason from mcp_cli.tools.manager import ToolManager -from mcp_cli.tools.models import ToolInfo +from mcp_cli.tools.models import PromptInfo, ResourceInfo, ToolInfo class DummyMeta: @@ -701,88 +701,137 @@ def test_get_streams_exception(self): assert result == [] - def test_list_resources_no_stream_manager(self): - """Test list_resources without stream manager.""" + async def test_list_resources_no_stream_manager(self): + """list_resources returns [] when there is no stream manager.""" tm = ToolManager(config_file="test.json", servers=[]) tm.stream_manager = None - result = tm.list_resources() + result = await tm.list_resources() assert result == [] - def test_list_resources_with_method(self): - """Test list_resources with stream manager that has method.""" + async def test_list_resources_with_method(self): + """list_resources awaits the stream manager and returns ResourceInfo objects.""" tm = ToolManager(config_file="test.json", servers=[]) mock_sm = MagicMock() - mock_sm.list_resources.return_value = ["resource1"] + mock_sm.list_resources = AsyncMock( + return_value=[ + {"uri": "file:///a.txt", "name": "a.txt", "mimeType": "text/plain"} + ] + ) tm.stream_manager = mock_sm - result = tm.list_resources() + result = await tm.list_resources() - assert result == ["resource1"] + assert all(isinstance(r, ResourceInfo) for r in result) + assert result[0].name == "a.txt" + # uri -> id and mimeType -> type are normalized onto the canonical fields, + # while the original keys remain available in extra. + assert result[0].id == "file:///a.txt" + assert result[0].type == "text/plain" + assert result[0].extra["uri"] == "file:///a.txt" + assert result[0].extra["mimeType"] == "text/plain" - def test_list_resources_no_method(self): - """Test list_resources without method.""" + async def test_list_resources_no_method(self): + """list_resources returns [] when the stream manager lacks the method.""" tm = ToolManager(config_file="test.json", servers=[]) - mock_sm = MagicMock(spec=[]) - tm.stream_manager = mock_sm + tm.stream_manager = MagicMock(spec=[]) - result = tm.list_resources() + result = await tm.list_resources() assert result == [] - def test_list_resources_exception(self): - """Test list_resources handles exceptions.""" + async def test_list_resources_exception(self): + """list_resources swallows stream-manager errors and returns [].""" tm = ToolManager(config_file="test.json", servers=[]) mock_sm = MagicMock() - mock_sm.list_resources.side_effect = RuntimeError("error") + mock_sm.list_resources = AsyncMock(side_effect=RuntimeError("error")) tm.stream_manager = mock_sm - result = tm.list_resources() + result = await tm.list_resources() assert result == [] - def test_list_prompts_no_stream_manager(self): - """Test list_prompts without stream manager.""" + async def test_list_prompts_no_stream_manager(self): + """list_prompts returns [] when there is no stream manager.""" tm = ToolManager(config_file="test.json", servers=[]) tm.stream_manager = None - result = tm.list_prompts() + result = await tm.list_prompts() assert result == [] - def test_list_prompts_with_method(self): - """Test list_prompts with stream manager that has method.""" + async def test_list_prompts_with_method(self): + """list_prompts awaits the stream manager and returns PromptInfo objects.""" tm = ToolManager(config_file="test.json", servers=[]) mock_sm = MagicMock() - mock_sm.list_prompts.return_value = ["prompt1"] + mock_sm.list_prompts = AsyncMock( + return_value=[ + { + "name": "greet", + "description": "Greet someone", + "arguments": [{"name": "who"}], + } + ] + ) tm.stream_manager = mock_sm - result = tm.list_prompts() + result = await tm.list_prompts() - assert result == ["prompt1"] + assert all(isinstance(p, PromptInfo) for p in result) + assert result[0].name == "greet" + assert result[0].description == "Greet someone" + assert result[0].arguments == [{"name": "who"}] - def test_list_prompts_no_method(self): - """Test list_prompts without method.""" + async def test_list_prompts_no_method(self): + """list_prompts returns [] when the stream manager lacks the method.""" tm = ToolManager(config_file="test.json", servers=[]) - mock_sm = MagicMock(spec=[]) - tm.stream_manager = mock_sm + tm.stream_manager = MagicMock(spec=[]) - result = tm.list_prompts() + result = await tm.list_prompts() assert result == [] - def test_list_prompts_exception(self): - """Test list_prompts handles exceptions.""" + async def test_list_prompts_exception(self): + """list_prompts swallows stream-manager errors and returns [].""" tm = ToolManager(config_file="test.json", servers=[]) mock_sm = MagicMock() - mock_sm.list_prompts.side_effect = RuntimeError("error") + mock_sm.list_prompts = AsyncMock(side_effect=RuntimeError("error")) tm.stream_manager = mock_sm - result = tm.list_prompts() + result = await tm.list_prompts() assert result == [] + async def test_get_prompt_no_stream_manager(self): + """get_prompt returns {} when there is no stream manager.""" + tm = ToolManager(config_file="test.json", servers=[]) + tm.stream_manager = None + + assert await tm.get_prompt("p") == {} + + async def test_get_prompt_with_method(self): + """get_prompt awaits the stream manager and returns its dict result.""" + tm = ToolManager(config_file="test.json", servers=[]) + mock_sm = MagicMock() + payload = {"messages": [{"role": "user", "content": {"text": "hi"}}]} + mock_sm.get_prompt = AsyncMock(return_value=payload) + tm.stream_manager = mock_sm + + result = await tm.get_prompt("greet", {"who": "world"}) + + assert result == payload + mock_sm.get_prompt.assert_awaited_once_with("greet", {"who": "world"}, None) + + async def test_get_prompt_exception(self): + """get_prompt swallows stream-manager errors and returns {}.""" + tm = ToolManager(config_file="test.json", servers=[]) + mock_sm = MagicMock() + mock_sm.get_prompt = AsyncMock(side_effect=RuntimeError("error")) + tm.stream_manager = mock_sm + + assert await tm.get_prompt("p") == {} + class TestToolManagerGetAllToolsErrors: """Test get_all_tools error handling."""