class Session(BaseSession[RequestT, NotificationT, ResultT]):
async def send_request(
self,
request: RequestT,
result_type: type[Result]
) -> Result:
"""
发送请求并等待响应。如果响应包含错误,则引发 McpError。
"""
# 请求处理实现
async def send_notification(
self,
notification: NotificationT
) -> None:
"""发送不期望响应的单向通知。"""
# 通知处理实现
async def _received_request(
self,
responder: RequestResponder[ReceiveRequestT, ResultT]
) -> None:
"""处理来自另一方的传入请求。"""
# 请求处理实现
async def _received_notification(
self,
notification: ReceiveNotificationT
) -> None:
"""处理来自另一方的传入通知。"""
# 通知处理实现