Skip to content

Commit e50fb5b

Browse files
authored
Serve the 2026-07-28 era over stdio and other stream-pair transports (#3038)
1 parent dcf8a6a commit e50fb5b

6 files changed

Lines changed: 757 additions & 44 deletions

File tree

src/mcp/server/connection.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,30 @@ async def notify(self, method: str, params: Mapping[str, Any] | None, opts: Call
100100
_NO_CHANNEL = _NoChannelOutbound()
101101

102102

103+
class NotifyOnlyOutbound:
104+
"""Connection-scoped `Outbound` that forwards notifications and refuses requests.
105+
106+
Installed by `serve_dual_era_loop` for modern (2026-07-28+) connections
107+
over duplex stream transports: the pipe is real, so server notifications
108+
ride it, but the modern protocol forbids server-initiated JSON-RPC
109+
requests, so `send_raw_request` refuses by construction.
110+
"""
111+
112+
def __init__(self, outbound: Outbound) -> None:
113+
self._outbound = outbound
114+
115+
async def send_raw_request(
116+
self,
117+
method: str,
118+
params: Mapping[str, Any] | None,
119+
opts: CallOptions | None = None,
120+
) -> dict[str, Any]:
121+
raise NoBackChannelError(method)
122+
123+
async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
124+
await self._outbound.notify(method, params, opts)
125+
126+
103127
class Connection:
104128
"""Per-client connection state and standalone-stream `Outbound`.
105129
@@ -167,7 +191,8 @@ def from_envelope(
167191
both supplied) are recorded as `client_params` so capability checks
168192
work. `outbound` defaults to the no-channel sentinel for the
169193
single-exchange HTTP path; duplex modern transports (e.g. stdio) pass
170-
the dispatcher so server-initiated messages have a back-channel.
194+
a notify-only wrapper around the dispatcher so server notifications
195+
ride the pipe while server-initiated requests stay refused.
171196
"""
172197
client_params = None
173198
if client_info is not None and client_capabilities is not None:

src/mcp/server/lowlevel/server.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def main():
6262
from mcp.server.caching import CacheableMethod, CacheHint, validate_cache_hints
6363
from mcp.server.context import HandlerResult, ServerMiddleware, ServerRequestContext
6464
from mcp.server.models import InitializationOptions
65-
from mcp.server.runner import serve_loop
65+
from mcp.server.runner import serve_dual_era_loop
6666
from mcp.server.streamable_http import EventStore
6767
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
6868
from mcp.server.transport_security import TransportSecuritySettings
@@ -689,12 +689,14 @@ async def run(
689689
) -> None:
690690
"""Serve a single connection over the given streams until the read side closes.
691691
692-
Thin wrapper over `serve_loop`: enters the server lifespan,
693-
then drives the loop. Transports with their own lifespan owner
694-
(the streamable-HTTP manager) call `serve_loop` directly instead.
692+
Thin wrapper over `serve_dual_era_loop`: enters the server lifespan,
693+
then drives the loop, serving the legacy handshake era and the modern
694+
per-request-envelope era (the first era-distinctive message locks the
695+
connection). Transports with their own lifespan owner (the
696+
streamable-HTTP manager) call `serve_loop` directly instead.
695697
"""
696698
async with self.lifespan(self) as lifespan_context:
697-
await serve_loop(
699+
await serve_dual_era_loop(
698700
self,
699701
read_stream,
700702
write_stream,

src/mcp/server/runner.py

Lines changed: 219 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
pure kernel: it holds a pre-populated `Connection` and reads
66
`connection.protocol_version` / `connection.outbound` as facts. Driving a
77
dispatcher loop and tearing down the connection live in the free-function
8-
drivers (`serve_connection`, `serve_loop`, `serve_one`); the entry constructs
9-
the `Connection`, the driver tears it down.
8+
drivers (`serve_connection`, `serve_loop`, `serve_dual_era_loop`, `serve_one`);
9+
the entry constructs the `Connection`, the driver tears it down.
1010
1111
`ServerRunner` holds a `Server` directly - `Server` is the registry.
1212
"""
@@ -17,7 +17,7 @@
1717
from collections.abc import Awaitable, Mapping
1818
from dataclasses import KW_ONLY, dataclass
1919
from functools import cached_property, partial
20-
from typing import TYPE_CHECKING, Any, Generic, cast
20+
from typing import TYPE_CHECKING, Any, Generic, Literal, cast
2121

2222
import anyio
2323
import anyio.abc
@@ -26,31 +26,41 @@
2626
CLIENT_INFO_META_KEY,
2727
INTERNAL_ERROR,
2828
INVALID_PARAMS,
29+
INVALID_REQUEST,
2930
METHOD_NOT_FOUND,
3031
PROTOCOL_VERSION_META_KEY,
32+
UNSUPPORTED_PROTOCOL_VERSION,
3133
CacheableResult,
3234
ErrorData,
3335
Implementation,
3436
InitializeRequestParams,
3537
InitializeResult,
38+
RequestId,
3639
RequestParams,
3740
RequestParamsMeta,
41+
UnsupportedProtocolVersionErrorData,
3842
)
3943
from mcp_types import methods as _methods
40-
from mcp_types.version import HANDSHAKE_PROTOCOL_VERSIONS, LATEST_HANDSHAKE_VERSION, LATEST_MODERN_VERSION
44+
from mcp_types.version import (
45+
HANDSHAKE_PROTOCOL_VERSIONS,
46+
LATEST_HANDSHAKE_VERSION,
47+
LATEST_MODERN_VERSION,
48+
MODERN_PROTOCOL_VERSIONS,
49+
)
4150
from pydantic import BaseModel, ValidationError
4251
from typing_extensions import TypeVar
4352

4453
from mcp.server.caching import apply_cache_hint
45-
from mcp.server.connection import Connection
54+
from mcp.server.connection import Connection, NotifyOnlyOutbound
4655
from mcp.server.context import CallNext, HandlerResult, ServerMiddleware, ServerRequestContext
4756
from mcp.server.models import InitializationOptions
4857
from mcp.server.session import ServerSession
4958
from mcp.shared._stream_protocols import ReadStream, WriteStream
50-
from mcp.shared.dispatcher import DispatchContext, Dispatcher, OnNotify, OnRequest
51-
from mcp.shared.exceptions import MCPError
59+
from mcp.shared.dispatcher import CallOptions, DispatchContext, Dispatcher, OnNotify, OnRequest
60+
from mcp.shared.exceptions import MCPError, NoBackChannelError
61+
from mcp.shared.inbound import InboundLadderRejection, classify_inbound_request
5262
from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher
53-
from mcp.shared.message import ServerMessageMetadata, SessionMessage
63+
from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage
5464
from mcp.shared.transport_context import TransportContext
5565

5666
if TYPE_CHECKING:
@@ -63,6 +73,7 @@
6373
"aclose_shielded",
6474
"modern_on_request",
6575
"serve_connection",
76+
"serve_dual_era_loop",
6677
"serve_loop",
6778
"serve_one",
6879
]
@@ -427,6 +438,206 @@ async def serve_loop(
427438
)
428439

429440

441+
_MODERN_ENVELOPE_KEYS = (PROTOCOL_VERSION_META_KEY, CLIENT_INFO_META_KEY, CLIENT_CAPABILITIES_META_KEY)
442+
443+
444+
def _has_modern_envelope(params: Mapping[str, Any] | None) -> bool:
445+
"""Whether `params._meta` carries every reserved modern-envelope key.
446+
447+
Era evidence is the FULL key triple - bare `_meta` is not (legacy traffic
448+
carries `progressToken` there).
449+
"""
450+
if not params:
451+
return False
452+
meta = params.get("_meta")
453+
return isinstance(meta, Mapping) and all(key in meta for key in _MODERN_ENVELOPE_KEYS)
454+
455+
456+
def _initialize_after_modern_data(params: Mapping[str, Any] | None) -> dict[str, Any]:
457+
"""Error data for an `initialize` arriving on a modern-locked connection.
458+
459+
The typed -32022 payload when the client's proposed version is parseable;
460+
otherwise just the supported list (the point is naming what we serve).
461+
"""
462+
requested = (params or {}).get("protocolVersion")
463+
if isinstance(requested, str):
464+
return UnsupportedProtocolVersionErrorData(
465+
supported=list(MODERN_PROTOCOL_VERSIONS), requested=requested
466+
).model_dump(mode="json")
467+
return {"supported": list(MODERN_PROTOCOL_VERSIONS)}
468+
469+
470+
@dataclass
471+
class _NoServerRequestsDispatchContext:
472+
"""Delegating `DispatchContext` that refuses server-initiated requests.
473+
474+
Wraps the loop dispatcher's per-message context for modern-era dispatch:
475+
the modern protocol forbids server-initiated JSON-RPC requests, so
476+
`send_raw_request` refuses while notifications and progress still ride
477+
the duplex pipe.
478+
"""
479+
480+
_inner: DispatchContext[TransportContext]
481+
482+
@property
483+
def transport(self) -> TransportContext:
484+
return self._inner.transport
485+
486+
@property
487+
def can_send_request(self) -> bool:
488+
return False
489+
490+
@property
491+
def request_id(self) -> RequestId | None:
492+
return self._inner.request_id
493+
494+
@property
495+
def message_metadata(self) -> MessageMetadata:
496+
return self._inner.message_metadata
497+
498+
@property
499+
def cancel_requested(self) -> anyio.Event:
500+
return self._inner.cancel_requested
501+
502+
async def send_raw_request(
503+
self,
504+
method: str,
505+
params: Mapping[str, Any] | None,
506+
opts: CallOptions | None = None,
507+
) -> dict[str, Any]:
508+
raise NoBackChannelError(method)
509+
510+
async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
511+
await self._inner.notify(method, params, opts)
512+
513+
async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
514+
await self._inner.progress(progress, total, message)
515+
516+
517+
async def serve_dual_era_loop(
518+
server: Server[LifespanT],
519+
read_stream: ReadStream[SessionMessage | Exception],
520+
write_stream: WriteStream[SessionMessage],
521+
*,
522+
lifespan_state: LifespanT,
523+
session_id: str | None = None,
524+
init_options: InitializationOptions | None = None,
525+
raise_exceptions: bool = False,
526+
) -> None:
527+
"""Drive `server` over a duplex stream pair, serving both protocol eras.
528+
529+
The stream-pair counterpart of the modern HTTP entry's era router. Era is
530+
a property of the connection, decided by how the client opens it, and
531+
mid-stream switching is undefined - so the first era-distinctive message
532+
locks the connection (matching the typescript-sdk):
533+
534+
- `initialize` locks legacy: the connection behaves exactly like
535+
`serve_loop` for its lifetime, and modern envelope traffic is rejected
536+
with INVALID_REQUEST.
537+
- A request carrying the modern `_meta` envelope triple - or
538+
`server/discover`, a modern-only method - locks modern: every request is
539+
classified (`classify_inbound_request`) and served single-exchange via
540+
`serve_one` with a born-ready per-request `Connection`, the same
541+
dispatch model as the modern HTTP entry. A later `initialize` is
542+
rejected with UNSUPPORTED_PROTOCOL_VERSION naming the modern versions.
543+
544+
Modern connections push notifications over the duplex pipe but refuse
545+
server-initiated requests on both channels (the modern protocol forbids
546+
them). A rejected classification (malformed envelope, unsupported version)
547+
never locks the era, so a failed probe leaves the legacy handshake
548+
available - released auto-negotiating clients fall back on any error code
549+
except -32022.
550+
"""
551+
dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(
552+
read_stream,
553+
write_stream,
554+
raise_handler_exceptions=raise_exceptions,
555+
# `initialize` inline for the same pipelining reason as `serve_loop`;
556+
# `server/discover` inline so the modern era lock commits before the
557+
# next pipelined message is read.
558+
inline_methods=frozenset({"initialize", "server/discover"}),
559+
)
560+
loop_connection = Connection.for_loop(dispatcher, session_id=session_id)
561+
loop_runner = ServerRunner(server, loop_connection, lifespan_state, init_options=init_options)
562+
standalone_outbound = NotifyOnlyOutbound(dispatcher)
563+
era: Literal["unlocked", "legacy", "modern"] = "unlocked"
564+
modern_version = LATEST_MODERN_VERSION
565+
566+
async def serve_modern(
567+
dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
568+
) -> dict[str, Any]:
569+
nonlocal era, modern_version
570+
route = classify_inbound_request({"method": method, "params": params})
571+
if isinstance(route, InboundLadderRejection):
572+
raise MCPError(code=route.code, message=route.message, data=route.data)
573+
if era != "modern":
574+
era, modern_version = "modern", route.protocol_version
575+
if method == "subscriptions/listen":
576+
# The registered listen handler assumes the HTTP entry's stream
577+
# semantics; served over a stream pair it would wedge. Reject until
578+
# this transport grows its own listen design.
579+
raise MCPError(
580+
code=METHOD_NOT_FOUND, message="subscriptions/listen is not served over this transport", data=method
581+
)
582+
connection = Connection.from_envelope(
583+
route.protocol_version,
584+
route.client_info,
585+
route.client_capabilities,
586+
outbound=standalone_outbound,
587+
)
588+
return await serve_one(
589+
server,
590+
_NoServerRequestsDispatchContext(dctx),
591+
method,
592+
params,
593+
connection=connection,
594+
lifespan_state=lifespan_state,
595+
)
596+
597+
async def on_request(
598+
dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
599+
) -> dict[str, Any]:
600+
nonlocal era
601+
if era == "legacy":
602+
if method == "server/discover" or _has_modern_envelope(params):
603+
raise MCPError(
604+
code=INVALID_REQUEST,
605+
message="connection is locked to the legacy handshake era; "
606+
"modern envelope requests are not accepted",
607+
)
608+
return await loop_runner.on_request(dctx, method, params)
609+
if era == "modern" and method == "initialize":
610+
raise MCPError(
611+
code=UNSUPPORTED_PROTOCOL_VERSION,
612+
message="connection already negotiated a modern protocol version",
613+
data=_initialize_after_modern_data(params),
614+
)
615+
if era == "modern" or method == "server/discover" or _has_modern_envelope(params):
616+
return await serve_modern(dctx, method, params)
617+
result = await loop_runner.on_request(dctx, method, params)
618+
if method == "initialize":
619+
# Lock only on success: a failed handshake leaves both eras open.
620+
era = "legacy"
621+
return result
622+
623+
async def on_notify(dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None) -> None:
624+
if era != "modern":
625+
return await loop_runner.on_notify(dctx, method, params)
626+
# The envelope is request-only, so notifications inherit the
627+
# connection's locked version.
628+
connection = Connection.from_envelope(modern_version, None, None, outbound=standalone_outbound)
629+
notify_runner = ServerRunner(server, connection, lifespan_state)
630+
try:
631+
await notify_runner.on_notify(_NoServerRequestsDispatchContext(dctx), method, params)
632+
finally:
633+
await aclose_shielded(connection)
634+
635+
try:
636+
await dispatcher.run(on_request, on_notify)
637+
finally:
638+
await aclose_shielded(loop_connection)
639+
640+
430641
async def serve_one(
431642
server: Server[LifespanT],
432643
dctx: DispatchContext[TransportContext],

0 commit comments

Comments
 (0)