diff --git a/src/hyperping/mcp_server/__init__.py b/src/hyperping/mcp_server/__init__.py index 5d05ac0..474f006 100644 --- a/src/hyperping/mcp_server/__init__.py +++ b/src/hyperping/mcp_server/__init__.py @@ -19,14 +19,16 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient + from hyperping._async_mcp_client import AsyncHyperpingMcpClient from hyperping.client import HyperpingClient from hyperping.mcp_client import HyperpingMcpClient def create_mcp_server( api_key: str | None = None, - client: HyperpingClient | None = None, - mcp_client: HyperpingMcpClient | None = None, + client: HyperpingClient | AsyncHyperpingClient | None = None, + mcp_client: HyperpingMcpClient | AsyncHyperpingMcpClient | None = None, tools: list[str] | None = None, name: str = "hyperping", ) -> FastMCP: @@ -35,12 +37,19 @@ def create_mcp_server( Args: api_key: Hyperping API key. Used to create internal REST and MCP clients when *client* and *mcp_client* are not supplied. - client: Pre-configured :class:`~hyperping.client.HyperpingClient`. - Takes precedence over *api_key* for REST operations. - mcp_client: Pre-configured :class:`~hyperping.mcp_client.HyperpingMcpClient`. + When only *api_key* is provided, sync clients are created. + client: Pre-configured :class:`~hyperping.client.HyperpingClient` or + :class:`~hyperping._async_client.AsyncHyperpingClient`. + Takes precedence over *api_key* for REST operations. When an + async client is passed, all REST tools are registered as + coroutine functions so FastMCP can run them on the event loop. + mcp_client: Pre-configured :class:`~hyperping.mcp_client.HyperpingMcpClient` + or :class:`~hyperping._async_mcp_client.AsyncHyperpingMcpClient`. Required for the observability tool group. When ``None`` and - *api_key* is provided, one is created internally. When ``None`` - and only *client* is provided, the observability group is skipped. + *api_key* is provided, a sync client is created internally. When + ``None`` and only *client* is provided, the observability group + is skipped. When an async client is passed, observability tools + are registered as coroutines. tools: List of tool group names to register. ``None`` (default) registers all groups. Valid group names: ``monitors``, ``incidents``, ``maintenance``, ``outages``, ``statuspages``, diff --git a/src/hyperping/mcp_server/_registry.py b/src/hyperping/mcp_server/_registry.py index 4b19863..5e966c7 100644 --- a/src/hyperping/mcp_server/_registry.py +++ b/src/hyperping/mcp_server/_registry.py @@ -12,6 +12,8 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient + from hyperping._async_mcp_client import AsyncHyperpingMcpClient from hyperping.client import HyperpingClient from hyperping.mcp_client import HyperpingMcpClient @@ -30,8 +32,8 @@ def register_tools( mcp: FastMCP, - client: HyperpingClient, - mcp_client: HyperpingMcpClient | None, + client: HyperpingClient | AsyncHyperpingClient, + mcp_client: HyperpingMcpClient | AsyncHyperpingMcpClient | None, groups: list[str] | None, ) -> None: """Register tool groups on *mcp*. diff --git a/src/hyperping/mcp_server/_tools_healthchecks.py b/src/hyperping/mcp_server/_tools_healthchecks.py index 701c047..498e070 100644 --- a/src/hyperping/mcp_server/_tools_healthchecks.py +++ b/src/hyperping/mcp_server/_tools_healthchecks.py @@ -11,74 +11,153 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient from hyperping.client import HyperpingClient -def register_healthcheck_tools(mcp: FastMCP, client: HyperpingClient) -> None: +def register_healthcheck_tools( + mcp: FastMCP, + client: HyperpingClient | AsyncHyperpingClient, +) -> None: """Register healthcheck tools on *mcp*.""" - - @mcp.tool() - def list_healthchecks() -> list[dict[str, Any]]: - """List all healthchecks (push-based cron/heartbeat monitors).""" - return [h.model_dump() for h in client.list_healthchecks()] - - @mcp.tool() - def get_healthcheck(healthcheck_id: str) -> dict[str, Any]: - """Get a single healthcheck by UUID.""" - return client.get_healthcheck(healthcheck_id).model_dump() - - @mcp.tool() - def create_healthcheck( - name: str, - period: int, - grace: int, - escalation_policy: str | None = None, - project_uuid: str | None = None, - ) -> dict[str, Any]: - """This will create a new healthcheck. period and grace are in seconds.""" - from hyperping.models import HealthcheckCreate - - fields: dict[str, Any] = {"name": name, "period": period, "grace": grace} - if escalation_policy is not None: - fields["escalation_policy"] = escalation_policy - if project_uuid is not None: - fields["project_uuid"] = project_uuid - return client.create_healthcheck(HealthcheckCreate(**fields)).model_dump() - - @mcp.tool() - def update_healthcheck( - healthcheck_id: str, - name: str | None = None, - period: int | None = None, - grace: int | None = None, - escalation_policy: str | None = None, - ) -> dict[str, Any]: - """Update an existing healthcheck. Only supplied fields are changed.""" - from hyperping.models import HealthcheckUpdate - - fields: dict[str, Any] = {} - if name is not None: - fields["name"] = name - if period is not None: - fields["period"] = period - if grace is not None: - fields["grace"] = grace - if escalation_policy is not None: - fields["escalation_policy"] = escalation_policy - return client.update_healthcheck(healthcheck_id, HealthcheckUpdate(**fields)).model_dump() - - @mcp.tool() - def delete_healthcheck(healthcheck_id: str) -> dict[str, Any]: - """This will permanently delete a healthcheck.""" - client.delete_healthcheck(healthcheck_id) - return {"success": True} - - @mcp.tool() - def pause_healthcheck(healthcheck_id: str) -> dict[str, Any]: - """Pause a healthcheck so it stops alerting on missed pings.""" - return client.pause_healthcheck(healthcheck_id).model_dump() - - @mcp.tool() - def resume_healthcheck(healthcheck_id: str) -> dict[str, Any]: - """Resume a paused healthcheck.""" - return client.resume_healthcheck(healthcheck_id).model_dump() + from hyperping._async_client import AsyncHyperpingClient + + if isinstance(client, AsyncHyperpingClient): + + @mcp.tool() + async def list_healthchecks() -> list[dict[str, Any]]: + """List all healthchecks (push-based cron/heartbeat monitors).""" + return [h.model_dump() for h in await client.list_healthchecks()] + + @mcp.tool() + async def get_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """Get a single healthcheck by UUID.""" + return (await client.get_healthcheck(healthcheck_id)).model_dump() + + @mcp.tool() + async def create_healthcheck( + name: str, + period: int, + grace: int, + escalation_policy: str | None = None, + project_uuid: str | None = None, + ) -> dict[str, Any]: + """This will create a new healthcheck. period and grace are in seconds.""" + from hyperping.models import HealthcheckCreate + + fields: dict[str, Any] = {"name": name, "period": period, "grace": grace} + if escalation_policy is not None: + fields["escalation_policy"] = escalation_policy + if project_uuid is not None: + fields["project_uuid"] = project_uuid + return (await client.create_healthcheck(HealthcheckCreate(**fields))).model_dump() + + @mcp.tool() + async def update_healthcheck( + healthcheck_id: str, + name: str | None = None, + period: int | None = None, + grace: int | None = None, + escalation_policy: str | None = None, + ) -> dict[str, Any]: + """Update an existing healthcheck. Only supplied fields are changed.""" + from hyperping.models import HealthcheckUpdate + + fields: dict[str, Any] = {} + if name is not None: + fields["name"] = name + if period is not None: + fields["period"] = period + if grace is not None: + fields["grace"] = grace + if escalation_policy is not None: + fields["escalation_policy"] = escalation_policy + return ( + await client.update_healthcheck(healthcheck_id, HealthcheckUpdate(**fields)) + ).model_dump() + + @mcp.tool() + async def delete_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """This will permanently delete a healthcheck.""" + await client.delete_healthcheck(healthcheck_id) + return {"success": True} + + @mcp.tool() + async def pause_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """Pause a healthcheck so it stops alerting on missed pings.""" + return (await client.pause_healthcheck(healthcheck_id)).model_dump() + + @mcp.tool() + async def resume_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """Resume a paused healthcheck.""" + return (await client.resume_healthcheck(healthcheck_id)).model_dump() + + else: + + @mcp.tool() + def list_healthchecks() -> list[dict[str, Any]]: + """List all healthchecks (push-based cron/heartbeat monitors).""" + return [h.model_dump() for h in client.list_healthchecks()] + + @mcp.tool() + def get_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """Get a single healthcheck by UUID.""" + return client.get_healthcheck(healthcheck_id).model_dump() + + @mcp.tool() + def create_healthcheck( + name: str, + period: int, + grace: int, + escalation_policy: str | None = None, + project_uuid: str | None = None, + ) -> dict[str, Any]: + """This will create a new healthcheck. period and grace are in seconds.""" + from hyperping.models import HealthcheckCreate + + fields: dict[str, Any] = {"name": name, "period": period, "grace": grace} + if escalation_policy is not None: + fields["escalation_policy"] = escalation_policy + if project_uuid is not None: + fields["project_uuid"] = project_uuid + return client.create_healthcheck(HealthcheckCreate(**fields)).model_dump() + + @mcp.tool() + def update_healthcheck( + healthcheck_id: str, + name: str | None = None, + period: int | None = None, + grace: int | None = None, + escalation_policy: str | None = None, + ) -> dict[str, Any]: + """Update an existing healthcheck. Only supplied fields are changed.""" + from hyperping.models import HealthcheckUpdate + + fields: dict[str, Any] = {} + if name is not None: + fields["name"] = name + if period is not None: + fields["period"] = period + if grace is not None: + fields["grace"] = grace + if escalation_policy is not None: + fields["escalation_policy"] = escalation_policy + return client.update_healthcheck( + healthcheck_id, HealthcheckUpdate(**fields) + ).model_dump() + + @mcp.tool() + def delete_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """This will permanently delete a healthcheck.""" + client.delete_healthcheck(healthcheck_id) + return {"success": True} + + @mcp.tool() + def pause_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """Pause a healthcheck so it stops alerting on missed pings.""" + return client.pause_healthcheck(healthcheck_id).model_dump() + + @mcp.tool() + def resume_healthcheck(healthcheck_id: str) -> dict[str, Any]: + """Resume a paused healthcheck.""" + return client.resume_healthcheck(healthcheck_id).model_dump() diff --git a/src/hyperping/mcp_server/_tools_incidents.py b/src/hyperping/mcp_server/_tools_incidents.py index 05211b4..3cfcaba 100644 --- a/src/hyperping/mcp_server/_tools_incidents.py +++ b/src/hyperping/mcp_server/_tools_incidents.py @@ -11,94 +11,191 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient from hyperping.client import HyperpingClient -def register_incident_tools(mcp: FastMCP, client: HyperpingClient) -> None: +def register_incident_tools( + mcp: FastMCP, + client: HyperpingClient | AsyncHyperpingClient, +) -> None: """Register incident tools on *mcp*.""" - - @mcp.tool() - def list_incidents(status: str | None = None) -> list[dict[str, Any]]: - """List incidents. Filter by status: investigating, identified, monitoring, resolved.""" - return [i.model_dump() for i in client.list_incidents(status=status)] - - @mcp.tool() - def get_incident(incident_id: str) -> dict[str, Any]: - """Get a single incident by UUID.""" - return client.get_incident(incident_id).model_dump() - - @mcp.tool() - def create_incident( - title: dict[str, Any], - text: dict[str, Any], - statuspages: list[str], - type: str | None = None, - affected_components: list[str] | None = None, - date: str | None = None, - ) -> dict[str, Any]: - """This will create a new incident. title/text are LocalizedText dicts with an 'en' key.""" - from hyperping.models import IncidentCreate, LocalizedText - - fields: dict[str, Any] = { - "title": LocalizedText(**title), - "text": LocalizedText(**text), - "statuspages": statuspages, - } - if type is not None: - fields["type"] = type - if affected_components is not None: - fields["affected_components"] = affected_components - if date is not None: - fields["date"] = date - return client.create_incident(IncidentCreate(**fields)).model_dump() - - @mcp.tool() - def update_incident( - incident_id: str, - title: dict[str, Any] | None = None, - type: str | None = None, - affected_components: list[str] | None = None, - statuspages: list[str] | None = None, - ) -> dict[str, Any]: - """Update an incident. title is a LocalizedText dict with an 'en' key.""" - from hyperping.models import IncidentUpdateRequest, LocalizedText - - fields: dict[str, Any] = {} - if title is not None: - fields["title"] = LocalizedText(**title) - if type is not None: - fields["type"] = type - if affected_components is not None: - fields["affected_components"] = affected_components - if statuspages is not None: - fields["statuspages"] = statuspages - return client.update_incident(incident_id, IncidentUpdateRequest(**fields)).model_dump() - - @mcp.tool() - def add_incident_update( - incident_id: str, - text: dict[str, Any], - type: str, - date: str, - ) -> dict[str, Any]: - """Add an update to an incident. text is a LocalizedText dict with 'en' key. - type: investigating, identified, monitoring, resolved.""" - from hyperping.models import AddIncidentUpdateRequest, LocalizedText - - update = AddIncidentUpdateRequest( - text=LocalizedText(**text), - type=type, - date=date, - ) - return client.add_incident_update(incident_id, update).model_dump() - - @mcp.tool() - def resolve_incident(incident_id: str, message: str | None = None) -> dict[str, Any]: - """This will resolve an incident and post a resolution update.""" - return client.resolve_incident(incident_id, message=message).model_dump() - - @mcp.tool() - def delete_incident(incident_id: str) -> dict[str, Any]: - """This will permanently delete an incident.""" - client.delete_incident(incident_id) - return {"success": True} + from hyperping._async_client import AsyncHyperpingClient + + if isinstance(client, AsyncHyperpingClient): + + @mcp.tool() + async def list_incidents(status: str | None = None) -> list[dict[str, Any]]: + """List incidents. Filter by status: investigating, identified, monitoring, resolved.""" + return [i.model_dump() for i in await client.list_incidents(status=status)] + + @mcp.tool() + async def get_incident(incident_id: str) -> dict[str, Any]: + """Get a single incident by UUID.""" + return (await client.get_incident(incident_id)).model_dump() + + @mcp.tool() + async def create_incident( + title: dict[str, Any], + text: dict[str, Any], + statuspages: list[str], + type: str | None = None, + affected_components: list[str] | None = None, + date: str | None = None, + ) -> dict[str, Any]: + """This will create a new incident. title/text are dicts with an 'en' key.""" + from hyperping.models import IncidentCreate, LocalizedText + + fields: dict[str, Any] = { + "title": LocalizedText(**title), + "text": LocalizedText(**text), + "statuspages": statuspages, + } + if type is not None: + fields["type"] = type + if affected_components is not None: + fields["affected_components"] = affected_components + if date is not None: + fields["date"] = date + return (await client.create_incident(IncidentCreate(**fields))).model_dump() + + @mcp.tool() + async def update_incident( + incident_id: str, + title: dict[str, Any] | None = None, + type: str | None = None, + affected_components: list[str] | None = None, + statuspages: list[str] | None = None, + ) -> dict[str, Any]: + """Update an incident. title is a LocalizedText dict with an 'en' key.""" + from hyperping.models import IncidentUpdateRequest, LocalizedText + + fields: dict[str, Any] = {} + if title is not None: + fields["title"] = LocalizedText(**title) + if type is not None: + fields["type"] = type + if affected_components is not None: + fields["affected_components"] = affected_components + if statuspages is not None: + fields["statuspages"] = statuspages + return ( + await client.update_incident(incident_id, IncidentUpdateRequest(**fields)) + ).model_dump() + + @mcp.tool() + async def add_incident_update( + incident_id: str, + text: dict[str, Any], + type: str, + date: str, + ) -> dict[str, Any]: + """Add an update to an incident. text is a LocalizedText dict with 'en' key. + type: investigating, identified, monitoring, resolved.""" + from hyperping.models import AddIncidentUpdateRequest, LocalizedText + + update = AddIncidentUpdateRequest( + text=LocalizedText(**text), + type=type, + date=date, + ) + return (await client.add_incident_update(incident_id, update)).model_dump() + + @mcp.tool() + async def resolve_incident(incident_id: str, message: str | None = None) -> dict[str, Any]: + """This will resolve an incident and post a resolution update.""" + return (await client.resolve_incident(incident_id, message=message)).model_dump() + + @mcp.tool() + async def delete_incident(incident_id: str) -> dict[str, Any]: + """This will permanently delete an incident.""" + await client.delete_incident(incident_id) + return {"success": True} + + else: + + @mcp.tool() + def list_incidents(status: str | None = None) -> list[dict[str, Any]]: + """List incidents. Filter by status: investigating, identified, monitoring, resolved.""" + return [i.model_dump() for i in client.list_incidents(status=status)] + + @mcp.tool() + def get_incident(incident_id: str) -> dict[str, Any]: + """Get a single incident by UUID.""" + return client.get_incident(incident_id).model_dump() + + @mcp.tool() + def create_incident( + title: dict[str, Any], + text: dict[str, Any], + statuspages: list[str], + type: str | None = None, + affected_components: list[str] | None = None, + date: str | None = None, + ) -> dict[str, Any]: + """This will create a new incident. title/text are dicts with an 'en' key.""" + from hyperping.models import IncidentCreate, LocalizedText + + fields: dict[str, Any] = { + "title": LocalizedText(**title), + "text": LocalizedText(**text), + "statuspages": statuspages, + } + if type is not None: + fields["type"] = type + if affected_components is not None: + fields["affected_components"] = affected_components + if date is not None: + fields["date"] = date + return client.create_incident(IncidentCreate(**fields)).model_dump() + + @mcp.tool() + def update_incident( + incident_id: str, + title: dict[str, Any] | None = None, + type: str | None = None, + affected_components: list[str] | None = None, + statuspages: list[str] | None = None, + ) -> dict[str, Any]: + """Update an incident. title is a LocalizedText dict with an 'en' key.""" + from hyperping.models import IncidentUpdateRequest, LocalizedText + + fields: dict[str, Any] = {} + if title is not None: + fields["title"] = LocalizedText(**title) + if type is not None: + fields["type"] = type + if affected_components is not None: + fields["affected_components"] = affected_components + if statuspages is not None: + fields["statuspages"] = statuspages + return client.update_incident(incident_id, IncidentUpdateRequest(**fields)).model_dump() + + @mcp.tool() + def add_incident_update( + incident_id: str, + text: dict[str, Any], + type: str, + date: str, + ) -> dict[str, Any]: + """Add an update to an incident. text is a LocalizedText dict with 'en' key. + type: investigating, identified, monitoring, resolved.""" + from hyperping.models import AddIncidentUpdateRequest, LocalizedText + + update = AddIncidentUpdateRequest( + text=LocalizedText(**text), + type=type, + date=date, + ) + return client.add_incident_update(incident_id, update).model_dump() + + @mcp.tool() + def resolve_incident(incident_id: str, message: str | None = None) -> dict[str, Any]: + """This will resolve an incident and post a resolution update.""" + return client.resolve_incident(incident_id, message=message).model_dump() + + @mcp.tool() + def delete_incident(incident_id: str) -> dict[str, Any]: + """This will permanently delete an incident.""" + client.delete_incident(incident_id) + return {"success": True} diff --git a/src/hyperping/mcp_server/_tools_maintenance.py b/src/hyperping/mcp_server/_tools_maintenance.py index 795a3dc..7807477 100644 --- a/src/hyperping/mcp_server/_tools_maintenance.py +++ b/src/hyperping/mcp_server/_tools_maintenance.py @@ -12,90 +12,185 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient from hyperping.client import HyperpingClient -def register_maintenance_tools(mcp: FastMCP, client: HyperpingClient) -> None: +def register_maintenance_tools( + mcp: FastMCP, + client: HyperpingClient | AsyncHyperpingClient, +) -> None: """Register maintenance window tools on *mcp*.""" - - @mcp.tool() - def list_maintenance(status: str | None = None) -> list[dict[str, Any]]: - """List maintenance windows. Filter by status: scheduled, in_progress, completed.""" - return [m.model_dump() for m in client.list_maintenance(status=status)] - - @mcp.tool() - def get_maintenance(maintenance_id: str) -> dict[str, Any]: - """Get a single maintenance window by UUID.""" - return client.get_maintenance(maintenance_id).model_dump() - - @mcp.tool() - def create_maintenance( - name: str, - start_date: str, - end_date: str, - monitors: list[str], - statuspages: list[str] | None = None, - title: dict[str, Any] | None = None, - text: dict[str, Any] | None = None, - notification_option: str | None = None, - notification_minutes: int | None = None, - ) -> dict[str, Any]: - """This will create a new maintenance window. title/text are LocalizedText dicts.""" - from hyperping.models import LocalizedText, MaintenanceCreate - - fields: dict[str, Any] = { - "name": name, - "start_date": start_date, - "end_date": end_date, - "monitors": monitors, - } - if statuspages is not None: - fields["statuspages"] = statuspages - if title is not None: - fields["title"] = LocalizedText(**title) - if text is not None: - fields["text"] = LocalizedText(**text) - if notification_option is not None: - fields["notification_option"] = notification_option - if notification_minutes is not None: - fields["notification_minutes"] = notification_minutes - return client.create_maintenance(MaintenanceCreate(**fields)).model_dump() - - @mcp.tool() - def update_maintenance( - maintenance_id: str, - name: str | None = None, - start_date: str | None = None, - end_date: str | None = None, - monitors: list[str] | None = None, - ) -> dict[str, Any]: - """Update an existing maintenance window. Only supplied fields are changed.""" - from hyperping.models import MaintenanceUpdate - - fields: dict[str, Any] = {} - if name is not None: - fields["name"] = name - if start_date is not None: - fields["start_date"] = start_date - if end_date is not None: - fields["end_date"] = end_date - if monitors is not None: - fields["monitors"] = monitors - return client.update_maintenance(maintenance_id, MaintenanceUpdate(**fields)).model_dump() - - @mcp.tool() - def delete_maintenance(maintenance_id: str) -> dict[str, Any]: - """This will permanently delete a maintenance window.""" - client.delete_maintenance(maintenance_id) - return {"success": True} - - @mcp.tool() - def get_active_maintenance() -> list[dict[str, Any]]: - """List currently active maintenance windows.""" - return [m.model_dump() for m in client.get_active_maintenance()] - - @mcp.tool() - def is_monitor_in_maintenance(monitor_uuid: str) -> dict[str, Any]: - """Check whether a monitor is currently inside an active maintenance window.""" - result = client.is_monitor_in_maintenance(monitor_uuid) - return {"in_maintenance": result} + from hyperping._async_client import AsyncHyperpingClient + + if isinstance(client, AsyncHyperpingClient): + + @mcp.tool() + async def list_maintenance(status: str | None = None) -> list[dict[str, Any]]: + """List maintenance windows. Filter by status: scheduled, in_progress, completed.""" + return [m.model_dump() for m in await client.list_maintenance(status=status)] + + @mcp.tool() + async def get_maintenance(maintenance_id: str) -> dict[str, Any]: + """Get a single maintenance window by UUID.""" + return (await client.get_maintenance(maintenance_id)).model_dump() + + @mcp.tool() + async def create_maintenance( + name: str, + start_date: str, + end_date: str, + monitors: list[str], + statuspages: list[str] | None = None, + title: dict[str, Any] | None = None, + text: dict[str, Any] | None = None, + notification_option: str | None = None, + notification_minutes: int | None = None, + ) -> dict[str, Any]: + """This will create a new maintenance window. title/text are LocalizedText dicts.""" + from hyperping.models import LocalizedText, MaintenanceCreate + + fields: dict[str, Any] = { + "name": name, + "start_date": start_date, + "end_date": end_date, + "monitors": monitors, + } + if statuspages is not None: + fields["statuspages"] = statuspages + if title is not None: + fields["title"] = LocalizedText(**title) + if text is not None: + fields["text"] = LocalizedText(**text) + if notification_option is not None: + fields["notification_option"] = notification_option + if notification_minutes is not None: + fields["notification_minutes"] = notification_minutes + return (await client.create_maintenance(MaintenanceCreate(**fields))).model_dump() + + @mcp.tool() + async def update_maintenance( + maintenance_id: str, + name: str | None = None, + start_date: str | None = None, + end_date: str | None = None, + monitors: list[str] | None = None, + ) -> dict[str, Any]: + """Update an existing maintenance window. Only supplied fields are changed.""" + from hyperping.models import MaintenanceUpdate + + fields: dict[str, Any] = {} + if name is not None: + fields["name"] = name + if start_date is not None: + fields["start_date"] = start_date + if end_date is not None: + fields["end_date"] = end_date + if monitors is not None: + fields["monitors"] = monitors + return ( + await client.update_maintenance(maintenance_id, MaintenanceUpdate(**fields)) + ).model_dump() + + @mcp.tool() + async def delete_maintenance(maintenance_id: str) -> dict[str, Any]: + """This will permanently delete a maintenance window.""" + await client.delete_maintenance(maintenance_id) + return {"success": True} + + @mcp.tool() + async def get_active_maintenance() -> list[dict[str, Any]]: + """List currently active maintenance windows.""" + return [m.model_dump() for m in await client.get_active_maintenance()] + + @mcp.tool() + async def is_monitor_in_maintenance(monitor_uuid: str) -> dict[str, Any]: + """Check whether a monitor is currently inside an active maintenance window.""" + result = await client.is_monitor_in_maintenance(monitor_uuid) + return {"in_maintenance": result} + + else: + + @mcp.tool() + def list_maintenance(status: str | None = None) -> list[dict[str, Any]]: + """List maintenance windows. Filter by status: scheduled, in_progress, completed.""" + return [m.model_dump() for m in client.list_maintenance(status=status)] + + @mcp.tool() + def get_maintenance(maintenance_id: str) -> dict[str, Any]: + """Get a single maintenance window by UUID.""" + return client.get_maintenance(maintenance_id).model_dump() + + @mcp.tool() + def create_maintenance( + name: str, + start_date: str, + end_date: str, + monitors: list[str], + statuspages: list[str] | None = None, + title: dict[str, Any] | None = None, + text: dict[str, Any] | None = None, + notification_option: str | None = None, + notification_minutes: int | None = None, + ) -> dict[str, Any]: + """This will create a new maintenance window. title/text are LocalizedText dicts.""" + from hyperping.models import LocalizedText, MaintenanceCreate + + fields: dict[str, Any] = { + "name": name, + "start_date": start_date, + "end_date": end_date, + "monitors": monitors, + } + if statuspages is not None: + fields["statuspages"] = statuspages + if title is not None: + fields["title"] = LocalizedText(**title) + if text is not None: + fields["text"] = LocalizedText(**text) + if notification_option is not None: + fields["notification_option"] = notification_option + if notification_minutes is not None: + fields["notification_minutes"] = notification_minutes + return client.create_maintenance(MaintenanceCreate(**fields)).model_dump() + + @mcp.tool() + def update_maintenance( + maintenance_id: str, + name: str | None = None, + start_date: str | None = None, + end_date: str | None = None, + monitors: list[str] | None = None, + ) -> dict[str, Any]: + """Update an existing maintenance window. Only supplied fields are changed.""" + from hyperping.models import MaintenanceUpdate + + fields: dict[str, Any] = {} + if name is not None: + fields["name"] = name + if start_date is not None: + fields["start_date"] = start_date + if end_date is not None: + fields["end_date"] = end_date + if monitors is not None: + fields["monitors"] = monitors + return client.update_maintenance( + maintenance_id, MaintenanceUpdate(**fields) + ).model_dump() + + @mcp.tool() + def delete_maintenance(maintenance_id: str) -> dict[str, Any]: + """This will permanently delete a maintenance window.""" + client.delete_maintenance(maintenance_id) + return {"success": True} + + @mcp.tool() + def get_active_maintenance() -> list[dict[str, Any]]: + """List currently active maintenance windows.""" + return [m.model_dump() for m in client.get_active_maintenance()] + + @mcp.tool() + def is_monitor_in_maintenance(monitor_uuid: str) -> dict[str, Any]: + """Check whether a monitor is currently inside an active maintenance window.""" + result = client.is_monitor_in_maintenance(monitor_uuid) + return {"in_maintenance": result} diff --git a/src/hyperping/mcp_server/_tools_monitors.py b/src/hyperping/mcp_server/_tools_monitors.py index 6b0c00d..73e551d 100644 --- a/src/hyperping/mcp_server/_tools_monitors.py +++ b/src/hyperping/mcp_server/_tools_monitors.py @@ -12,155 +12,305 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient + from hyperping._async_mcp_client import AsyncHyperpingMcpClient from hyperping.client import HyperpingClient from hyperping.mcp_client import HyperpingMcpClient def register_monitor_tools( mcp: FastMCP, - client: HyperpingClient, - mcp_client: HyperpingMcpClient | None, + client: HyperpingClient | AsyncHyperpingClient, + mcp_client: HyperpingMcpClient | AsyncHyperpingMcpClient | None, ) -> None: """Register monitor tools on *mcp*.""" + from hyperping._async_client import AsyncHyperpingClient - @mcp.tool() - def list_monitors() -> list[dict[str, Any]]: - """List all monitors in the account.""" - return [m.model_dump() for m in client.list_monitors()] - - @mcp.tool() - def get_monitor(monitor_id: str) -> dict[str, Any]: - """Get a single monitor by UUID.""" - return client.get_monitor(monitor_id).model_dump() - - @mcp.tool() - def create_monitor( - name: str, - url: str, - protocol: str | None = None, - http_method: str | None = None, - check_frequency: int | None = None, - regions: list[str] | None = None, - request_body: str | None = None, - follow_redirects: bool | None = None, - expected_status_code: str | None = None, - required_keyword: str | None = None, - paused: bool | None = None, - port: int | None = None, - alerts_wait: int | None = None, - escalation_policy: str | None = None, - dns_record_type: str | None = None, - dns_nameserver: str | None = None, - dns_expected_answer: str | None = None, - ) -> dict[str, Any]: - """This will create a new monitor.""" - from hyperping.models import MonitorCreate - - fields: dict[str, Any] = { - "name": name, - "url": url, - } - optionals = { - "protocol": protocol, - "http_method": http_method, - "check_frequency": check_frequency, - "regions": regions, - "request_body": request_body, - "follow_redirects": follow_redirects, - "expected_status_code": expected_status_code, - "required_keyword": required_keyword, - "paused": paused, - "port": port, - "alerts_wait": alerts_wait, - "escalation_policy": escalation_policy, - "dns_record_type": dns_record_type, - "dns_nameserver": dns_nameserver, - "dns_expected_answer": dns_expected_answer, - } - fields.update({k: v for k, v in optionals.items() if v is not None}) - return client.create_monitor(MonitorCreate(**fields)).model_dump() - - @mcp.tool() - def update_monitor( - monitor_id: str, - name: str | None = None, - url: str | None = None, - protocol: str | None = None, - http_method: str | None = None, - check_frequency: int | None = None, - regions: list[str] | None = None, - request_body: str | None = None, - follow_redirects: bool | None = None, - expected_status_code: str | None = None, - required_keyword: str | None = None, - paused: bool | None = None, - port: int | None = None, - alerts_wait: int | None = None, - escalation_policy: str | None = None, - dns_record_type: str | None = None, - dns_nameserver: str | None = None, - dns_expected_answer: str | None = None, - ) -> dict[str, Any]: - """Update an existing monitor. Only supplied fields are changed.""" - from hyperping.models import MonitorUpdate - - fields: dict[str, Any] = {} - candidates = { - "name": name, - "url": url, - "protocol": protocol, - "http_method": http_method, - "check_frequency": check_frequency, - "regions": regions, - "request_body": request_body, - "follow_redirects": follow_redirects, - "expected_status_code": expected_status_code, - "required_keyword": required_keyword, - "paused": paused, - "port": port, - "alerts_wait": alerts_wait, - "escalation_policy": escalation_policy, - "dns_record_type": dns_record_type, - "dns_nameserver": dns_nameserver, - "dns_expected_answer": dns_expected_answer, - } - fields.update({k: v for k, v in candidates.items() if v is not None}) - return client.update_monitor(monitor_id, MonitorUpdate(**fields)).model_dump() - - @mcp.tool() - def delete_monitor(monitor_id: str) -> dict[str, Any]: - """This will permanently delete a monitor and all its historical data.""" - client.delete_monitor(monitor_id) - return {"success": True} - - @mcp.tool() - def pause_monitor(monitor_id: str) -> dict[str, Any]: - """Pause a monitor so it stops sending checks.""" - return client.pause_monitor(monitor_id).model_dump() - - @mcp.tool() - def resume_monitor(monitor_id: str) -> dict[str, Any]: - """Resume a paused monitor.""" - return client.resume_monitor(monitor_id).model_dump() - - @mcp.tool() - def get_all_reports(period: str = "30d") -> list[dict[str, Any]]: - """Get uptime reports for all monitors. period: 1h, 24h, 7d, 30d, 90d.""" - _period = cast(Literal["1h", "24h", "7d", "30d", "90d"], period) - return [r.model_dump() for r in client.get_all_reports(period=_period)] - - @mcp.tool() - def get_monitor_report(monitor_id: str, period: str = "30d") -> dict[str, Any]: - """Get uptime report for a single monitor. period: 1h, 24h, 7d, 30d, 90d.""" - _period = cast(Literal["1h", "24h", "7d", "30d", "90d"], period) - return client.get_monitor_report(monitor_id, period=_period).model_dump() - - @mcp.tool() - def search_monitors_by_name(query: str) -> list[dict[str, Any]]: - """Search monitors by name substring.""" - if mcp_client is None: - raise RuntimeError( - "search_monitors_by_name requires an mcp_client. " - "Pass mcp_client= to create_mcp_server or supply api_key." - ) - return [m.model_dump() for m in mcp_client.search_monitors_by_name(query)] + if isinstance(client, AsyncHyperpingClient): + + @mcp.tool() + async def list_monitors() -> list[dict[str, Any]]: + """List all monitors in the account.""" + return [m.model_dump() for m in await client.list_monitors()] + + @mcp.tool() + async def get_monitor(monitor_id: str) -> dict[str, Any]: + """Get a single monitor by UUID.""" + return (await client.get_monitor(monitor_id)).model_dump() + + @mcp.tool() + async def create_monitor( + name: str, + url: str, + protocol: str | None = None, + http_method: str | None = None, + check_frequency: int | None = None, + regions: list[str] | None = None, + request_body: str | None = None, + follow_redirects: bool | None = None, + expected_status_code: str | None = None, + required_keyword: str | None = None, + paused: bool | None = None, + port: int | None = None, + alerts_wait: int | None = None, + escalation_policy: str | None = None, + dns_record_type: str | None = None, + dns_nameserver: str | None = None, + dns_expected_answer: str | None = None, + ) -> dict[str, Any]: + """This will create a new monitor.""" + from hyperping.models import MonitorCreate + + fields: dict[str, Any] = { + "name": name, + "url": url, + } + optionals = { + "protocol": protocol, + "http_method": http_method, + "check_frequency": check_frequency, + "regions": regions, + "request_body": request_body, + "follow_redirects": follow_redirects, + "expected_status_code": expected_status_code, + "required_keyword": required_keyword, + "paused": paused, + "port": port, + "alerts_wait": alerts_wait, + "escalation_policy": escalation_policy, + "dns_record_type": dns_record_type, + "dns_nameserver": dns_nameserver, + "dns_expected_answer": dns_expected_answer, + } + fields.update({k: v for k, v in optionals.items() if v is not None}) + return (await client.create_monitor(MonitorCreate(**fields))).model_dump() + + @mcp.tool() + async def update_monitor( + monitor_id: str, + name: str | None = None, + url: str | None = None, + protocol: str | None = None, + http_method: str | None = None, + check_frequency: int | None = None, + regions: list[str] | None = None, + request_body: str | None = None, + follow_redirects: bool | None = None, + expected_status_code: str | None = None, + required_keyword: str | None = None, + paused: bool | None = None, + port: int | None = None, + alerts_wait: int | None = None, + escalation_policy: str | None = None, + dns_record_type: str | None = None, + dns_nameserver: str | None = None, + dns_expected_answer: str | None = None, + ) -> dict[str, Any]: + """Update an existing monitor. Only supplied fields are changed.""" + from hyperping.models import MonitorUpdate + + fields: dict[str, Any] = {} + candidates = { + "name": name, + "url": url, + "protocol": protocol, + "http_method": http_method, + "check_frequency": check_frequency, + "regions": regions, + "request_body": request_body, + "follow_redirects": follow_redirects, + "expected_status_code": expected_status_code, + "required_keyword": required_keyword, + "paused": paused, + "port": port, + "alerts_wait": alerts_wait, + "escalation_policy": escalation_policy, + "dns_record_type": dns_record_type, + "dns_nameserver": dns_nameserver, + "dns_expected_answer": dns_expected_answer, + } + fields.update({k: v for k, v in candidates.items() if v is not None}) + return (await client.update_monitor(monitor_id, MonitorUpdate(**fields))).model_dump() + + @mcp.tool() + async def delete_monitor(monitor_id: str) -> dict[str, Any]: + """This will permanently delete a monitor and all its historical data.""" + await client.delete_monitor(monitor_id) + return {"success": True} + + @mcp.tool() + async def pause_monitor(monitor_id: str) -> dict[str, Any]: + """Pause a monitor so it stops sending checks.""" + return (await client.pause_monitor(monitor_id)).model_dump() + + @mcp.tool() + async def resume_monitor(monitor_id: str) -> dict[str, Any]: + """Resume a paused monitor.""" + return (await client.resume_monitor(monitor_id)).model_dump() + + @mcp.tool() + async def get_all_reports(period: str = "30d") -> list[dict[str, Any]]: + """Get uptime reports for all monitors. period: 1h, 24h, 7d, 30d, 90d.""" + _period = cast(Literal["1h", "24h", "7d", "30d", "90d"], period) + return [r.model_dump() for r in await client.get_all_reports(period=_period)] + + @mcp.tool() + async def get_monitor_report(monitor_id: str, period: str = "30d") -> dict[str, Any]: + """Get uptime report for a single monitor. period: 1h, 24h, 7d, 30d, 90d.""" + _period = cast(Literal["1h", "24h", "7d", "30d", "90d"], period) + return (await client.get_monitor_report(monitor_id, period=_period)).model_dump() + + else: + + @mcp.tool() + def list_monitors() -> list[dict[str, Any]]: + """List all monitors in the account.""" + return [m.model_dump() for m in client.list_monitors()] + + @mcp.tool() + def get_monitor(monitor_id: str) -> dict[str, Any]: + """Get a single monitor by UUID.""" + return client.get_monitor(monitor_id).model_dump() + + @mcp.tool() + def create_monitor( + name: str, + url: str, + protocol: str | None = None, + http_method: str | None = None, + check_frequency: int | None = None, + regions: list[str] | None = None, + request_body: str | None = None, + follow_redirects: bool | None = None, + expected_status_code: str | None = None, + required_keyword: str | None = None, + paused: bool | None = None, + port: int | None = None, + alerts_wait: int | None = None, + escalation_policy: str | None = None, + dns_record_type: str | None = None, + dns_nameserver: str | None = None, + dns_expected_answer: str | None = None, + ) -> dict[str, Any]: + """This will create a new monitor.""" + from hyperping.models import MonitorCreate + + fields: dict[str, Any] = { + "name": name, + "url": url, + } + optionals = { + "protocol": protocol, + "http_method": http_method, + "check_frequency": check_frequency, + "regions": regions, + "request_body": request_body, + "follow_redirects": follow_redirects, + "expected_status_code": expected_status_code, + "required_keyword": required_keyword, + "paused": paused, + "port": port, + "alerts_wait": alerts_wait, + "escalation_policy": escalation_policy, + "dns_record_type": dns_record_type, + "dns_nameserver": dns_nameserver, + "dns_expected_answer": dns_expected_answer, + } + fields.update({k: v for k, v in optionals.items() if v is not None}) + return client.create_monitor(MonitorCreate(**fields)).model_dump() + + @mcp.tool() + def update_monitor( + monitor_id: str, + name: str | None = None, + url: str | None = None, + protocol: str | None = None, + http_method: str | None = None, + check_frequency: int | None = None, + regions: list[str] | None = None, + request_body: str | None = None, + follow_redirects: bool | None = None, + expected_status_code: str | None = None, + required_keyword: str | None = None, + paused: bool | None = None, + port: int | None = None, + alerts_wait: int | None = None, + escalation_policy: str | None = None, + dns_record_type: str | None = None, + dns_nameserver: str | None = None, + dns_expected_answer: str | None = None, + ) -> dict[str, Any]: + """Update an existing monitor. Only supplied fields are changed.""" + from hyperping.models import MonitorUpdate + + fields: dict[str, Any] = {} + candidates = { + "name": name, + "url": url, + "protocol": protocol, + "http_method": http_method, + "check_frequency": check_frequency, + "regions": regions, + "request_body": request_body, + "follow_redirects": follow_redirects, + "expected_status_code": expected_status_code, + "required_keyword": required_keyword, + "paused": paused, + "port": port, + "alerts_wait": alerts_wait, + "escalation_policy": escalation_policy, + "dns_record_type": dns_record_type, + "dns_nameserver": dns_nameserver, + "dns_expected_answer": dns_expected_answer, + } + fields.update({k: v for k, v in candidates.items() if v is not None}) + return client.update_monitor(monitor_id, MonitorUpdate(**fields)).model_dump() + + @mcp.tool() + def delete_monitor(monitor_id: str) -> dict[str, Any]: + """This will permanently delete a monitor and all its historical data.""" + client.delete_monitor(monitor_id) + return {"success": True} + + @mcp.tool() + def pause_monitor(monitor_id: str) -> dict[str, Any]: + """Pause a monitor so it stops sending checks.""" + return client.pause_monitor(monitor_id).model_dump() + + @mcp.tool() + def resume_monitor(monitor_id: str) -> dict[str, Any]: + """Resume a paused monitor.""" + return client.resume_monitor(monitor_id).model_dump() + + @mcp.tool() + def get_all_reports(period: str = "30d") -> list[dict[str, Any]]: + """Get uptime reports for all monitors. period: 1h, 24h, 7d, 30d, 90d.""" + _period = cast(Literal["1h", "24h", "7d", "30d", "90d"], period) + return [r.model_dump() for r in client.get_all_reports(period=_period)] + + @mcp.tool() + def get_monitor_report(monitor_id: str, period: str = "30d") -> dict[str, Any]: + """Get uptime report for a single monitor. period: 1h, 24h, 7d, 30d, 90d.""" + _period = cast(Literal["1h", "24h", "7d", "30d", "90d"], period) + return client.get_monitor_report(monitor_id, period=_period).model_dump() + + from hyperping._async_mcp_client import AsyncHyperpingMcpClient + + if isinstance(mcp_client, AsyncHyperpingMcpClient): + + @mcp.tool() + async def search_monitors_by_name(query: str) -> list[dict[str, Any]]: + """Search monitors by name substring.""" + return [m.model_dump() for m in await mcp_client.search_monitors_by_name(query)] + + else: + + @mcp.tool() + def search_monitors_by_name(query: str) -> list[dict[str, Any]]: + """Search monitors by name substring.""" + if mcp_client is None: + raise RuntimeError( + "search_monitors_by_name requires an mcp_client. " + "Pass mcp_client= to create_mcp_server or supply api_key." + ) + return [m.model_dump() for m in mcp_client.search_monitors_by_name(query)] diff --git a/src/hyperping/mcp_server/_tools_observability.py b/src/hyperping/mcp_server/_tools_observability.py index 5b143d7..0f0394d 100644 --- a/src/hyperping/mcp_server/_tools_observability.py +++ b/src/hyperping/mcp_server/_tools_observability.py @@ -14,83 +14,167 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_mcp_client import AsyncHyperpingMcpClient from hyperping.mcp_client import HyperpingMcpClient -def register_observability_tools(mcp: FastMCP, mcp_client: HyperpingMcpClient) -> None: +def register_observability_tools( + mcp: FastMCP, + mcp_client: HyperpingMcpClient | AsyncHyperpingMcpClient, +) -> None: """Register observability tools on *mcp*.""" - - @mcp.tool() - def get_status_summary() -> dict[str, Any]: - """Get aggregate monitor status counts (up, down, paused, total).""" - return mcp_client.get_status_summary().model_dump() - - @mcp.tool() - def get_monitor_response_time(monitor_uuid: str) -> dict[str, Any]: - """Get response time metrics for a monitor.""" - return mcp_client.get_monitor_response_time(monitor_uuid).model_dump() - - @mcp.tool() - def get_monitor_mtta(monitor_uuid: str | None = None) -> dict[str, Any]: - """Get mean time to acknowledge metrics. Optionally scoped to a monitor UUID.""" - return mcp_client.get_monitor_mtta(monitor_uuid=monitor_uuid).model_dump() - - @mcp.tool() - def get_monitor_mttr(monitor_uuid: str | None = None) -> dict[str, Any]: - """Get mean time to resolve metrics. Optionally scoped to a monitor UUID.""" - return mcp_client.get_monitor_mttr(monitor_uuid=monitor_uuid).model_dump() - - @mcp.tool() - def get_monitor_anomalies(monitor_uuid: str) -> list[dict[str, Any]]: - """Get anomalies detected for a monitor.""" - return [a.model_dump() for a in mcp_client.get_monitor_anomalies(monitor_uuid)] - - @mcp.tool() - def get_monitor_http_logs(monitor_uuid: str) -> dict[str, Any]: - """Get HTTP probe logs for a monitor.""" - return mcp_client.get_monitor_http_logs(monitor_uuid).model_dump() - - @mcp.tool() - def list_recent_alerts() -> dict[str, Any]: - """List recent alert notifications.""" - return mcp_client.list_recent_alerts().model_dump() - - @mcp.tool() - def list_on_call_schedules() -> list[dict[str, Any]]: - """List all on-call schedules.""" - return [s.model_dump() for s in mcp_client.list_on_call_schedules()] - - @mcp.tool() - def get_on_call_schedule(uuid: str) -> dict[str, Any]: - """Get a single on-call schedule by UUID.""" - return mcp_client.get_on_call_schedule(uuid).model_dump() - - @mcp.tool() - def list_escalation_policies() -> list[dict[str, Any]]: - """List all escalation policies.""" - return [p.model_dump() for p in mcp_client.list_escalation_policies()] - - @mcp.tool() - def get_escalation_policy(uuid: str) -> dict[str, Any]: - """Get a single escalation policy by UUID.""" - return mcp_client.get_escalation_policy(uuid).model_dump() - - @mcp.tool() - def list_team_members() -> list[dict[str, Any]]: - """List all team members.""" - return [m.model_dump() for m in mcp_client.list_team_members()] - - @mcp.tool() - def list_integrations() -> list[dict[str, Any]]: - """List all notification channel integrations.""" - return [i.model_dump() for i in mcp_client.list_integrations()] - - @mcp.tool() - def get_integration(uuid: str) -> dict[str, Any]: - """Get a single integration by UUID.""" - return mcp_client.get_integration(uuid).model_dump() - - @mcp.tool() - def get_outage_timeline(outage_uuid: str) -> dict[str, Any]: - """Get the lifecycle timeline for an outage.""" - return mcp_client.get_outage_timeline(outage_uuid).model_dump() + from hyperping._async_mcp_client import AsyncHyperpingMcpClient + + if isinstance(mcp_client, AsyncHyperpingMcpClient): + + @mcp.tool() + async def get_status_summary() -> dict[str, Any]: + """Get aggregate monitor status counts (up, down, paused, total).""" + return (await mcp_client.get_status_summary()).model_dump() + + @mcp.tool() + async def get_monitor_response_time(monitor_uuid: str) -> dict[str, Any]: + """Get response time metrics for a monitor.""" + return (await mcp_client.get_monitor_response_time(monitor_uuid)).model_dump() + + @mcp.tool() + async def get_monitor_mtta(monitor_uuid: str | None = None) -> dict[str, Any]: + """Get mean time to acknowledge metrics. Optionally scoped to a monitor UUID.""" + return (await mcp_client.get_monitor_mtta(monitor_uuid=monitor_uuid)).model_dump() + + @mcp.tool() + async def get_monitor_mttr(monitor_uuid: str | None = None) -> dict[str, Any]: + """Get mean time to resolve metrics. Optionally scoped to a monitor UUID.""" + return (await mcp_client.get_monitor_mttr(monitor_uuid=monitor_uuid)).model_dump() + + @mcp.tool() + async def get_monitor_anomalies(monitor_uuid: str) -> list[dict[str, Any]]: + """Get anomalies detected for a monitor.""" + return [a.model_dump() for a in await mcp_client.get_monitor_anomalies(monitor_uuid)] + + @mcp.tool() + async def get_monitor_http_logs(monitor_uuid: str) -> dict[str, Any]: + """Get HTTP probe logs for a monitor.""" + return (await mcp_client.get_monitor_http_logs(monitor_uuid)).model_dump() + + @mcp.tool() + async def list_recent_alerts() -> dict[str, Any]: + """List recent alert notifications.""" + return (await mcp_client.list_recent_alerts()).model_dump() + + @mcp.tool() + async def list_on_call_schedules() -> list[dict[str, Any]]: + """List all on-call schedules.""" + return [s.model_dump() for s in await mcp_client.list_on_call_schedules()] + + @mcp.tool() + async def get_on_call_schedule(uuid: str) -> dict[str, Any]: + """Get a single on-call schedule by UUID.""" + return (await mcp_client.get_on_call_schedule(uuid)).model_dump() + + @mcp.tool() + async def list_escalation_policies() -> list[dict[str, Any]]: + """List all escalation policies.""" + return [p.model_dump() for p in await mcp_client.list_escalation_policies()] + + @mcp.tool() + async def get_escalation_policy(uuid: str) -> dict[str, Any]: + """Get a single escalation policy by UUID.""" + return (await mcp_client.get_escalation_policy(uuid)).model_dump() + + @mcp.tool() + async def list_team_members() -> list[dict[str, Any]]: + """List all team members.""" + return [m.model_dump() for m in await mcp_client.list_team_members()] + + @mcp.tool() + async def list_integrations() -> list[dict[str, Any]]: + """List all notification channel integrations.""" + return [i.model_dump() for i in await mcp_client.list_integrations()] + + @mcp.tool() + async def get_integration(uuid: str) -> dict[str, Any]: + """Get a single integration by UUID.""" + return (await mcp_client.get_integration(uuid)).model_dump() + + @mcp.tool() + async def get_outage_timeline(outage_uuid: str) -> dict[str, Any]: + """Get the lifecycle timeline for an outage.""" + return (await mcp_client.get_outage_timeline(outage_uuid)).model_dump() + + else: + + @mcp.tool() + def get_status_summary() -> dict[str, Any]: + """Get aggregate monitor status counts (up, down, paused, total).""" + return mcp_client.get_status_summary().model_dump() + + @mcp.tool() + def get_monitor_response_time(monitor_uuid: str) -> dict[str, Any]: + """Get response time metrics for a monitor.""" + return mcp_client.get_monitor_response_time(monitor_uuid).model_dump() + + @mcp.tool() + def get_monitor_mtta(monitor_uuid: str | None = None) -> dict[str, Any]: + """Get mean time to acknowledge metrics. Optionally scoped to a monitor UUID.""" + return mcp_client.get_monitor_mtta(monitor_uuid=monitor_uuid).model_dump() + + @mcp.tool() + def get_monitor_mttr(monitor_uuid: str | None = None) -> dict[str, Any]: + """Get mean time to resolve metrics. Optionally scoped to a monitor UUID.""" + return mcp_client.get_monitor_mttr(monitor_uuid=monitor_uuid).model_dump() + + @mcp.tool() + def get_monitor_anomalies(monitor_uuid: str) -> list[dict[str, Any]]: + """Get anomalies detected for a monitor.""" + return [a.model_dump() for a in mcp_client.get_monitor_anomalies(monitor_uuid)] + + @mcp.tool() + def get_monitor_http_logs(monitor_uuid: str) -> dict[str, Any]: + """Get HTTP probe logs for a monitor.""" + return mcp_client.get_monitor_http_logs(monitor_uuid).model_dump() + + @mcp.tool() + def list_recent_alerts() -> dict[str, Any]: + """List recent alert notifications.""" + return mcp_client.list_recent_alerts().model_dump() + + @mcp.tool() + def list_on_call_schedules() -> list[dict[str, Any]]: + """List all on-call schedules.""" + return [s.model_dump() for s in mcp_client.list_on_call_schedules()] + + @mcp.tool() + def get_on_call_schedule(uuid: str) -> dict[str, Any]: + """Get a single on-call schedule by UUID.""" + return mcp_client.get_on_call_schedule(uuid).model_dump() + + @mcp.tool() + def list_escalation_policies() -> list[dict[str, Any]]: + """List all escalation policies.""" + return [p.model_dump() for p in mcp_client.list_escalation_policies()] + + @mcp.tool() + def get_escalation_policy(uuid: str) -> dict[str, Any]: + """Get a single escalation policy by UUID.""" + return mcp_client.get_escalation_policy(uuid).model_dump() + + @mcp.tool() + def list_team_members() -> list[dict[str, Any]]: + """List all team members.""" + return [m.model_dump() for m in mcp_client.list_team_members()] + + @mcp.tool() + def list_integrations() -> list[dict[str, Any]]: + """List all notification channel integrations.""" + return [i.model_dump() for i in mcp_client.list_integrations()] + + @mcp.tool() + def get_integration(uuid: str) -> dict[str, Any]: + """Get a single integration by UUID.""" + return mcp_client.get_integration(uuid).model_dump() + + @mcp.tool() + def get_outage_timeline(outage_uuid: str) -> dict[str, Any]: + """Get the lifecycle timeline for an outage.""" + return mcp_client.get_outage_timeline(outage_uuid).model_dump() diff --git a/src/hyperping/mcp_server/_tools_outages.py b/src/hyperping/mcp_server/_tools_outages.py index 3c39e88..3dab6bf 100644 --- a/src/hyperping/mcp_server/_tools_outages.py +++ b/src/hyperping/mcp_server/_tools_outages.py @@ -12,52 +12,110 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient from hyperping.client import HyperpingClient -def register_outage_tools(mcp: FastMCP, client: HyperpingClient) -> None: +def register_outage_tools( + mcp: FastMCP, + client: HyperpingClient | AsyncHyperpingClient, +) -> None: """Register outage tools on *mcp*.""" + from hyperping._async_client import AsyncHyperpingClient - @mcp.tool() - def list_outages( - status: str = "all", - outage_type: str = "all", - ) -> list[dict[str, Any]]: - """List outages. status: all, ongoing, resolved. outage_type: all, manual, monitor.""" - return [o.model_dump() for o in client.list_outages(status=status, outage_type=outage_type)] - - @mcp.tool() - def get_outage(outage_id: str) -> dict[str, Any]: - """Get a single outage by UUID.""" - return client.get_outage(outage_id).model_dump() - - @mcp.tool() - def create_outage(monitor_uuid: str) -> dict[str, Any]: - """This will create a manual outage for a monitor.""" - return client.create_outage(monitor_uuid).model_dump() - - @mcp.tool() - def acknowledge_outage(outage_id: str, message: str | None = None) -> dict[str, Any]: - """Acknowledge an outage with an optional message.""" - return client.acknowledge_outage(outage_id, message=message).model_dump() - - @mcp.tool() - def resolve_outage(outage_id: str, message: str | None = None) -> dict[str, Any]: - """This will resolve an outage with an optional message.""" - return client.resolve_outage(outage_id, message=message).model_dump() - - @mcp.tool() - def escalate_outage(outage_id: str) -> dict[str, Any]: - """Escalate an outage to the next on-call tier.""" - return client.escalate_outage(outage_id).model_dump() - - @mcp.tool() - def unacknowledge_outage(outage_id: str) -> dict[str, Any]: - """Unacknowledge an outage.""" - return client.unacknowledge_outage(outage_id).model_dump() - - @mcp.tool() - def delete_outage(outage_id: str) -> dict[str, Any]: - """This will permanently delete an outage record.""" - client.delete_outage(outage_id) - return {"success": True} + if isinstance(client, AsyncHyperpingClient): + + @mcp.tool() + async def list_outages( + status: str = "all", + outage_type: str = "all", + ) -> list[dict[str, Any]]: + """List outages. status: all, ongoing, resolved. outage_type: all, manual, monitor.""" + return [ + o.model_dump() + for o in await client.list_outages(status=status, outage_type=outage_type) + ] + + @mcp.tool() + async def get_outage(outage_id: str) -> dict[str, Any]: + """Get a single outage by UUID.""" + return (await client.get_outage(outage_id)).model_dump() + + @mcp.tool() + async def create_outage(monitor_uuid: str) -> dict[str, Any]: + """This will create a manual outage for a monitor.""" + return (await client.create_outage(monitor_uuid)).model_dump() + + @mcp.tool() + async def acknowledge_outage(outage_id: str, message: str | None = None) -> dict[str, Any]: + """Acknowledge an outage with an optional message.""" + return (await client.acknowledge_outage(outage_id, message=message)).model_dump() + + @mcp.tool() + async def resolve_outage(outage_id: str, message: str | None = None) -> dict[str, Any]: + """This will resolve an outage with an optional message.""" + return (await client.resolve_outage(outage_id, message=message)).model_dump() + + @mcp.tool() + async def escalate_outage(outage_id: str) -> dict[str, Any]: + """Escalate an outage to the next on-call tier.""" + return (await client.escalate_outage(outage_id)).model_dump() + + @mcp.tool() + async def unacknowledge_outage(outage_id: str) -> dict[str, Any]: + """Unacknowledge an outage.""" + return (await client.unacknowledge_outage(outage_id)).model_dump() + + @mcp.tool() + async def delete_outage(outage_id: str) -> dict[str, Any]: + """This will permanently delete an outage record.""" + await client.delete_outage(outage_id) + return {"success": True} + + else: + + @mcp.tool() + def list_outages( + status: str = "all", + outage_type: str = "all", + ) -> list[dict[str, Any]]: + """List outages. status: all, ongoing, resolved. outage_type: all, manual, monitor.""" + return [ + o.model_dump() for o in client.list_outages(status=status, outage_type=outage_type) + ] + + @mcp.tool() + def get_outage(outage_id: str) -> dict[str, Any]: + """Get a single outage by UUID.""" + return client.get_outage(outage_id).model_dump() + + @mcp.tool() + def create_outage(monitor_uuid: str) -> dict[str, Any]: + """This will create a manual outage for a monitor.""" + return client.create_outage(monitor_uuid).model_dump() + + @mcp.tool() + def acknowledge_outage(outage_id: str, message: str | None = None) -> dict[str, Any]: + """Acknowledge an outage with an optional message.""" + return client.acknowledge_outage(outage_id, message=message).model_dump() + + @mcp.tool() + def resolve_outage(outage_id: str, message: str | None = None) -> dict[str, Any]: + """This will resolve an outage with an optional message.""" + return client.resolve_outage(outage_id, message=message).model_dump() + + @mcp.tool() + def escalate_outage(outage_id: str) -> dict[str, Any]: + """Escalate an outage to the next on-call tier.""" + return client.escalate_outage(outage_id).model_dump() + + @mcp.tool() + def unacknowledge_outage(outage_id: str) -> dict[str, Any]: + """Unacknowledge an outage.""" + return client.unacknowledge_outage(outage_id).model_dump() + + @mcp.tool() + def delete_outage(outage_id: str) -> dict[str, Any]: + """This will permanently delete an outage record.""" + client.delete_outage(outage_id) + return {"success": True} diff --git a/src/hyperping/mcp_server/_tools_statuspages.py b/src/hyperping/mcp_server/_tools_statuspages.py index 86751d9..87cb2eb 100644 --- a/src/hyperping/mcp_server/_tools_statuspages.py +++ b/src/hyperping/mcp_server/_tools_statuspages.py @@ -12,93 +12,189 @@ if TYPE_CHECKING: from mcp.server.fastmcp import FastMCP + from hyperping._async_client import AsyncHyperpingClient from hyperping.client import HyperpingClient -def register_statuspage_tools(mcp: FastMCP, client: HyperpingClient) -> None: +def register_statuspage_tools( + mcp: FastMCP, + client: HyperpingClient | AsyncHyperpingClient, +) -> None: """Register status page tools on *mcp*.""" - - @mcp.tool() - def list_status_pages(search: str | None = None) -> list[dict[str, Any]]: - """List all status pages. Optionally filter by name or subdomain.""" - return [p.model_dump() for p in client.list_status_pages(search=search)] - - @mcp.tool() - def get_status_page(status_page_id: str) -> dict[str, Any]: - """Get a single status page by UUID.""" - return client.get_status_page(status_page_id).model_dump() - - @mcp.tool() - def create_status_page( - name: str, - subdomain: str, - custom_domain: str | None = None, - public: bool | None = None, - monitors: list[str] | None = None, - ) -> dict[str, Any]: - """This will create a new status page.""" - from hyperping.models import StatusPageCreate - - fields: dict[str, Any] = {"name": name, "subdomain": subdomain} - if custom_domain is not None: - fields["custom_domain"] = custom_domain - if public is not None: - fields["public"] = public - if monitors is not None: - fields["monitors"] = monitors - return client.create_status_page(StatusPageCreate(**fields)).model_dump() - - @mcp.tool() - def update_status_page( - status_page_id: str, - name: str | None = None, - subdomain: str | None = None, - custom_domain: str | None = None, - public: bool | None = None, - monitors: list[str] | None = None, - ) -> dict[str, Any]: - """Update an existing status page. Only supplied fields are changed.""" - from hyperping.models import StatusPageUpdate - - fields: dict[str, Any] = {} - if name is not None: - fields["name"] = name - if subdomain is not None: - fields["subdomain"] = subdomain - if custom_domain is not None: - fields["custom_domain"] = custom_domain - if public is not None: - fields["public"] = public - if monitors is not None: - fields["monitors"] = monitors - return client.update_status_page(status_page_id, StatusPageUpdate(**fields)).model_dump() - - @mcp.tool() - def delete_status_page(status_page_id: str) -> dict[str, Any]: - """This will permanently delete a status page.""" - client.delete_status_page(status_page_id) - return {"success": True} - - @mcp.tool() - def list_subscribers( - status_page_id: str, - subscriber_type: str = "all", - ) -> list[dict[str, Any]]: - """List subscribers for a status page. subscriber_type: all, email, sms, slack, teams.""" - return [ - s.model_dump() - for s in client.list_subscribers( - status_page_id, subscriber_type=subscriber_type - ) - ] - - @mcp.tool() - def add_subscriber(status_page_id: str, email: str) -> dict[str, Any]: - """This will add an email subscriber to a status page.""" - return client.add_subscriber(status_page_id, email).model_dump() - - @mcp.tool() - def remove_subscriber(status_page_id: str, subscriber_id: str) -> dict[str, Any]: - """This will remove a subscriber from a status page.""" - client.remove_subscriber(status_page_id, subscriber_id) - return {"success": True} + from hyperping._async_client import AsyncHyperpingClient + + if isinstance(client, AsyncHyperpingClient): + + @mcp.tool() + async def list_status_pages(search: str | None = None) -> list[dict[str, Any]]: + """List all status pages. Optionally filter by name or subdomain.""" + return [p.model_dump() for p in await client.list_status_pages(search=search)] + + @mcp.tool() + async def get_status_page(status_page_id: str) -> dict[str, Any]: + """Get a single status page by UUID.""" + return (await client.get_status_page(status_page_id)).model_dump() + + @mcp.tool() + async def create_status_page( + name: str, + subdomain: str, + custom_domain: str | None = None, + public: bool | None = None, + monitors: list[str] | None = None, + ) -> dict[str, Any]: + """This will create a new status page.""" + from hyperping.models import StatusPageCreate + + fields: dict[str, Any] = {"name": name, "subdomain": subdomain} + if custom_domain is not None: + fields["custom_domain"] = custom_domain + if public is not None: + fields["public"] = public + if monitors is not None: + fields["monitors"] = monitors + return (await client.create_status_page(StatusPageCreate(**fields))).model_dump() + + @mcp.tool() + async def update_status_page( + status_page_id: str, + name: str | None = None, + subdomain: str | None = None, + custom_domain: str | None = None, + public: bool | None = None, + monitors: list[str] | None = None, + ) -> dict[str, Any]: + """Update an existing status page. Only supplied fields are changed.""" + from hyperping.models import StatusPageUpdate + + fields: dict[str, Any] = {} + if name is not None: + fields["name"] = name + if subdomain is not None: + fields["subdomain"] = subdomain + if custom_domain is not None: + fields["custom_domain"] = custom_domain + if public is not None: + fields["public"] = public + if monitors is not None: + fields["monitors"] = monitors + return ( + await client.update_status_page(status_page_id, StatusPageUpdate(**fields)) + ).model_dump() + + @mcp.tool() + async def delete_status_page(status_page_id: str) -> dict[str, Any]: + """This will permanently delete a status page.""" + await client.delete_status_page(status_page_id) + return {"success": True} + + @mcp.tool() + async def list_subscribers( + status_page_id: str, + subscriber_type: str = "all", + ) -> list[dict[str, Any]]: + """List subscribers for a status page. subscriber_type: all, email, sms, slack.""" + return [ + s.model_dump() + for s in await client.list_subscribers( + status_page_id, subscriber_type=subscriber_type + ) + ] + + @mcp.tool() + async def add_subscriber(status_page_id: str, email: str) -> dict[str, Any]: + """This will add an email subscriber to a status page.""" + return (await client.add_subscriber(status_page_id, email)).model_dump() + + @mcp.tool() + async def remove_subscriber(status_page_id: str, subscriber_id: str) -> dict[str, Any]: + """This will remove a subscriber from a status page.""" + await client.remove_subscriber(status_page_id, subscriber_id) + return {"success": True} + + else: + + @mcp.tool() + def list_status_pages(search: str | None = None) -> list[dict[str, Any]]: + """List all status pages. Optionally filter by name or subdomain.""" + return [p.model_dump() for p in client.list_status_pages(search=search)] + + @mcp.tool() + def get_status_page(status_page_id: str) -> dict[str, Any]: + """Get a single status page by UUID.""" + return client.get_status_page(status_page_id).model_dump() + + @mcp.tool() + def create_status_page( + name: str, + subdomain: str, + custom_domain: str | None = None, + public: bool | None = None, + monitors: list[str] | None = None, + ) -> dict[str, Any]: + """This will create a new status page.""" + from hyperping.models import StatusPageCreate + + fields: dict[str, Any] = {"name": name, "subdomain": subdomain} + if custom_domain is not None: + fields["custom_domain"] = custom_domain + if public is not None: + fields["public"] = public + if monitors is not None: + fields["monitors"] = monitors + return client.create_status_page(StatusPageCreate(**fields)).model_dump() + + @mcp.tool() + def update_status_page( + status_page_id: str, + name: str | None = None, + subdomain: str | None = None, + custom_domain: str | None = None, + public: bool | None = None, + monitors: list[str] | None = None, + ) -> dict[str, Any]: + """Update an existing status page. Only supplied fields are changed.""" + from hyperping.models import StatusPageUpdate + + fields: dict[str, Any] = {} + if name is not None: + fields["name"] = name + if subdomain is not None: + fields["subdomain"] = subdomain + if custom_domain is not None: + fields["custom_domain"] = custom_domain + if public is not None: + fields["public"] = public + if monitors is not None: + fields["monitors"] = monitors + return client.update_status_page( + status_page_id, StatusPageUpdate(**fields) + ).model_dump() + + @mcp.tool() + def delete_status_page(status_page_id: str) -> dict[str, Any]: + """This will permanently delete a status page.""" + client.delete_status_page(status_page_id) + return {"success": True} + + @mcp.tool() + def list_subscribers( + status_page_id: str, + subscriber_type: str = "all", + ) -> list[dict[str, Any]]: + """List subscribers for a status page. subscriber_type: all, email, sms, slack.""" + return [ + s.model_dump() + for s in client.list_subscribers(status_page_id, subscriber_type=subscriber_type) + ] + + @mcp.tool() + def add_subscriber(status_page_id: str, email: str) -> dict[str, Any]: + """This will add an email subscriber to a status page.""" + return client.add_subscriber(status_page_id, email).model_dump() + + @mcp.tool() + def remove_subscriber(status_page_id: str, subscriber_id: str) -> dict[str, Any]: + """This will remove a subscriber from a status page.""" + client.remove_subscriber(status_page_id, subscriber_id) + return {"success": True} diff --git a/tests/unit/test_mcp_server/test_factory.py b/tests/unit/test_mcp_server/test_factory.py index c3bbb80..176a77b 100644 --- a/tests/unit/test_mcp_server/test_factory.py +++ b/tests/unit/test_mcp_server/test_factory.py @@ -111,3 +111,86 @@ def test_total_tool_count(self, mock_client, mock_mcp_client): server = create_mcp_server(client=mock_client, mcp_client=mock_mcp_client) assert len(server._tool_manager.list_tools()) == 62 + + +@pytest.fixture() +def mock_async_client(): + from hyperping._async_client import AsyncHyperpingClient + + return MagicMock(spec=AsyncHyperpingClient) + + +@pytest.fixture() +def mock_async_mcp_client(): + from hyperping._async_mcp_client import AsyncHyperpingMcpClient + + return MagicMock(spec=AsyncHyperpingMcpClient) + + +class TestCreateServerWithAsyncClient: + def test_create_server_with_async_client(self, mock_async_client, mock_async_mcp_client): + """create_mcp_server accepts AsyncHyperpingClient.""" + from mcp.server.fastmcp import FastMCP + + from hyperping.mcp_server import create_mcp_server + + server = create_mcp_server(client=mock_async_client, mcp_client=mock_async_mcp_client) + assert isinstance(server, FastMCP) + + def test_create_server_with_async_client_no_mcp_client(self, mock_async_client): + """create_mcp_server with only async REST client skips observability.""" + from hyperping.mcp_server import create_mcp_server + + server = create_mcp_server(client=mock_async_client) + tool_names = {t.name for t in server._tool_manager.list_tools()} + assert "list_monitors" in tool_names + assert "get_status_summary" not in tool_names + + def test_async_client_registers_coroutine_tools(self, mock_async_client, mock_async_mcp_client): + """Tools registered with async client are coroutine functions.""" + import asyncio + + from hyperping.mcp_server import create_mcp_server + + server = create_mcp_server( + client=mock_async_client, mcp_client=mock_async_mcp_client, tools=["monitors"] + ) + tool = next(t for t in server._tool_manager.list_tools() if t.name == "list_monitors") + assert asyncio.iscoroutinefunction(tool.fn) + + def test_sync_client_registers_sync_tools(self, mock_client, mock_mcp_client): + """Tools registered with sync client are plain functions.""" + import asyncio + + from hyperping.mcp_server import create_mcp_server + + server = create_mcp_server( + client=mock_client, mcp_client=mock_mcp_client, tools=["monitors"] + ) + tool = next(t for t in server._tool_manager.list_tools() if t.name == "list_monitors") + assert not asyncio.iscoroutinefunction(tool.fn) + + def test_total_tool_count_with_async_client(self, mock_async_client, mock_async_mcp_client): + """Async client registers the same 62 tools as sync client.""" + from hyperping.mcp_server import create_mcp_server + + server = create_mcp_server(client=mock_async_client, mcp_client=mock_async_mcp_client) + assert len(server._tool_manager.list_tools()) == 62 + + def test_mixed_async_client_sync_mcp_client(self, mock_async_client, mock_mcp_client): + """Async REST client with sync MCP client: REST tools are async, observability sync.""" + import asyncio + + from hyperping.mcp_server import create_mcp_server + + server = create_mcp_server( + client=mock_async_client, + mcp_client=mock_mcp_client, + tools=["monitors", "observability"], + ) + list_m = next(t for t in server._tool_manager.list_tools() if t.name == "list_monitors") + status = next( + t for t in server._tool_manager.list_tools() if t.name == "get_status_summary" + ) + assert asyncio.iscoroutinefunction(list_m.fn) + assert not asyncio.iscoroutinefunction(status.fn) diff --git a/tests/unit/test_mcp_server/test_tools_healthchecks.py b/tests/unit/test_mcp_server/test_tools_healthchecks.py index 262a92e..dd6622b 100644 --- a/tests/unit/test_mcp_server/test_tools_healthchecks.py +++ b/tests/unit/test_mcp_server/test_tools_healthchecks.py @@ -43,3 +43,32 @@ def test_pause_healthcheck_delegates(self, server, mock_client): _call(server, "pause_healthcheck", healthcheck_id="hc1") mock_client.pause_healthcheck.assert_called_once_with("hc1") + + +async def _call_async(server, tool_name, **kwargs): + tool = next(t for t in server._tool_manager.list_tools() if t.name == tool_name) + return await tool.fn(**kwargs) + + +@pytest.fixture() +def mock_async_client(): + from hyperping._async_client import AsyncHyperpingClient + + return MagicMock(spec=AsyncHyperpingClient) + + +@pytest.fixture() +def async_server(mock_async_client): + from hyperping.mcp_server import create_mcp_server + + return create_mcp_server(client=mock_async_client, tools=["healthchecks"]) + + +class TestHealthcheckToolsAsync: + async def test_list_healthchecks_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + mock_async_client.list_healthchecks = AsyncMock(return_value=[]) + + await _call_async(async_server, "list_healthchecks") + mock_async_client.list_healthchecks.assert_called_once() diff --git a/tests/unit/test_mcp_server/test_tools_incidents.py b/tests/unit/test_mcp_server/test_tools_incidents.py index 906f349..6059528 100644 --- a/tests/unit/test_mcp_server/test_tools_incidents.py +++ b/tests/unit/test_mcp_server/test_tools_incidents.py @@ -58,3 +58,32 @@ def test_delete_incident_description_warns(self, server): assert tool.description is not None lower = tool.description.lower() assert "delete" in lower or "this will" in lower + + +async def _call_async(server, tool_name, **kwargs): + tool = next(t for t in server._tool_manager.list_tools() if t.name == tool_name) + return await tool.fn(**kwargs) + + +@pytest.fixture() +def mock_async_client(): + from hyperping._async_client import AsyncHyperpingClient + + return MagicMock(spec=AsyncHyperpingClient) + + +@pytest.fixture() +def async_server(mock_async_client): + from hyperping.mcp_server import create_mcp_server + + return create_mcp_server(client=mock_async_client, tools=["incidents"]) + + +class TestIncidentToolsAsync: + async def test_list_incidents_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + mock_async_client.list_incidents = AsyncMock(return_value=[]) + + await _call_async(async_server, "list_incidents") + mock_async_client.list_incidents.assert_called_once() diff --git a/tests/unit/test_mcp_server/test_tools_maintenance.py b/tests/unit/test_mcp_server/test_tools_maintenance.py index 9292632..5425cdc 100644 --- a/tests/unit/test_mcp_server/test_tools_maintenance.py +++ b/tests/unit/test_mcp_server/test_tools_maintenance.py @@ -41,3 +41,32 @@ def test_is_monitor_in_maintenance_delegates(self, server, mock_client): result = _call(server, "is_monitor_in_maintenance", monitor_uuid="uuid-1") mock_client.is_monitor_in_maintenance.assert_called_once_with("uuid-1") assert result["in_maintenance"] is True + + +async def _call_async(server, tool_name, **kwargs): + tool = next(t for t in server._tool_manager.list_tools() if t.name == tool_name) + return await tool.fn(**kwargs) + + +@pytest.fixture() +def mock_async_client(): + from hyperping._async_client import AsyncHyperpingClient + + return MagicMock(spec=AsyncHyperpingClient) + + +@pytest.fixture() +def async_server(mock_async_client): + from hyperping.mcp_server import create_mcp_server + + return create_mcp_server(client=mock_async_client, tools=["maintenance"]) + + +class TestMaintenanceToolsAsync: + async def test_list_maintenance_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + mock_async_client.list_maintenance = AsyncMock(return_value=[]) + + await _call_async(async_server, "list_maintenance") + mock_async_client.list_maintenance.assert_called_once() diff --git a/tests/unit/test_mcp_server/test_tools_monitors.py b/tests/unit/test_mcp_server/test_tools_monitors.py index 677d366..c3adfd3 100644 --- a/tests/unit/test_mcp_server/test_tools_monitors.py +++ b/tests/unit/test_mcp_server/test_tools_monitors.py @@ -31,9 +31,7 @@ def mock_mcp_client(): def server(mock_client, mock_mcp_client): from hyperping.mcp_server import create_mcp_server - return create_mcp_server( - client=mock_client, mcp_client=mock_mcp_client, tools=["monitors"] - ) + return create_mcp_server(client=mock_client, mcp_client=mock_mcp_client, tools=["monitors"]) class TestMonitorTools: @@ -88,3 +86,85 @@ def test_search_monitors_delegates_to_mcp_client(self, server, mock_mcp_client): _call(server, "search_monitors_by_name", query="my") mock_mcp_client.search_monitors_by_name.assert_called_once_with("my") + + +async def _call_async(server, tool_name, **kwargs): + tool = next(t for t in server._tool_manager.list_tools() if t.name == tool_name) + return await tool.fn(**kwargs) + + +@pytest.fixture() +def mock_async_client(): + from hyperping._async_client import AsyncHyperpingClient + + return MagicMock(spec=AsyncHyperpingClient) + + +@pytest.fixture() +def mock_async_mcp_client(): + from hyperping._async_mcp_client import AsyncHyperpingMcpClient + + return MagicMock(spec=AsyncHyperpingMcpClient) + + +@pytest.fixture() +def async_server(mock_async_client, mock_async_mcp_client): + from hyperping.mcp_server import create_mcp_server + + return create_mcp_server( + client=mock_async_client, mcp_client=mock_async_mcp_client, tools=["monitors"] + ) + + +class TestMonitorToolsAsync: + async def test_list_monitors_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + monitor = MagicMock() + monitor.model_dump.return_value = {"uuid": "abc", "name": "test"} + mock_async_client.list_monitors = AsyncMock(return_value=[monitor]) + + result = await _call_async(async_server, "list_monitors") + mock_async_client.list_monitors.assert_called_once() + assert result[0]["uuid"] == "abc" + + async def test_get_monitor_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + monitor = MagicMock() + monitor.model_dump.return_value = {"uuid": "abc"} + mock_async_client.get_monitor = AsyncMock(return_value=monitor) + + await _call_async(async_server, "get_monitor", monitor_id="abc") + mock_async_client.get_monitor.assert_called_once_with("abc") + + async def test_create_monitor_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + monitor = MagicMock() + monitor.model_dump.return_value = {"uuid": "new", "name": "m1"} + mock_async_client.create_monitor = AsyncMock(return_value=monitor) + + await _call_async(async_server, "create_monitor", name="m1", url="https://example.com") + mock_async_client.create_monitor.assert_called_once() + + async def test_delete_monitor_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + mock_async_client.delete_monitor = AsyncMock(return_value=None) + + result = await _call_async(async_server, "delete_monitor", monitor_id="abc") + mock_async_client.delete_monitor.assert_called_once_with("abc") + assert result.get("success") is True + + async def test_search_monitors_by_name_async_delegates( + self, async_server, mock_async_mcp_client + ): + from unittest.mock import AsyncMock + + m = MagicMock() + m.model_dump.return_value = {"uuid": "abc", "name": "my monitor"} + mock_async_mcp_client.search_monitors_by_name = AsyncMock(return_value=[m]) + + await _call_async(async_server, "search_monitors_by_name", query="my") + mock_async_mcp_client.search_monitors_by_name.assert_called_once_with("my") diff --git a/tests/unit/test_mcp_server/test_tools_observability.py b/tests/unit/test_mcp_server/test_tools_observability.py index 8722a58..7e6d9b8 100644 --- a/tests/unit/test_mcp_server/test_tools_observability.py +++ b/tests/unit/test_mcp_server/test_tools_observability.py @@ -63,3 +63,38 @@ def test_skipped_when_no_mcp_client(self): mock_client = MagicMock(spec=HyperpingClient) server = create_mcp_server(client=mock_client, tools=["observability"]) assert len(server._tool_manager.list_tools()) == 0 + + +async def _call_async(server, tool_name, **kwargs): + tool = next(t for t in server._tool_manager.list_tools() if t.name == tool_name) + return await tool.fn(**kwargs) + + +@pytest.fixture() +def mock_async_mcp_client(): + from hyperping._async_mcp_client import AsyncHyperpingMcpClient + + return MagicMock(spec=AsyncHyperpingMcpClient) + + +@pytest.fixture() +def async_server(mock_async_mcp_client): + from hyperping.client import HyperpingClient + from hyperping.mcp_server import create_mcp_server + + mock_client = MagicMock(spec=HyperpingClient) + return create_mcp_server( + client=mock_client, mcp_client=mock_async_mcp_client, tools=["observability"] + ) + + +class TestObservabilityToolsAsync: + async def test_get_status_summary_async_delegates(self, async_server, mock_async_mcp_client): + from unittest.mock import AsyncMock + + summary = MagicMock() + summary.model_dump.return_value = {"total": 5, "up": 4, "down": 1} + mock_async_mcp_client.get_status_summary = AsyncMock(return_value=summary) + + await _call_async(async_server, "get_status_summary") + mock_async_mcp_client.get_status_summary.assert_called_once() diff --git a/tests/unit/test_mcp_server/test_tools_outages.py b/tests/unit/test_mcp_server/test_tools_outages.py index 03362b5..27984a4 100644 --- a/tests/unit/test_mcp_server/test_tools_outages.py +++ b/tests/unit/test_mcp_server/test_tools_outages.py @@ -46,3 +46,32 @@ def test_escalate_outage_delegates(self, server, mock_client): _call(server, "escalate_outage", outage_id="out1") mock_client.escalate_outage.assert_called_once_with("out1") + + +async def _call_async(server, tool_name, **kwargs): + tool = next(t for t in server._tool_manager.list_tools() if t.name == tool_name) + return await tool.fn(**kwargs) + + +@pytest.fixture() +def mock_async_client(): + from hyperping._async_client import AsyncHyperpingClient + + return MagicMock(spec=AsyncHyperpingClient) + + +@pytest.fixture() +def async_server(mock_async_client): + from hyperping.mcp_server import create_mcp_server + + return create_mcp_server(client=mock_async_client, tools=["outages"]) + + +class TestOutageToolsAsync: + async def test_list_outages_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + mock_async_client.list_outages = AsyncMock(return_value=[]) + + await _call_async(async_server, "list_outages") + mock_async_client.list_outages.assert_called_once() diff --git a/tests/unit/test_mcp_server/test_tools_statuspages.py b/tests/unit/test_mcp_server/test_tools_statuspages.py index 1e75dde..8486386 100644 --- a/tests/unit/test_mcp_server/test_tools_statuspages.py +++ b/tests/unit/test_mcp_server/test_tools_statuspages.py @@ -43,3 +43,32 @@ def test_add_subscriber_delegates(self, server, mock_client): _call(server, "add_subscriber", status_page_id="sp1", email="a@b.com") mock_client.add_subscriber.assert_called_once_with("sp1", "a@b.com") + + +async def _call_async(server, tool_name, **kwargs): + tool = next(t for t in server._tool_manager.list_tools() if t.name == tool_name) + return await tool.fn(**kwargs) + + +@pytest.fixture() +def mock_async_client(): + from hyperping._async_client import AsyncHyperpingClient + + return MagicMock(spec=AsyncHyperpingClient) + + +@pytest.fixture() +def async_server(mock_async_client): + from hyperping.mcp_server import create_mcp_server + + return create_mcp_server(client=mock_async_client, tools=["statuspages"]) + + +class TestStatusPageToolsAsync: + async def test_list_status_pages_async_delegates(self, async_server, mock_async_client): + from unittest.mock import AsyncMock + + mock_async_client.list_status_pages = AsyncMock(return_value=[]) + + await _call_async(async_server, "list_status_pages") + mock_async_client.list_status_pages.assert_called_once()