Skip to content

Commit 39c03ad

Browse files
committed
Reject duplicate in-flight request ids in the JSON-RPC dispatcher
The wire dispatcher registered each inbound request in `_in_flight` with a blind overwrite keyed by request id (the `TODO(maxisbey)` from #3046). Two concurrent requests sharing a JSON-RPC id on one session would silently displace each other: the older entry was evicted at registration, so a `notifications/cancelled` for that id always targeted the newer request and the older one became uncancellable. Reject a duplicate id that is still in flight with INVALID_REQUEST instead of overwriting, matching the guard `direct_dispatcher` already applies to caller-supplied ids. Ids remain reusable once the earlier request completes, which deployed clients that send a constant id rely on. With duplicate registration ruled out, the completion path no longer needs the identity guard on its `_in_flight` pop. Replaces the two overwrite-semantics tests with three covering rejection, cancellation-targeting of the original request, and sequential id reuse. Fixes the dispatcher-layer half of #3060; complementary to the transport-level guard in #3063.
1 parent 53117cb commit 39c03ad

2 files changed

Lines changed: 110 additions & 85 deletions

File tree

src/mcp/shared/jsonrpc_dispatcher.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
CONNECTION_CLOSED,
2323
INTERNAL_ERROR,
2424
INVALID_PARAMS,
25+
INVALID_REQUEST,
2526
REQUEST_TIMEOUT,
2627
ErrorData,
2728
JSONRPCError,
@@ -551,6 +552,23 @@ async def _dispatch_request(
551552
on_request: OnRequest,
552553
sender_ctx: contextvars.Context | None,
553554
) -> None:
555+
# Key coerced so a stringified `notifications/cancelled` id still correlates.
556+
key = coerce_request_id(req.id)
557+
if key in self._in_flight:
558+
# Duplicate in-flight id. The spec requires request ids to be unique
559+
# within a session while a request is outstanding; a blind overwrite
560+
# here would silently retarget `notifications/cancelled` onto the newer
561+
# request and orphan the older one (see #3060). Reject the duplicate
562+
# instead - ids may still be reused once the earlier request completes.
563+
# Mirrors `direct_dispatcher`'s guard for caller-supplied ids.
564+
logger.warning("duplicate in-flight request id %r; rejecting with INVALID_REQUEST", req.id)
565+
self._spawn(
566+
self._write_error,
567+
req.id,
568+
ErrorData(code=INVALID_REQUEST, message=f"request id {req.id!r} is already in flight"),
569+
sender_ctx=sender_ctx,
570+
)
571+
return
554572
progress_token = progress_token_from_params(req.params)
555573
try:
556574
transport_ctx = self._transport_builder(metadata)
@@ -572,10 +590,7 @@ async def _dispatch_request(
572590
_progress_token=progress_token,
573591
)
574592
scope = anyio.CancelScope()
575-
# TODO(maxisbey): duplicate ids blind-overwrite (v1/TS parity); revisit
576-
# rejecting with INVALID_REQUEST. Key coerced so a stringified
577-
# `notifications/cancelled` id still correlates.
578-
self._in_flight[coerce_request_id(req.id)] = _InFlight(scope=scope, dctx=dctx)
593+
self._in_flight[key] = _InFlight(scope=scope, dctx=dctx)
579594
if req.method in self._inline_methods:
580595
# Spawn so `sender_ctx` applies, but park the read loop until the
581596
# handler returns - that's the inline ordering guarantee.
@@ -699,12 +714,10 @@ async def _handle_request(
699714
result = await on_request(dctx, req.method, req.params)
700715
finally:
701716
# Close the back-channel and drop from `_in_flight`; no checkpoint
702-
# since handler return, so a peer cancel can't interleave.
703-
# Identity guard: don't evict a duplicate id's newer entry.
717+
# since handler return, so a peer cancel can't interleave. Duplicate
718+
# ids are rejected at registration, so this entry is always ours.
704719
dctx.close()
705-
key = coerce_request_id(req.id)
706-
if (entry := self._in_flight.get(key)) is not None and entry.dctx is dctx:
707-
del self._in_flight[key]
720+
self._in_flight.pop(coerce_request_id(req.id), None)
708721
# A write interrupted by cancellation may still have delivered
709722
# (a memory-stream send can hand its item to the receiver and
710723
# still raise), so a started answer write counts as sent below:
@@ -744,7 +757,7 @@ async def _handle_request(
744757
await self._write_error(req.id, ErrorData(code=0, message=str(e)))
745758
if self._raise_handler_exceptions:
746759
raise
747-
# No `_in_flight` pop here: the inner finally covers every path, and a late pop could evict a reused id.
760+
# No `_in_flight` pop here: the inner finally covers every path.
748761

749762
def _allocate_id(self) -> int:
750763
self._next_id += 1

tests/shared/test_jsonrpc_dispatcher.py

Lines changed: 87 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
CONNECTION_CLOSED,
1515
INTERNAL_ERROR,
1616
INVALID_PARAMS,
17+
INVALID_REQUEST,
1718
REQUEST_TIMEOUT,
1819
CallToolRequest,
1920
CallToolRequestParams,
@@ -2191,27 +2192,69 @@ async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) ->
21912192

21922193

21932194
@pytest.mark.anyio
2194-
async def test_completed_handler_does_not_evict_reused_request_id_from_in_flight():
2195-
"""A second request reusing an id while the first handler is parked in its response write
2196-
keeps its own `_in_flight` entry (a post-write pop would evict it and break peer-cancellation)."""
2195+
async def test_duplicate_in_flight_request_id_is_rejected_with_invalid_request():
2196+
"""A second inbound request that reuses an id still in flight is rejected with INVALID_REQUEST
2197+
rather than blindly overwriting the first's `_in_flight` entry (#3060). The duplicate never
2198+
reaches the handler; the original request is untouched and still completes."""
21972199
c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
2198-
# buffer=0: the first handler's response write parks until the test receives.
2199-
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](0)
2200+
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
22002201
server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send)
2201-
calls = 0
2202-
second_started = anyio.Event()
2203-
second_exited = anyio.Event()
2202+
handled: list[str] = []
2203+
started = anyio.Event()
2204+
release = anyio.Event()
22042205

