Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,12 @@ func createTransport(server *protocol.MCPServerConfig) (sdk_mcp.Transport, error
switch server.Transport {
case "stdio":
return NewStdioTransport(server.Command, server.Args, server.Env), nil
case "sse":
return NewSSETransport(server.URL, server.Headers, server.DynamicHeaders), nil
case "streamable-http", "sse":
// Both labels map to the SDK's Streamable HTTP client. "sse" is a legacy
// alias kept for back-compat: existing configs use it, but the runner has
// always spoken Streamable HTTP for URL-based servers, never the old
// HTTP+SSE protocol.
return NewStreamableHTTPTransport(server.URL, server.Headers, server.DynamicHeaders), nil
default:
return nil, fmt.Errorf("unsupported transport type '%s'", server.Transport)
}
Expand Down
12 changes: 8 additions & 4 deletions mcp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ func isValidEnvVar(name string) bool {
return !strings.Contains(name, "=") && !strings.Contains(name, "\x00")
}

// NewSSETransport creates a new SSE transport for MCP.
func NewSSETransport(endpoint string, headers map[string]string, dynamicHeaders map[string]string) sdk_mcp.Transport {
slog.Info("mcp creating SSE transport",
// NewStreamableHTTPTransport creates a Streamable HTTP transport for MCP.
//
// Despite the historical name, this returns the SDK's StreamableClientTransport
// (the modern "Streamable HTTP" transport), not the legacy HTTP+SSE client. Both
// the "streamable-http" and legacy "sse" config labels route here.
func NewStreamableHTTPTransport(endpoint string, headers map[string]string, dynamicHeaders map[string]string) sdk_mcp.Transport {
slog.Info("mcp creating Streamable HTTP transport",
"endpoint", endpoint,
"headers_count", len(headers),
"dynamic_headers_count", len(dynamicHeaders),
Expand All @@ -65,7 +69,7 @@ type headerTransport struct {
}

// newHeaderTransport creates a headerTransport with the given static headers
// and http.DefaultTransport as the base. Used by tests and NewSSETransport.
// and http.DefaultTransport as the base. Used by tests and NewStreamableHTTPTransport.
func newHeaderTransport(headers map[string]string) *headerTransport {
return &headerTransport{
headers: headers,
Expand Down
106 changes: 106 additions & 0 deletions mcp/transport_streamable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package mcp

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/flashcatcloud/flashduty-runner/protocol"

sdk_mcp "github.com/modelcontextprotocol/go-sdk/mcp"
)

// newTestStreamableServer starts an in-process MCP server speaking the
// Streamable HTTP transport, exposing a single "ping" tool.
func newTestStreamableServer(t *testing.T) *httptest.Server {
t.Helper()
srv := sdk_mcp.NewServer(&sdk_mcp.Implementation{Name: "test-server", Version: "1.0.0"}, nil)
srv.AddTool(
&sdk_mcp.Tool{
Name: "ping",
Description: "returns pong",
InputSchema: json.RawMessage(`{"type":"object"}`),
},
func(_ context.Context, _ *sdk_mcp.CallToolRequest) (*sdk_mcp.CallToolResult, error) {
return &sdk_mcp.CallToolResult{
Content: []sdk_mcp.Content{&sdk_mcp.TextContent{Text: "pong"}},
}, nil
},
)
handler := sdk_mcp.NewStreamableHTTPHandler(func(*http.Request) *sdk_mcp.Server { return srv }, nil)
ts := httptest.NewServer(handler)
t.Cleanup(ts.Close)
return ts
}

// TestListToolsConnectsOverStreamableHTTP reproduces the production failure where
// a server configured with transport "streamable-http" was rejected up front with
// "unsupported transport type 'streamable-http'". The runner has always spoken the
// Streamable HTTP wire protocol for URL-based servers; both the modern
// "streamable-http" label and the legacy "sse" alias must connect to the same
// Streamable HTTP server.
func TestListToolsConnectsOverStreamableHTTP(t *testing.T) {
ts := newTestStreamableServer(t)

// The server is shared across subtests (it is stateless); each subtest uses
// a fresh ClientManager and a distinct server name, so sessions never collide.
for _, transport := range []string{"streamable-http", "sse"} {
t.Run(transport, func(t *testing.T) {
m := NewClientManager()
t.Cleanup(m.Close)

tools, err := m.ListTools(context.Background(), &protocol.MCPServerConfig{
Name: "test-" + transport,
Transport: transport,
URL: ts.URL,
}, nil)
if err != nil {
t.Fatalf("ListTools over %q transport: %v", transport, err)
}
if len(tools) != 1 || tools[0].Name != "ping" {
t.Fatalf("expected [ping], got %+v", tools)
}
})
}
}

// TestCreateTransport pins the transport-string dispatch: "stdio" maps to the
// command transport, both "sse" and "streamable-http" map to the SDK Streamable
// HTTP client (the same wire protocol — only the label differed, which is what
// broke production), and unknown types error.
func TestCreateTransport(t *testing.T) {
errCases := []string{"", "websocket"}
for _, transport := range errCases {
t.Run("err/"+transport, func(t *testing.T) {
if _, err := createTransport(&protocol.MCPServerConfig{Transport: transport}); err == nil {
t.Fatalf("expected error for transport %q", transport)
}
})
}

t.Run("stdio", func(t *testing.T) {
tr, err := createTransport(&protocol.MCPServerConfig{Transport: "stdio", Command: "true"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if _, ok := tr.(*sdk_mcp.CommandTransport); !ok {
t.Fatalf("got %T, want *CommandTransport", tr)
}
})

// "sse" and "streamable-http" must produce the same concrete transport type:
// the SDK Streamable HTTP client.
for _, transport := range []string{"sse", "streamable-http"} {
t.Run("http/"+transport, func(t *testing.T) {
tr, err := createTransport(&protocol.MCPServerConfig{Transport: transport, URL: "http://example.test/mcp"})
if err != nil {
t.Fatalf("createTransport(%q): %v", transport, err)
}
if _, ok := tr.(*sdk_mcp.StreamableClientTransport); !ok {
t.Fatalf("transport %q: got %T, want *StreamableClientTransport", transport, tr)
}
})
}
}
2 changes: 1 addition & 1 deletion protocol/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ type TaskCancelPayload struct {
// MCPServerConfig is the MCP server configuration passed from cloud.
type MCPServerConfig struct {
Name string `json:"name"`
Transport string `json:"transport"` // stdio, sse
Transport string `json:"transport"` // stdio, streamable-http, sse (legacy alias for streamable-http)
Command string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
Env map[string]string `json:"env,omitempty"`
Expand Down
Loading