MCP 传输机制(Transport)是 MCP 客户端与 MCP 服务器通信的一个桥梁,定义了客户端与服务器通信的细节,帮助客户端和服务器交换消息。
MCP 协议使用 JSON-RPC 来编码消息。JSON-RPC 消息必须使用 UTF-8 编码。
MCP 协议目前定义了三种传输机制用于客户端-服务器通信:
MCP 协议要求客户端应尽可能支持 stdio。
MCP 协议的传输机制是可插拔的,也就是说,客户端和服务器不局限于 MCP 协议标准定义的这几种传输机制,也可以通过自定义的传输机制来实现通信。
stdio 即 standard input & output(标准输入 / 输出)。
是 MCP 协议推荐使用的一种传输机制,主要用于本地进程通信。
在 stdio 传输中:
参考 MCP 官方的 typescript-sdk 来看 stdio 传输机制是如何实现的:
以命令行的方式,在本地启动 MCP 服务器:
npx -y mcp-server-time
MCP 服务器启动时,会创建 stdio 通信管道(pipeline),用于跟 MCP 客户端进行消息通信。在 MCP 客户端发送关闭信号,或者 MCP 服务器异常退出之前,这个通信管道会一直保持,常驻进程。
export class StdioServerTransport implements Transport {
private _readBuffer: ReadBuffer = new ReadBuffer();
private _started = false;
constructor(
private _stdin: Readable = process.stdin,
private _stdout: Writable = process.stdout
) {}
onclose?: () =>void;
onerror?: (error: Error) =>void;
onmessage?: (message: JSONRPCMessage) =>void;
}
MCP 客户端把消息发到通信管道。MCP 服务器通过标准输入 stdin 读取客户端发送的消息,以换行符:\n 作为读取完成标识。
MCP 服务器读取到的有效消息,是 JSON-RPC 编码的结构体。
readMessage(): JSONRPCMessage | null {
if (!this._buffer) {
returnnull;
}
const index = this._buffer.indexOf("\n");
if (index === -1) {
returnnull;
}
const line = this._buffer.toString("utf8", 0, index).replace(/\r$/, '');
this._buffer = this._buffer.subarray(index + 1);
return deserializeMessage(line);
}
MCP 服务器运行完内部逻辑,需要给 MCP 客户端响应消息。
MCP 服务器先用 JSON-RPC 编码消息,再把消息写入标准输出 stdout。
send(message: JSONRPCMessage): Promise<void> {
return new Promise((resolve) => {
const json = serializeMessage(message);
if (this._stdout.write(json)) {
resolve();
} else {
this._stdout.once("drain", resolve);
}
});
}
}
MCP 客户端从 MCP 服务器的标准输出 stdout 读取消息,获得 MCP 服务器的响应内容。
MCP 客户端退出,给 MCP 服务器发送关闭信号。
MCP 服务器通过 stdio 通信管道读到客户端发送的终止信号,或者内部运行错误,主动关闭 stdio 通信管道。
stdio 通信管道关闭之后,MCP 客户端与 MCP 服务器之间不能再相互发送消息,直到再次建立 stdio 通信管道。
async close(): Promise<void> {
// Remove our event listeners first
this._stdin.off("data", this._ondata);
this._stdin.off("error", this._onerror);
// Check if we were the only data listener
const remainingDataListeners = this._stdin.listenerCount('data');
if (remainingDataListeners === 0) {
// Only pause stdin if we were the only listener
// This prevents interfering with other parts of the application that might be using stdin
this._stdin.pause();
}
// Clear the buffer and notify closure
this._readBuffer.clear();
this.onclose?.();
}
stdio 传输机制主要依靠本地进程通信实现。
主要的优势是:
也有一些局限性:
stdio 传输适用于要操作的数据资源位于本地计算机,且不希望暴露外部访问的场景。
比如,你希望通过一个聊天客户端,来总结你的微信消息,微信消息文件存储在你的本地电脑,外部访问不了,也不应该访问。
这种情况,你可以实现一个 MCP 服务器来读取你电脑上的微信消息文件,通过 stdio 传输接收 MCP 客户端的访问请求。
如果你要访问的是一个远程服务器上的文件,也可以使用 stdio 传输,流程会复杂一些:
既然 stdio 传输访问远程资源这么麻烦,是不是应该有一种更适合远程资源访问的传输机制?
当然有。可以使用 SSE 传输。
MCP 协议使用 SSE(Server-Sent Events) 传输来解决远程资源访问的问题。底层是基于 HTTP 通信,通过类似 API 的方式,让 MCP 客户端直接访问远程资源,而不用通过 stdio 传输做中转。
在 SSE 传输中,服务器作为一个独立进程运行,可以处理多个客户端连接。
服务器必须提供两个端点:
当客户端连接时,服务器必须发送一个包含客户端用于发送消息的 URL 的端点事件。所有后续客户端消息必须作为 HTTP POST 请求发送到该端点。
服务器消息作为 SSE 消息事件发送,消息内容以 JSON 格式编码在事件数据中。
参考 MCP 官方的 typescript-sdk 来看 SSE 传输机制是如何实现的:
系统管理员在远程服务器(也可以是本地电脑)输入命令,启动 MCP 服务器,监听服务器端口,对外暴露 HTTP 接口。
const server = new McpServer({
name: "example-server",
version: "1.0.0",
});
const app = express();
app.get("/sse", async (_: Request, res: Response) => {
const transport = new SSEServerTransport("/messages", res);
await server.connect(transport);
});
app.post("/messages", async (req: Request, res: Response) => {
await transport.handlePostMessage(req, res);
});
app.listen(3001);
在这个示例中,使用了 express 框架,启动了一个 HTTP 服务,监听在 3001 端口,对外暴露了两个端点:
解析一个 MCP 客户端可访问的域名到 MCP 服务器。比如:abc.mcp.so
MCP 客户端请求 MCP 服务器的 URL 地址:https://abc.mcp.so:3001/sse 与 MCP 服务器建立连接,MCP 服务器需要给 MCP 客户端返回一个用于消息通信的地址:
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
});
const messagesUrl = "https://abc.mcp.so:3001/messages?sessionId=xxx";
res.write(`event: endpoint\ndata: ${messagesUrl}\n\n`);
MCP 客户端从 MCP 服务器返回的 endpoint 事件中得到了消息通信的地址,与 MCP 服务器建立 SSE 连接成功。
MCP 客户端通过 POST 请求把消息发到这个消息通信地址,与 MCP 服务器进行消息交互。
MCP 客户端在与 MCP 服务器建立 SSE 连接之后,开始给 MCP 服务器返回的通信地址发送消息。
MCP 协议中的 SSE 传输是双通道响应机制。也就是说,MCP 服务器在接收到 MCP 客户端的消息之后,既要给当前的请求回复一个响应,也要给之前建立的 SSE 连接发送一条响应消息。(通知类型的消息,不需要给 SSE 连接发消息)
举个例子,MCP 客户端与 MCP 服务器建立 SSE 连接之后,给 MCP 服务器发送的第一条消息,用于初始化阶段做能力协商。
MCP 客户端请求示例:
curl -X POST https://abc.mcp.so/messages?sessionId=xxx \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": "1",
"method": "initialize",
"params": {
"protocolVersion": "1.0",
"capabilities": {},
"clientInfo": {
"name": "mcp-client",
"version": "1.0.0"
}
}
}'
MCP 服务器从 HTTP 请求体里面读取 MCP 客户端发送的消息:
async handlePostMessage(
req: IncomingMessage,
res: ServerResponse,
parsedBody?: unknown,
): Promise<void> {
if (!this._sseResponse) {
const message = "SSE connection not established";
res.writeHead(500).end(message);
thrownewError(message);
}
let body: string | unknown;
try {
const ct = contentType.parse(req.headers["content-type"] ?? "");
if (ct.type !== "application/json") {
thrownewError(`Unsupported content-type: ${ct}`);
}
body = parsedBody ?? await getRawBody(req, {
limit: MAXIMUM_MESSAGE_SIZE,
encoding: ct.parameters.charset ?? "utf-8",
});
} catch (error) {
res.writeHead(400).end(String(error));
this.onerror?.(error asError);
return;
}
try {
awaitthis.handleMessage(typeof body === 'string' ? JSON.parse(body) : body);
} catch {
res.writeHead(400).end(`Invalid message: ${body}`);
return;
}
res.writeHead(202).end("Accepted");
}
先给当前请求,响应一个 HTTP 202 状态码,告知 MCP 客户端,请求已收到。
然后 MCP 服务器运行内部逻辑,完成业务功能。
再给之前与 MCP 客户端建立的 SSE 连接发送一个事件消息,data 里面放 JSON-RPC 编码的消息内容:
async send(message: JSONRPCMessage): Promise<void> {
if (!this._sseResponse) {
throw new Error("Not connected");
}
this._sseResponse.write(
`event: message\ndata: ${JSON.stringify(message)}\n\n`,
);
}
MCP 客户端根据 MCP 服务器同步响应的 2** 状态码,判断 MCP 服务器已经接到请求,并开始读取 MCP 服务器发到 SSE 连接的消息内容。
MCP 客户端从与 MCP 服务器建立的 SSE 连接中读取 event: message 事件消息,获得 MCP 服务器发送的业务数据 data。
MCP 客户端与 MCP 服务器建立的 SSE 连接,应该是 1:1 的。为了防止串数据的问题,在建立 SSE 连接阶段,MCP 服务器返回的通信地址,应该为当前连接分配一个唯一标识,叫做 sessionId,给 MCP 客户端返回的通信地址带上这个标识,比如 /messages?sessionId=xxx。
在消息交互阶段,MCP 服务器根据 MCP 客户端请求地址参数里面的 sessionId,找到之前建立的 SSE 连接,并只给这个 SSE 连接发送消息。
MCP 服务器与 MCP 客户端双方都可能会主动断开 SSE 连接。
还保持连接的一方,应该加上必要的连接检测和超时关闭机制。
比如通过 SSE 连接,给对方定时发送一条心跳检测消息,如果多次无响应,可以认作对方已断开连接,此时可以主动关闭 SSE 连接,避免资源泄露。
一个用 go 实现的心跳检测和超时关闭示例:
// Setup heartbeat ticker
heartbeatInterval := 30 * time.Second
heartbeatTicker := time.NewTicker(heartbeatInterval)
defer heartbeatTicker.Stop()
// Setup idle timeout
idleTimeout := 5 * time.Minute
idleTimer := time.NewTimer(idleTimeout)
defer idleTimer.Stop()
gofunc() {
for {
select {
case <-session.Done():
return
case <-heartbeatTicker.C:
// Send heartbeat
if err := writer.SendHeartbeat(); err != nil {
session.Close()
return
}
case <-idleTimer.C:
// Close connection due to inactivity
session.Close()
return
}
}
}()
当使用 SSE 传输时,服务器一方需要实现一些必要的安全防护措施:
如果没有这些保护措施,攻击者可能会使用 DNS 重绑定从远程网站与本地 MCP 服务器交互。
SSE 传输适用于 MCP 客户端与 MCP 服务器不在同一个网络下的通信场景。
比如,你希望在本地电脑,通过对话的方式,查询你云服务器上的数据库。你就可以在你的云服务器上部署一个 MCP 服务器,去读取数据库,再跟你本地电脑上的 MCP 客户端建立连接通信。
当然,所有用 SSE 传输实现的 MCP 服务器,理论上都可以通过 stdio 传输 + API 的方式实现。
区别在于:
SSE 传输主要解决远程资源访问的问题,依靠 HTTP 协议实现底层通信。
SSE 传输的主要优势:
SSE 传输的主要劣势与问题:
正因为这些问题,MCP 协议已经引入了新的 Streamable HTTP 传输机制(2025-03-26 版本)来替代 SSE,并计划废弃 SSE 传输。新的传输机制保留了 HTTP 的基础,但支持更灵活的连接方式,更适合现代云架构和无服务器环境。
Streamable HTTP 传输是 MCP 协议在 2025-03-26 版本中引入的新传输机制,用于替代之前的 SSE 传输。
在 Streamable HTTP 传输中,服务器作为一个独立进程运行,可以处理多个客户端连接。此传输使用 HTTP POST 和 GET 请求,服务器可以选择使用服务器发送事件(SSE)来流式传输多个服务器消息。
服务器必须提供一个同时支持 POST 和 GET 方法的单个 HTTP 端点。例如:https://xyz.mcp.so/mcp。
跟 SSE 传输不同的点在于,Streamable HTTP 传输中,客户端与服务器的消息交互,基本上是“一来一回”的(单通道响应)。而这个“一来一回”的消息交互,可能会有很多种组合类型。
参考 MCP 官方的 typescript-sdk 来看 Streamable HTTP 传输机制是如何实现的:
跟 SSE 传输机制一样,Streamable HTTP 传输本质上也是基于 HTTP 协议通信,需要先启动一个 HTTP 服务:
const server = new McpServer({
name: "example-server",
version: "1.0.0",
});
const app = express();
app.all("/mcp", async (req: Request, res: Response) => {
const transport = new StreamableHTTPServerTransport();
await server.connect(transport);
await transport.handleMessage(req, res);
});
app.listen(3002);
跟 SSE 传输不一样的点在于,Streamable HTTP 传输只需要暴露一个端点(endpoint),来接收各种类型的客户端请求(GET / POST / DELETE)。
比如在这个例子,使用 express 框架,启动了一个 HTTP 服务,监听在 3002 端口,对外暴露了一个端点:
解析一个 MCP 客户端可访问的域名到 MCP 服务器。比如:xyz.mcp.so
在 MCP 服务器启动成功之后,MCP 客户端可以直接给 MCP 服务器暴露的地址发送消息。
跟 SSE 传输不同,Streamable HTTP 传输机制中,MCP 客户端给服务器发送消息,无需先跟一个端点建立 SSE 连接,再给另一个端点发消息。而是单通道模式,即客户端给服务器发消息,直接获得服务器的响应内容。
MCP 客户端可以使用 GET 或者 POST 请求给服务器发消息,每个请求必须设置请求头 Accept,传递以下两个值:
MCP 客户端请求示例:
curl -X POST https://xyz.mcp.so/mcp \
-H "Content-Type: application/json" \
-H "Accept: application/json, text/event-stream" \
-d '{
"jsonrpc": "2.0",
"id": "1",
"method": "initialize",
"params": {
"protocolVersion": "1.0",
"capabilities": {},
"clientInfo": {
"name": "mcp-client",
"version": "1.0.0"
}
}
}'
Streamable HTTP 传输机制下,MCP 客户端与服务器通信的几个要点:
Streamable HTTP 传输既支持无状态的请求:每一次请求都是独立的,无需记录状态。
也支持有状态的请求:一次新的请求,可能需要同步之前的请求 / 响应信息作为参考。这种情况叫做:会话保持。
如果需要保持会话,MCP 服务器与 MCP 客户端之间的交互应该遵守以下原则:
MCP 服务器验证会话的一个示例:
/**
* Validates session ID for non-initialization requests
* Returns true if the session is valid, false otherwise
*/
private validateSession(req: IncomingMessage, res: ServerResponse): boolean {
if (!this._initialized) {
// If the server has not been initialized yet, reject all requests
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: Server not initialized"
},
id: null
}));
returnfalse;
}
if (this.sessionId === undefined) {
// If the session ID is not set, the session management is disabled
// and we don't need to validate the session ID
returntrue;
}
const sessionId = req.headers["mcp-session-id"];
if (!sessionId) {
// Non-initialization requests without a session ID should return 400 Bad Request
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: Mcp-Session-Id header is required"
},
id: null
}));
returnfalse;
} elseif (Array.isArray(sessionId)) {
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: Mcp-Session-Id header must be a single value"
},
id: null
}));
returnfalse;
}
elseif (sessionId !== this.sessionId) {
// Reject requests with invalid session ID with 404 Not Found
res.writeHead(404).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32001,
message: "Session not found"
},
id: null
}));
returnfalse;
}
returntrue;
}
Streamable HTTP 传输,如果客户端与服务器使用 SSE 连接通信,断开连接的方式跟 SSE 传输断开连接的方式一致。
可以由连接的任意一方主动断开连接。还保持着连接的一方,需要实现心跳检测和超时机制,以便能及时关闭连接,避免资源泄露。
Streamable HTTP 传输比起 SSE 传输,做了一些改进,支持恢复已中断的连接,重新发送可能丢失的消息:
支持断点重连的 Streamable HTTP 传输,在消息传输方面会比 SSE 传输更加可靠。
Streamable HTTP 传输机制结合了 SSE 传输的远程访问能力和无状态 HTTP 的灵活性,同时解决了 SSE 传输中的许多问题。
主要优势:
主要劣势:
Streamable HTTP 传输适用于:
与 SSE 传输相比,Streamable HTTP 传输是一个更全面、更灵活的解决方案,特别适合现代云原生应用和无服务器环境。
MCP 客户端和 MCP 服务器可以实现额外的自定义传输机制以满足其特定需求。MCP 协议与传输无关,可以在支持双向消息交换的任何通信通道上实现。
选择支持自定义传输的实现者必须确保他们保留由 MCP 定义的 JSON-RPC 消息格式和生命周期要求。自定义传输应记录其特定的连接建立和消息交换模式,以实现互操作性。
用一个实际的例子来说明,如何实现一个自定义传输,来满足特定的需求。
目前市面上大部分的 MCP 服务器是使用 stdio 和 SSE 传输机制实现的,用户要使用这些 MCP 服务器,需要拉代码到本地运行,使用门槛有点高。
我们希望实现一个 MCP 代理服务,在云上部署,让用户仅需要配置一个 URL 即可接入。由这个云端部署的 MCP 代理去对接第三方的 MCP 服务器。
大致的流程是:
这个 MCP 代理服务以 HTTP 的形式提供接入,需要支持并发调用。这个 MCP 代理服务作为 stdio 传输或者 SSE 传输的客户端,发送消息给后台的 MCP 服务器。
按照这个方案,MCP 代理服务跟后台部署的第三方服务器之间的交互存在着一些问题:
1.如果后台对接的 MCP 服务器是基于 stdio 传输实现的,MCP 代理每接到一个用户请求,都需要调用 MCP 服务器创建一个独立的 stdio 通信进程。服务器开销特别大(进程创建和销毁、上下文切换,内存隔离等操作,都很消耗服务器资源)。
2.如果后台对接的 MCP 服务器是基于 SSE 传输实现的,从前面对 SSE 传输的分析我们知道,MCP 代理需要跟后台部署的 MCP 服务器之间保持一个 SSE 连接。如果每个用户请求都需要维持一个 SSE 的长连接,对服务器也是不小的压力。
为了解决这些问题,对于 MCP 代理和其背后连接的 MCP 服务器,可以想到的一些优化方案:
基于以上几点分析,我们可以设计一套自定义的传输机制,来改造在分布式集群部署的第三方 MCP 服务器。
这套自定义的传输机制应该具备的几个特性:
虽然 Streamable HTTP 传输通过配置参数也能实现这些功能,但是我们还是希望能自定义一套更简单,更直接,零配置的传输机制。
我们把这个新的传输机制取名为:Restful HTTP Transport,简称 Rest Transport
因为要改造的主要是 MCP 服务器,接下来主要讲解如何实现一个服务器使用的 Rest Server Transport.
参考 MCP 官方实现的 Streamable HTTP Transport,我们来实现这个自定义的 Rest Server Transport。
/**
* Server transport for Synchronous HTTP: a stateless implementation for direct HTTP responses.
* It supports concurrent requests with no streaming, no SSE, and no persistent connections.
*/
export class RestServerTransport implements Transport {
// ...
}
/**
* Start the HTTP server
*/
async startServer(): Promise<void> {
if (this._server) {
thrownewError("Server is already running");
}
this._server = express();
this._server.post(this._endpoint, (req, res) => {
this.handleRequest(req, res, req.body);
});
returnnewPromise((resolve, reject) => {
try {
this._httpServer = this._server!.listen(this._port, () => {
console.log(
`Server is running on http://localhost:${this._port}${this._endpoint}`
);
resolve();
});
this._httpServer.on("error", (error) => {
console.error("Server error:", error);
this.onerror?.(error);
});
} catch (error) {
reject(error);
}
});
}
跟 Streamable HTTP 传输一样,自定义的传输在启动 HTTP 服务器之后,我们也只暴露一个端点来接收客户端请求,这个端点和 HTTP 服务器监听的端口,都可以在启动服务器的时候自定义。
export interface RestServerTransportOptions {
endpoint?: string;
port?: string | number;
}
跟 Streamable HTTP 传输最主要的区别,我们自定义的这个传输,仅支持客户端发起 POST 请求,客户端也只会收到 application/json 响应,不会收到 text/event-stream 响应。
客户端与服务器使用短连接通信,不会有 Connection: "keep-alive"。服务器响应完,与客户端的连接就会断开。
客户端与服务器之间的消息交互是无状态的,所以客户端无需传递 Mcp-Session-Id,服务器也不会判断客户端的会话有效性,不会维护任何 session 相关的数据。
Rest Server Transport 处理用户请求的主要实现逻辑:
/**
* Handles an incoming HTTP request
*/
async handleRequest(
req: IncomingMessage,
res: ServerResponse,
parsedBody?: unknown
): Promise<void> {
if (req.method === "POST") {
awaitthis.handlePostRequest(req, res, parsedBody);
} else {
res.writeHead(405).end(
JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed",
},
id: null,
})
);
}
}
/**
* Handles POST requests containing JSON-RPC messages
*/
privateasync handlePostRequest(
req: IncomingMessage,
res: ServerResponse,
parsedBody?: unknown
): Promise<void> {
try {
// validate the Accept header
const acceptHeader = req.headers.accept;
if (
acceptHeader &&
acceptHeader !== "*/*" &&
!acceptHeader.includes("application/json")
) {
res.writeHead(406).end(
JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Not Acceptable: Client must accept application/json",
},
id: null,
})
);
return;
}
const ct = req.headers["content-type"];
if (!ct || !ct.includes("application/json")) {
res.writeHead(415).end(
JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message:
"Unsupported Media Type: Content-Type must be application/json",
},
id: null,
})
);
return;
}
let rawMessage;
if (parsedBody !== undefined) {
rawMessage = parsedBody;
} else {
const parsedCt = contentType.parse(ct);
const body = await getRawBody(req, {
limit: MAXIMUM_MESSAGE_SIZE,
encoding: parsedCt.parameters.charset ?? "utf-8",
});
rawMessage = JSON.parse(body.toString());
}
let messages: JSONRPCMessage[];
// handle batch and single messages
if (Array.isArray(rawMessage)) {
messages = rawMessage.map((msg) => JSONRPCMessageSchema.parse(msg));
} else {
messages = [JSONRPCMessageSchema.parse(rawMessage)];
}
// check if it contains requests
const hasRequests = messages.some(
(msg) =>"method"in msg && "id"in msg
);
const hasOnlyNotifications = messages.every(
(msg) =>"method"in msg && !("id"in msg)
);
if (hasOnlyNotifications) {
// if it only contains notifications, return 202
res.writeHead(202).end();
// handle each message
for (const message of messages) {
this.onmessage?.(message);
}
} elseif (hasRequests) {
// Create a unique identifier for this request batch
const requestBatchId = randomUUID();
// Extract the request IDs that we need to collect responses for
const requestIds = messages
.filter((msg) =>"method"in msg && "id"in msg)
.map((msg) =>String(msg.id));
// Set up a promise that will be resolved with all the responses
const responsePromise = newPromise<JSONRPCMessage[]>((resolve) => {
this._pendingRequests.set(requestBatchId, {
resolve,
responseMessages: [],
requestIds,
});
});
// Processallmessages
for (const message of messages) {
this.onmessage?.(message);
}
// Waitforresponsesandsendthem
constresponses = awaitPromise.race([
responsePromise,
// 30 second timeout
new Promise<JSONRPCMessage[]>((resolve) =>
setTimeout(() => resolve([]), 30000)
),
]);
// Cleanupthependingrequest
this._pendingRequests.delete(requestBatchId);
// Setresponseheaders
constheaders: Record<string, string> = {
"Content-Type": "application/json",
};
res.writeHead(200, headers);
// FormattheresponseaccordingtoJSON-RPCspec
constresponseBody = responses.length === 1 ? responses[0] : responses;
res.end(JSON.stringify(responseBody));
}
} catch (error) {
// returnJSON-RPCformattederror
res.writeHead(400).end(
JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32700,
message: "Parse error",
data: String(error),
},
id: null,
})
);
this.onerror?.(error as Error);
}
}
MCP 服务器接到客户端请求后,把请求数据发到内部实现的各个功能函数内,得到响应内容后,用 JSON-RPC 编码,响应给 MCP 客户端。
Rest Server Transport 发送消息的主要实现逻辑:
async send(message: JSONRPCMessage): Promise<void> {
// Only process response messages
if (!("id"in message) || !("result"in message || "error"in message)) {
return;
}
const messageId = String(message.id);
// Find the pending request that is waiting for this response
for (const [batchId, pendingRequest] of this._pendingRequests.entries()) {
if (pendingRequest.requestIds.includes(messageId)) {
// Add this response to the collection
pendingRequest.responseMessages.push(message);
// If we've collected all responses for this batch, resolve the promise
if (
pendingRequest.responseMessages.length ===
pendingRequest.requestIds.length
) {
pendingRequest.resolve(pendingRequest.responseMessages);
}
break;
}
}
}
Streamable HTTP / SSE 以及我们自定义的 Rest Transport,关闭服务的逻辑都是一致的。
因为服务启动时是启动了一个 HTTP 服务器,所以服务关闭时,只需要关闭这个 HTTP 服务器即可。
Rest Server Transport 实现的关闭服务的主要逻辑:
async stopServer(): Promise<void> {
if (this._httpServer) {
returnnewPromise((resolve, reject) => {
this._httpServer!.close((err) => {
if (err) {
reject(err);
return;
}
this._server = null;
this._httpServer = null;
resolve();
});
});
}
}
在实现完这个自定义的传输后,我们就可以把代码打包,发布到 npm 公共仓库。这样第三方 MCP 服务器都可以使用这个 Transport 来改造自己的实现。
比如,我用 typescript 实现的 Rest Server Transport 封装在 @chatmcp/sdk 这个 npm 包里面,第三方服务器可以这样引入:
import { RestServerTransport } from "@chatmcp/sdk/server/index.js";
为了把第三方 MCP 服务器部署到云端,支持多租户并发调用,我们需要对第三方 MCP 服务器做一定的改造。
以 Perplexity Ask Server 这个 MCP 服务器为例,来演示具体的改造流程:
npm install @chatmcp/sdk
通过 @chatmcp/sdk 提供的 getParamValue 方法,可以获得 MCP 服务器启动时候的参数,从命令行读取,或者从环境变量读取:
import { getParamValue } from "@chatmcp/sdk/utils/index.js";
const perplexityApiKey = getParamValue("perplexity_api_key") || "";
const mode = getParamValue("mode") || "stdio";
const port = getParamValue("port") || 9593;
const endpoint = getParamValue("endpoint") || "/rest";
MCP 服务器的设计初衷,是让用户运行在本地,跟私有数据打交道。
所以目前绝大部分的 MCP 服务器,都是不支持多租户的(也就是不能让多个用户共同使用)
主要的限制在于,在 MCP 服务器内部的实现逻辑里面,用到鉴权参数的地方,是在服务器启动时获取的,不是在请求时动态获取的。
如果要在云端部署 MCP 服务器,支持多租户使用,我们需要修改 MCP 服务器内部的参数获取逻辑,改成从请求参数里面动态获取。
MCP 协议允许在每个请求的 Request 对象通过 _meta 字段传递自定义的数据。
那么就可以改造 MCP 服务器,从 request.params._meta 里面获取客户端传递的 auth 参数。
在 @chatmcp/sdk 中,实现了一个 getAuthValue 方法,可以让 MCP 服务器获取 MCP 客户端传递的鉴权参数。
const auth: any = request.params?._meta?.auth;
以这个 Perplexity Ask Server 为例,我们把读取启动时参数,改成读取请求时参数:
// before: get params from env and set as global params
// after: get params from env or command line, set as global params
import { getParamValue, getAuthValue } from"@chatmcp/sdk/utils/index.js";
const perplexityApiKey = getParamValue("perplexity_api_key") || "";
const mode = getParamValue("mode") || "stdio";
const port = getParamValue("port") || 9593;
const endpoint = getParamValue("endpoint") || "/rest";
server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
// before: use global params
// after: get auth params from request, use global params if request params not set
const apiKey =
getAuthValue(request, "PERPLEXITY_API_KEY") || perplexityApiKey;
if (!apiKey) {
thrownewError("PERPLEXITY_API_KEY not set");
}
const { name, arguments: args } = request.params;
if (!args) {
thrownewError("No arguments provided");
}
switch (name) {
case"perplexity_ask": {
if (!Array.isArray(args.messages)) {
thrownewError(
"Invalid arguments for perplexity_ask: 'messages' must be an array"
);
}
const messages = args.messages;
// before: use global params in every function
// const result = await performChatCompletion(
// messages,
// "sonar-pro"
// );
// after: pass params to every function
const result = await performChatCompletion(
apiKey,
messages,
"sonar-pro"
);
return {
content: [{ type: "text", text: result }],
isError: false,
};
}
// ...
}
} catch (error) {
// Return error details in the response
return {
content: [
{
type: "text",
text: `Error: ${
error instanceof Error ? error.message : String(error)
}`,
},
],
isError: true,
};
}
});
改造完之后,云端的 MCP 代理,就可以把用户设置的鉴权参数:perplexity_api_key,通过这种方式传给 MCP 服务器了:
request.params._meta.auth = {
perplexity_api_key: "xxx",
};
修改这个 MCP 服务器,在原来仅支持 stdio 传输的基础上,新增一个 rest http 传输:
import { RestServerTransport } from"@chatmcp/sdk/server/rest.js";
import { getParamValue } from"@chatmcp/sdk/utils/index.js";
const mode = getParamValue("mode") || "stdio";
const port = getParamValue("port") || 9593;
const endpoint = getParamValue("endpoint") || "/rest";
asyncfunction runServer() {
try {
// after: MCP Server run with rest transport and stdio transport
if (mode === "rest") {
const transport = new RestServerTransport({
port,
endpoint,
});
await server.connect(transport);
await transport.startServer();
return;
}
// before: MCP Server only run with stdio transport
const transport = new StdioServerTransport();
await server.connect(transport);
console.error(
"Perplexity MCP Server running on stdio with Ask, Research, and Reason tools"
);
} catch (error) {
console.error("Fatal error running server:", error);
process.exit(1);
}
}
改造之后,这个 MCP 服务器就支持两种传输机制了。在不同的场景下,使用不同的传输机制运行:
export PERPLEXITY_API_KEY=xxx && node build/index.js
PERPLEXITY_API_KEY=xxx npx -y server-perplexity-ask
docker run -i --rm -e PERPLEXITY_API_KEY=xxx mcp/perplexity-ask
这三种方式,都会在本地启动 stdio 通信,在服务器启动的时候传递 PERPLEXITY_API_KEY 参数。
改造完的 MCP 服务器在云端部署的启动命令:
node build/index.js --mode=rest --port=8081 --endpoint=/rest
使用了自定义的 Rest 传输,启动 HTTP 服务器,监听在本地的 8081 端口。通过 K8S Service 创建一个内网访问域名:perplexity-svc
MCP 代理把客户端的请求转发到 http://perplexity-svc:8081/rest,就可以处理多个用户的并发请求了。
MCP 客户端仅需要配置一个 MCP 代理的 URL,就可以使用 MCP 服务器了:
在本节中,我们详细讲解了 MCP 协议支持的三种标准传输机制以及自定义传输的实现方式:
1.stdio 传输:
2.SSE 传输(即将废弃):
3.Streamable HTTP 传输:
4.自定义传输机制:
传输机制的选择应基于具体应用场景:
MCP 协议的可插拔传输架构使其能够灵活适应不同的部署环境和通信需求,从简单的本地工具到复杂的云服务均可支持。随着技术发展,传输机制也在不断优化,以提供更好的性能、可靠性和可扩展性。
文章来自于“艾逗笔”,作者“idoubi”。
【开源免费】MindSearch是一个模仿人类思考方式的AI搜索引擎框架,其性能可与 Perplexity和ChatGPT-Web相媲美。
项目地址:https://github.com/InternLM/MindSearch
在线使用:https://mindsearch.openxlab.org.cn/
【开源免费】Morphic是一个由AI驱动的搜索引擎。该项目开源免费,搜索结果包含文本,图片,视频等各种AI搜索所需要的必备功能。相对于其他开源AI搜索项目,测试搜索结果最好。
项目地址:https://github.com/miurla/morphic/tree/main
在线使用:https://www.morphic.sh/