Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/core-internal/src/types/guards.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
CallToolResultSchema,
CancelledNotificationSchema,
InitializedNotificationSchema,
InitializeRequestSchema,
JSONRPCErrorResponseSchema,
Expand All @@ -12,6 +13,7 @@ import {
} from './schemas';
import type {
CallToolResult,
CancelledNotification,
CompleteRequest,
CompleteRequestPrompt,
CompleteRequestResourceTemplate,
Expand Down Expand Up @@ -123,6 +125,9 @@ export const isInitializeRequest = (value: unknown): value is InitializeRequest
export const isInitializedNotification = (value: unknown): value is InitializedNotification =>
InitializedNotificationSchema.safeParse(value).success;

export const isCancelledNotification = (value: unknown): value is CancelledNotification =>
CancelledNotificationSchema.safeParse(value).success;

export function assertCompleteRequestPrompt(request: CompleteRequest): asserts request is CompleteRequestPrompt {
if (request.params.ref.type !== 'ref/prompt') {
throw new TypeError(`Expected CompleteRequestPrompt, but got ${request.params.ref.type}`);
Expand Down
33 changes: 33 additions & 0 deletions packages/server/src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import type { AuthInfo, JSONRPCMessage, MessageExtraInfo, RequestId, Transport } from '@modelcontextprotocol/core-internal';
import {
DEFAULT_NEGOTIATED_PROTOCOL_VERSION,
isCancelledNotification,
isInitializeRequest,
isJSONRPCErrorResponse,
isJSONRPCRequest,
Expand Down Expand Up @@ -763,6 +764,38 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
}
}

// A cancelled request never produces a response (the protocol layer
// suppresses responses for aborted handlers), so retire its transport
// bookkeeping here. Otherwise its _requestToStreamMapping entry would
// pin the request ID as in-flight forever and the duplicate-ID guard
// below would reject any later reuse of that ID.
for (const message of messages) {
if (isCancelledNotification(message) && message.params.requestId !== undefined) {
this._requestToStreamMapping.delete(message.params.requestId);
this._requestResponseMap.delete(message.params.requestId);
}
}

// Request IDs MUST be unique within a session. Registering a duplicate
// would overwrite the in-flight entry in _requestToStreamMapping and
// cross-wire the original request's response onto this POST's stream,
// leaving the original POST hanging - so reject the duplicate while the
// first request is still in flight. Sequential reuse stays allowed:
// entries are retired when the response is delivered or the request is
// cancelled.
const incomingRequestIds = new Set<RequestId>();
for (const message of messages) {
if (!isJSONRPCRequest(message)) {
continue;
}
if (this._requestToStreamMapping.has(message.id) || incomingRequestIds.has(message.id)) {
const error = `Invalid Request: Request ID ${String(message.id)} is already in flight`;
this.onerror?.(new Error(error));
return this.createJsonErrorResponse(400, -32_600, error);
}
incomingRequestIds.add(message.id);
}

// check if it contains requests
const hasRequests = messages.some(element => isJSONRPCRequest(element));

Expand Down
178 changes: 178 additions & 0 deletions packages/server/test/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,184 @@ describe('Zod v4', () => {
});
});

describe('HTTPServerTransport - Duplicate request IDs', () => {
let transport: WebStandardStreamableHTTPServerTransport;
let mcpServer: McpServer;
let sessionId: string;
let slowToolStarted: Promise<void>;
let releaseSlowTool: () => void;

function setupServer(options?: { enableJsonResponse?: boolean }): void {
mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } });

mcpServer.registerTool(
'greet',
{ description: 'Greeting tool', inputSchema: z.object({ name: z.string() }) },
async ({ name }): Promise<CallToolResult> => {
return { content: [{ type: 'text', text: `Hello, ${name}!` }] };
}
);

let started!: () => void;
slowToolStarted = new Promise<void>(resolve => {
started = resolve;
});
const gate = new Promise<void>(resolve => {
releaseSlowTool = resolve;
});
mcpServer.registerTool(
'slow',
{ description: 'A tool that blocks until released by the test', inputSchema: z.object({}) },
async (): Promise<CallToolResult> => {
started();
await gate;
return { content: [{ type: 'text', text: 'slow-done' }] };
}
);

transport = new WebStandardStreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: options?.enableJsonResponse ?? false
});
}

beforeEach(async () => {
setupServer();
await mcpServer.connect(transport);
});

afterEach(async () => {
releaseSlowTool();
await transport.close();
});

async function initializeServer(): Promise<string> {
const request = createRequest('POST', TEST_MESSAGES.initialize);
const response = await transport.handleRequest(request);

expect(response.status).toBe(200);
const newSessionId = response.headers.get('mcp-session-id');
expect(newSessionId).toBeDefined();
return newSessionId as string;
}

