From 35b46701bb4f716a5488efa427a74b62bdc60eef Mon Sep 17 00:00:00 2001 From: Koichi ITO Date: Tue, 21 Apr 2026 01:15:42 +0900 Subject: [PATCH] Support `notifications/cancelled` per MCP specification ## Motivation and Context The MCP specification defines `notifications/cancelled` so either party can stop a previously-issued in-flight request. The Ruby SDK declared the method constant but had no receive logic, no in-flight tracking, and no way for handlers to participate in cancellation; cancellation notifications fell through to the unsupported-method path. This PR implements the **server-side** half of the spec. The server stops processing the targeted request cooperatively and suppresses its JSON-RPC response, matching the Python SDK's anyio `CancelScope` and the TypeScript SDK's `RequestHandlerExtra.signal`. Cancellation is observable by every user-overridable request handler: - `Tool.call` (tools/call) - prompt templates (prompts/get) - blocks registered via `resources_read_handler` / `completion_handler` / `resources_subscribe_handler` / `resources_unsubscribe_handler` / `define_custom_method` Each handler opts in by declaring a `server_context:` keyword and polls `server_context.cancelled?` or calls `server_context.raise_if_cancelled!` (raising `MCP::CancelledError`) inside long-running work. Handlers that keep their existing `|params|` (or `|args|`) signature continue to work unchanged. On `StreamableHTTPTransport`, cancelling a parent `tools/call` automatically cancels n ested server-to-client requests (`sampling/createMessage`, `elicitation/create`); the nested `send_request` raises `MCP::CancelledError` and a cancel notification is routed to the peer on the parent's POST response stream. `StdioTransport` is single-threaded and blocks on `$stdin.gets`, so it deliberately does not propagate nested cancellation. Tools running on stdio still observe cancellation between calls via `server_context.cancelled?`. The design is cooperative-only (no `Thread#raise`) because preemptive cancellation is unsafe across Rack adapters and arbitrary handler code. The `initialize` request is never cancellable, satisfying the spec rule that it MUST NOT be cancelled. Unknown / completed / duplicate cancel notifications are silently ignored per the spec. `MCP::Client#cancel` (an equivalent that aborts the calling thread's synchronous wait) is deferred to a follow-up PR. Ref: https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/cancellation ## How Has This Been Tested? `test/mcp/cancellation_test.rb` covers the `MCP::Cancellation` token contract. `test/mcp/server_cancellation_test.rb` covers end-to-end cancellation: - A handler spins on `cancelled?` in a background thread; the test sends `notifications/cancelled` and asserts the handler observed cancellation and the JSON-RPC response was suppressed. Each user-overridable handler type (tool, prompt template, resources/read, completion, custom method) has its own regression test. - `initialize` is not cancellable; unknown / duplicate / late-after-completion cancels are silently ignored. - The `reason` propagates into the `cancellation_reason` instrumentation field. - Custom transports implementing only the abstract `(method, params = nil)` contract keep working; the new kwargs (`session_id:`, `related_request_id:`, `parent_cancellation:`, `server_session:`) are silently dropped when the transport's signature does not declare them. `StreamableHTTPTransport` tests cover nested cancellation, hook deregistration after normal completion (so a late parent cancel does not emit a stray `notifications/cancelled`), and the first-writer-wins race when a real response and a cancel arrive concurrently. ## Breaking Change None. `MCP::ServerContext#initialize` gains an optional `cancellation:` keyword (defaults to `nil`), and `StreamableHTTPTransport#send_request` / `#send_notification` gain optional cancellation kwargs. The abstract `Transport` base class and `StdioTransport` keep their existing `(method, params = nil)` signatures, and custom transports following those signatures continue to work. `StreamableHTTPTransport` now dispatches client-originated notifications through `ServerSession#handle_json` before returning 202; previously these were accepted without dispatch, but the existing handlers for `notifications/initialized` and `notifications/progress` were already no-ops, so no user-visible effect is expected. Adding `server_context:` to a handler block is strictly opt-in. --- README.md | 129 ++++ lib/json_rpc_handler.rb | 6 + lib/mcp.rb | 2 + lib/mcp/cancellation.rb | 72 ++ lib/mcp/cancelled_error.rb | 13 + lib/mcp/server.rb | 161 ++++- lib/mcp/server/transports/stdio_transport.rb | 7 + .../transports/streamable_http_transport.rb | 58 +- lib/mcp/server_context.rb | 13 +- lib/mcp/server_session.rb | 125 +++- test/mcp/cancellation_test.rb | 66 ++ .../streamable_http_transport_test.rb | 158 +++++ test/mcp/server_cancellation_test.rb | 627 ++++++++++++++++++ test/mcp/server_elicitation_test.rb | 4 +- test/mcp/server_sampling_test.rb | 4 +- 15 files changed, 1404 insertions(+), 41 deletions(-) create mode 100644 lib/mcp/cancellation.rb create mode 100644 lib/mcp/cancelled_error.rb create mode 100644 test/mcp/cancellation_test.rb create mode 100644 test/mcp/server_cancellation_test.rb diff --git a/README.md b/README.md index ef701139..d180dccb 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ It implements the Model Context Protocol specification, handling model context r - Supports roots (server-to-client filesystem boundary queries) - Supports sampling (server-to-client LLM completion requests) - Supports cursor-based pagination for list operations +- Supports server-side cancellation of in-flight requests (notifications/cancelled) ### Supported Methods @@ -1096,9 +1097,137 @@ Notifications follow the JSON-RPC 2.0 specification and use these method names: - `notifications/tools/list_changed` - `notifications/prompts/list_changed` - `notifications/resources/list_changed` +- `notifications/cancelled` - `notifications/progress` - `notifications/message` +### Cancellation + +The MCP Ruby SDK supports server-side handling of the +[MCP `notifications/cancelled` utility](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/cancellation). +When a client sends `notifications/cancelled` for an in-flight request, the server stops +processing cooperatively and suppresses the JSON-RPC response for that request. + +Cancellation is cooperative: the SDK does not forcibly terminate tool code. Instead, +a `MCP::Cancellation` token is threaded through `server_context`, and long-running tools +poll it to exit early. When a tool returns after cancellation has been observed, +the server suppresses the JSON-RPC response, matching the spec. The `initialize` request +is never cancellable per the spec. + +> [!NOTE] +> Client-initiated cancellation (`Client#cancel` equivalent that would also abort +> the calling thread's wait) is not yet implemented. Sending `notifications/cancelled` +> from the client side can be done by constructing the notification payload and writing it +> directly through the transport, but the calling thread does not yet unwind automatically. +> This is tracked as a follow-up. + +#### Server-Side: Handlers that Check for Cancellation + +Any handler that opts in to `server_context:` - tools (`Tool.call`), prompt templates, +`resources_read_handler`, `completion_handler`, `resources_subscribe_handler`, +`resources_unsubscribe_handler`, and `define_custom_method` blocks - receives +an `MCP::ServerContext` wired to the in-flight request's cancellation token. +Handlers check `cancelled?` in their work loop, or call `raise_if_cancelled!` to raise +`MCP::CancelledError` at a safe point: + +```ruby +class LongRunningTool < MCP::Tool + description "A tool that supports cancellation" + input_schema(properties: { count: { type: "integer" } }, required: ["count"]) + + def self.call(count:, server_context:) + count.times do |i| + # Exit early if the client has sent `notifications/cancelled`. + break if server_context.cancelled? + + do_work(i) + end + + MCP::Tool::Response.new([{ type: "text", text: "Done" }]) + end +end +``` + +Alternatively, raise at the next safe point with `raise_if_cancelled!`: + +```ruby +def self.call(count:, server_context:) + count.times do |i| + server_context.raise_if_cancelled! + + do_work(i) + end + + MCP::Tool::Response.new([{ type: "text", text: "Done" }]) +end +``` + +When a handler observes cancellation (either by returning early with `cancelled?` or +by raising `MCP::CancelledError` via `raise_if_cancelled!`), the server drops the response and +no JSON-RPC result is sent to the client. + +The same pattern works for other handler types: + +```ruby +# resources/read +server.resources_read_handler do |params, server_context:| + server_context.raise_if_cancelled! + # read the resource +end + +# completion/complete +server.completion_handler do |params, server_context:| + server_context.raise_if_cancelled! + # compute completions +end + +# custom method +server.define_custom_method(method_name: "custom/slow") do |params, server_context:| + server_context.raise_if_cancelled! + # do work +end + +# prompts (via Prompt subclass) +class SlowPrompt < MCP::Prompt + prompt_name "slow_prompt" + + def self.template(args, server_context:) + server_context.raise_if_cancelled! + MCP::Prompt::Result.new(messages: []) + end +end +``` + +Handlers that do not declare a `server_context:` keyword continue to work unchanged - +the opt-in detection only wraps the context when the block signature asks for it. + +#### Nested Server-to-Client Requests Are Cancelled Automatically + +When a tool handler is waiting on a nested server-to-client request +(`server_context.create_sampling_message`, `create_form_elicitation`, or +`create_url_elicitation`), cancelling the parent tool call automatically raises +`MCP::CancelledError` from the nested call, so the tool does not need to wrap it +in its own `cancelled?` checks: + +```ruby +def self.call(server_context:) + result = server_context.create_sampling_message(messages: messages, max_tokens: 100) + # If the parent tools/call is cancelled while waiting above, MCP::CancelledError + # is raised here and the tool can let it propagate or clean up as needed. + MCP::Tool::Response.new([{ type: "text", text: result[:content][:text] }]) +rescue MCP::CancelledError + # Optional: run cleanup. Re-raising (or letting it propagate) is fine; the server + # will still suppress the JSON-RPC response per the MCP spec. + raise +end +``` + +Nested cancellation propagation is supported on `StreamableHTTPTransport` only. +`StdioTransport` is single-threaded and blocks on `$stdin.gets`, so a nested +`server_context.create_sampling_message` inside a tool runs to completion even if +the parent `tools/call` is cancelled. The parent tool itself still observes cancellation +via `server_context.cancelled?` between nested calls. + ### Ping The MCP Ruby SDK supports the diff --git a/lib/json_rpc_handler.rb b/lib/json_rpc_handler.rb index d564c78d..977afa39 100644 --- a/lib/json_rpc_handler.rb +++ b/lib/json_rpc_handler.rb @@ -18,6 +18,11 @@ class ErrorCode DEFAULT_ALLOWED_ID_CHARACTERS = /\A[a-zA-Z0-9_-]+\z/ + # Sentinel return value from a handler. When a handler returns this, + # `process_request` emits no JSON-RPC response for the request, + # matching the notification-style semantics (id is ignored). + NO_RESPONSE = Object.new.freeze + extend self def handle(request, id_validation_pattern: DEFAULT_ALLOWED_ID_CHARACTERS, &method_finder) @@ -103,6 +108,7 @@ def process_request(request, id_validation_pattern:, &method_finder) end result = method.call(params) + return if result.equal?(NO_RESPONSE) success_response(id: id, result: result) rescue MCP::Server::RequestHandlerError => e diff --git a/lib/mcp.rb b/lib/mcp.rb index 79082c46..32b7eedd 100644 --- a/lib/mcp.rb +++ b/lib/mcp.rb @@ -8,6 +8,8 @@ module MCP autoload :Annotations, "mcp/annotations" + autoload :Cancellation, "mcp/cancellation" + autoload :CancelledError, "mcp/cancelled_error" autoload :Client, "mcp/client" autoload :Content, "mcp/content" autoload :Icon, "mcp/icon" diff --git a/lib/mcp/cancellation.rb b/lib/mcp/cancellation.rb new file mode 100644 index 00000000..b702ae26 --- /dev/null +++ b/lib/mcp/cancellation.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +require_relative "cancelled_error" + +module MCP + class Cancellation + attr_reader :reason, :request_id + + def initialize(request_id: nil) + @request_id = request_id + @reason = nil + @cancelled = false + @callbacks = [] + @mutex = Mutex.new + end + + def cancelled? + @mutex.synchronize { @cancelled } + end + + def cancel(reason: nil) + callbacks = @mutex.synchronize do + return false if @cancelled + + @cancelled = true + @reason = reason + @callbacks.tap { @callbacks = [] } + end + + callbacks.each do |callback| + callback.call(reason) + rescue StandardError => e + MCP.configuration.exception_reporter.call(e, { error: "Cancellation callback failed" }) + end + + true + end + + # Registers a callback invoked synchronously on the first `cancel` call. + # If already cancelled, fires immediately. + # + # Returns the block itself as a handle that can be passed to `off_cancel` + # to deregister it (e.g. when a nested request completes normally and the + # hook should not fire on a later parent cancellation). + def on_cancel(&block) + fire_now = false + @mutex.synchronize do + if @cancelled + fire_now = true + else + @callbacks << block + end + end + + block.call(@reason) if fire_now + block + end + + # Removes a previously-registered `on_cancel` callback. Returns `true` + # if the callback was still pending (i.e. had not yet fired), `false` + # otherwise. Safe to call with `nil`. + def off_cancel(handle) + return false unless handle + + @mutex.synchronize { !@callbacks.delete(handle).nil? } + end + + def raise_if_cancelled! + raise CancelledError.new(request_id: @request_id, reason: @reason) if cancelled? + end + end +end diff --git a/lib/mcp/cancelled_error.rb b/lib/mcp/cancelled_error.rb new file mode 100644 index 00000000..767e6f92 --- /dev/null +++ b/lib/mcp/cancelled_error.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module MCP + class CancelledError < StandardError + attr_reader :request_id, :reason + + def initialize(message = "Request was cancelled", request_id: nil, reason: nil) + super(message) + @request_id = request_id + @reason = reason + end + end +end diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index b687fc5d..b28334af 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true require_relative "../json_rpc_handler" +require_relative "cancellation" +require_relative "cancelled_error" require_relative "instrumentation" require_relative "methods" require_relative "logging_message_notification" @@ -384,6 +386,13 @@ def schema_contains_ref?(schema) end def handle_request(request, method, session: nil, related_request_id: nil) + # `notifications/cancelled` is dispatched directly: it is a notification (no JSON-RPC id) + # and intentionally bypasses the `@handlers` lookup, capability check, in-flight registry, + # and rescue blocks below. + if method == Methods::NOTIFICATIONS_CANCELLED + return ->(params) { handle_cancelled_notification(params, session: session) } + end + handler = @handlers[method] unless handler instrument_call("unsupported_method", server_context: { request: request }) do @@ -395,6 +404,12 @@ def handle_request(request, method, session: nil, related_request_id: nil) Methods.ensure_capability!(method, capabilities) + # `initialize` MUST NOT be cancelled (MCP spec 2025-11-25, cancellation item 2), + # so do not track it in the in-flight registry. + cancellation = if related_request_id && method != Methods::INITIALIZE + session&.register_in_flight(related_request_id) + end + ->(params) { reported_exception = nil instrument_call( @@ -406,23 +421,33 @@ def handle_request(request, method, session: nil, related_request_id: nil) when Methods::INITIALIZE init(params, session: session) when Methods::RESOURCES_READ - { contents: @handlers[Methods::RESOURCES_READ].call(params) } + { contents: read_resource_contents(params, session: session, related_request_id: related_request_id, cancellation: cancellation) } when Methods::RESOURCES_SUBSCRIBE, Methods::RESOURCES_UNSUBSCRIBE - @handlers[method].call(params) + dispatch_optional_context_handler(@handlers[method], params, session: session, related_request_id: related_request_id, cancellation: cancellation) {} when Methods::TOOLS_CALL - call_tool(params, session: session, related_request_id: related_request_id) + call_tool(params, session: session, related_request_id: related_request_id, cancellation: cancellation) + when Methods::PROMPTS_GET + get_prompt(params, session: session, related_request_id: related_request_id, cancellation: cancellation) when Methods::COMPLETION_COMPLETE - complete(params) + complete(params, session: session, related_request_id: related_request_id, cancellation: cancellation) when Methods::LOGGING_SET_LEVEL configure_logging_level(params, session: session) else - @handlers[method].call(params) + dispatch_optional_context_handler(@handlers[method], params, session: session, related_request_id: related_request_id, cancellation: cancellation) end client = session&.client || @client add_instrumentation_data(client: client) if client + if cancellation&.cancelled? + add_instrumentation_data(cancelled: true, cancellation_reason: cancellation.reason) + next JsonRpcHandler::NO_RESPONSE + end + result + rescue CancelledError => e + add_instrumentation_data(cancelled: true, cancellation_reason: e.reason) + next JsonRpcHandler::NO_RESPONSE rescue RequestHandlerError => e report_exception(e.original_error || e, { request: request }) add_instrumentation_data(error: e.error_type) @@ -434,10 +459,23 @@ def handle_request(request, method, session: nil, related_request_id: nil) wrapped = RequestHandlerError.new("Internal error handling #{method} request", request, original_error: e) reported_exception = wrapped raise wrapped + ensure + session&.unregister_in_flight(related_request_id) if related_request_id end } end + def handle_cancelled_notification(params, session: nil) + return unless session + return unless params.is_a?(Hash) + + request_id = params[:requestId] || params["requestId"] + return if request_id.nil? + + reason = params[:reason] || params["reason"] + session.cancel_incoming(request_id: request_id, reason: reason) + end + def default_capabilities { tools: { listChanged: true }, @@ -516,7 +554,7 @@ def list_tools(request) { tools: page[:items], nextCursor: page[:next_cursor] }.compact end - def call_tool(request, session: nil, related_request_id: nil) + def call_tool(request, session: nil, related_request_id: nil, cancellation: nil) tool_name = request[:name] tool = tools[tool_name] @@ -548,8 +586,12 @@ def call_tool(request, session: nil, related_request_id: nil) progress_token = request.dig(:_meta, :progressToken) - call_tool_with_args(tool, arguments, server_context_with_meta(request), progress_token: progress_token, session: session, related_request_id: related_request_id) - rescue RequestHandlerError + call_tool_with_args( + tool, arguments, server_context_with_meta(request), progress_token: progress_token, session: session, related_request_id: related_request_id, cancellation: cancellation + ) + rescue RequestHandlerError, CancelledError + # CancelledError is intentionally not wrapped so `handle_request` can turn it into + # `JsonRpcHandler::NO_RESPONSE` per the MCP cancellation spec. raise rescue => e raise RequestHandlerError.new( @@ -566,7 +608,7 @@ def list_prompts(request) { prompts: page[:items], nextCursor: page[:next_cursor] }.compact end - def get_prompt(request) + def get_prompt(request, session: nil, related_request_id: nil, cancellation: nil) prompt_name = request[:name] prompt = @prompts[prompt_name] unless prompt @@ -579,7 +621,14 @@ def get_prompt(request) prompt_args = request[:arguments] prompt.validate_arguments!(prompt_args) - call_prompt_template_with_args(prompt, prompt_args, server_context_with_meta(request)) + server_context = build_server_context( + request: request, + session: session, + related_request_id: related_request_id, + cancellation: cancellation, + ) + + call_prompt_template_with_args(prompt, prompt_args, server_context) end def list_resources(request) @@ -600,14 +649,82 @@ def list_resource_templates(request) { resourceTemplates: page[:items], nextCursor: page[:next_cursor] }.compact end - def complete(params) + def complete(params, session: nil, related_request_id: nil, cancellation: nil) validate_completion_params!(params) - result = @handlers[Methods::COMPLETION_COMPLETE].call(params) + result = dispatch_optional_context_handler( + @handlers[Methods::COMPLETION_COMPLETE], + params, + session: session, + related_request_id: related_request_id, + cancellation: cancellation, + ) normalize_completion_result(result) end + # Invokes `resources/read` via the registered handler. If the handler block opts in to `server_context:`, + # pass an `MCP::ServerContext` so the handler can observe cancellation via `server_context.cancelled?` or + # `server_context.raise_if_cancelled!`. + def read_resource_contents(request, session: nil, related_request_id: nil, cancellation: nil) + dispatch_optional_context_handler( + @handlers[Methods::RESOURCES_READ], + request, + session: session, + related_request_id: related_request_id, + cancellation: cancellation, + ) + end + + # Opt-in `server_context:` dispatch for block-based handlers registered via `resources_read_handler`, + # `completion_handler`, `resources_subscribe_handler`, `resources_unsubscribe_handler`, or `define_custom_method`. + # Existing handlers that only accept `params` are called unchanged; handlers that declare a `server_context:` + # keyword receive an `MCP::ServerContext` wrapping the raw server context with cancellation plumbing. + def dispatch_optional_context_handler(handler, params, session: nil, related_request_id: nil, cancellation: nil) + return handler.call(params) unless handler_declares_server_context?(handler) + + server_context = build_server_context( + request: params, + session: session, + related_request_id: related_request_id, + cancellation: cancellation, + ) + handler.call(params, server_context: server_context) + end + + # Stricter than `accepts_server_context?`: requires `server_context` to appear as a named keyword parameter + # (`:key` optional, `:keyreq` required). Positional parameters named `server_context` (`:req` / `:opt`) are NOT + # treated as opt-in - otherwise `handler.call(params, server_context: ctx)` would pass the `{server_context: ctx}` + # Hash as the handler's second positional argument, which is never what the user meant. + # + # `**kwargs`-only signatures (`:keyrest` without a named `server_context`) are also not opt-in here, + # because the dispatch site passes a positional `params`, and a `**kwargs`-only block cannot accept + # that positional argument (lambdas/methods raise `ArgumentError`; non-lambda procs silently drop `params`). + # Tool handlers intentionally allow `**kwargs` opt-in via `accepts_server_context?` because they are invoked + # via `tool.call(**args, server_context: …)` without a positional argument. + def handler_declares_server_context?(handler) + return false unless handler.respond_to?(:parameters) + + handler.parameters.any? do |type, name| + name == :server_context && (type == :key || type == :keyreq) + end + end + + # Builds an `MCP::ServerContext` used to give a handler access to session-scoped helpers + # (progress, cancellation, nested server-to-client requests). + def build_server_context(request:, session:, related_request_id:, cancellation:) + meta_source = request.is_a?(Hash) ? request : {} + progress_token = meta_source.dig(:_meta, :progressToken) + progress = Progress.new(notification_target: session, progress_token: progress_token, related_request_id: related_request_id) + ServerContext.new( + server_context_with_meta(meta_source), + progress: progress, + notification_target: session, + related_request_id: related_request_id, + cancellation: cancellation, + ) + end + def report_exception(exception, server_context = {}) configuration.exception_reporter.call(exception, server_context) end @@ -628,18 +745,32 @@ def error_tool_response(text) ).to_h end + # Whether a tool/prompt handler opts in to receiving an `MCP::ServerContext`. + # Recognizes `:keyrest` (`**kwargs`) because tools are invoked without a positional argument + # (`tool.call(**args, server_context:)`), soa `**kwargs`-only signature safely captures `server_context:`. + # Named keyword `server_context` must be `:key` or `:keyreq` - positional parameters (`:req` / `:opt`) that + # happen to be named `server_context` are excluded because the call site passes `server_context:` as a keyword, + # and a positional slot would receive the `{server_context: ctx}` Hash instead. def accepts_server_context?(method_object) parameters = method_object.parameters - parameters.any? { |type, name| type == :keyrest || name == :server_context } + parameters.any? do |type, name| + type == :keyrest || (name == :server_context && (type == :key || type == :keyreq)) + end end - def call_tool_with_args(tool, arguments, context, progress_token: nil, session: nil, related_request_id: nil) + def call_tool_with_args(tool, arguments, context, progress_token: nil, session: nil, related_request_id: nil, cancellation: nil) args = arguments&.transform_keys(&:to_sym) || {} if accepts_server_context?(tool.method(:call)) progress = Progress.new(notification_target: session, progress_token: progress_token, related_request_id: related_request_id) - server_context = ServerContext.new(context, progress: progress, notification_target: session, related_request_id: related_request_id) + server_context = ServerContext.new( + context, + progress: progress, + notification_target: session, + related_request_id: related_request_id, + cancellation: cancellation, + ) tool.call(**args, server_context: server_context).to_h else tool.call(**args).to_h diff --git a/lib/mcp/server/transports/stdio_transport.rb b/lib/mcp/server/transports/stdio_transport.rb index 3d6a4ff1..c96f5269 100644 --- a/lib/mcp/server/transports/stdio_transport.rb +++ b/lib/mcp/server/transports/stdio_transport.rb @@ -54,6 +54,13 @@ def send_notification(method, params = nil) false end + # NOTE: This signature deliberately matches the abstract `Transport#send_request` contract + # (`method, params = nil`) without the cancellation kwargs that `StreamableHTTPTransport#send_request` accepts. + # On Ruby 2.7 the project's supported minimum a method that mixes a positional `params` Hash with + # explicit keyword arguments cannot be called as `send_request(method, { ... })` - the trailing Hash would be + # auto-promoted to keyword arguments. Stdio is single-threaded and blocks on `$stdin.gets`, so nested-request + # cancellation has very limited value here regardless; servers that need cancellation propagation for nested + # server-to-client requests should use `StreamableHTTPTransport`. def send_request(method, params = nil) request_id = generate_request_id request = { jsonrpc: "2.0", id: request_id, method: method } diff --git a/lib/mcp/server/transports/streamable_http_transport.rb b/lib/mcp/server/transports/streamable_http_transport.rb index 284e7584..f7f7e245 100644 --- a/lib/mcp/server/transports/streamable_http_transport.rb +++ b/lib/mcp/server/transports/streamable_http_transport.rb @@ -175,7 +175,7 @@ def send_notification(method, params = nil, session_id: nil, related_request_id: # sends the request via SSE stream, then blocks on `queue.pop`. # When the client POSTs a response, `handle_response` matches it by `request_id` # and pushes the result onto the queue, unblocking this thread. - def send_request(method, params = nil, session_id: nil, related_request_id: nil) + def send_request(method, params = nil, session_id: nil, related_request_id: nil, parent_cancellation: nil, server_session: nil) if @stateless raise "Stateless mode does not support server-to-client requests." end @@ -190,6 +190,7 @@ def send_request(method, params = nil, session_id: nil, related_request_id: nil) request_id = generate_request_id queue = Queue.new + cancel_hook = nil request = { jsonrpc: "2.0", id: request_id, method: method } request[:params] = params if params @@ -229,6 +230,16 @@ def send_request(method, params = nil, session_id: nil, related_request_id: nil) raise "No active stream for #{method} request." end + if parent_cancellation && server_session + cancel_hook = parent_cancellation.on_cancel do |reason| + server_session.send_peer_cancellation( + nested_request_id: request_id, + related_request_id: related_request_id, + reason: reason, + ) + end + end + response = queue.pop if response.is_a?(Hash) && response.key?(:error) @@ -239,8 +250,18 @@ def send_request(method, params = nil, session_id: nil, related_request_id: nil) raise "SSE session closed while waiting for #{method} response." end + if response == :cancelled + reason = @mutex.synchronize { @pending_responses.dig(request_id, :cancel_reason) } + raise MCP::CancelledError.new( + "#{method} request was cancelled", + request_id: request_id, + reason: reason, + ) + end + response ensure + parent_cancellation.off_cancel(cancel_hook) if cancel_hook if request_id @mutex.synchronize do @pending_responses.delete(request_id) @@ -248,6 +269,24 @@ def send_request(method, params = nil, session_id: nil, related_request_id: nil) end end + # Unblocks a `send_request` awaiting a response when the peer is being cancelled. + # The waiting thread will see `:cancelled` on its queue and raise `MCP::CancelledError`. + # + # Race note: this is first-writer-wins on the pending-response queue. If a real response + # has already been pushed (client responded before the cancel hook fired), that response + # wins and `:cancelled` is enqueued behind it but never read - `send_request` returns + # the real response and deletes the pending entry in its `ensure` block. Conversely, + # if `:cancelled` arrives first, any later client response is silently dropped in `handle_response` + # because the pending entry has been removed. + def cancel_pending_request(request_id, reason: nil) + @mutex.synchronize do + if (pending = @pending_responses[request_id]) + pending[:cancel_reason] = reason + pending[:queue].push(:cancelled) + end + end + end + private def start_reaper_thread @@ -309,6 +348,7 @@ def handle_post(request) return missing_session_id_response if !@stateless && !session_id if notification?(body) + dispatch_notification(body_string, session_id) handle_accepted elsif response?(body) return session_not_found_response if !@stateless && !session_exists?(session_id) @@ -459,6 +499,22 @@ def notification?(body) !body[:id] && !!body[:method] end + # Dispatches a client-originated notification (e.g. `notifications/cancelled`, + # `notifications/initialized`) through the server so it can update session state. + def dispatch_notification(body_string, session_id) + server_session = nil + if session_id && !@stateless + @mutex.synchronize do + session = @sessions[session_id] + server_session = session[:server_session] if session + end + end + + dispatch_handle_json(body_string, server_session) + rescue => e + MCP.configuration.exception_reporter.call(e, { error: "Failed to dispatch notification" }) + end + def response?(body) !!body[:id] && !body[:method] end diff --git a/lib/mcp/server_context.rb b/lib/mcp/server_context.rb index 7c7b87ba..3962869d 100644 --- a/lib/mcp/server_context.rb +++ b/lib/mcp/server_context.rb @@ -2,11 +2,22 @@ module MCP class ServerContext - def initialize(context, progress:, notification_target:, related_request_id: nil) + attr_reader :cancellation + + def initialize(context, progress:, notification_target:, related_request_id: nil, cancellation: nil) @context = context @progress = progress @notification_target = notification_target @related_request_id = related_request_id + @cancellation = cancellation + end + + def cancelled? + !!@cancellation&.cancelled? + end + + def raise_if_cancelled! + @cancellation&.raise_if_cancelled! end # Reports progress for the current tool operation. diff --git a/lib/mcp/server_session.rb b/lib/mcp/server_session.rb index de5c2d05..7e99f28f 100644 --- a/lib/mcp/server_session.rb +++ b/lib/mcp/server_session.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require_relative "cancellation" require_relative "methods" module MCP @@ -15,6 +16,48 @@ def initialize(server:, transport:, session_id: nil) @client = nil @client_capabilities = nil @logging_message_notification = nil + @in_flight = {} + @in_flight_mutex = Mutex.new + end + + # Registers a `Cancellation` token for an in-flight request. + def register_in_flight(request_id) + return if request_id.nil? + + cancellation = Cancellation.new(request_id: request_id) + @in_flight_mutex.synchronize { @in_flight[request_id] = cancellation } + cancellation + end + + def unregister_in_flight(request_id) + return if request_id.nil? + + @in_flight_mutex.synchronize { @in_flight.delete(request_id) } + end + + def lookup_in_flight(request_id) + @in_flight_mutex.synchronize { @in_flight[request_id] } + end + + # Flips the `Cancellation` for a matching in-flight request received from the peer. + # Silently ignores unknown IDs per MCP spec (cancellation utilities, item 5). + def cancel_incoming(request_id:, reason: nil) + cancellation = lookup_in_flight(request_id) + cancellation&.cancel(reason: reason) + end + + # Sends `notifications/cancelled` to the peer for a previously-issued request. + # Also unblocks any transport-level `send_request` waiting on a response for `request_id`. + def cancel_request(request_id:, reason: nil) + params = { requestId: request_id } + params[:reason] = reason if reason + send_to_transport(Methods::NOTIFICATIONS_CANCELLED, params) + + if @transport.respond_to?(:cancel_pending_request) + @transport.cancel_pending_request(request_id, reason: reason) + end + rescue => e + MCP.configuration.exception_reporter.call(e, { notification: "cancelled", request_id: request_id }) end def handle(request) @@ -78,6 +121,23 @@ def create_url_elicitation(message:, url:, elicitation_id:, related_request_id: send_to_transport_request(Methods::ELICITATION_CREATE, params, related_request_id: related_request_id) end + # Sends `notifications/cancelled` to the peer for a nested server-to-client request + # that was started inside a now-cancelled parent request. `related_request_id` + # is the parent request id so the notification is routed to the same stream + # (e.g. the parent's POST response stream on `StreamableHTTPTransport`) rather than + # the GET SSE stream. + def send_peer_cancellation(nested_request_id:, related_request_id: nil, reason: nil) + params = { requestId: nested_request_id } + params[:reason] = reason if reason + send_to_transport(Methods::NOTIFICATIONS_CANCELLED, params, related_request_id: related_request_id) + + if @transport.respond_to?(:cancel_pending_request) + @transport.cancel_pending_request(nested_request_id, reason: reason) + end + rescue => e + MCP.configuration.exception_reporter.call(e, { notification: "cancelled", request_id: nested_request_id }) + end + # Sends an elicitation complete notification scoped to this session. def notify_elicitation_complete(elicitation_id:) send_to_transport(Methods::NOTIFICATIONS_ELICITATION_COMPLETE, { elicitationId: elicitation_id }) @@ -121,32 +181,57 @@ def notify_log_message(data:, level:, logger: nil, related_request_id: nil) private - # Branches on `@session_id` because `StdioTransport` creates a `ServerSession` without - # a `session_id` (`session_id: nil`), while `StreamableHTTPTransport` always provides one. - # - # TODO: When Ruby 2.7 support is dropped, replace with a direct call: - # `@transport.send_notification(method, params, session_id: @session_id)` and - # add `**` to `Transport#send_notification` and `StdioTransport#send_notification`. + # Forwards `send_notification` to the transport with only the kwargs the transport's method signature + # actually accepts. Custom transports that implement the abstract `send_notification(method, params = nil)` + # contract continue to work unchanged; bundled transports that declare `session_id:` / `related_request_id:` + # receive the session-scoped routing information. def send_to_transport(method, params, related_request_id: nil) - if @session_id - @transport.send_notification(method, params, session_id: @session_id, related_request_id: related_request_id) - else - @transport.send_notification(method, params) - end + kwargs = { + session_id: @session_id, + related_request_id: related_request_id, + }.compact + + forward_to_transport(@transport.method(:send_notification), method, params, kwargs) end - # Branches on `@session_id` because `StdioTransport` creates a `ServerSession` without - # a `session_id` (`session_id: nil`), while `StreamableHTTPTransport` always provides one. - # - # TODO: When Ruby 2.7 support is dropped, replace with a direct call: - # `@transport.send_request(method, params, session_id: @session_id)` and - # add `**` to `Transport#send_request` and `StdioTransport#send_request`. + # Forwards `send_request` to the transport with only the kwargs the transport's method signature + # actually accepts. Custom transports that implement the abstract `send_request(method, params = nil)` + # contract continue to work; bundled transports that declare `session_id:` / `related_request_id:` / + # `parent_cancellation:` / `server_session:` receive the nested-cancellation plumbing. + # When `related_request_id` names an in-flight request, its `Cancellation` token is looked up + # so that cancelling the parent also cancels this nested server-to-client request. def send_to_transport_request(method, params, related_request_id: nil) - if @session_id - @transport.send_request(method, params, session_id: @session_id, related_request_id: related_request_id) + parent_cancellation = related_request_id ? lookup_in_flight(related_request_id) : nil + + kwargs = { + session_id: @session_id, + related_request_id: related_request_id, + parent_cancellation: parent_cancellation, + server_session: self, + }.compact + + forward_to_transport(@transport.method(:send_request), method, params, kwargs) + end + + # Calls `transport_method(method, params, **supported)` where `supported` contains only the keys + # the transport's method signature accepts. This keeps bundled transports (which declare the new kwargs) + # working while preserving compatibility with custom transports that implement only the abstract + # `(method, params = nil)` contract. + def forward_to_transport(transport_method, method, params, kwargs) + parameters = transport_method.parameters + accepts_keyrest = parameters.any? { |type, _| type == :keyrest } + supported = if accepts_keyrest + kwargs else - @transport.send_request(method, params) + allowed = parameters.filter_map { |type, name| name if type == :key || type == :keyreq } + kwargs.slice(*allowed) end + + # Always splat `**supported` even when empty: on Ruby 2.7 the bare `transport_method.call(method, params)` + # form would let the trailing `params` Hash be auto-promoted to keyword arguments when the receiver + # accepts `**kwargs`, breaking handlers that rely on `params` arriving as a positional Hash. + # The explicit splat suppresses that conversion and is a no-op when `supported` is empty. + transport_method.call(method, params, **supported) end end end diff --git a/test/mcp/cancellation_test.rb b/test/mcp/cancellation_test.rb new file mode 100644 index 00000000..41449eb1 --- /dev/null +++ b/test/mcp/cancellation_test.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true + +require "test_helper" + +module MCP + class CancellationTest < ActiveSupport::TestCase + test "starts not cancelled" do + cancellation = Cancellation.new + + refute_predicate cancellation, :cancelled? + assert_nil cancellation.reason + end + + test "#cancel flips state and stores reason" do + cancellation = Cancellation.new + + assert cancellation.cancel(reason: "user requested") + assert_predicate cancellation, :cancelled? + assert_equal "user requested", cancellation.reason + end + + test "#cancel is idempotent" do + cancellation = Cancellation.new + + assert cancellation.cancel(reason: "first") + refute cancellation.cancel(reason: "second") + assert_equal "first", cancellation.reason + end + + test "#on_cancel fires once on cancellation" do + cancellation = Cancellation.new + fired = [] + + cancellation.on_cancel { |reason| fired << reason } + cancellation.cancel(reason: "stop") + cancellation.cancel(reason: "again") + + assert_equal ["stop"], fired + end + + test "#on_cancel fires immediately when already cancelled" do + cancellation = Cancellation.new + cancellation.cancel(reason: "done") + fired = [] + + cancellation.on_cancel { |reason| fired << reason } + + assert_equal ["done"], fired + end + + test "#raise_if_cancelled! raises CancelledError when cancelled" do + cancellation = Cancellation.new(request_id: "req-1") + cancellation.cancel(reason: "abort") + + error = assert_raises(CancelledError) { cancellation.raise_if_cancelled! } + assert_equal "req-1", error.request_id + assert_equal "abort", error.reason + end + + test "#raise_if_cancelled! is a no-op when not cancelled" do + cancellation = Cancellation.new + + assert_nil cancellation.raise_if_cancelled! + end + end +end diff --git a/test/mcp/server/transports/streamable_http_transport_test.rb b/test/mcp/server/transports/streamable_http_transport_test.rb index 6c1983d5..758a3d4f 100644 --- a/test/mcp/server/transports/streamable_http_transport_test.rb +++ b/test/mcp/server/transports/streamable_http_transport_test.rb @@ -2168,6 +2168,164 @@ def string assert_equal "correct", result[:content][:text] end + test "send_request with parent_cancellation unblocks with MCP::CancelledError when cancelled" do + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + io = StringIO.new + get_request = create_rack_request("GET", "/", { "HTTP_MCP_SESSION_ID" => session_id }) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + sleep(0.1) + + cancellation = MCP::Cancellation.new(request_id: "parent-1") + server_session = @transport.instance_variable_get(:@sessions)[session_id][:server_session] + + result_queue = Queue.new + Thread.new do + @transport.send_request( + "sampling/createMessage", + { messages: [] }, + session_id: session_id, + parent_cancellation: cancellation, + server_session: server_session, + ) + rescue => e + result_queue.push(e) + end + + sleep(0.1) + cancellation.cancel(reason: "parent cancelled") + + error = result_queue.pop + assert_kind_of MCP::CancelledError, error + end + + test "send_request deregisters on_cancel hook after completion so late parent cancel is a no-op" do + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + io = StringIO.new + get_request = create_rack_request("GET", "/", { "HTTP_MCP_SESSION_ID" => session_id }) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + sleep(0.1) + + cancellation = MCP::Cancellation.new(request_id: "parent-x") + server_session = @transport.instance_variable_get(:@sessions)[session_id][:server_session] + + result_queue = Queue.new + Thread.new do + result = @transport.send_request( + "sampling/createMessage", + { messages: [] }, + session_id: session_id, + parent_cancellation: cancellation, + server_session: server_session, + ) + result_queue.push(result) + end + + sleep(0.1) + + io.rewind + data_lines = io.read.lines.select { |line| line.start_with?("data: ") } + request_data = JSON.parse(data_lines.first.sub("data: ", "")) + nested_request_id = request_data["id"] + + # Client responds successfully; send_request completes normally. + client_response = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json", "HTTP_MCP_SESSION_ID" => session_id }, + { jsonrpc: "2.0", id: nested_request_id, result: { role: "assistant", content: { type: "text", text: "ok" } } }.to_json, + ) + @transport.handle_request(client_response) + + result = result_queue.pop + assert_equal "ok", result[:content][:text] + + # Snapshot what was written to the SSE stream before late cancel. + io.rewind + before_cancel = io.read + + # Parent is cancelled after the nested request already completed. The hook must + # have been deregistered, so no `notifications/cancelled` should go to the peer + # for the already-completed nested request. + cancellation.cancel(reason: "late") + + sleep(0.05) # Give any stray callback time to fire (none should). + + io.rewind + after_cancel = io.read + + refute_includes after_cancel.sub(before_cancel, ""), "notifications/cancelled" + end + + test "cancel_pending_request is no-op when a real response is already queued (race)" do + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + io = StringIO.new + get_request = create_rack_request("GET", "/", { "HTTP_MCP_SESSION_ID" => session_id }) + response = @transport.handle_request(get_request) + response[2].call(io) if response[2].is_a?(Proc) + + sleep(0.1) + + result_queue = Queue.new + Thread.new do + result = @transport.send_request( + "sampling/createMessage", + { messages: [] }, + session_id: session_id, + ) + result_queue.push(result) + end + + sleep(0.1) + + io.rewind + data_lines = io.read.lines.select { |line| line.start_with?("data: ") } + request_data = JSON.parse(data_lines.first.sub("data: ", "")) + request_id = request_data["id"] + + # Client responded first. + client_response = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json", "HTTP_MCP_SESSION_ID" => session_id }, + { jsonrpc: "2.0", id: request_id, result: { role: "assistant", content: { type: "text", text: "ok" } } }.to_json, + ) + @transport.handle_request(client_response) + + # Cancel arrives after the response was already enqueued. Must not clobber the result. + @transport.cancel_pending_request(request_id, reason: "late cancel") + + result = result_queue.pop + assert_equal "ok", result[:content][:text] + end + test "send_request raises on error response from client" do # Create session. init_request = create_rack_request( diff --git a/test/mcp/server_cancellation_test.rb b/test/mcp/server_cancellation_test.rb new file mode 100644 index 00000000..b0615081 --- /dev/null +++ b/test/mcp/server_cancellation_test.rb @@ -0,0 +1,627 @@ +# frozen_string_literal: true + +require "test_helper" + +module MCP + class ServerCancellationTest < ActiveSupport::TestCase + include InstrumentationTestHelper + + class MockTransport < Transport + attr_reader :requests, :notifications, :cancelled_request_ids + + def initialize(server) + super + @requests = [] + @notifications = [] + @cancelled_request_ids = [] + end + + def send_request(method, params = nil, **_kwargs) + @requests << { method: method, params: params } + {} + end + + def send_notification(method, params = nil, **_kwargs) + @notifications << { method: method, params: params } + true + end + + def cancel_pending_request(request_id, reason: nil) + @cancelled_request_ids << request_id + end + + def send_response(response); end + def open; end + def close; end + def handle_request(request); end + end + + setup do + configuration = MCP::Configuration.new + configuration.instrumentation_callback = instrumentation_helper.callback + + @server = Server.new(name: "test_server", version: "1.0.0", configuration: configuration) + @mock_transport = MockTransport.new(@server) + @server.transport = @mock_transport + @session = ServerSession.new(server: @server, transport: @mock_transport, session_id: "sess-1") + end + + test "cooperative cancellation via server_context suppresses response" do + observed_cancelled = false + + @server.define_tool(name: "slow") do |server_context:| + 20.times do + break if server_context.cancelled? + + sleep(0.01) + end + observed_cancelled = server_context.cancelled? + Tool::Response.new([{ type: "text", text: "ok" }]) + end + + request_id = 42 + + call_thread = Thread.new do + @session.handle( + jsonrpc: "2.0", + id: request_id, + method: Methods::TOOLS_CALL, + params: { name: "slow", arguments: {} }, + ) + end + + sleep(0.02) until @session.lookup_in_flight(request_id) + + @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: request_id, reason: "user aborted" }, + ) + + response = call_thread.value + + assert observed_cancelled, "tool handler should observe cancellation" + assert_nil response, "cancelled request must not emit a JSON-RPC response" + end + + test "tool raising CancelledError via raise_if_cancelled! suppresses the JSON-RPC response" do + @server.define_tool(name: "raising") do |server_context:| + 50.times do + server_context.raise_if_cancelled! + sleep(0.01) + end + Tool::Response.new([{ type: "text", text: "ok" }]) + end + + request_id = 99 + + call_thread = Thread.new do + @session.handle( + jsonrpc: "2.0", + id: request_id, + method: Methods::TOOLS_CALL, + params: { name: "raising", arguments: {} }, + ) + end + + sleep(0.02) until @session.lookup_in_flight(request_id) + + @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: request_id, reason: "raised" }, + ) + + response = call_thread.value + + assert_nil response, "raise_if_cancelled! must result in no JSON-RPC response" + end + + test "CancelledError raised from nested server-to-client call suppresses response" do + @server.define_tool(name: "nesting") do |server_context:| # rubocop:disable Lint/UnusedBlockArgument + # Simulate a transport-level CancelledError coming back from a nested + # send_request (e.g. sampling/createMessage after parent was cancelled). + raise MCP::CancelledError.new(request_id: "nested-xyz", reason: "parent aborted") + end + + response = @session.handle( + jsonrpc: "2.0", + id: 123, + method: Methods::TOOLS_CALL, + params: { name: "nesting", arguments: {} }, + ) + + assert_nil response, "CancelledError propagating from a nested call must suppress the JSON-RPC response" + end + + test "cancellation with unknown request id is silently ignored" do + response = @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: "does-not-exist" }, + ) + + assert_nil response + end + + test "duplicate cancellation for the same in-flight request is idempotent" do + @server.define_tool(name: "slow_dup") do |server_context:| + 20.times do + break if server_context.cancelled? + + sleep(0.01) + end + Tool::Response.new([{ type: "text", text: "ok" }]) + end + + request_id = 51 + + call_thread = Thread.new do + @session.handle( + jsonrpc: "2.0", + id: request_id, + method: Methods::TOOLS_CALL, + params: { name: "slow_dup", arguments: {} }, + ) + end + + sleep(0.02) until @session.lookup_in_flight(request_id) + + first = @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: request_id, reason: "first" }, + ) + + second = @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: request_id, reason: "second" }, + ) + + response = call_thread.value + + assert_nil first + assert_nil second, "duplicate cancel must not emit a response" + assert_nil response, "cancelled request must not emit a response" + end + + test "cancellation arriving after the request already completed is silently ignored" do + @server.define_tool(name: "quick") do + Tool::Response.new([{ type: "text", text: "done" }]) + end + + request_id = 61 + + response = @session.handle( + jsonrpc: "2.0", + id: request_id, + method: Methods::TOOLS_CALL, + params: { name: "quick", arguments: {} }, + ) + + assert response, "the tool should have completed successfully before the cancel arrives" + assert_nil @session.lookup_in_flight(request_id), "completed request must be unregistered" + + late = @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: request_id, reason: "late" }, + ) + + assert_nil late, "late cancel for a completed request must be silently ignored" + end + + test "initialize request cannot be cancelled" do + init_params = { + protocolVersion: Configuration::LATEST_STABLE_PROTOCOL_VERSION, + clientInfo: { name: "test", version: "1.0" }, + capabilities: {}, + } + + response = @session.handle( + jsonrpc: "2.0", + id: "init-1", + method: Methods::INITIALIZE, + params: init_params, + ) + + assert response, "initialize returns a response" + assert response[:result][:protocolVersion] + # The initialize request is not registered in the in-flight map. + assert_nil @session.lookup_in_flight("init-1") + end + + test "ServerSession#cancel_request sends notification and cancels pending" do + @session.cancel_request(request_id: "req-9", reason: "timeout") + + notification = @mock_transport.notifications.last + assert_equal Methods::NOTIFICATIONS_CANCELLED, notification[:method] + assert_equal "req-9", notification[:params][:requestId] + assert_equal "timeout", notification[:params][:reason] + + assert_includes @mock_transport.cancelled_request_ids, "req-9" + end + + test "parent cancellation propagates to nested server-to-client requests" do + @session.instance_variable_set(:@client_capabilities, { elicitation: {} }) + + parent_request_id = 101 + cancellation = @session.register_in_flight(parent_request_id) + + # Simulate a nested sampling/elicitation send via ServerSession. The transport + # is expected to receive `parent_cancellation:` and `server_session:` kwargs so + # it can install an `on_cancel` hook. Here we assert those kwargs are forwarded. + recorded = {} + (class << @mock_transport; self; end).define_method(:send_request) do |method, _params = nil, **kwargs| + recorded[:method] = method + recorded[:kwargs] = kwargs + {} + end + + @session.create_form_elicitation( + message: "ignored", + requested_schema: { type: "object", properties: {} }, + related_request_id: parent_request_id, + ) + + assert_equal Methods::ELICITATION_CREATE, recorded[:method] + assert_equal parent_request_id, recorded[:kwargs][:related_request_id] + assert_same cancellation, recorded[:kwargs][:parent_cancellation] + assert_same @session, recorded[:kwargs][:server_session] + end + + test "cancellation reason is recorded in instrumentation data" do + recorded_data = nil + @server.configuration.instrumentation_callback = ->(data) { recorded_data = data } + + @server.define_tool(name: "slow_with_reason") do |server_context:| + 50.times do + break if server_context.cancelled? + + sleep(0.01) + end + Tool::Response.new([{ type: "text", text: "ok" }]) + end + + request_id = 77 + + call_thread = Thread.new do + @session.handle( + jsonrpc: "2.0", + id: request_id, + method: Methods::TOOLS_CALL, + params: { name: "slow_with_reason", arguments: {} }, + ) + end + + sleep(0.02) until @session.lookup_in_flight(request_id) + + @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: request_id, reason: "explicit reason string" }, + ) + + call_thread.value + + assert recorded_data + assert recorded_data[:cancelled] + assert_equal "explicit reason string", recorded_data[:cancellation_reason] + end + + # Helper for the non-tools cancellation regression tests below. Spawns a background + # request, waits for the in-flight entry, sends `notifications/cancelled`, and + # returns [response, thread_result_flag_yielded_by_block]. + def drive_cancellation(request_id:, method:, params:) + result_flag = { observed: false } + call_thread = Thread.new do + @session.handle(jsonrpc: "2.0", id: request_id, method: method, params: params) + end + + sleep(0.02) until @session.lookup_in_flight(request_id) + + yield(result_flag) if block_given? + + @session.handle( + jsonrpc: "2.0", + method: Methods::NOTIFICATIONS_CANCELLED, + params: { requestId: request_id, reason: "caller aborted" }, + ) + + [call_thread.value, result_flag] + end + + test "resources/read handler that opts in to server_context observes cancellation" do + observed = false + @server.resources_read_handler do |_params, server_context:| + 50.times do + break if server_context.cancelled? + + sleep(0.01) + end + observed = server_context.cancelled? + [{ uri: "test://resource", text: "done" }] + end + + response, = drive_cancellation( + request_id: 201, + method: Methods::RESOURCES_READ, + params: { uri: "test://resource" }, + ) + + assert observed, "resources/read handler should observe cancellation via server_context" + assert_nil response, "cancelled resources/read must not emit a JSON-RPC response" + end + + test "completion/complete handler that opts in to server_context observes cancellation" do + observed = false + @server.capabilities[:completions] = {} + @server.completion_handler do |_params, server_context:| + 50.times do + break if server_context.cancelled? + + sleep(0.01) + end + observed = server_context.cancelled? + { completion: { values: ["v"], hasMore: false } } + end + + # completion/complete requires a known ref; register a dummy prompt. + @server.define_prompt(name: "dummy") { MCP::Prompt::Result.new(messages: []) } + + response, = drive_cancellation( + request_id: 202, + method: Methods::COMPLETION_COMPLETE, + params: { ref: { type: "ref/prompt", name: "dummy" }, argument: { name: "arg", value: "" } }, + ) + + assert observed, "completion/complete handler should observe cancellation via server_context" + assert_nil response, "cancelled completion/complete must not emit a JSON-RPC response" + end + + test "prompts/get template that opts in to server_context observes cancellation" do + observed = false + prompt_class = Class.new(MCP::Prompt) do + prompt_name "slow_prompt" + + define_singleton_method(:template) do |_args, server_context:| + 50.times do + break if server_context.cancelled? + + sleep(0.01) + end + observed = server_context.cancelled? + MCP::Prompt::Result.new(messages: []) + end + end + @server.prompts[prompt_class.name_value] = prompt_class + # Share `observed` between the closure above and the test scope. + @server.singleton_class.define_method(:_test_observed) { observed } + + response, = drive_cancellation( + request_id: 203, + method: Methods::PROMPTS_GET, + params: { name: "slow_prompt" }, + ) + + assert observed, "prompt template should observe cancellation via server_context" + assert_nil response, "cancelled prompts/get must not emit a JSON-RPC response" + end + + test "send_to_transport (notifications) works with a custom transport that only implements the abstract signature" do + # Regression: `ServerSession#send_to_transport` must not assume the + # transport's `send_notification` accepts the new kwargs + # (`session_id:` / `related_request_id:`). Custom transports that + # implement the abstract `Transport#send_notification(method, params = nil)` + # contract must keep working - this is exercised via `cancel_request` / + # `send_peer_cancellation` which the cancellation feature relies on. + minimal_transport = Class.new(Transport) do + attr_reader :recorded + + def initialize(server) + super + @recorded = [] + end + + def send_notification(method, params = nil) + @recorded << [method, params] + true + end + + def send_request(method, params = nil) + @recorded << [method, params] + {} + end + + def send_response(_); end + def open; end + def close; end + end.new(@server) + + session = ServerSession.new(server: @server, transport: minimal_transport) + + assert_nothing_raised do + session.cancel_request(request_id: "req-1", reason: "timeout") + end + + recorded = minimal_transport.recorded.last + assert_equal Methods::NOTIFICATIONS_CANCELLED, recorded[0] + assert_equal "req-1", recorded[1][:requestId] + assert_equal "timeout", recorded[1][:reason] + end + + test "send_to_transport_request works with a custom transport that only implements the abstract signature" do + # Regression: `ServerSession#send_to_transport_request` must not assume + # the transport's `send_request` accepts the new kwargs + # (`session_id:` / `related_request_id:` / `parent_cancellation:` / + # `server_session:`). Custom transports that implement the abstract + # `Transport#send_request(method, params = nil)` contract must keep working. + minimal_transport = Class.new(Transport) do + attr_reader :recorded + + def initialize(server) + super + @recorded = [] + end + + def send_request(method, params = nil) + @recorded << [method, params] + {} # fake response + end + + def send_notification(method, params = nil) + @recorded << [method, params] + true + end + + def send_response(_); end + def open; end + def close; end + end.new(@server) + + session = ServerSession.new(server: @server, transport: minimal_transport) + session.instance_variable_set(:@client_capabilities, { sampling: {} }) + + assert_nothing_raised do + session.create_sampling_message( + messages: [{ role: "user", content: { type: "text", text: "hi" } }], + max_tokens: 10, + ) + end + + recorded = minimal_transport.recorded.last + assert_equal Methods::SAMPLING_CREATE_MESSAGE, recorded[0] + refute_nil recorded[1] + end + + test "tool with positional `server_context` parameter is not auto-opted-in" do + # Regression: `accepts_server_context?` (used by tool/prompt dispatch) must + # require `server_context` as a *keyword* parameter. A tool whose `call` + # signature is `def self.call(arg, server_context)` (positional name collision) + # would previously have been opt-in, and `tool.call(**args, server_context: ctx)` + # would have blown up or passed the wrapped Hash to the wrong slot. + tool = Class.new(MCP::Tool) do + tool_name "positional_collision" + input_schema(properties: { a: { type: "string" } }, required: ["a"]) + + class << self + def call(a, server_context) + [a, server_context] + end + end + end + @server.tools[tool.name_value] = tool + + # Should dispatch without `server_context:` kwarg - the positional arg collision + # must NOT trigger opt-in. The tool receives only its declared positional (a). + response = @session.handle( + jsonrpc: "2.0", + id: 303, + method: Methods::TOOLS_CALL, + params: { name: "positional_collision", arguments: { a: "hello" } }, + ) + + # Because the tool signature isn't opt-in, `tool.call(**args)` is called + # (without `server_context:`). The method has `server_context` as a required + # positional, so missing it raises ArgumentError - caught as an internal error. + # The key assertion is that we do NOT pass `server_context:` kwarg that would + # flow into the positional slot as a Hash. + assert response.dig(:error), "dispatch should not silently inject server_context kwarg" + end + + test "handler with positional `server_context` parameter is not auto-opted-in" do + # Regression: `handler_declares_server_context?` must require `server_context` + # as a *keyword* parameter. A positional parameter that happens to be named + # `server_context` (rare but possible) would previously have been treated as + # opt-in, and the dispatch site would call `handler.call(params, server_context: ctx)` + # - the second positional argument would become the Hash `{server_context: ctx}`, + # which is never what the user intended. + received_args = nil + @server.define_custom_method(method_name: "custom/positional_name") do |params, server_context| + received_args = [params, server_context] + { ok: true } + end + + response = @session.handle( + jsonrpc: "2.0", + id: 302, + method: "custom/positional_name", + params: { hello: "world" }, + ) + + assert_equal [{ hello: "world" }, nil], + received_args, + "positional `server_context` must receive the single `params` argument, not an auto-wrapped context" + assert_equal({ ok: true }, response[:result]) + end + + test "handler with **kwargs-only signature is not auto-opted-in to server_context" do + # Regression: a block like `|**opts|` would have triggered opt-in under the + # looser `accepts_server_context?` check used by tools, and the dispatch site + # would then call `handler.call(params, server_context:)` - but a proc that + # only declares `**opts` cannot accept the positional `params` (lambdas raise + # ArgumentError, non-lambda procs silently drop it). The stricter + # `handler_declares_server_context?` check requires `server_context` as a + # named keyword to opt in, so a kwargs-only handler receives only `params`. + received_positional = nil + @server.define_custom_method(method_name: "custom/kwargs_only") do |params, **_opts| + received_positional = params + { ok: true } + end + + response = @session.handle( + jsonrpc: "2.0", + id: 301, + method: "custom/kwargs_only", + params: { hello: "world" }, + ) + + assert_equal({ hello: "world" }, received_positional) + assert_equal({ ok: true }, response[:result]) + end + + test "custom method handler that opts in to server_context observes cancellation" do + observed = false + @server.define_custom_method(method_name: "custom/slow") do |_params, server_context:| + 50.times do + break if server_context.cancelled? + + sleep(0.01) + end + observed = server_context.cancelled? + { ok: true } + end + + response, = drive_cancellation( + request_id: 204, + method: "custom/slow", + params: {}, + ) + + assert observed, "custom method handler should observe cancellation via server_context" + assert_nil response, "cancelled custom method must not emit a JSON-RPC response" + end + + test "send_peer_cancellation routes notification on parent stream via related_request_id" do + recorded_notifications = [] + (class << @mock_transport; self; end).define_method(:send_notification) do |method, params = nil, **kwargs| + recorded_notifications << { method: method, params: params, kwargs: kwargs } + true + end + + @session.send_peer_cancellation( + nested_request_id: "nested-1", + related_request_id: 42, + reason: "parent aborted", + ) + + notif = recorded_notifications.last + assert_equal Methods::NOTIFICATIONS_CANCELLED, notif[:method] + assert_equal "nested-1", notif[:params][:requestId] + assert_equal "parent aborted", notif[:params][:reason] + # Crucially, the cancel notification targets the parent's stream, not the GET SSE stream. + assert_equal 42, notif[:kwargs][:related_request_id] + end + end +end diff --git a/test/mcp/server_elicitation_test.rb b/test/mcp/server_elicitation_test.rb index 82dd414c..66a70f42 100644 --- a/test/mcp/server_elicitation_test.rb +++ b/test/mcp/server_elicitation_test.rb @@ -15,14 +15,14 @@ def initialize(server) @notifications = [] end - def send_request(method, params = nil) + def send_request(method, params = nil, **_kwargs) @requests << { method: method, params: params } { action: "accept" } end def send_response(response); end - def send_notification(method, params = nil) + def send_notification(method, params = nil, **_kwargs) @notifications << { method: method, params: params } end diff --git a/test/mcp/server_sampling_test.rb b/test/mcp/server_sampling_test.rb index ffb3539c..dd123564 100644 --- a/test/mcp/server_sampling_test.rb +++ b/test/mcp/server_sampling_test.rb @@ -14,7 +14,7 @@ def initialize(server) @requests = [] end - def send_request(method, params = nil) + def send_request(method, params = nil, **_kwargs) @requests << { method: method, params: params } { role: "assistant", @@ -25,7 +25,7 @@ def send_request(method, params = nil) end def send_response(response); end - def send_notification(method, params = nil); end + def send_notification(method, params = nil, **_kwargs); end def open; end def close; end end