22052206
async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]:
2206-
nonlocal calls
2207-
calls += 1
2208-
if calls == 1:
2209-
return {"first": True}
2210-
second_started.set()
2207+
handled.append(method)
2208+
started.set()
2209+
await release.wait()
2210+
return {"method": method}
2211+
2212+
async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None:
2213+
raise NotImplementedError # no notifications are sent in this test
2214+
2215+
try:
2216+
async with anyio.create_task_group() as tg:
2217+
await tg.start(server.run, on_request, on_notify)
2218+
with anyio.fail_after(5):
2219+
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="first")))
2220+
await started.wait()
2221+
# Duplicate id while the first request is still outstanding.
2222+
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="second")))
2223+
rejection = await s2c_recv.receive()
2224+
assert isinstance(rejection, SessionMessage)
2225+
assert isinstance(rejection.message, JSONRPCError)
2226+
assert rejection.message.id == 7
2227+
assert rejection.message.error.code == INVALID_REQUEST
2228+
# The original request is untouched and still completes normally.
2229+
release.set()
2230+
resp = await s2c_recv.receive()
2231+
assert isinstance(resp, SessionMessage)
2232+
assert isinstance(resp.message, JSONRPCResponse)
2233+
assert resp.message.result == {"method": "first"}
2234+
tg.cancel_scope.cancel()
2235+
finally:
2236+
for s in (c2s_send, c2s_recv, s2c_send, s2c_recv):
2237+
s.close()
2238+
assert handled == ["first"] # the rejected duplicate never reached the handler
2239+
2240+
2241+
@pytest.mark.anyio
2242+
async def test_duplicate_id_rejection_leaves_original_request_cancellable():
2243+
"""Rejecting the duplicate keeps `_in_flight` pointing at the original request, so a later
2244+
`notifications/cancelled` still targets it - the duplicate can no longer steal cancellation
2245+
away from the older, still-running request (#3060)."""
2246+
c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
2247+
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
2248+
server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send)
2249+
started = anyio.Event()
2250+
exited = anyio.Event()
2251+
2252+
async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]:
2253+
started.set()
22112254
try:
22122255
await anyio.sleep_forever()
22132256
finally:
2214-
second_exited.set()
2257+
exited.set()
22152258
raise NotImplementedError
22162259

22172260
async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None:
@@ -2221,93 +2264,62 @@ async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) ->
22212264
async with anyio.create_task_group() as tg:
22222265
await tg.start(server.run, on_request, on_notify)
22232266
with anyio.fail_after(5):
2224-
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="a")))
2225-
# First handler is now parked in `_write_result`; reuse its id.
2226-
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="b")))
2227-
await second_started.wait()
2228-
resp1 = await s2c_recv.receive()
2229-
assert isinstance(resp1, SessionMessage)
2230-
assert isinstance(resp1.message, JSONRPCResponse)
2231-
assert resp1.message.result == {"first": True}
2232-
# Let the first handler task run to completion past the write.
2233-
await anyio.wait_all_tasks_blocked()
2234-
assert 7 in server._in_flight # pyright: ignore[reportPrivateUsage]
2235-
# The surviving entry must still be cancellable.
2267+
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="slow")))
2268+
await started.wait()
2269+
# Duplicate id is rejected; it must not become the cancellation target.
2270+
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="dup")))
2271+
rejection = await s2c_recv.receive()
2272+
assert isinstance(rejection, SessionMessage)
2273+
assert isinstance(rejection.message, JSONRPCError)
2274+
assert rejection.message.error.code == INVALID_REQUEST
2275+
# Cancelling id 7 must reach the original, still-parked request.
22362276
await c2s_send.send(
22372277
SessionMessage(
22382278
message=JSONRPCNotification(
22392279
jsonrpc="2.0", method="notifications/cancelled", params={"requestId": 7}
22402280
)
22412281
)
22422282
)
2243-
resp2 = await s2c_recv.receive()
2244-
assert isinstance(resp2, SessionMessage)
2245-
assert isinstance(resp2.message, JSONRPCError)
2246-
assert resp2.message.error == ErrorData(code=0, message="Request cancelled")
2247-
assert second_exited.is_set()
2283+
cancelled = await s2c_recv.receive()
2284+
assert isinstance(cancelled, SessionMessage)
2285+
assert isinstance(cancelled.message, JSONRPCError)
2286+
assert cancelled.message.id == 7
2287+
assert cancelled.message.error == ErrorData(code=0, message="Request cancelled")
2288+
assert exited.is_set()
22482289
tg.cancel_scope.cancel()
22492290
finally:
22502291
for s in (c2s_send, c2s_recv, s2c_send, s2c_recv):
22512292
s.close()
22522293

