From 220d24018c3618be7e17e75b1ec15fa3d0f5ee43 Mon Sep 17 00:00:00 2001 From: ysyneu Date: Mon, 8 Jun 2026 17:58:30 +0800 Subject: [PATCH] fix(mcp): support "streamable-http" transport for remote MCP servers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit createTransport only handled "stdio" and "sse", so a server configured with transport "streamable-http" was rejected up front with "unsupported transport type 'streamable-http'" and the agent silently proceeded without the tool. The "sse" branch was already returning the SDK's StreamableClientTransport (the modern Streamable HTTP client), not the legacy HTTP+SSE transport — so "sse" and "streamable-http" are the same wire protocol, only the label differed. Route both labels to that transport and rename the misleading NewSSETransport -> NewStreamableHTTPTransport so the code matches reality. "sse" stays as a back-compat alias; existing deployments are unaffected. Adds an end-to-end test that stands up an in-process Streamable HTTP MCP server and asserts both labels connect and list tools. --- mcp/client.go | 8 ++- mcp/transport.go | 12 ++-- mcp/transport_streamable_test.go | 106 +++++++++++++++++++++++++++++++ protocol/messages.go | 2 +- 4 files changed, 121 insertions(+), 7 deletions(-) create mode 100644 mcp/transport_streamable_test.go diff --git a/mcp/client.go b/mcp/client.go index d35b6f4..73b4b2d 100644 --- a/mcp/client.go +++ b/mcp/client.go @@ -261,8 +261,12 @@ func createTransport(server *protocol.MCPServerConfig) (sdk_mcp.Transport, error switch server.Transport { case "stdio": return NewStdioTransport(server.Command, server.Args, server.Env), nil - case "sse": - return NewSSETransport(server.URL, server.Headers, server.DynamicHeaders), nil + case "streamable-http", "sse": + // Both labels map to the SDK's Streamable HTTP client. "sse" is a legacy + // alias kept for back-compat: existing configs use it, but the runner has + // always spoken Streamable HTTP for URL-based servers, never the old + // HTTP+SSE protocol. + return NewStreamableHTTPTransport(server.URL, server.Headers, server.DynamicHeaders), nil default: return nil, fmt.Errorf("unsupported transport type '%s'", server.Transport) } diff --git a/mcp/transport.go b/mcp/transport.go index 68a1fb7..2a3632f 100644 --- a/mcp/transport.go +++ b/mcp/transport.go @@ -36,9 +36,13 @@ func isValidEnvVar(name string) bool { return !strings.Contains(name, "=") && !strings.Contains(name, "\x00") } -// NewSSETransport creates a new SSE transport for MCP. -func NewSSETransport(endpoint string, headers map[string]string, dynamicHeaders map[string]string) sdk_mcp.Transport { - slog.Info("mcp creating SSE transport", +// NewStreamableHTTPTransport creates a Streamable HTTP transport for MCP. +// +// Despite the historical name, this returns the SDK's StreamableClientTransport +// (the modern "Streamable HTTP" transport), not the legacy HTTP+SSE client. Both +// the "streamable-http" and legacy "sse" config labels route here. +func NewStreamableHTTPTransport(endpoint string, headers map[string]string, dynamicHeaders map[string]string) sdk_mcp.Transport { + slog.Info("mcp creating Streamable HTTP transport", "endpoint", endpoint, "headers_count", len(headers), "dynamic_headers_count", len(dynamicHeaders), @@ -65,7 +69,7 @@ type headerTransport struct { } // newHeaderTransport creates a headerTransport with the given static headers -// and http.DefaultTransport as the base. Used by tests and NewSSETransport. +// and http.DefaultTransport as the base. Used by tests and NewStreamableHTTPTransport. func newHeaderTransport(headers map[string]string) *headerTransport { return &headerTransport{ headers: headers, diff --git a/mcp/transport_streamable_test.go b/mcp/transport_streamable_test.go new file mode 100644 index 0000000..4c1b255 --- /dev/null +++ b/mcp/transport_streamable_test.go @@ -0,0 +1,106 @@ +package mcp + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/flashcatcloud/flashduty-runner/protocol" + + sdk_mcp "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// newTestStreamableServer starts an in-process MCP server speaking the +// Streamable HTTP transport, exposing a single "ping" tool. +func newTestStreamableServer(t *testing.T) *httptest.Server { + t.Helper() + srv := sdk_mcp.NewServer(&sdk_mcp.Implementation{Name: "test-server", Version: "1.0.0"}, nil) + srv.AddTool( + &sdk_mcp.Tool{ + Name: "ping", + Description: "returns pong", + InputSchema: json.RawMessage(`{"type":"object"}`), + }, + func(_ context.Context, _ *sdk_mcp.CallToolRequest) (*sdk_mcp.CallToolResult, error) { + return &sdk_mcp.CallToolResult{ + Content: []sdk_mcp.Content{&sdk_mcp.TextContent{Text: "pong"}}, + }, nil + }, + ) + handler := sdk_mcp.NewStreamableHTTPHandler(func(*http.Request) *sdk_mcp.Server { return srv }, nil) + ts := httptest.NewServer(handler) + t.Cleanup(ts.Close) + return ts +} + +// TestListToolsConnectsOverStreamableHTTP reproduces the production failure where +// a server configured with transport "streamable-http" was rejected up front with +// "unsupported transport type 'streamable-http'". The runner has always spoken the +// Streamable HTTP wire protocol for URL-based servers; both the modern +// "streamable-http" label and the legacy "sse" alias must connect to the same +// Streamable HTTP server. +func TestListToolsConnectsOverStreamableHTTP(t *testing.T) { + ts := newTestStreamableServer(t) + + // The server is shared across subtests (it is stateless); each subtest uses + // a fresh ClientManager and a distinct server name, so sessions never collide. + for _, transport := range []string{"streamable-http", "sse"} { + t.Run(transport, func(t *testing.T) { + m := NewClientManager() + t.Cleanup(m.Close) + + tools, err := m.ListTools(context.Background(), &protocol.MCPServerConfig{ + Name: "test-" + transport, + Transport: transport, + URL: ts.URL, + }, nil) + if err != nil { + t.Fatalf("ListTools over %q transport: %v", transport, err) + } + if len(tools) != 1 || tools[0].Name != "ping" { + t.Fatalf("expected [ping], got %+v", tools) + } + }) + } +} + +// TestCreateTransport pins the transport-string dispatch: "stdio" maps to the +// command transport, both "sse" and "streamable-http" map to the SDK Streamable +// HTTP client (the same wire protocol — only the label differed, which is what +// broke production), and unknown types error. +func TestCreateTransport(t *testing.T) { + errCases := []string{"", "websocket"} + for _, transport := range errCases { + t.Run("err/"+transport, func(t *testing.T) { + if _, err := createTransport(&protocol.MCPServerConfig{Transport: transport}); err == nil { + t.Fatalf("expected error for transport %q", transport) + } + }) + } + + t.Run("stdio", func(t *testing.T) { + tr, err := createTransport(&protocol.MCPServerConfig{Transport: "stdio", Command: "true"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok := tr.(*sdk_mcp.CommandTransport); !ok { + t.Fatalf("got %T, want *CommandTransport", tr) + } + }) + + // "sse" and "streamable-http" must produce the same concrete transport type: + // the SDK Streamable HTTP client. + for _, transport := range []string{"sse", "streamable-http"} { + t.Run("http/"+transport, func(t *testing.T) { + tr, err := createTransport(&protocol.MCPServerConfig{Transport: transport, URL: "http://example.test/mcp"}) + if err != nil { + t.Fatalf("createTransport(%q): %v", transport, err) + } + if _, ok := tr.(*sdk_mcp.StreamableClientTransport); !ok { + t.Fatalf("transport %q: got %T, want *StreamableClientTransport", transport, tr) + } + }) + } +} diff --git a/protocol/messages.go b/protocol/messages.go index 4716536..cd35dd0 100644 --- a/protocol/messages.go +++ b/protocol/messages.go @@ -380,7 +380,7 @@ type TaskCancelPayload struct { // MCPServerConfig is the MCP server configuration passed from cloud. type MCPServerConfig struct { Name string `json:"name"` - Transport string `json:"transport"` // stdio, sse + Transport string `json:"transport"` // stdio, streamable-http, sse (legacy alias for streamable-http) Command string `json:"command,omitempty"` Args []string `json:"args,omitempty"` Env map[string]string `json:"env,omitempty"`