远程通信方式
通信方式:
- Stdio: 推荐,高效、简洁、本地
- Streamable HTTP: 远程
前置知识
SSE 全称 Server-Sent Events,中文是“服务器发送事件”。是一种基于 HTTP 的单向通信协议,由浏览器发起连接,服务器可以持续不断地向客户端推送数据。
你可以把它想象成:“浏览器打开一个通道,然后服务器不断地往里面发消息。”
SSE 特点
- 协议:基于 HTTP(长连接)
- 方向:单向:服务器 -> 客户端
- 格式:文本流,内容类型为
text/event-stream
- 浏览器支持:所有现代浏览器都支持(IE 除外)
- 应用场景:单方面需要推送的时候。实时通知、消息流、状态更新、股票/天气数据等
消息格式
SSE 协议规定,服务器以 text/event-stream
格式不断推送消息,每条消息格式如下:
event: 事件名 # 可选,默认是 message 事件
id: 唯一ID # 可选
retry: 3000 # 客户端断线重连间隔,单位毫秒,可选
data: 内容 # 必需,可以多行
每条消息用空行 \n\n
作为结尾。
事件类型
如果服务器发送的数据中没有指定事件类型,浏览器端会将其作为默认事件类型 message
来处理:
data: 这是默认消息(data 代表要发送的消息)
客户端监听方式:
eventSource.addEventListener("message", (e) => {console.log("默认事件:", e.data);
});
可以自定义事件名:使用 event:
字段
event: update(事件名)
data: 新的更新内容
客户端监听方式:
eventSource.addEventListener("update", (e) => {console.log("收到 update 事件:", e.data);
});
课堂练习
SSE 服务器推送信息示例
StreamableHTTP
Streamable HTTP 是 MCP 中 用于 Web 环境 的通信方式。
客户端基于 HTTP POST 发送 JSON-RPC 请求,例如:
initialize
callTool
listResources
sequenceDiagramparticipant Clientparticipant ServerClient->>Server: POST /mcp(initialize / callTool)Server-->>Client: JSON一次性响应 或者 SSE 流响应
服务端可返回:
- 普通 JSON 响应(application/json)
- 流式 SSE 响应(text/event-stream)
另外,客户端还可以和服务端建立 持久 SSE 连接,用于监听以下事件:
notifications/resources/list_changed
notifications/tools/list_changed
sequenceDiagramparticipant Clientparticipant ServerClient->>Server: GET /mcp(建立 SSE)Server-->>Client: 推送 notifications(如 list_changed)
StreamableHTTP 使用场景
- 通过 HTTP 接收 远程 请求(如前端网页、API 网关)
- 需要支持 多客户端 并发访问
- 浏览器与 MCP Server 通信
- 流式响应(如 SSE 推送)需求
官方接口
在官方 SDK 里面,提供了相应的接口:StreamableHTTPServerTransport
使用方式:
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { randomUUID } from "crypto";const transport = new StreamableHTTPServerTransport({sessionIdGenerator: () => randomUUID(), // 为每个连接生成唯一会话ID
});
该接口内部提供了一系列的方法,其中需要了解的,是 handleRequest 方法。
handleRequest(req: IncomingMessage,res: ServerResponse,parsedBody: unknown // 解析后的请求体
): Promise<void>;
内部处理流程:
- 解析 body,识别 JSON-RPC 方法(如 "initialize", "callTool")
- 将请求路由给 MCP Server 的对应处理函数
- 根据返回结果的 Content-Type 自动来决定是普通 JSON 响应还是流式响应
http-server.js
import express from "express";
import { setCommonHeaders } from "./utils.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { randomUUID } from "crypto";
import { createMCPServer } from "./mcp-server.js";const app = express(); // 创建 express 服务器// 存储所有的连接
// 键:sessionId
// 值:sessionId 对应的连接
const transports = {};// 添加JSON解析中间件
app.use(express.json());app.get("/mcp", (req, res) => {setCommonHeaders(res);// 代表当前的 GET 方法请求不被允许// 服务器支持的方法是 POST 方法,而不是当前请求的方法res.status(405).set("Allow", "POST").send("当前服务器不支持GET方法,仅支持POST方法");
});app.post("/mcp", async (req, res) => {setCommonHeaders(res);// 处理当前的这一次请求try {// 先提取一些信息const body = req.body; // 拿到请求体const method = body?.method; // 拿到这一次请求体里面的方法 initialize、tools/call、tools/listconst sessionId = req.headers["mcp-session-id"]; // 当前这一次连接的 sessionIdconst transport = sessionId && transports[sessionId]; // 拿到 sessionId 对应的连接if (!transport && method === "initialize") {// 进入此分支,说明当前这一次请求是一个初始化请求// 创建一个新的 StreamableHTTP 类型的连接const newTransport = new StreamableHTTPServerTransport({sessionIdGenerator: () => randomUUID(), // 为每个连接生成唯一会话ID});// 为当前的连接绑定一个 close 事件// 会在客户端关闭连接的时候触发newTransport.onclose = () => {// 需要做清理工作if (newTransport.sessionId) {delete transports[newTransport.sessionId];}};// 为当前的这个新连接,创建 MCP Serverconst mcpServer = createMCPServer();await mcpServer.connect(newTransport);// 根据具体的方法,连接 MCP Server 去做处理// 如果是初始化请求,还会生成 sessionIdawait newTransport.handleRequest(req, res, body); // 调用该方法处理这一次的请求if (newTransport.sessionId) {// 进行一个存储transports[newTransport.sessionId] = newTransport;}return;}// 没有进入上面的分支,说明是非初始化请求if (transport) {// 说明连接是存在await transport.handleRequest(req, res, body);return;}// 没有 transport(没有连接),又不是做初始化res.status(400).json({error: "非法的请求",message: "非法的 SessionId 或者非初始化操作",});} catch (err) {console.error(`出错了,对应信息${err.message}`);}
});const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {console.log(`MCP Server 运行在 http://localhost:${PORT}/mcp`);
});
mcp-server.js
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";export const createMCPServer = () => {const server = new McpServer({name: "http-mcp-server",version: "0.1.0",});server.registerTool("两数之和",{title: "数字加法计算器",description: "计算两个数字的和",inputSchema: {num1: z.number().describe("第一个数字"),num2: z.number().describe("第二个数字"),},},async ({ num1, num2 }) => {return {content: [{type: "text",text: `计算结果: ${num1} + ${num2} = ${num1 + num2}!!!`,},],};});return server;
};
utils.js
// 设置响应头的方法
export function setCommonHeaders(res) {// 允许哪些域(Origin)可以访问该服务res.setHeader("Access-Control-Allow-Origin", "*");// 允许客户端在跨域请求中使用哪些 HTTP 方法res.setHeader("Access-Control-Allow-Methods", "POST, GET, DELETE, OPTIONS");// 指定客户端请求时允许携带的自定义请求头res.setHeader("Access-Control-Allow-Headers", "Content-Type, mcp-session-id");// 允许前端 JS 访问响应中的哪些自定义头res.setHeader("Access-Control-Expose-Headers", "mcp-session-id");
}