跳到主要内容
模型上下文协议 (MCP) 中的传输方式为客户端与服务器之间的通信奠定了基础。传输层负责处理消息发送和接收的底层机制。

消息格式

MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。 共使用三种类型的 JSON-RPC 消息:

请求 (Requests)

{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}

响应 (Responses)

{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}

通知

{
  jsonrpc: "2.0",
  method: string,
  params?: object
}

内置传输类型

MCP 目前定义了两种标准传输机制

标准输入/输出 (stdio)

stdio 传输通过标准输入和输出流实现通信。这对于本地集成和命令行工具特别有用。 在以下情况下使用 stdio:
  • 构建命令行工具
  • 实现本地集成
  • 需要简单的进程间通信
  • 配合 Shell 脚本工作

服务器

const server = new Server(
  {
    name: "example-server",
    version: "1.0.0",
  },
  {
    capabilities: {},
  },
);

const transport = new StdioServerTransport();
await server.connect(transport);

客户端

const client = new Client(
  {
    name: "example-client",
    version: "1.0.0",
  },
  {
    capabilities: {},
  },
);

const transport = new StdioClientTransport({
  command: "./server",
  args: ["--option", "value"],
});
await client.connect(transport);

可流式 HTTP (Streamable HTTP)

Streamable HTTP 传输使用 HTTP POST 请求进行客户端到服务器的通信,并可选地使用服务器发送事件 (SSE) 流进行服务器到客户端的通信。 在以下情况下使用 Streamable HTTP:
  • 构建基于 Web 的集成
  • 需要通过 HTTP 进行客户端-服务器通信
  • 需要有状态会话
  • 支持多个并发客户端
  • 实现可恢复连接

工作原理

  1. 客户端到服务器通信:来自客户端的每个 JSON-RPC 消息都会作为新的 HTTP POST 请求发送到 MCP 端点
  2. 服务器响应:服务器可以通过以下方式响应:
    • 单个 JSON 响应 (Content-Type: application/json)
    • 用于多条消息的 SSE 流 (Content-Type: text/event-stream)
  3. 服务器到客户端通信:服务器可以通过以下方式向客户端发送请求/通知:
    • 由客户端请求发起的 SSE 流
    • 通过对 MCP 端点发起 HTTP GET 请求获得的 SSE 流

服务器

import express from "express";

const app = express();

const server = new Server(
  {
    name: "example-server",
    version: "1.0.0",
  },
  {
    capabilities: {},
  },
);

// MCP endpoint handles both POST and GET
app.post("/mcp", async (req, res) => {
  // Handle JSON-RPC request
  const response = await server.handleRequest(req.body);

  // Return single response or SSE stream
  if (needsStreaming) {
    res.setHeader("Content-Type", "text/event-stream");
    // Send SSE events...
  } else {
    res.json(response);
  }
});

app.get("/mcp", (req, res) => {
  // Optional: Support server-initiated SSE streams
  res.setHeader("Content-Type", "text/event-stream");
  // Send server notifications/requests...
});

app.listen(3000);

客户端

const client = new Client(
  {
    name: "example-client",
    version: "1.0.0",
  },
  {
    capabilities: {},
  },
);

const transport = new HttpClientTransport(new URL("https://:3000/mcp"));
await client.connect(transport);

会话管理

Streamable HTTP 支持有状态会话,以便在多个请求之间保持上下文
  1. 会话初始化:服务器可以在初始化期间通过在 Mcp-Session-Id 标头中包含会话 ID 来分配它
  2. 会话持久化:客户端必须在随后所有的请求中使用 Mcp-Session-Id 标头包含该会话 ID
  3. 会话终止:可以通过发送带有会话 ID 的 HTTP DELETE 请求来显式终止会话
会话流程示例
// Server assigns session ID during initialization
app.post("/mcp", (req, res) => {
  if (req.body.method === "initialize") {
    const sessionId = generateSecureId();
    res.setHeader("Mcp-Session-Id", sessionId);
    // Store session state...
  }
  // Handle request...
});

// Client includes session ID in subsequent requests
fetch("/mcp", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Mcp-Session-Id": sessionId,
  },
  body: JSON.stringify(request),
});

可恢复性与重新交付

为了支持恢复中断的连接,Streamable HTTP 提供了:
  1. 事件 ID:服务器可以为 SSE 事件附加唯一 ID 以进行追踪
  2. 从最后一个事件恢复:客户端可以通过发送 Last-Event-ID 标头来恢复
  3. 消息重播:服务器可以从断开连接点开始重播错过的消息
这确保了即使在网络连接不稳定的情况下也能实现可靠的消息交付。

安全注意事项

在实现 Streamable HTTP 传输时,请遵循以下安全最佳实践
  1. 验证 Origin 标头:始终验证所有传入连接的 Origin 标头,以防止 DNS 重新绑定攻击
  2. 绑定到 Localhost:在本地运行时,仅绑定到 localhost (127.0.0.1),而不是所有网络接口 (0.0.0.0)
  3. 实现身份验证:为所有连接使用适当的身份验证
  4. 使用 HTTPS:在生产部署中始终使用 TLS/HTTPS
  5. 验证会话 ID:确保会话 ID 在加密上是安全的且经过正确验证
如果没有这些保护措施,攻击者可能会利用 DNS 重新绑定,从远程网站与本地 MCP 服务器进行交互。

服务器发送事件 (SSE) - 已弃用

