diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 1a1a85721..c88a89860 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -21,7 +21,7 @@ EventStore, StreamableHTTPServerTransport, ) -from mcp.server.transport_security import TransportSecuritySettings +from mcp.server.transport_security import TransportSecurityMiddleware, TransportSecuritySettings from mcp.types import INVALID_REQUEST, ErrorData, JSONRPCError logger = logging.getLogger(__name__) @@ -85,6 +85,15 @@ def __init__( self.retry_interval = retry_interval self.session_idle_timeout = session_idle_timeout + # Pre-check middleware: the manager runs security validation *before* + # deciding whether to allocate a session. Without this, a request that + # fails security (DNS rebinding, bad Host) would either be rejected too + # late (after a session has already been created and would leak) or with + # the wrong status code (400 "Missing session ID" instead of 421 + # "Invalid Host header"). Using the same middleware the transport uses + # keeps the two validation paths consistent. + self._security = TransportSecurityMiddleware(security_settings) + # Session tracking (only used if not stateless) self._session_creation_lock = anyio.Lock() self._server_instances: dict[str, StreamableHTTPServerTransport] = {} @@ -229,6 +238,16 @@ async def _handle_stateful_request( send: ASGI send function """ request = Request(scope, receive) + + # Run security validation FIRST so DNS-rebinding / bad-Host requests + # are rejected with 421 (or the transport-level Content-Type 400) + # regardless of whether a session existed or not — and so bad-Host + # requests can never trigger a session allocation. + security_error = await self._security.validate_request(request, is_post=(request.method == "POST")) + if security_error is not None: + await security_error(scope, receive, send) + return + request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER) user = scope.get("user") @@ -262,6 +281,57 @@ async def _handle_stateful_request( return if request_mcp_session_id is None: + # Only POST may initialize a new session (per MCP spec: a session is + # opened by the response to the ``initialize`` JSON-RPC request, + # which is always a POST). Any non-POST request without a session-id + # is a protocol error and must be rejected here — the transport-layer + # rejection happens too late: by the time + # ``StreamableHTTPServerTransport.handle_request()`` sees the request, + # a session has already been allocated in ``_server_instances`` and + # a background ``run_server`` task has been spawned waiting on a + # stream that will never receive anything. Both leak forever unless + # ``session_idle_timeout`` is set (opt-in, off by default). Rejecting + # here also holds for PUT/PATCH/OPTIONS/HEAD — the transport would + # answer 405 to those, but only after the leaky allocation. + if request.method != "POST": + # GET/DELETE are protocol-valid methods when a session exists, + # so the correct response for a missing session is 400 — matching + # the transport's existing "Missing session ID" wording. Anything + # else is a genuinely unsupported method, so 405 is more accurate. + # Both branches mirror the shape produced by + # ``StreamableHTTPServerTransport._create_error_response`` / + # ``_handle_unsupported_request`` so clients see the same + # JSON-RPC body and headers (including the RFC 7231 ``Allow`` + # advertisement for 405) whether the rejection happens here or + # in the transport layer. + if request.method in ("GET", "DELETE"): + error_body = JSONRPCError( + jsonrpc="2.0", + id="server-error", + error=ErrorData(code=INVALID_REQUEST, message="Bad Request: Missing session ID"), + ) + response = Response( + content=error_body.model_dump_json(by_alias=True, exclude_none=True), + status_code=400, + media_type="application/json", + ) + else: + error_body = JSONRPCError( + jsonrpc="2.0", + id="server-error", + error=ErrorData(code=INVALID_REQUEST, message="Method Not Allowed"), + ) + response = Response( + content=error_body.model_dump_json(by_alias=True, exclude_none=True), + status_code=405, + headers={ + "Content-Type": "application/json", + "Allow": "GET, POST, DELETE", + }, + ) + await response(scope, receive, send) + return + # New session case logger.debug("Creating new transport") async with self._session_creation_lock: diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index 0ae07c43a..035785cc9 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -14,6 +14,7 @@ from mcp.server.lowlevel import Server from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from mcp.server.transport_security import TransportSecuritySettings from mcp.types import INVALID_REQUEST @@ -382,6 +383,159 @@ async def capture_send(message: Message): assert response_start["status"] == 404 +@pytest.mark.anyio +@pytest.mark.parametrize( + ("method", "expected_status", "expected_message_substring", "expected_allow_header"), + [ + ("GET", 400, "Missing session ID", None), + ("DELETE", 400, "Missing session ID", None), + ("PUT", 405, "Method Not Allowed", "GET, POST, DELETE"), + ("PATCH", 405, "Method Not Allowed", "GET, POST, DELETE"), + ("OPTIONS", 405, "Method Not Allowed", "GET, POST, DELETE"), + ("HEAD", 405, "Method Not Allowed", "GET, POST, DELETE"), + ], +) +async def test_non_post_without_session_id_does_not_allocate_session( + method: str, expected_status: int, expected_message_substring: str, expected_allow_header: str | None +): + """Regression test: no non-POST method without a session-id may allocate a + session or spawn a background task. + + Before this fix, any request without a session id — including GET/DELETE + (per this bug report) but also PUT/PATCH/OPTIONS/HEAD — entered the + "new session" branch of the manager, created a transport, registered it + in ``_server_instances``, and launched a ``run_server`` task that would + wait forever for messages that never come. The transport-level validation + then returned 400/406/405, but the allocated session + task were leaked. + Under a Docker healthcheck polling /mcp every 30 seconds this accumulated + ~2 sessions/min indefinitely (~1 GiB/week). + + GET/DELETE are protocol-valid methods when a session exists, so they get + ``400 "Missing session ID"``. Other methods are genuinely unsupported on + the MCP endpoint, so they get ``405 "Method Not Allowed"``. + """ + app = Server("test-non-post-no-session") + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent_messages: list[Message] = [] + response_body = b"" + + async def mock_send(message: Message): + nonlocal response_body + sent_messages.append(message) + if message["type"] == "http.response.body": + response_body += message.get("body", b"") + + scope: Scope = { + "type": "http", + "method": method, + "path": "/mcp", + "headers": [ + (b"accept", b"application/json, text/event-stream"), + ], + } + + async def mock_receive(): # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + # Snapshot before + assert len(manager._server_instances) == 0 + + await manager.handle_request(scope, mock_receive, mock_send) + + # Give any accidentally-spawned background task a chance to register + await anyio.sleep(0.05) + + # No session, no task + assert len(manager._server_instances) == 0, ( + f"{method} without session-id must not allocate a session — leaked {len(manager._server_instances)}" + ) + assert manager._task_group is not None + # anyio TaskGroup internals: no live tasks belonging to run_server + live_tasks = len(manager._task_group._tasks) # type: ignore[attr-defined] + assert live_tasks == 0, f"{method} without session-id must not spawn a background task — leaked {live_tasks}" + + # Response should be a well-formed JSON-RPC error at status 400 + response_start = next( + (msg for msg in sent_messages if msg["type"] == "http.response.start"), + None, + ) + assert response_start is not None, "Should have sent a response" + assert response_start["status"] == expected_status + + error_data = json.loads(response_body) + assert error_data["jsonrpc"] == "2.0" + assert error_data["error"]["code"] == INVALID_REQUEST + assert expected_message_substring in error_data["error"]["message"] + + # RFC 7231: a 405 response must advertise the allowed methods via the + # ``Allow`` header. The manager's 405 mirrors the transport's shape + # (``Allow: GET, POST, DELETE``) exactly so downstream clients get + # identical metadata whether the rejection happens here or one layer + # deeper. 400 responses do not carry the header (they are not about + # method mismatch), which is what ``expected_allow_header=None`` asserts. + response_headers = {name.decode().lower(): value.decode() for name, value in response_start.get("headers", [])} + assert response_headers.get("allow") == expected_allow_header, ( + f"Unexpected Allow header for {method}/{expected_status}: got {response_headers.get('allow')!r}, " + f"expected {expected_allow_header!r}" + ) + + +@pytest.mark.anyio +async def test_bad_host_header_rejected_before_session_allocation(): + """Security check runs before session allocation. + + With DNS-rebinding protection enabled, a request that presents a + Host header not in the allow-list must be rejected with 421 without + allocating a session. Previously this check lived only in the + transport, so a bad-Host request would allocate a session first and + then get rejected — the allocated session and its task were leaked. + """ + app = Server("test-bad-host") + manager = StreamableHTTPSessionManager( + app=app, + security_settings=TransportSecuritySettings(enable_dns_rebinding_protection=True, allowed_hosts=["127.0.0.1"]), + ) + + async with manager.run(): + sent_messages: list[Message] = [] + + async def mock_send(message: Message): + sent_messages.append(message) + + scope: Scope = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [ + (b"host", b"evil.com"), + (b"content-type", b"application/json"), + (b"accept", b"application/json, text/event-stream"), + ], + } + + async def mock_receive(): # pragma: no cover + return {"type": "http.request", "body": b"{}", "more_body": False} + + assert len(manager._server_instances) == 0 + + await manager.handle_request(scope, mock_receive, mock_send) + + # Session must NOT have been allocated + assert len(manager._server_instances) == 0, ( + "Bad-Host request must not allocate a session (was rejected by security check)" + ) + + # And the response must be the 421 the middleware produced + response_start = next( + (msg for msg in sent_messages if msg["type"] == "http.response.start"), + None, + ) + assert response_start is not None + assert response_start["status"] == 421 + + def test_session_idle_timeout_rejects_non_positive(): with pytest.raises(ValueError, match="positive number"): StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1)