From 992dced9918998b00ecf026149c6c846f3f87e04 Mon Sep 17 00:00:00 2001 From: Sammy Dabbas Date: Sat, 4 Jul 2026 01:20:57 -0400 Subject: [PATCH] fix(server): reject duplicate in-flight request ids in streamable HTTP The transport registered every request via _requestToStreamMapping.set(message.id, streamId) with no duplicate check, so a concurrent POST reusing an in-flight id overwrote the entry, cross-wired the first request's response onto the second POST's stream, and left the first POST hanging. Reject a POST containing a request whose id is already in flight, or duplicated within the same batch, with HTTP 400 and JSON-RPC -32600. Sequential reuse after completion stays allowed since deployed clients send a constant id for every request. Cancelled requests never produce a response, so notifications/cancelled now retires the transport bookkeeping for its target id; without that, a cancelled id would stay in flight forever and lock out cancel-then-reuse clients. Adds the missing isCancelledNotification guard to core-internal alongside the existing notification guards. Fixes #2433. --- packages/core-internal/src/types/guards.ts | 5 + packages/server/src/server/streamableHttp.ts | 33 ++++ .../server/test/server/streamableHttp.test.ts | 178 ++++++++++++++++++ 3 files changed, 216 insertions(+) diff --git a/packages/core-internal/src/types/guards.ts b/packages/core-internal/src/types/guards.ts index a0d575054e..eeb2153d56 100644 --- a/packages/core-internal/src/types/guards.ts +++ b/packages/core-internal/src/types/guards.ts @@ -1,5 +1,6 @@ import { CallToolResultSchema, + CancelledNotificationSchema, InitializedNotificationSchema, InitializeRequestSchema, JSONRPCErrorResponseSchema, @@ -12,6 +13,7 @@ import { } from './schemas'; import type { CallToolResult, + CancelledNotification, CompleteRequest, CompleteRequestPrompt, CompleteRequestResourceTemplate, @@ -123,6 +125,9 @@ export const isInitializeRequest = (value: unknown): value is InitializeRequest export const isInitializedNotification = (value: unknown): value is InitializedNotification => InitializedNotificationSchema.safeParse(value).success; +export const isCancelledNotification = (value: unknown): value is CancelledNotification => + CancelledNotificationSchema.safeParse(value).success; + export function assertCompleteRequestPrompt(request: CompleteRequest): asserts request is CompleteRequestPrompt { if (request.params.ref.type !== 'ref/prompt') { throw new TypeError(`Expected CompleteRequestPrompt, but got ${request.params.ref.type}`); diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index 1f2cf94b00..890e130404 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -10,6 +10,7 @@ import type { AuthInfo, JSONRPCMessage, MessageExtraInfo, RequestId, Transport } from '@modelcontextprotocol/core-internal'; import { DEFAULT_NEGOTIATED_PROTOCOL_VERSION, + isCancelledNotification, isInitializeRequest, isJSONRPCErrorResponse, isJSONRPCRequest, @@ -763,6 +764,38 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { } } + // A cancelled request never produces a response (the protocol layer + // suppresses responses for aborted handlers), so retire its transport + // bookkeeping here. Otherwise its _requestToStreamMapping entry would + // pin the request ID as in-flight forever and the duplicate-ID guard + // below would reject any later reuse of that ID. + for (const message of messages) { + if (isCancelledNotification(message) && message.params.requestId !== undefined) { + this._requestToStreamMapping.delete(message.params.requestId); + this._requestResponseMap.delete(message.params.requestId); + } + } + + // Request IDs MUST be unique within a session. Registering a duplicate + // would overwrite the in-flight entry in _requestToStreamMapping and + // cross-wire the original request's response onto this POST's stream, + // leaving the original POST hanging - so reject the duplicate while the + // first request is still in flight. Sequential reuse stays allowed: + // entries are retired when the response is delivered or the request is + // cancelled. + const incomingRequestIds = new Set(); + for (const message of messages) { + if (!isJSONRPCRequest(message)) { + continue; + } + if (this._requestToStreamMapping.has(message.id) || incomingRequestIds.has(message.id)) { + const error = `Invalid Request: Request ID ${String(message.id)} is already in flight`; + this.onerror?.(new Error(error)); + return this.createJsonErrorResponse(400, -32_600, error); + } + incomingRequestIds.add(message.id); + } + // check if it contains requests const hasRequests = messages.some(element => isJSONRPCRequest(element)); diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index beca451113..1f44dd466a 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -561,6 +561,184 @@ describe('Zod v4', () => { }); }); + describe('HTTPServerTransport - Duplicate request IDs', () => { + let transport: WebStandardStreamableHTTPServerTransport; + let mcpServer: McpServer; + let sessionId: string; + let slowToolStarted: Promise; + let releaseSlowTool: () => void; + + function setupServer(options?: { enableJsonResponse?: boolean }): void { + mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + mcpServer.registerTool( + 'greet', + { description: 'Greeting tool', inputSchema: z.object({ name: z.string() }) }, + async ({ name }): Promise => { + return { content: [{ type: 'text', text: `Hello, ${name}!` }] }; + } + ); + + let started!: () => void; + slowToolStarted = new Promise(resolve => { + started = resolve; + }); + const gate = new Promise(resolve => { + releaseSlowTool = resolve; + }); + mcpServer.registerTool( + 'slow', + { description: 'A tool that blocks until released by the test', inputSchema: z.object({}) }, + async (): Promise => { + started(); + await gate; + return { content: [{ type: 'text', text: 'slow-done' }] }; + } + ); + + transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + enableJsonResponse: options?.enableJsonResponse ?? false + }); + } + + beforeEach(async () => { + setupServer(); + await mcpServer.connect(transport); + }); + + afterEach(async () => { + releaseSlowTool(); + await transport.close(); + }); + + async function initializeServer(): Promise { + const request = createRequest('POST', TEST_MESSAGES.initialize); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + const newSessionId = response.headers.get('mcp-session-id'); + expect(newSessionId).toBeDefined(); + return newSessionId as string; + } + + function callTool(name: string, id: string): JSONRPCMessage { + return { + jsonrpc: '2.0', + method: 'tools/call', + params: { name, arguments: name === 'greet' ? { name: 'Test' } : {} }, + id + } as JSONRPCMessage; + } + + it('should reject a request whose ID collides with one still in flight', async () => { + sessionId = await initializeServer(); + + // First request occupies the ID and blocks inside the tool handler + const firstResponse = await transport.handleRequest(createRequest('POST', callTool('slow', 'dup-1'), { sessionId })); + expect(firstResponse.status).toBe(200); + await slowToolStarted; + + // Concurrent request reusing the same ID must be rejected, not cross-wired + const secondResponse = await transport.handleRequest(createRequest('POST', callTool('greet', 'dup-1'), { sessionId })); + expect(secondResponse.status).toBe(400); + const errorData = await secondResponse.json(); + expectErrorResponse(errorData, -32_600, /already in flight/); + + // The original request must still receive its own response + releaseSlowTool(); + const text = await readSSEEvent(firstResponse); + const eventData = parseSSEData(text); + expect(eventData).toMatchObject({ + jsonrpc: '2.0', + result: { content: [{ type: 'text', text: 'slow-done' }] }, + id: 'dup-1' + }); + }); + + it('should reject duplicate request IDs within a single batch', async () => { + sessionId = await initializeServer(); + + const batch: JSONRPCMessage[] = [callTool('greet', 'batch-dup'), callTool('greet', 'batch-dup')]; + const response = await transport.handleRequest(createRequest('POST', batch, { sessionId })); + + expect(response.status).toBe(400); + const errorData = await response.json(); + expectErrorResponse(errorData, -32_600, /already in flight/); + }); + + it('should allow sequential reuse of a request ID after the response is delivered', async () => { + sessionId = await initializeServer(); + + for (let attempt = 0; attempt < 2; attempt++) { + const response = await transport.handleRequest(createRequest('POST', callTool('greet', 'reuse-1'), { sessionId })); + expect(response.status).toBe(200); + + const eventData = parseSSEData(await readSSEEvent(response)); + expect(eventData).toMatchObject({ + jsonrpc: '2.0', + result: { content: [{ type: 'text', text: 'Hello, Test!' }] }, + id: 'reuse-1' + }); + } + }); + + it('should allow reuse of a request ID after the original request was cancelled', async () => { + sessionId = await initializeServer(); + + const firstResponse = await transport.handleRequest(createRequest('POST', callTool('slow', 'cancel-1'), { sessionId })); + expect(firstResponse.status).toBe(200); + await slowToolStarted; + + const cancelNotification: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'notifications/cancelled', + params: { requestId: 'cancel-1', reason: 'test cancellation' } + }; + const cancelResponse = await transport.handleRequest(createRequest('POST', cancelNotification, { sessionId })); + expect(cancelResponse.status).toBe(202); + + // The cancelled request will never produce a response, so its ID must be reusable + const secondResponse = await transport.handleRequest(createRequest('POST', callTool('greet', 'cancel-1'), { sessionId })); + expect(secondResponse.status).toBe(200); + + const eventData = parseSSEData(await readSSEEvent(secondResponse)); + expect(eventData).toMatchObject({ + jsonrpc: '2.0', + result: { content: [{ type: 'text', text: 'Hello, Test!' }] }, + id: 'cancel-1' + }); + }); + + it('should reject duplicate in-flight request IDs in JSON response mode', async () => { + await transport.close(); + setupServer({ enableJsonResponse: true }); + await mcpServer.connect(transport); + + sessionId = await initializeServer(); + + // In JSON mode handleRequest resolves only when the response is ready - do not await + const firstResponsePromise = transport.handleRequest(createRequest('POST', callTool('slow', 'dup-json'), { sessionId })); + await slowToolStarted; + + const secondResponse = await transport.handleRequest(createRequest('POST', callTool('greet', 'dup-json'), { sessionId })); + expect(secondResponse.status).toBe(400); + const errorData = await secondResponse.json(); + expectErrorResponse(errorData, -32_600, /already in flight/); + + // The original request must still resolve with its own result + releaseSlowTool(); + const firstResponse = await firstResponsePromise; + expect(firstResponse.status).toBe(200); + const data = await firstResponse.json(); + expect(data).toMatchObject({ + jsonrpc: '2.0', + result: { content: [{ type: 'text', text: 'slow-done' }] }, + id: 'dup-json' + }); + }); + }); + describe('HTTPServerTransport - Session Callbacks', () => { it('should call onsessioninitialized callback', async () => { const onInitialized = vi.fn();