22532294

22542295
@pytest.mark.anyio
2255-
async def test_duplicate_request_id_completion_of_first_handler_keeps_second_cancellable():
2256-
"""A duplicate inbound id overwrites `_in_flight` (parity with v1/TS); the identity-guarded pop
2257-
keeps the first handler's completion from evicting the second's entry and breaking its cancellation."""
2296+
async def test_request_id_is_reusable_after_the_earlier_request_completes():
2297+
"""Sequential reuse of an id after the earlier request has completed is still accepted -
2298+
deployed clients that send a constant id depend on it; only *in-flight* duplicates are rejected."""
22582299
c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
22592300
s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
22602301
server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send)
2261-
first_started = anyio.Event()
2262-
release_first = anyio.Event()
2263-
second_started = anyio.Event()
2264-
second_exited = anyio.Event()
2302+
calls = 0
22652303

22662304
async def on_request(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> dict[str, Any]:
2267-
if method == "first":
2268-
first_started.set()
2269-
await release_first.wait()
2270-
return {"first": True}
2271-
second_started.set()
2272-
try:
2273-
await anyio.sleep_forever()
2274-
finally:
2275-
second_exited.set()
2276-
raise NotImplementedError
2305+
nonlocal calls
2306+
calls += 1
2307+
return {"call": calls}
22772308

22782309
async def on_notify(ctx: DCtx, method: str, params: Mapping[str, Any] | None) -> None:
2279-
pass # the cancelled notification is teed here; nothing to observe
2310+
raise NotImplementedError # no notifications are sent in this test
22802311

22812312
try:
22822313
async with anyio.create_task_group() as tg:
22832314
await tg.start(server.run, on_request, on_notify)
22842315
with anyio.fail_after(5):
2285-
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="first")))
2286-
await first_started.wait()
2287-
# Duplicate id: the table entry now belongs to the second request.
2288-
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="second")))
2289-
await second_started.wait()
2290-
release_first.set()
2291-
resp1 = await s2c_recv.receive()
2292-
assert isinstance(resp1, SessionMessage)
2293-
assert isinstance(resp1.message, JSONRPCResponse)
2294-
assert resp1.message.result == {"first": True}
2295-
# Let the first handler task run past its pop entirely.
2296-
await anyio.wait_all_tasks_blocked()
2297-
assert 7 in server._in_flight # pyright: ignore[reportPrivateUsage]
2298-
# The surviving entry must still be cancellable by the peer.
2299-
await c2s_send.send(
2300-
SessionMessage(
2301-
message=JSONRPCNotification(
2302-
jsonrpc="2.0", method="notifications/cancelled", params={"requestId": 7}
2303-
)
2304-
)
2305-
)
2306-
resp2 = await s2c_recv.receive()
2307-
assert isinstance(resp2, SessionMessage)
2308-
assert isinstance(resp2.message, JSONRPCError)
2309-
assert resp2.message.error == ErrorData(code=0, message="Request cancelled")
2310-
assert second_exited.is_set()
2316+
for expected in (1, 2):
2317+
await c2s_send.send(SessionMessage(message=JSONRPCRequest(jsonrpc="2.0", id=7, method="t")))
2318+
resp = await s2c_recv.receive()
2319+
assert isinstance(resp, SessionMessage)
2320+
assert isinstance(resp.message, JSONRPCResponse)
2321+
assert resp.message.result == {"call": expected}
2322+
assert 7 not in server._in_flight # pyright: ignore[reportPrivateUsage]
23112323
tg.cancel_scope.cancel()
23122324
finally:
23132325
for s in (c2s_send, c2s_recv, s2c_send, s2c_recv):

0 commit comments

Comments
 (0)