> ## Documentation Index
> Fetch the complete documentation index at: https://model-context-protocol.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# 传输

> 了解 MCP 的通信机制

Model Context Protocol (MCP) 中的传输提供了客户端和服务器之间通信的基础。传输处理消息发送和接收的底层机制。

## 消息格式

MCP 使用 [JSON-RPC](https://www.jsonrpc.org/) 2.0 作为其线格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输，并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。

使用三种类型的 JSON-RPC 消息：

### 请求

```typescript theme={null}
{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}
```

### 响应

```typescript theme={null}
{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}
```

### 通知

```typescript theme={null}
{
  jsonrpc: "2.0",
  method: string,
  params?: object
}
```

## 内置传输类型

MCP 包含两种标准传输实现：

### 标准输入/输出 (stdio)

stdio 传输通过标准输入和输出流进行通信。这对于本地集成和命令行工具特别有用。

在以下情况下使用 stdio：

* 构建命令行工具
* 实现本地集成
* 需要简单的进程通信
* 使用 shell 脚本

<Tabs>
  <Tab title="TypeScript (Server)">
    ```typescript theme={null}
    const server = new Server({
      name: "example-server",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioServerTransport();
    await server.connect(transport);
    ```
  </Tab>

  <Tab title="TypeScript (Client)">
    ```typescript theme={null}
    const client = new Client({
      name: "example-client",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioClientTransport({
      command: "./server",
      args: ["--option", "value"]
    });
    await client.connect(transport);
    ```
  </Tab>

  <Tab title="Python (Server)">
    ```python theme={null}
    app = Server("example-server")

    async with stdio_server() as streams:
        await app.run(
            streams[0],
            streams[1],
            app.create_initialization_options()
        )
    ```
  </Tab>

  <Tab title="Python (Client)">
    ```python theme={null}
    params = StdioServerParameters(
        command="./server",
        args=["--option", "value"]
    )

    async with stdio_client(params) as streams:
        async with ClientSession(streams[0], streams[1]) as session:
            await session.initialize()
    ```
  </Tab>
</Tabs>

### 服务器发送事件 (SSE)

SSE 传输通过 HTTP POST 请求实现服务器到客户端的流式传输。

在以下情况下使用 SSE：

* 仅需要服务器到客户端的流式传输
* 在受限网络中工作
* 实现简单的更新

<Tabs>
  <Tab title="TypeScript (Server)">
    ```typescript theme={null}
    const server = new Server({
      name: "example-server",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new SSEServerTransport("/message", response);
    await server.connect(transport);
    ```
  </Tab>

  <Tab title="TypeScript (Client)">
    ```typescript theme={null}
    const client = new Client({
      name: "example-client",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new SSEClientTransport(
      new URL("http://localhost:3000/sse")
    );
    await client.connect(transport);
    ```
  </Tab>

  <Tab title="Python (Server)">
    ```python theme={null}
    from mcp.server.sse import SseServerTransport
    from starlette.applications import Starlette
    from starlette.routing import Route

    app = Server("example-server")
    sse = SseServerTransport("/messages")

    async def handle_sse(scope, receive, send):
        async with sse.connect_sse(scope, receive, send) as streams:
            await app.run(streams[0], streams[1], app.create_initialization_options())

    async def handle_messages(scope, receive, send):
        await sse.handle_post_message(scope, receive, send)

    starlette_app = Starlette(
        routes=[
            Route("/sse", endpoint=handle_sse),
            Route("/messages", endpoint=handle_messages, methods=["POST"]),
        ]
    )
    ```
  </Tab>

  <Tab title="Python (Client)">
    ```python theme={null}
    async with sse_client("http://localhost:8000/sse") as streams:
        async with ClientSession(streams[0], streams[1]) as session:
            await session.initialize()
    ```
  </Tab>
</Tabs>

## 自定义传输

MCP 使实现特定需求的自定义传输变得容易。任何传输实现只需符合 Transport 接口：

您可以实现自定义传输以：

* 自定义网络协议
* 专用通信通道
* 与现有系统集成
* 性能优化

<Tabs>
  <Tab title="TypeScript">
    ```typescript theme={null}
    interface Transport {
      // 开始处理消息
      start(): Promise<void>;

      // 发送 JSON-RPC 消息
      send(message: JSONRPCMessage): Promise<void>;

      // 关闭连接
      close(): Promise<void>;

      // 回调
      onclose?: () => void;
      onerror?: (error: Error) => void;
      onmessage?: (message: JSONRPCMessage) => void;
    }
    ```
  </Tab>

  <Tab title="Python">
    请注意，虽然 MCP 服务器通常使用 asyncio 实现，但我们建议使用 `anyio` 实现低级接口（如传输），以实现更广泛的兼容性。

    ```python theme={null}
    @contextmanager
    async def create_transport(
        read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
        write_stream: MemoryObjectSendStream[JSONRPCMessage]
    ):
        """
        MCP 的传输接口。

        参数：
            read_stream：读取传入消息的流
            write_stream：写入传出消息的流
        """
        async with anyio.create_task_group() as tg:
            try:
                // 开始处理消息
                tg.start_soon(lambda: process_messages(read_stream))

                // 发送消息
                async with write_stream:
                    yield write_stream

            except Exception as exc:
                // 处理错误
                raise exc
            finally:
                // 清理
                tg.cancel_scope.cancel()
                await write_stream.aclose()
                await read_stream.aclose()
    ```
  </Tab>
</Tabs>

## 错误处理

传输实现应处理各种错误场景：

1. 连接错误
2. 消息解析错误
3. 协议错误
4. 网络超时
5. 资源清理

错误处理示例：

<Tabs>
  <Tab title="TypeScript">
    ```typescript theme={null}
    class ExampleTransport implements Transport {
      async start() {
        try {
          // 连接逻辑
        } catch (error) {
          this.onerror?.(new Error(`连接失败：${error}`));
          throw error;
        }
      }

      async send(message: JSONRPCMessage) {
        try {
          // 发送逻辑
        } catch (error) {
          this.onerror?.(new Error(`发送消息失败：${error}`));
          throw error;
        }
      }
    }
    ```
  </Tab>

  <Tab title="Python">
    请注意，虽然 MCP 服务器通常使用 asyncio 实现，但我们建议使用 `anyio` 实现低级接口（如传输），以实现更广泛的兼容性。

    ```python theme={null}
    @contextmanager
    async def example_transport(scope: Scope, receive: Receive, send: Send):
        try:
            // 创建双向通信的流
            read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
            write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

            async def message_handler():
                try:
                    async with read_stream_writer:
                        // 消息处理逻辑
                        pass
                except Exception as exc:
                    logger.error(f"处理消息失败：{exc}")
                    raise exc

            async with anyio.create_task_group() as tg:
                tg.start_soon(message_handler)
                try:
                    // 提供通信流
                    yield read_stream, write_stream
                except Exception as exc:
                    logger.error(f"传输错误：{exc}")
                    raise exc
                finally:
                    tg.cancel_scope.cancel()
                    await write_stream.aclose()
                    await read_stream.aclose()
        except Exception as exc:
            logger.error(f"初始化传输失败：{exc}")
            raise exc
    ```
  </Tab>
</Tabs>

## 最佳实践

实现或使用 MCP 传输时：

1. 正确处理连接生命周期
2. 实现适当的错误处理
3. 在连接关闭时清理资源
4. 使用适当的超时
5. 在发送前验证消息
6. 记录传输事件以进行调试
7. 在适当时实现重新连接逻辑
8. 处理消息队列中的背压
9. 监控连接健康状况
10. 实施适当的安全措施

## 安全考虑

实现传输时：

### 身份验证和授权

* 实施适当的身份验证机制
* 验证客户端凭据
* 使用安全令牌处理
* 实施授权检查

### 数据安全

* 使用 TLS 进行网络传输
* 加密敏感数据
* 验证消息完整性
* 实施消息大小限制
* 清理输入数据

### 网络安全

* 实施速率限制
* 使用适当的超时
* 处理拒绝服务场景
* 监控异常模式
* 实施适当的防火墙规则

## 调试传输

调试传输问题的提示：

1. 启用调试日志记录
2. 监控消息流
3. 检查连接状态
4. 验证消息格式
5. 测试错误场景
6. 使用网络分析工具
7. 实施健康检查
8. 监控资源使用情况
9. 测试边缘情况
10. 使用适当的错误跟踪
