Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ dependencies = [

[project.optional-dependencies]
cli = ["typer[all]>=0.12,<1.0"]
otel = [
"opentelemetry-api>=1.20",
]
dev = [
"pytest>=9.0.3",
"pytest-cov",
Expand All @@ -41,6 +44,8 @@ dev = [
"pydantic",
"pip-audit>=2.7",
"typer[all]>=0.12,<1.0",
"opentelemetry-api>=1.20",
"opentelemetry-sdk>=1.20",
]

[project.scripts]
Expand Down
135 changes: 74 additions & 61 deletions src/hyperping/_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
sanitize_for_log,
validate_base_url,
)
from hyperping._otel import get_tracer, record_error, start_request_span
from hyperping.client import _ENDPOINT_BREAKERS_MAX, DEFAULT_RETRY_CONFIG, RetryConfig
from hyperping.endpoints import API_BASE, Endpoint
from hyperping.exceptions import (
Expand Down Expand Up @@ -138,6 +139,7 @@ def __init__(
},
timeout=self.timeout,
)
self._tracer = get_tracer()

def __repr__(self) -> str:
return f"AsyncHyperpingClient(base_url={self.base_url!r})"
Expand Down Expand Up @@ -388,68 +390,79 @@ async def _request(
Raises:
HyperpingAPIError: On API errors after retries exhausted
"""
breaker = self._breaker_for(path)
if not breaker.call_allowed():
raise HyperpingAPIError(self._circuit_open_message(breaker, path))

last_exception: Exception | None = None
delay = self.retry_config.initial_delay
max_attempts = self.retry_config.max_retries + 1

for attempt in range(max_attempts):
try:
result = await self._execute_single_attempt(method, path, json, params)

if not isinstance(result, httpx.Response):
return result

response = result
if self._should_retry(response.status_code, attempt):
sleep_time = self._compute_sleep_time(response, delay)
logger.warning(
"Retrying after %.2fs due to %d (attempt %d/%d)",
sleep_time,
response.status_code,
attempt + 1,
max_attempts,
)
await asyncio.sleep(sleep_time)
delay = min(
delay * self.retry_config.backoff_factor,
self.retry_config.max_delay,
)
continue

if response.status_code >= 500:
with start_request_span(self._tracer, method, path, self.base_url) as span:
breaker = self._breaker_for(path)
if not breaker.call_allowed():
raise HyperpingAPIError(self._circuit_open_message(breaker, path))

last_exception: Exception | None = None
delay = self.retry_config.initial_delay
max_attempts = self.retry_config.max_retries + 1

for attempt in range(max_attempts):
try:
result = await self._execute_single_attempt(method, path, json, params)

if not isinstance(result, httpx.Response):
return result

response = result
if self._should_retry(response.status_code, attempt):
sleep_time = self._compute_sleep_time(response, delay)
logger.warning(
"Retrying after %.2fs due to %d (attempt %d/%d)",
sleep_time,
response.status_code,
attempt + 1,
max_attempts,
)
await asyncio.sleep(sleep_time)
delay = min(
delay * self.retry_config.backoff_factor,
self.retry_config.max_delay,
)
continue

if response.status_code >= 500:
breaker.record_failure()
try:
self._handle_response_error(response)
except HyperpingAPIError as exc:
record_error(span, exc)
raise

except (httpx.TimeoutException, httpx.RequestError) as e:
last_exception = e
if attempt < self.retry_config.max_retries:
label = "timeout" if isinstance(e, httpx.TimeoutException) else str(e)
sleep_time = delay + random.uniform(0, delay * 0.25)
logger.warning(
"Request %s, retrying after %.2fs (attempt %d/%d)",
label,
sleep_time,
attempt + 1,
max_attempts,
)
await asyncio.sleep(sleep_time)
delay = min(
delay * self.retry_config.backoff_factor,
self.retry_config.max_delay,
)
continue
breaker.record_failure()
self._handle_response_error(response)

except (httpx.TimeoutException, httpx.RequestError) as e:
last_exception = e
if attempt < self.retry_config.max_retries:
label = "timeout" if isinstance(e, httpx.TimeoutException) else str(e)
sleep_time = delay + random.uniform(0, delay * 0.25)
logger.warning(
"Request %s, retrying after %.2fs (attempt %d/%d)",
label,
sleep_time,
attempt + 1,
max_attempts,
)
await asyncio.sleep(sleep_time)
delay = min(
delay * self.retry_config.backoff_factor,
self.retry_config.max_delay,
)
continue
breaker.record_failure()
if isinstance(e, httpx.TimeoutException):
raise HyperpingAPIError(f"Request timeout after {max_attempts} attempts") from e
raise HyperpingAPIError(f"Request failed: {e}") from e

raise HyperpingAPIError( # pragma: no cover
"Request failed after all retries"
) from last_exception
if isinstance(e, httpx.TimeoutException):
api_exc = HyperpingAPIError(
f"Request timeout after {max_attempts} attempts"
)
record_error(span, api_exc)
raise api_exc from e
api_exc = HyperpingAPIError(f"Request failed: {e}")
record_error(span, api_exc)
raise api_exc from e

raise HyperpingAPIError( # pragma: no cover
"Request failed after all retries"
) from last_exception

# ==================== Health Check ====================

Expand Down
77 changes: 42 additions & 35 deletions src/hyperping/_async_mcp_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pydantic import SecretStr

from hyperping._internals import validate_base_url
from hyperping._otel import get_tracer, record_error, start_rpc_span
from hyperping._version import __version__
from hyperping.endpoints import MCP_URL
from hyperping.exceptions import (
Expand Down Expand Up @@ -84,6 +85,7 @@ def __init__(
self._init_blocked_status_code: int = 200
self._init_result: dict[str, Any] = {}
self._max_retries = max_retries
self._tracer = get_tracer()

async def _next_id(self) -> int:
async with self._lock:
Expand Down Expand Up @@ -273,44 +275,49 @@ async def call_tool(
"""
await self.initialize()

last_exc: Exception | None = None
for attempt in range(self._max_retries + 1):
try:
result = await self._send_rpc(
"tools/call",
{"name": tool_name, "arguments": arguments or {}},
)
break
except HyperpingAPIError as exc:
if exc.status_code and exc.status_code in (500, 502, 503, 504):
last_exc = exc
if attempt < self._max_retries:
await asyncio.sleep(min(2**attempt, 10))
continue
raise
else:
raise last_exc # type: ignore[misc]
if result is None:
return None
with start_rpc_span(self._tracer, "tools/call", self._url) as span:
last_exc: Exception | None = None
for attempt in range(self._max_retries + 1):
try:
result = await self._send_rpc(
"tools/call",
{"name": tool_name, "arguments": arguments or {}},
)
break
except HyperpingAPIError as exc:
if exc.status_code and exc.status_code in (500, 502, 503, 504):
last_exc = exc
if attempt < self._max_retries:
await asyncio.sleep(min(2**attempt, 10))
continue
record_error(span, exc)
raise
else:
if last_exc is not None:
record_error(span, last_exc)
raise last_exc # type: ignore[misc]

content = result.get("result", {}).get("content", [])
if not content:
return None
if result is None:
return None

text = content[0].get("text", "")
if not text:
return None
content = result.get("result", {}).get("content", [])
if not content:
return None

try:
return json.loads(text)
except json.JSONDecodeError as exc:
# Server-controlled ``text`` may carry PII; drop it instead of
# embedding the first 500 bytes into the exception.
raise HyperpingAPIError(
f"Failed to parse MCP tool response: {exc}",
status_code=200,
response_body=None,
) from exc
text = content[0].get("text", "")
if not text:
return None

try:
return json.loads(text)
except json.JSONDecodeError as exc:
# Server-controlled ``text`` may carry PII; drop it instead of
# embedding the first 500 bytes into the exception.
raise HyperpingAPIError(
f"Failed to parse MCP tool response: {exc}",
status_code=200,
response_body=None,
) from exc

async def close(self) -> None:
await self._client.aclose()
Expand Down
78 changes: 42 additions & 36 deletions src/hyperping/_mcp_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pydantic import SecretStr

from hyperping._internals import validate_base_url
from hyperping._otel import get_tracer, record_error, start_rpc_span
from hyperping._version import __version__
from hyperping.endpoints import MCP_URL
from hyperping.exceptions import (
Expand Down Expand Up @@ -84,6 +85,7 @@ def __init__(
self._init_blocked_status_code: int = 200
self._init_result: dict[str, Any] = {}
self._max_retries = max_retries
self._tracer = get_tracer()

def _next_id(self) -> int:
with self._lock:
Expand Down Expand Up @@ -274,45 +276,49 @@ def call_tool(
"""
self.initialize()

last_exc: Exception | None = None
for attempt in range(self._max_retries + 1):
try:
result = self._send_rpc(
"tools/call",
{"name": tool_name, "arguments": arguments or {}},
)
break
except HyperpingAPIError as exc:
if exc.status_code and exc.status_code in (500, 502, 503, 504):
last_exc = exc
if attempt < self._max_retries:
time.sleep(min(2**attempt, 10))
continue
raise
else:
raise last_exc # type: ignore[misc]

if result is None:
return None
with start_rpc_span(self._tracer, "tools/call", self._url) as span:
last_exc: Exception | None = None
for attempt in range(self._max_retries + 1):
try:
result = self._send_rpc(
"tools/call",
{"name": tool_name, "arguments": arguments or {}},
)
break
except HyperpingAPIError as exc:
if exc.status_code and exc.status_code in (500, 502, 503, 504):
last_exc = exc
if attempt < self._max_retries:
time.sleep(min(2**attempt, 10))
continue
record_error(span, exc)
raise
else:
if last_exc is not None:
record_error(span, last_exc)
raise last_exc # type: ignore[misc]

content = result.get("result", {}).get("content", [])
if not content:
return None
if result is None:
return None

text = content[0].get("text", "")
if not text:
return None
content = result.get("result", {}).get("content", [])
if not content:
return None

try:
return json.loads(text)
except json.JSONDecodeError as exc:
# Server-controlled ``text`` may carry PII; drop it instead of
# embedding the first 500 bytes into the exception.
raise HyperpingAPIError(
f"Failed to parse MCP tool response: {exc}",
status_code=200,
response_body=None,
) from exc
text = content[0].get("text", "")
if not text:
return None

try:
return json.loads(text)
except json.JSONDecodeError as exc:
# Server-controlled ``text`` may carry PII; drop it instead of
# embedding the first 500 bytes into the exception.
raise HyperpingAPIError(
f"Failed to parse MCP tool response: {exc}",
status_code=200,
response_body=None,
) from exc

def close(self) -> None:
self._client.close()
Expand Down
Loading