Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
EventStore,
StreamableHTTPServerTransport,
)
from mcp.server.transport_security import TransportSecuritySettings
from mcp.server.transport_security import TransportSecurityMiddleware, TransportSecuritySettings
from mcp.types import INVALID_REQUEST, ErrorData, JSONRPCError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -85,6 +85,15 @@ def __init__(
self.retry_interval = retry_interval
self.session_idle_timeout = session_idle_timeout

# Pre-check middleware: the manager runs security validation *before*
# deciding whether to allocate a session. Without this, a request that
# fails security (DNS rebinding, bad Host) would either be rejected too
# late (after a session has already been created and would leak) or with
# the wrong status code (400 "Missing session ID" instead of 421
# "Invalid Host header"). Using the same middleware the transport uses
# keeps the two validation paths consistent.
self._security = TransportSecurityMiddleware(security_settings)

# Session tracking (only used if not stateless)
self._session_creation_lock = anyio.Lock()
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
Expand Down Expand Up @@ -229,6 +238,16 @@ async def _handle_stateful_request(
send: ASGI send function
"""
request = Request(scope, receive)

# Run security validation FIRST so DNS-rebinding / bad-Host requests
# are rejected with 421 (or the transport-level Content-Type 400)
# regardless of whether a session existed or not — and so bad-Host
# requests can never trigger a session allocation.
security_error = await self._security.validate_request(request, is_post=(request.method == "POST"))
if security_error is not None:
await security_error(scope, receive, send)
return

request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER)

user = scope.get("user")
Expand Down Expand Up @@ -262,6 +281,57 @@ async def _handle_stateful_request(
return

if request_mcp_session_id is None:
# Only POST may initialize a new session (per MCP spec: a session is
# opened by the response to the ``initialize`` JSON-RPC request,
# which is always a POST). Any non-POST request without a session-id
# is a protocol error and must be rejected here — the transport-layer
# rejection happens too late: by the time
# ``StreamableHTTPServerTransport.handle_request()`` sees the request,
# a session has already been allocated in ``_server_instances`` and
# a background ``run_server`` task has been spawned waiting on a
# stream that will never receive anything. Both leak forever unless
# ``session_idle_timeout`` is set (opt-in, off by default). Rejecting
# here also holds for PUT/PATCH/OPTIONS/HEAD — the transport would
# answer 405 to those, but only after the leaky allocation.
if request.method != "POST":
# GET/DELETE are protocol-valid methods when a session exists,
# so the correct response for a missing session is 400 — matching
# the transport's existing "Missing session ID" wording. Anything
# else is a genuinely unsupported method, so 405 is more accurate.
# Both branches mirror the shape produced by
# ``StreamableHTTPServerTransport._create_error_response`` /
# ``_handle_unsupported_request`` so clients see the same
# JSON-RPC body and headers (including the RFC 7231 ``Allow``
# advertisement for 405) whether the rejection happens here or
# in the transport layer.
if request.method in ("GET", "DELETE"):
error_body = JSONRPCError(
jsonrpc="2.0",
id="server-error",
error=ErrorData(code=INVALID_REQUEST, message="Bad Request: Missing session ID"),
)
response = Response(
content=error_body.model_dump_json(by_alias=True, exclude_none=True),
status_code=400,
media_type="application/json",
)
else:
error_body = JSONRPCError(
jsonrpc="2.0",
id="server-error",
error=ErrorData(code=INVALID_REQUEST, message="Method Not Allowed"),
)
response = Response(
content=error_body.model_dump_json(by_alias=True, exclude_none=True),
status_code=405,
headers={
"Content-Type": "application/json",
"Allow": "GET, POST, DELETE",
},
)
await response(scope, receive, send)
return

# New session case
logger.debug("Creating new transport")
async with self._session_creation_lock:
Expand Down
154 changes: 154 additions & 0 deletions tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from mcp.server.lowlevel import Server
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.server.transport_security import TransportSecuritySettings
from mcp.types import INVALID_REQUEST


Expand Down Expand Up @@ -382,6 +383,159 @@ async def capture_send(message: Message):
assert response_start["status"] == 404


@pytest.mark.anyio
@pytest.mark.parametrize(
("method", "expected_status", "expected_message_substring", "expected_allow_header"),
[
("GET", 400, "Missing session ID", None),
("DELETE", 400, "Missing session ID", None),
("PUT", 405, "Method Not Allowed", "GET, POST, DELETE"),
("PATCH", 405, "Method Not Allowed", "GET, POST, DELETE"),
("OPTIONS", 405, "Method Not Allowed", "GET, POST, DELETE"),
("HEAD", 405, "Method Not Allowed", "GET, POST, DELETE"),
],
)
async def test_non_post_without_session_id_does_not_allocate_session(
method: str, expected_status: int, expected_message_substring: str, expected_allow_header: str | None
):
"""Regression test: no non-POST method without a session-id may allocate a
session or spawn a background task.

Before this fix, any request without a session id — including GET/DELETE
(per this bug report) but also PUT/PATCH/OPTIONS/HEAD — entered the
"new session" branch of the manager, created a transport, registered it
in ``_server_instances``, and launched a ``run_server`` task that would
wait forever for messages that never come. The transport-level validation
then returned 400/406/405, but the allocated session + task were leaked.
Under a Docker healthcheck polling /mcp every 30 seconds this accumulated
~2 sessions/min indefinitely (~1 GiB/week).

GET/DELETE are protocol-valid methods when a session exists, so they get
``400 "Missing session ID"``. Other methods are genuinely unsupported on
the MCP endpoint, so they get ``405 "Method Not Allowed"``.
"""
app = Server("test-non-post-no-session")
manager = StreamableHTTPSessionManager(app=app)

async with manager.run():
sent_messages: list[Message] = []
response_body = b""

async def mock_send(message: Message):
nonlocal response_body
sent_messages.append(message)
if message["type"] == "http.response.body":
response_body += message.get("body", b"")

scope: Scope = {
"type": "http",
"method": method,
"path": "/mcp",
"headers": [
(b"accept", b"application/json, text/event-stream"),
],
}

async def mock_receive(): # pragma: no cover
return {"type": "http.request", "body": b"", "more_body": False}

# Snapshot before
assert len(manager._server_instances) == 0

await manager.handle_request(scope, mock_receive, mock_send)

# Give any accidentally-spawned background task a chance to register
await anyio.sleep(0.05)

# No session, no task
assert len(manager._server_instances) == 0, (
f"{method} without session-id must not allocate a session — leaked {len(manager._server_instances)}"
)
assert manager._task_group is not None
# anyio TaskGroup internals: no live tasks belonging to run_server
live_tasks = len(manager._task_group._tasks) # type: ignore[attr-defined]
assert live_tasks == 0, f"{method} without session-id must not spawn a background task — leaked {live_tasks}"

# Response should be a well-formed JSON-RPC error at status 400
response_start = next(
(msg for msg in sent_messages if msg["type"] == "http.response.start"),
None,
)
assert response_start is not None, "Should have sent a response"
assert response_start["status"] == expected_status

error_data = json.loads(response_body)
assert error_data["jsonrpc"] == "2.0"
assert error_data["error"]["code"] == INVALID_REQUEST
assert expected_message_substring in error_data["error"]["message"]

# RFC 7231: a 405 response must advertise the allowed methods via the
# ``Allow`` header. The manager's 405 mirrors the transport's shape
# (``Allow: GET, POST, DELETE``) exactly so downstream clients get
# identical metadata whether the rejection happens here or one layer
# deeper. 400 responses do not carry the header (they are not about
# method mismatch), which is what ``expected_allow_header=None`` asserts.
response_headers = {name.decode().lower(): value.decode() for name, value in response_start.get("headers", [])}
assert response_headers.get("allow") == expected_allow_header, (
f"Unexpected Allow header for {method}/{expected_status}: got {response_headers.get('allow')!r}, "
f"expected {expected_allow_header!r}"
)


@pytest.mark.anyio
async def test_bad_host_header_rejected_before_session_allocation():
"""Security check runs before session allocation.

With DNS-rebinding protection enabled, a request that presents a
Host header not in the allow-list must be rejected with 421 without
allocating a session. Previously this check lived only in the
transport, so a bad-Host request would allocate a session first and
then get rejected — the allocated session and its task were leaked.
"""
app = Server("test-bad-host")
manager = StreamableHTTPSessionManager(
app=app,
security_settings=TransportSecuritySettings(enable_dns_rebinding_protection=True, allowed_hosts=["127.0.0.1"]),
)

async with manager.run():
sent_messages: list[Message] = []

async def mock_send(message: Message):
sent_messages.append(message)

scope: Scope = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [
(b"host", b"evil.com"),
(b"content-type", b"application/json"),
(b"accept", b"application/json, text/event-stream"),
],
}

async def mock_receive(): # pragma: no cover
return {"type": "http.request", "body": b"{}", "more_body": False}

assert len(manager._server_instances) == 0

await manager.handle_request(scope, mock_receive, mock_send)

# Session must NOT have been allocated
assert len(manager._server_instances) == 0, (
"Bad-Host request must not allocate a session (was rejected by security check)"
)

# And the response must be the 421 the middleware produced
response_start = next(
(msg for msg in sent_messages if msg["type"] == "http.response.start"),
None,
)
assert response_start is not None
assert response_start["status"] == 421


def test_session_idle_timeout_rejects_non_positive():
with pytest.raises(ValueError, match="positive number"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1)
Expand Down
Loading