diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index d316345c7..4cfc4aede 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -549,6 +549,20 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re request_id = str(message.id) + # Reject duplicate in-flight request ids: `_request_streams` is keyed by + # request id, so a second concurrent request with the same id would + # silently overwrite the first one's routing slot and cross-wire their + # responses (one request receives the other's payload, the other hangs). + # The spec requires ids to be unique within a session; ids may still be + # reused once the earlier request has completed. See #3060. + if request_id in self._request_streams: + response = self._create_error_response( + f"Bad Request: Request id {request_id} is already in flight for this session", + HTTPStatus.BAD_REQUEST, + ) + await response(scope, receive, send) + return + if self.is_json_response_enabled: self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage]( REQUEST_STREAM_BUFFER_SIZE diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index cbce222ec..cb89d9e4c 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -94,6 +94,14 @@ def first_sse_data(response: httpx.Response) -> dict[str, Any]: raise ValueError("No data event in SSE response") # pragma: no cover +async def next_sse_data(lines: AsyncIterator[str]) -> dict[str, Any]: + """Return the next SSE `data:` payload from a live line iterator, parsed as JSON.""" + while True: + line = await anext(lines) + if line.startswith("data: "): + return json.loads(line.removeprefix("data: ")) + + def extract_protocol_version_from_sse(response: httpx.Response) -> str: """Extract the negotiated protocol version from an SSE initialization response.""" return first_sse_data(response)["result"]["protocolVersion"] @@ -680,6 +688,130 @@ async def test_response(basic_app: Starlette) -> None: assert tools_response.headers.get("Content-Type") == "text/event-stream" +@pytest.mark.anyio +async def test_duplicate_in_flight_request_id_rejected(basic_app: Starlette) -> None: + """A request whose id is already in flight on the session is rejected with 400. + + The per-request routing in the transport is keyed by request id, so a second + concurrent request with the same id would overwrite the in-flight request's + routing slot and cross-wire the two responses (see #3060). The duplicate is + rejected and the in-flight request completes unaffected. + """ + async with make_client(basic_app) as client: + response = await client.post( + "/mcp", + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + }, + json=INIT_REQUEST, + ) + assert response.status_code == 200 + headers = { + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + MCP_SESSION_ID_HEADER: response.headers[MCP_SESSION_ID_HEADER], + MCP_PROTOCOL_VERSION_HEADER: extract_protocol_version_from_sse(response), + } + + # Request A blocks server-side on the lock, keeping its id in flight. + async with client.stream( + "POST", + "/mcp", + headers=headers, + json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": {"name": "wait_for_lock_with_notification", "arguments": {}}, + }, + ) as response_a: + assert response_a.status_code == 200 + lines_a = response_a.aiter_lines() + # The tool's first notification confirms request A is in flight. + with anyio.fail_after(5): + notification = await next_sse_data(lines_a) + assert notification["params"]["data"] == "First notification before lock" + + # A second request reusing id 1 while A is in flight is rejected. + response_b = await client.post( + "/mcp", + headers=headers, + json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": {"name": "test_tool", "arguments": {}}, + }, + ) + assert response_b.status_code == 400 + error = response_b.json()["error"] + assert error["code"] == INVALID_REQUEST + assert "already in flight" in error["message"] + + # Request A is unaffected: release the lock and it completes normally. + release_response = await client.post( + "/mcp", + headers=headers, + json={ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": {"name": "release_lock", "arguments": {}}, + }, + ) + assert release_response.status_code == 200 + + with anyio.fail_after(5): + notification = await next_sse_data(lines_a) + final = await next_sse_data(lines_a) + assert notification["params"]["data"] == "Second notification after lock" + assert final["id"] == 1 + assert final["result"]["content"][0]["text"] == "Completed" + + +@pytest.mark.anyio +async def test_request_id_reuse_after_completion_allowed(basic_app: Starlette) -> None: + """A request id can be reused once the earlier request with that id has completed. + + Only concurrent requests with the same id are ambiguous to route; sequential + reuse (which some deployed clients rely on, sending every request with id 1) + keeps working (see #3060). + """ + async with make_client(basic_app) as client: + response = await client.post( + "/mcp", + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + }, + json=INIT_REQUEST, + ) + assert response.status_code == 200 + headers = { + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + MCP_SESSION_ID_HEADER: response.headers[MCP_SESSION_ID_HEADER], + MCP_PROTOCOL_VERSION_HEADER: extract_protocol_version_from_sse(response), + } + + for _ in range(2): + response = await client.post( + "/mcp", + headers=headers, + json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": {"name": "test_tool", "arguments": {}}, + }, + ) + assert response.status_code == 200 + body = first_sse_data(response) + assert body["id"] == 1 + assert body["result"]["content"][0]["text"] == "Called test_tool" + + @pytest.mark.anyio async def test_json_response(json_app: Starlette) -> None: """With JSON response mode enabled, requests are answered with application/json bodies."""