Skip to content

Commit ca10dad

Browse files
authored
Serve subscriptions/listen with a pluggable event bus (SEP-2575) (#3035)
1 parent 48ef569 commit ca10dad

32 files changed

Lines changed: 1722 additions & 66 deletions

.github/actions/conformance/expected-failures.2026-07-28.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,4 @@
2222

2323
client: []
2424

25-
server:
26-
# SEP-2575 subscriptions/listen is not implemented yet; see the matching
27-
# entry in expected-failures.yml for the full rationale.
28-
- server-stateless
25+
server: []

.github/actions/conformance/expected-failures.yml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,6 @@
1313
client: []
1414

1515
server:
16-
# SEP-2575 subscriptions/listen is not implemented yet. The everything-
17-
# server's legacy resources/subscribe handlers make it advertise
18-
# `resources.subscribe` in server/discover, and as of conformance #372 a
19-
# server that advertises a subscription capability but answers
20-
# subscriptions/listen with -32601 fails the three listen MUST checks
21-
# ("Not testable") instead of skipping them. Remove this entry when the
22-
# listen runtime lands. NOTE: while listed, this entry also masks new
23-
# failures in the scenario's other 25 (currently passing) checks — the
24-
# baseline is per-scenario, not per-check.
25-
- server-stateless
2616
# SEP-2663 (io.modelcontextprotocol/tasks): the SDK does not implement the
2717
# tasks extension yet. These extension-tagged scenarios are selected only by
2818
# the bare `--suite all` leg — extension scenarios never match a

docs/advanced/low-level-server.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ Each of these is one idea you now have the vocabulary for; each has its own chap
183183

184184
* `on_call_tool`, `on_get_prompt`, and `on_read_resource` may return an `InputRequiredResult` instead of their normal result to pause the call and ask the client for input; see **[Multi-round-trip requests](multi-round-trip.md)**. True to this tier, nothing is installed for you: where `MCPServer` seals `requestState` by default, here the `request_state` you set crosses the wire exactly as written until you opt in with `server.middleware.append(RequestStateBoundary(RequestStateSecurity(keys=[...]), default_audience=server.name))`: one line (both names import from `mcp.server.request_state`) for the identical sealing and verification `MCPServer` performs (**[Protecting `requestState`](multi-round-trip.md#protecting-requeststate)**).
185185
* `on_list_resources`, `on_read_resource`, `on_list_prompts`, `on_get_prompt`, `on_completion` are the same `(ctx, params) -> result` shape for the other primitives.
186+
* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass a `ListenHandler` built over a `SubscriptionBus` and publish events to the bus from your other handlers; see **[Subscriptions](subscriptions.md)** for the full composition.
186187
* `server.streamable_http_app()` returns the same Starlette app `MCPServer`'s does; deploy it the way **[Running your server](../run/index.md)** deploys any other ASGI app. There is no `server.run(transport=...)` down here: `server.run(read_stream, write_stream, server.create_initialization_options())` drives one connection over a pair of streams, and that one line is the whole story.
187188

188189
## Recap

docs/advanced/subscriptions.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Subscriptions
2+
3+
A server's catalog is not fixed. Tools get registered at runtime, resources change behind their URIs. The client side of that story is a subscription: on the 2026-07-28 protocol, a client that wants to hear about changes sends one `subscriptions/listen` request, and the response to that request *is* the stream — it stays open, carrying exactly the notification kinds the client asked for.
4+
5+
Your side of it is one line: publish the change.
6+
7+
```python title="server.py" hl_lines="16 27"
8+
--8<-- "docs_src/subscriptions/tutorial001.py"
9+
```
10+
11+
* `await ctx.notify_resource_updated("note://todo")` delivers `notifications/resources/updated` to every open listen stream that subscribed to that URI. Not to anyone else.
12+
* `await ctx.notify_tools_changed()` delivers `notifications/tools/list_changed` to every stream that asked for tool-list changes. A client that receives it calls `tools/list` again — and now sees `search`.
13+
* The siblings are `notify_prompts_changed()` and `notify_resources_changed()`, for the other two list-changed kinds.
14+
* No subscribers, no work: publishing to an idle server is a no-op. You don't check whether anyone is listening; you state what changed.
15+
16+
The SDK serves `subscriptions/listen` for you — `MCPServer` registers the handler at construction, and the wire obligations (the acknowledgment as the first frame, the per-stream filtering, the subscription id tagged onto every frame) are its job, not yours.
17+
18+
!!! check
19+
On the wire, a stream whose filter named `note://todo` looks like this after `edit_note` runs:
20+
21+
```json
22+
{"method": "notifications/subscriptions/acknowledged",
23+
"params": {"notifications": {"resourceSubscriptions": ["note://todo"]}, "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}}
24+
25+
{"method": "notifications/resources/updated",
26+
"params": {"uri": "note://todo", "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}}
27+
```
28+
29+
The acknowledgment echoes the filter the server agreed to honor, and every frame carries the
30+
listen request's JSON-RPC id under `_meta` — that id *is* the subscription id.
31+
32+
## Only what was asked for
33+
34+
The filter is a contract. A stream that requested tool-list changes and one resource URI receives those two kinds and nothing else — publish a prompt change and that stream stays silent. Resource URIs are matched as exact strings: `note://todo` does not cover `note://todo/draft`.
35+
36+
!!! warning
37+
Filters are honored without per-client authorization: any client may name any URI —
38+
including one it cannot read — and will receive update notifications for it (resource
39+
existence and change timing, never content). On a multi-tenant server, don't publish
40+
sensitive per-user URIs through `notify_resource_updated`, or serve the method with
41+
your own handler on the low-level `Server` and narrow the filter there before acking —
42+
the honored subset exists in the protocol precisely so servers can do this.
43+
44+
Two more things the stream is *not*:
45+
46+
* **It is not a replay log.** A dropped stream is gone; events published while nobody was connected are not queued. The client's contract is to re-listen and re-fetch what it cares about.
47+
* **It is not the 2025 path.** Clients on earlier protocol versions that called `resources/subscribe` are served by `ctx.session.send_resource_updated(uri)` — the `notify_*` methods reach `subscriptions/listen` streams only.
48+
49+
## One process is the default. More takes a bus
50+
51+
Publishes travel from your handler to the open streams over a `SubscriptionBus`. The default is in-memory: one process, every stream in it. That is the right answer until you run replicas behind a load balancer — then a client's stream is pinned to one replica, and a publish on another replica has to reach it.
52+
53+
That seam is yours to implement: two methods over your pub/sub backend.
54+
55+
```python
56+
class RedisSubscriptionBus:
57+
async def publish(self, event: ServerEvent) -> None:
58+
await self.redis.publish("mcp-events", encode(event)) # to every replica
59+
60+
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
61+
... # register the local listener; a reader task calls it for arriving events
62+
```
63+
64+
```python
65+
mcp = MCPServer("Notebook", subscriptions=RedisSubscriptionBus(...))
66+
```
67+
68+
The bus carries typed `ServerEvent` values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. To publish from outside a request, keep a reference to the bus you constructed and `await bus.publish(ToolsListChanged())` — the server holds the same instance.
69+
70+
## The low-level composition
71+
72+
Down on the low-level `Server` there is no pre-wired anything — and the same parts assemble in three lines:
73+
74+
```python title="server.py" hl_lines="9 31 39"
75+
--8<-- "docs_src/subscriptions/tutorial002.py"
76+
```
77+
78+
* You own the bus, so you publish to it directly: `await bus.publish(ResourceUpdated(uri=...))`. Put it wherever your handlers can reach it — module scope here, the lifespan in a bigger app.
79+
* `ListenHandler(bus)` is the same handler `MCPServer` registers; `on_subscriptions_listen=` is an ordinary handler slot. Don't want the SDK's semantics? Write your own handler for the slot — the spec obligations come with it.
80+
* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately — a clean end, as opposed to the abrupt drop a client may treat as a cue to reconnect. Without it, streams end when the client disconnects.
81+
82+
## Recap
83+
84+
* A client opts in with one `subscriptions/listen` request; the response is the stream. There is nothing to configure server-side — serving it is built in.
85+
* You publish: `await ctx.notify_resource_updated(uri)`, `notify_tools_changed()`, `notify_prompts_changed()`, `notify_resources_changed()`. Idle servers make these free.
86+
* Streams receive only what their filter requested; URIs match exactly; nothing is replayed.
87+
* Scaling out means implementing `SubscriptionBus` — two methods — over your own pub/sub, and passing it as `MCPServer(subscriptions=...)`.
88+
* The low-level spelling is the same machinery held in your hands: a bus, `ListenHandler(bus)`, one constructor argument.

docs/client/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ The resource verbs come in pairs: two ways to list, one way to read.
145145

146146
`read_resource` returns `contents`, a list of `TextResourceContents` or `BlobResourceContents`. Same idea as tool content: narrow with `isinstance`, then read `.text` (or `.blob`).
147147

148-
A client can also **subscribe** to a resource and be told when it changes: `subscribe_resource(uri)` and `unsubscribe_resource(uri)`, same shape as everything else here. `MCPServer` doesn't implement that half. It says so up front (`server_capabilities.resources.subscribe` is `False`) and answers the request with an `MCPError`: `-32601`, *Method not found*. A server that does support subscriptions is built on the low-level `Server` (**[The low-level Server](../advanced/low-level-server.md)**).
148+
A client can also be told when a resource changes. On 2025-era connections that is `subscribe_resource(uri)` / `unsubscribe_resource(uri)` - a method pair `MCPServer` doesn't implement, so on the 2026-07-28 wire (where those verbs no longer exist) the request answers `-32601`, *Method not found*. The 2026 replacement is a `subscriptions/listen` stream, which `MCPServer` *does* serve - `server_capabilities.resources.subscribe` is `True` there, and the server side of the story is **[Subscriptions](../advanced/subscriptions.md)**.
149149

150150
## Prompts
151151

docs/tutorial/context.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ What a server offers is not fixed at import time. Register a tool at runtime, th
104104

105105
The siblings are `send_resource_list_changed()`, `send_prompt_list_changed()`, and `send_resource_updated(uri)` for a change to one specific resource.
106106

107+
On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened — the `send_*` methods above do not reach those streams. The `Context` publish methods — `await ctx.notify_tools_changed()`, `await ctx.notify_prompts_changed()`, `await ctx.notify_resources_changed()`, and `await ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once. The whole story, including scaling out across replicas, is in **[Subscriptions](../advanced/subscriptions.md)**.
108+
107109
!!! check
108110
Before anyone runs `enable_recommendations`, the tool you are promising does not exist. Call it
109111
anyway and the result is an error the model can read:

docs/tutorial/first-steps.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ asyncio.run(main())
9797
```
9898

9999
```text
100-
{'prompts': {'list_changed': False}, 'resources': {'subscribe': False, 'list_changed': False}, 'tools': {'list_changed': False}}
100+
{'prompts': {'list_changed': True}, 'resources': {'subscribe': True, 'list_changed': True}, 'tools': {'list_changed': True}}
101101
```
102102

103103
That dictionary is the server's half of the handshake:

docs_src/subscriptions/__init__.py

Whitespace-only changes.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from mcp.server.mcpserver import Context, MCPServer
2+
3+
mcp = MCPServer("Notebook")
4+
5+
NOTES = {"todo": "buy milk", "journal": "day one"}
6+
7+
8+
@mcp.resource("note://{name}")
9+
def note(name: str) -> str:
10+
return NOTES[name]
11+
12+
13+
@mcp.tool()
14+
async def edit_note(name: str, text: str, ctx: Context) -> str:
15+
NOTES[name] = text
16+
await ctx.notify_resource_updated(f"note://{name}")
17+
return "saved"
18+
19+
20+
def search(query: str) -> list[str]:
21+
return [name for name, text in NOTES.items() if query in text]
22+
23+
24+
@mcp.tool()
25+
async def enable_search(ctx: Context) -> str:
26+
mcp.add_tool(search)
27+
await ctx.notify_tools_changed()
28+
return "search is live"
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from typing import Any
2+
3+
import mcp_types as types
4+
5+
from mcp.server.context import ServerRequestContext
6+
from mcp.server.lowlevel import Server
7+
from mcp.server.subscriptions import InMemorySubscriptionBus, ListenHandler, ResourceUpdated
8+
9+
bus = InMemorySubscriptionBus()
10+
11+
NOTES = {"todo": "buy milk"}
12+
13+
EDIT_NOTE_SCHEMA: dict[str, Any] = {
14+
"type": "object",
15+
"properties": {"name": {"type": "string"}, "text": {"type": "string"}},
16+
"required": ["name", "text"],
17+
}
18+
19+
20+
async def list_tools(
21+
ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None
22+
) -> types.ListToolsResult:
23+
return types.ListToolsResult(
24+
tools=[types.Tool(name="edit_note", description="Replace a note's text.", input_schema=EDIT_NOTE_SCHEMA)]
25+
)
26+
27+
28+
async def call_tool(ctx: ServerRequestContext[Any], params: types.CallToolRequestParams) -> types.CallToolResult:
29+
args = params.arguments or {}
30+
NOTES[args["name"]] = args["text"]
31+
await bus.publish(ResourceUpdated(uri=f"note://{args['name']}"))
32+
return types.CallToolResult(content=[types.TextContent(type="text", text="saved")])
33+
34+
35+
server = Server(
36+
"notebook",
37+
on_list_tools=list_tools,
38+
on_call_tool=call_tool,
39+
on_subscriptions_listen=ListenHandler(bus),
40+
)

0 commit comments

Comments
 (0)