function callTool(name: string, id: string): JSONRPCMessage {
return {
jsonrpc: '2.0',
method: 'tools/call',
params: { name, arguments: name === 'greet' ? { name: 'Test' } : {} },
id
} as JSONRPCMessage;
}

it('should reject a request whose ID collides with one still in flight', async () => {
sessionId = await initializeServer();

// First request occupies the ID and blocks inside the tool handler
const firstResponse = await transport.handleRequest(createRequest('POST', callTool('slow', 'dup-1'), { sessionId }));
expect(firstResponse.status).toBe(200);
await slowToolStarted;

// Concurrent request reusing the same ID must be rejected, not cross-wired
const secondResponse = await transport.handleRequest(createRequest('POST', callTool('greet', 'dup-1'), { sessionId }));
expect(secondResponse.status).toBe(400);
const errorData = await secondResponse.json();
expectErrorResponse(errorData, -32_600, /already in flight/);

// The original request must still receive its own response
releaseSlowTool();
const text = await readSSEEvent(firstResponse);
const eventData = parseSSEData(text);
expect(eventData).toMatchObject({
jsonrpc: '2.0',
result: { content: [{ type: 'text', text: 'slow-done' }] },
id: 'dup-1'
});
});

it('should reject duplicate request IDs within a single batch', async () => {
sessionId = await initializeServer();

const batch: JSONRPCMessage[] = [callTool('greet', 'batch-dup'), callTool('greet', 'batch-dup')];
const response = await transport.handleRequest(createRequest('POST', batch, { sessionId }));

expect(response.status).toBe(400);
const errorData = await response.json();
expectErrorResponse(errorData, -32_600, /already in flight/);
});

it('should allow sequential reuse of a request ID after the response is delivered', async () => {
sessionId = await initializeServer();

for (let attempt = 0; attempt < 2; attempt++) {
const response = await transport.handleRequest(createRequest('POST', callTool('greet', 'reuse-1'), { sessionId }));
expect(response.status).toBe(200);

const eventData = parseSSEData(await readSSEEvent(response));
expect(eventData).toMatchObject({
jsonrpc: '2.0',
result: { content: [{ type: 'text', text: 'Hello, Test!' }] },
id: 'reuse-1'
});
}
});

it('should allow reuse of a request ID after the original request was cancelled', async () => {
sessionId = await initializeServer();

const firstResponse = await transport.handleRequest(createRequest('POST', callTool('slow', 'cancel-1'), { sessionId }));
expect(firstResponse.status).toBe(200);
await slowToolStarted;

const cancelNotification: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'notifications/cancelled',
params: { requestId: 'cancel-1', reason: 'test cancellation' }
};
const cancelResponse = await transport.handleRequest(createRequest('POST', cancelNotification, { sessionId }));
expect(cancelResponse.status).toBe(202);

// The cancelled request will never produce a response, so its ID must be reusable
const secondResponse = await transport.handleRequest(createRequest('POST', callTool('greet', 'cancel-1'), { sessionId }));
expect(secondResponse.status).toBe(200);

const eventData = parseSSEData(await readSSEEvent(secondResponse));
expect(eventData).toMatchObject({
jsonrpc: '2.0',
result: { content: [{ type: 'text', text: 'Hello, Test!' }] },
id: 'cancel-1'
});
});

it('should reject duplicate in-flight request IDs in JSON response mode', async () => {
await transport.close();
setupServer({ enableJsonResponse: true });
await mcpServer.connect(transport);

sessionId = await initializeServer();

// In JSON mode handleRequest resolves only when the response is ready - do not await
const firstResponsePromise = transport.handleRequest(createRequest('POST', callTool('slow', 'dup-json'), { sessionId }));
await slowToolStarted;

const secondResponse = await transport.handleRequest(createRequest('POST', callTool('greet', 'dup-json'), { sessionId }));
expect(secondResponse.status).toBe(400);
const errorData = await secondResponse.json();
expectErrorResponse(errorData, -32_600, /already in flight/);

// The original request must still resolve with its own result
releaseSlowTool();
const firstResponse = await firstResponsePromise;
expect(firstResponse.status).toBe(200);
const data = await firstResponse.json();
expect(data).toMatchObject({
jsonrpc: '2.0',
result: { content: [{ type: 'text', text: 'slow-done' }] },
id: 'dup-json'
});
});
});

describe('HTTPServerTransport - Session Callbacks', () => {
it('should call onsessioninitialized callback', async () => {
const onInitialized = vi.fn();
Expand Down
Loading