Model Context Protocol (MCP) 中的传输提供了客户端和服务器之间通信的基础。传输处理消息发送和接收的底层机制。
消息格式
MCP 使用 JSON-RPC 2.0 作为其线格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。
使用三种类型的 JSON-RPC 消息:
{
jsonrpc : "2.0" ,
id : number | string ,
method : string ,
params ?: object
}
{
jsonrpc : "2.0" ,
id : number | string ,
result ?: object ,
error ?: {
code: number ,
message: string ,
data? : unknown
}
}
{
jsonrpc : "2.0" ,
method : string ,
params ?: object
}
内置传输类型
MCP 包含两种标准传输实现:
标准输入/输出 (stdio)
stdio 传输通过标准输入和输出流进行通信。这对于本地集成和命令行工具特别有用。
在以下情况下使用 stdio:
构建命令行工具
实现本地集成
需要简单的进程通信
使用 shell 脚本
TypeScript (Server) TypeScript (Client) Python (Server) Python (Client) const server = new Server ({
name: "example-server" ,
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioServerTransport ();
await server . connect ( transport );
const server = new Server ({
name: "example-server" ,
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioServerTransport ();
await server . connect ( transport );
const client = new Client ({
name: "example-client" ,
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioClientTransport ({
command: "./server" ,
args: [ "--option" , "value" ]
});
await client . connect ( transport );
app = Server( "example-server" )
async with stdio_server() as streams:
await app.run(
streams[ 0 ],
streams[ 1 ],
app.create_initialization_options()
)
params = StdioServerParameters(
command = "./server" ,
args = [ "--option" , "value" ]
)
async with stdio_client(params) as streams:
async with ClientSession(streams[ 0 ], streams[ 1 ]) as session:
await session.initialize()
服务器发送事件 (SSE)
SSE 传输通过 HTTP POST 请求实现服务器到客户端的流式传输。
在以下情况下使用 SSE:
仅需要服务器到客户端的流式传输
在受限网络中工作
实现简单的更新
TypeScript (Server) TypeScript (Client) Python (Server) Python (Client) const server = new Server ({
name: "example-server" ,
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEServerTransport ( "/message" , response );
await server . connect ( transport );
const server = new Server ({
name: "example-server" ,
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEServerTransport ( "/message" , response );
await server . connect ( transport );
const client = new Client ({
name: "example-client" ,
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport (
new URL ( "http://localhost:3000/sse" )
);
await client . connect ( transport );
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
app = Server( "example-server" )
sse = SseServerTransport( "/messages" )
async def handle_sse ( scope , receive , send ):
async with sse.connect_sse(scope, receive, send) as streams:
await app.run(streams[ 0 ], streams[ 1 ], app.create_initialization_options())
async def handle_messages ( scope , receive , send ):
await sse.handle_post_message(scope, receive, send)
starlette_app = Starlette(
routes = [
Route( "/sse" , endpoint = handle_sse),
Route( "/messages" , endpoint = handle_messages, methods = [ "POST" ]),
]
)
async with sse_client( "http://localhost:8000/sse" ) as streams:
async with ClientSession(streams[ 0 ], streams[ 1 ]) as session:
await session.initialize()
自定义传输
MCP 使实现特定需求的自定义传输变得容易。任何传输实现只需符合 Transport 接口:
您可以实现自定义传输以:
自定义网络协议
专用通信通道
与现有系统集成
性能优化
interface Transport {
// 开始处理消息
start () : Promise < void >;
// 发送 JSON-RPC 消息
send ( message : JSONRPCMessage ) : Promise < void >;
// 关闭连接
close () : Promise < void >;
// 回调
onclose ?: () => void ;
onerror ?: ( error : Error ) => void ;
onmessage ?: ( message : JSONRPCMessage ) => void ;
}
interface Transport {
// 开始处理消息
start () : Promise < void >;
// 发送 JSON-RPC 消息
send ( message : JSONRPCMessage ) : Promise < void >;
// 关闭连接
close () : Promise < void >;
// 回调
onclose ?: () => void ;
onerror ?: ( error : Error ) => void ;
onmessage ?: ( message : JSONRPCMessage ) => void ;
}
请注意,虽然 MCP 服务器通常使用 asyncio 实现,但我们建议使用 anyio
实现低级接口(如传输),以实现更广泛的兼容性。
@contextmanager
async def create_transport (
read_stream : MemoryObjectReceiveStream[JSONRPCMessage | Exception ],
write_stream : MemoryObjectSendStream[JSONRPCMessage]
):
"""
MCP 的传输接口。
参数:
read_stream:读取传入消息的流
write_stream:写入传出消息的流
"""
async with anyio.create_task_group() as tg:
try :
// 开始处理消息
tg.start_soon( lambda : process_messages(read_stream))
// 发送消息
async with write_stream:
yield write_stream
except Exception as exc:
// 处理错误
raise exc
finally :
// 清理
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
错误处理
传输实现应处理各种错误场景:
连接错误
消息解析错误
协议错误
网络超时
资源清理
错误处理示例:
class ExampleTransport implements Transport {
async start () {
try {
// 连接逻辑
} catch ( error ) {
this . onerror ?.( new Error ( `连接失败: ${ error } ` ));
throw error ;
}
}
async send ( message : JSONRPCMessage ) {
try {
// 发送逻辑
} catch ( error ) {
this . onerror ?.( new Error ( `发送消息失败: ${ error } ` ));
throw error ;
}
}
}
class ExampleTransport implements Transport {
async start () {
try {
// 连接逻辑
} catch ( error ) {
this . onerror ?.( new Error ( `连接失败: ${ error } ` ));
throw error ;
}
}
async send ( message : JSONRPCMessage ) {
try {
// 发送逻辑
} catch ( error ) {
this . onerror ?.( new Error ( `发送消息失败: ${ error } ` ));
throw error ;
}
}
}
请注意,虽然 MCP 服务器通常使用 asyncio 实现,但我们建议使用 anyio
实现低级接口(如传输),以实现更广泛的兼容性。
@contextmanager
async def example_transport ( scope : Scope, receive : Receive, send : Send):
try :
// 创建双向通信的流
read_stream_writer, read_stream = anyio.create_memory_object_stream( 0 )
write_stream, write_stream_reader = anyio.create_memory_object_stream( 0 )
async def message_handler ():
try :
async with read_stream_writer:
// 消息处理逻辑
pass
except Exception as exc:
logger.error( f "处理消息失败: { exc } " )
raise exc
async with anyio.create_task_group() as tg:
tg.start_soon(message_handler)
try :
// 提供通信流
yield read_stream, write_stream
except Exception as exc:
logger.error( f "传输错误: { exc } " )
raise exc
finally :
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
except Exception as exc:
logger.error( f "初始化传输失败: { exc } " )
raise exc
最佳实践
实现或使用 MCP 传输时:
正确处理连接生命周期
实现适当的错误处理
在连接关闭时清理资源
使用适当的超时
在发送前验证消息
记录传输事件以进行调试
在适当时实现重新连接逻辑
处理消息队列中的背压
监控连接健康状况
实施适当的安全措施
安全考虑
实现传输时:
身份验证和授权
实施适当的身份验证机制
验证客户端凭据
使用安全令牌处理
实施授权检查
数据安全
使用 TLS 进行网络传输
加密敏感数据
验证消息完整性
实施消息大小限制
清理输入数据
网络安全
实施速率限制
使用适当的超时
处理拒绝服务场景
监控异常模式
实施适当的防火墙规则
调试传输
调试传输问题的提示:
启用调试日志记录
监控消息流
检查连接状态
验证消息格式
测试错误场景
使用网络分析工具
实施健康检查
监控资源使用情况
测试边缘情况
使用适当的错误跟踪