自 2024-11-05 协议版本起,作为独立传输方式的 SSE 已被弃用。它已被 Streamable HTTP 取代,后者将 SSE 作为可选的流式机制。有关向后兼容性的信息,请参阅下文的向后兼容性部分。
旧版 SSE 传输支持服务器到客户端的流式传输,并使用 HTTP POST 请求进行客户端到服务器的通信。 以前用于:
  • 仅需要服务器到客户端流式传输的情况
  • 在受限网络中工作
  • 实现简单的更新

旧版安全性考量

已弃用的 SSE 传输具有与 Streamable HTTP 类似的安全性考量,特别是关于 DNS 重新绑定攻击。在 Streamable HTTP 传输中使用 SSE 流时,应应用同样的保护措施。

服务器

import express from "express";

const app = express();

const server = new Server(
  {
    name: "example-server",
    version: "1.0.0",
  },
  {
    capabilities: {},
  },
);

let transport: SSEServerTransport | null = null;

app.get("/sse", (req, res) => {
  transport = new SSEServerTransport("/messages", res);
  server.connect(transport);
});

app.post("/messages", (req, res) => {
  if (transport) {
    transport.handlePostMessage(req, res);
  }
});

app.listen(3000);

客户端

const client = new Client(
  {
    name: "example-client",
    version: "1.0.0",
  },
  {
    capabilities: {},
  },
);

const transport = new SSEClientTransport(new URL("https://:3000/sse"));
await client.connect(transport);

自定义传输方式

MCP 使得为特定需求实现自定义传输方式变得容易。任何传输实现只需符合 Transport 接口即可: 您可以为以下场景实现自定义传输:
  • 自定义网络协议
  • 专门的通信渠道
  • 与现有系统集成
  • 性能优化
interface Transport {
  // Start processing messages
  start(): Promise<void>;

  // Send a JSON-RPC message
  send(message: JSONRPCMessage): Promise<void>;

  // Close the connection
  close(): Promise<void>;

  // Callbacks
  onclose?: () => void;
  onerror?: (error: Error) => void;
  onmessage?: (message: JSONRPCMessage) => void;
}

错误处理

传输实现应处理各种错误场景
  1. 连接错误
  2. 消息解析错误
  3. 协议错误
  4. 网络超时
  5. 资源清理
错误处理示例
class ExampleTransport implements Transport {
  async start() {
    try {
      // Connection logic
    } catch (error) {
      this.onerror?.(new Error(`Failed to connect: ${error}`));
      throw error;
    }
  }

  async send(message: JSONRPCMessage) {
    try {
      // Sending logic
    } catch (error) {
      this.onerror?.(new Error(`Failed to send message: ${error}`));
      throw error;
    }
  }
}

最佳实践

在实现或使用 MCP 传输时
  1. 正确处理连接生命周期
  2. 实现适当的错误处理
  3. 在连接关闭时清理资源
  4. 使用适当的超时时间
  5. 在发送前验证消息
  6. 记录传输事件以便调试
  7. 在适当时实现重连逻辑
  8. 处理消息队列中的背压
  9. 监控连接健康状况
  10. 实施适当的安全措施

安全注意事项

在实现传输时

身份验证与授权

  • 实现适当的身份验证机制
  • 验证客户端凭据
  • 使用安全的令牌处理
  • 实施授权检查

数据安全

  • 为网络传输使用 TLS
  • 加密敏感数据
  • 验证消息完整性
  • 实施消息大小限制
  • 清理输入数据

网络安全

  • 实施速率限制
  • 使用适当的超时时间
  • 处理拒绝服务 (DoS) 场景
  • 监控异常模式
  • 实施适当的防火墙规则
  • 对于基于 HTTP 的传输(包括 Streamable HTTP),验证 Origin 标头以防止 DNS 重新绑定攻击
  • 对于本地服务器,仅绑定到 localhost (127.0.0.1) 而不是所有接口 (0.0.0.0)

调试传输层

传输问题调试技巧
  1. 启用调试日志
  2. 监控消息流
  3. 检查连接状态
  4. 验证消息格式
  5. 测试错误场景
  6. 使用网络分析工具
  7. 实现健康检查
  8. 监控资源使用情况
  9. 测试边缘情况
  10. 使用适当的错误追踪

向后兼容性

为了保持不同协议版本之间的兼容性

针对支持旧版客户端的服务器

希望支持使用已弃用 HTTP+SSE 传输的客户端的服务器应:
  1. 在新的 MCP 端点旁同时托管旧的 SSE 和 POST 端点
  2. 在两个端点上处理初始化请求
  3. 为每种传输类型维护独立的处理逻辑

针对支持旧版服务器的客户端

希望支持使用已弃用传输方式的服务器的客户端应:
  1. 接受可能使用任一传输方式的服务器 URL
  2. 尝试发送带有正确 Accept 标头的 InitializeRequest POST 请求
    • 如果成功,则使用 Streamable HTTP 传输
    • 如果失败并返回 4xx 状态码,则回退到旧版 SSE 传输
  3. 针对旧版服务器,发起一个期望得到包含 endpoint 事件的 SSE 流的 GET 请求
兼容性检测示例
async function detectTransport(serverUrl: string): Promise<TransportType> {
  try {
    // Try Streamable HTTP first
    const response = await fetch(serverUrl, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Accept: "application/json, text/event-stream",
      },
      body: JSON.stringify({
        jsonrpc: "2.0",
        method: "initialize",
        params: {
          /* ... */
        },
      }),
    });

    if (response.ok) {
      return "streamable-http";
    }
  } catch (error) {
    // Fall back to legacy SSE
    const sseResponse = await fetch(serverUrl, {
      method: "GET",
      headers: { Accept: "text/event-stream" },
    });

    if (sseResponse.ok) {
      return "legacy-sse";
    }
  }

  throw new Error("Unsupported transport");
}