前言
截至目前(2025年9月19日),除了基础的Prompt、Resource和Tool概念,FastMCP还提供了以下功能:Sampling、Elicitation、Logging、Progress、Proxy、Middleware、Composition和Authentication功能
- Sampling,采样,在server端调用client的llm,实现功能解耦
- Elicition,征询,实现人工介入
- Logging,将Server日志发送给Client
- Progress,Server端将进度发送给Client
- Proxy,代理其它MCP Server
- Middleware,拦截MCP通信中的请求和响应
- Composition,Server端将多个servers组合成一个server对外提供
- Authentication,Client和Server之间安全认证
其中Sampling和Elicitation在我的实际开发中用到的比较多,所以我在前面章节中单独拎出来介绍了。FastMCP官方文档也说了Authentication还在迅速迭代中,虽然已经有了相关文档,但本文暂时就不涉及了,等这个功能稳定了再具体细说。剩下的功能会在本文中一次性全部介绍完,篇幅较长,可以根据章节名跳转到自己需要关注的内容。本文大部分参考自官方文档。
Logging
Logging,从服务器向 MCP 客户端发送消息。FastMCP提供了一个logger(fastmcp.utilities.logging.get_logger()
),也可以用python标准库的logging
。
服务器日志功能允许 MCP 工具向客户端发送调试(debug)、信息(info)、警告(warning)和错误(error)级别的消息。这有助于用户了解函数执行过程,在开发和运行阶段辅助调试。一般用于以下场景:
- 调试:发送详细的执行信息,帮助诊断问题
- 进度可见性:让用户了解工具当前正在执行的操作
- 错误报告:向客户端传达问题及其上下文
- 审计追踪:为合规或分析目的生成工具执行记录
与标准 Python 日志不同,MCP 服务器 Logging 会直接将消息发送至客户端,使其在客户端界面或日志中可见。
Server 示例
在任意tool函数中使用Context提供的日志方法:
from fastmcp import FastMCP, Contextmcp = FastMCP("custom")@mcp.tool
async def analyze_data(data: list[float], ctx: Context) -> dict:"""通过全面日志记录分析数值数据。"""await ctx.debug("开始分析数值数据")await ctx.info(f"正在分析 {len(data)} 个数据点")try:if not data:await ctx.warning("提供了空数据列表")return {"error": "空数据列表"}result = sum(data) / len(data)await ctx.info(f"分析完成,平均值为:{result}")return {"average": result, "count": len(data)}except Exception as e:await ctx.error(f"分析失败:{str(e)}")raiseif __name__ == "__main__":mcp.run(transport="stdio", show_banner=False)
所有日志方法(debug
、info
、warning
、error
、log
)现在均支持 extra
参数,该参数接受一个字典,用于传递任意结构化数据。这使得客户端可接收结构化日志,便于创建丰富且可查询的日志记录。
@mcp.tool
async def process_transaction(transaction_id: str, amount: float, ctx: Context):await ctx.info(f"正在处理交易 {transaction_id}",extra={"transaction_id": transaction_id,"amount": amount,"currency": "USD"})# ... 处理逻辑 ...
Client 示例
import asyncio
from pathlib import Path
from fastmcp.client import Client, StdioTransport
from fastmcp.client.logging import LogMessage
import logging
import syslogger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
# This mapping is useful for converting MCP level strings to Python's levels
LOGGING_LEVEL_MAP = logging.getLevelNamesMapping()class MCPClient:def __init__(self):self.mcp_client = Client(StdioTransport(command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),args = ["demo09-server.py"],cwd = str(Path(__file__).parent)),log_handler=self.logging_handler,)async def logging_handler(self, message: LogMessage):"""Handles incoming logs from the MCP server and forwards themto the standard Python logging system."""msg = message.data.get('msg')extra = message.data.get('extra')# Convert the MCP log level to a Python log levellevel = LOGGING_LEVEL_MAP.get(message.level.upper(), logging.INFO)logger.log(level, msg, extra=extra)async def generate(self):async with self.mcp_client:await self.mcp_client.ping()rst = await self.mcp_client.call_tool("analyze_data", arguments={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})print(rst)async def main():client = MCPClient()await client.generate()if __name__ == "__main__":asyncio.run(main())
client运行输出
开始分析数值数据
正在分析 5 个数据点
分析完成,平均值为:3.0
CallToolResult(content=[TextContent(type='text', text='{"average":3.0,"count":5}', annotations=None, meta=None)], structured_content={'average': 3.0, 'count': 5}, data={'average': 3.0, 'count': 5}, is_error=False)
Progress
Progress 功能允许 MCP tool 向 Client 通知长时间运行操作的当前进度。这使得Client能够显示进度指示器,从而在执行耗时任务时提供更佳的用户体验。Progress 在以下方面具有重要价值:
- 用户体验:让用户了解长时间运行操作的当前状态
- 进度指示器:使客户端能够显示进度条或百分比
- 防止超时:表明操作正在持续进行中,避免被误判为无响应
- 调试用途:追踪执行进度,便于性能分析
Server 示例
from fastmcp import FastMCP, Context
import asynciomcp = FastMCP("custom")@mcp.tool
async def process_items(items: list[str], ctx: Context) -> dict:"""处理项目列表,并发送进度更新。"""total = len(items)results = []for i, item in enumerate(items):# 每处理一个项目,报告当前进度await ctx.report_progress(progress=i, total=total)# 模拟处理耗时await asyncio.sleep(0.1)results.append(item.upper())# 报告 100% 完成await ctx.report_progress(progress=total, total=total)return {"processed": len(results), "results": results}if __name__ == "__main__":mcp.run(transport="stdio", show_banner=False)
Client 示例
import asyncio
from pathlib import Path
from fastmcp.client import Client, StdioTransportclass MCPClient:def __init__(self):self.mcp_client = Client(StdioTransport(command = str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),args = ["demo09-server.py"],cwd = str(Path(__file__).parent)),progress_handler=self.progress_handler,)async def progress_handler(self, progress: float, total: float | None, message: str | None) -> None:if total is not None:percentage = (progress / total) * 100print(f"Progress: {percentage:.1f}% - {message or ''}")else:print(f"Progress: {progress} - {message or ''}")async def generate(self):async with self.mcp_client:await self.mcp_client.ping()rst = await self.mcp_client.call_tool("process_items", arguments={"items": ["item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10", "item11", "item12", "item13", "item14", "item15"]})print(rst)async def main():client = MCPClient()await client.generate()if __name__ == "__main__":asyncio.run(main())
client运行输出
Progress: 0.0% -
Progress: 6.7% -
Progress: 13.3% -
Progress: 20.0% -
Progress: 26.7% -
Progress: 33.3% -
Progress: 40.0% -
Progress: 46.7% -
Progress: 53.3% -
Progress: 60.0% -
Progress: 66.7% -
Progress: 73.3% -
Progress: 80.0% -
Progress: 86.7% -
Progress: 93.3% -
Progress: 100.0% -
CallToolResult(content=[TextContent(type='text', text='{"processed":15,"results":["ITEM1","ITEM2","ITEM3","ITEM4","ITEM5","ITEM6","ITEM7","ITEM8","ITEM9","ITEM10","ITEM11","ITEM12","ITEM13","ITEM14","ITEM15"]}', annotations=None, meta=None)], structured_content={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, data={'processed': 15, 'results': ['ITEM1', 'ITEM2', 'ITEM3', 'ITEM4', 'ITEM5', 'ITEM6', 'ITEM7', 'ITEM8', 'ITEM9', 'ITEM10', 'ITEM11', 'ITEM12', 'ITEM13', 'ITEM14', 'ITEM15']}, is_error=False)
Proxy
FastMCP 的 Proxy 允许一个 FastMCP 服务器实例作为前端,代理另一个 MCP 服务器(该服务器可能是远程的、运行在不同传输协议上的,甚至是另一个 FastMCP 实例)。此功能通过 FastMCP.as_proxy()
类方法实现。作为代理服务器,它本身不直接实现工具或资源。当它接收到请求(如 tools/call
或 resources/read
)时,会将该请求转发至一个_后端_ MCP 服务器,接收其响应,再将响应原样返回给原始客户端。
核心优势
- 会话隔离:每个请求拥有独立隔离的会话,确保并发操作安全
- 传输协议桥接:通过一种传输协议暴露运行在另一种传输协议上的服务器
- 高级 MCP 功能支持:自动转发采样(sampling)、引导(elicitation)、日志和进度报告
- 安全性:作为后端服务器的受控网关
- 简化架构:即使后端位置或传输协议变更,前端仍保持单一接入点
使用代理服务器时,特别是连接到基于 HTTP 的后端服务器时,需注意延迟可能显著增加。例如,
list_tools()
操作可能耗时数百毫秒,而本地工具仅需 1–2 毫秒。挂载代理服务器时,此延迟会影响父服务器的所有操作,而不仅仅是与被代理工具的交互。
如果您的使用场景对低延迟有严格要求,建议使用import_server()
方法在启动时复制工具,而非在运行时进行代理。
快速入门
推荐使用 ProxyClient
创建代理,它提供完整的 MCP 功能支持,并自动实现会话隔离:
from fastmcp import FastMCP
from fastmcp.server.proxy import ProxyClient# 创建支持完整 MCP 功能的代理
proxy = FastMCP.as_proxy(ProxyClient("backend_server.py"),name="MyProxy"
)# 运行代理(例如,通过 stdio 供 Claude Desktop 使用)
if __name__ == "__main__":proxy.run()
此单一设置即可提供:
- 安全的并发请求处理
- 自动转发高级 MCP 功能(采样、引导等)
- 会话隔离,防止上下文混淆
- 与所有 MCP 客户端完全兼容
高级MCP功能支持
ProxyClient
会自动在后端服务器与连接到代理的客户端之间转发高级 MCP 协议功能,确保完整的 MCP 兼容性。支持的功能:
- Roots:将文件系统根目录访问请求转发给客户端
- Sampling:将后端发起的 LLM 补全请求转发给客户端
- Elicitation:将用户输入请求转发给客户端
- Logging:将后端日志消息转发至客户端
- Progress:在长时间操作中转发进度通知
也可以自定义功能支持,比如设置为None来选择性禁用转发
# 禁用采样,但保留其他功能
backend = ProxyClient("backend_server.py",sampling_handler=None, # 禁用 LLM 采样转发log_handler=None # 禁用日志转发
)
基于配置的代理
你可以直接从符合 MCPConfig 模式的配置字典创建代理。这对于快速设置指向远程服务器的代理非常有用,无需手动配置每个连接细节。
from fastmcp import FastMCP# 直接从配置字典创建代理
config = {"mcpServers": {"default": { # 对于单服务器配置,通常使用 'default'"url": "https://example.com/mcp ","transport": "http"}}
}# 创建指向配置服务器的代理(自动创建 ProxyClient)
proxy = FastMCP.as_proxy(config, name="Config-Based Proxy")# 通过 stdio 传输协议本地运行
if __name__ == "__main__":proxy.run()
多服务器的设置
你可以通过在配置中指定多个条目来创建指向多个服务器的代理。系统会自动以配置名称作为前缀挂载它们:
# 多服务器配置
config = {"mcpServers": {"weather": {"url": "https://weather-api.example.com/mcp ","transport": "http"},"calendar": {"url": "https://calendar-api.example.com/mcp ","transport": "http"}}
}# 创建统一的多服务器代理
composite_proxy = FastMCP.as_proxy(config, name="Composite Proxy")# 工具和资源可通过前缀访问:
# - weather_get_forecast, calendar_add_event
# - weather://weather/icons/sunny, calendar://calendar/events/today
显式会话管理
在内部,FastMCP.as_proxy()
使用 FastMCPProxy
类。您通常无需直接与此类交互,但在高级场景下它可供使用。FastMCPProxy
要求显式会话管理——不会执行任何自动检测。您必须选择您的会话策略:
# 在所有请求间共享会话(并发时需谨慎)
shared_client = ProxyClient("backend_server.py")
def shared_session_factory():return shared_clientproxy = FastMCPProxy(client_factory=shared_session_factory)# 为每个请求创建新会话(推荐)
def fresh_session_factory():return ProxyClient("backend_server.py")proxy = FastMCPProxy(client_factory=fresh_session_factory)
如需自动选择会话策略,请使用便捷方法 FastMCP.as_proxy()
。
# 带有特定配置的自定义工厂
def custom_client_factory():client = ProxyClient("backend_server.py")# 在此处添加任何自定义配置return clientproxy = FastMCPProxy(client_factory=custom_client_factory)
Middleware
MCP 中间件允许您在请求和响应流经服务器时对其进行拦截和修改。可以将其视为一条管道,每个中间件均可检查当前操作、进行修改,然后将控制权传递给链中的下一个中间件。与传统的 Web 中间件不同,MCP 中间件专为 Model Context Protocol 设计,为各类 MCP 操作(如工具调用、资源读取和提示请求)提供专用钩子。
MCP 中间件是一个全新概念,未来版本中可能发生破坏性变更。
MCP 中间件的常见应用场景包括:
- 身份验证与授权:在执行操作前验证客户端权限
- 日志与监控:追踪使用模式与性能指标
- 速率限制:按客户端或操作类型控制请求频率
- 请求/响应转换:在数据到达工具前或离开后对其进行修改
- 缓存:存储频繁请求的数据以提升性能
- 错误处理:为服务器提供一致的错误响应
中间件工作原理
FastMCP 中间件基于管道模型运行。当请求进入时,它会按添加到服务器的顺序依次流经各个中间件。每个中间件均可:
检查传入的请求及其上下文
在传递给下一个中间件或处理器前修改请求
通过调用 call_next() 执行链中的下一个中间件/处理器
在返回前检查并修改响应
处理执行过程中发生的错误
关键在于,中间件形成一条链,每个环节决定是继续处理还是完全终止链的执行。
如果你熟悉 ASGI 中间件,FastMCP 中间件的基本结构会感觉似曾相识。其核心是一个可调用类,接收一个包含当前 JSON-RPC 消息信息的上下文对象,以及一个用于继续中间件链的处理器函数。
重要的是要理解,MCP 基于 JSON-RPC 规范 运行。虽然 FastMCP 以熟悉的方式呈现请求和响应,但其本质是 JSON-RPC 消息,而非 Web 应用中常见的 HTTP 请求/响应对。FastMCP 中间件适用于所有 传输类型 ,包括本地 stdio 传输和 HTTP 传输,但并非所有中间件实现都兼容所有传输类型(例如,检查 HTTP 头部的中间件无法在 stdio 传输中工作)。
实现中间件最基础的方式是重写 Middleware 基类的 call 方法:
from fastmcp.server.middleware import Middleware, MiddlewareContextclass RawMiddleware(Middleware):async def __call__(self, context: MiddlewareContext, call_next):# 此方法接收所有消息,无论类型print(f"原始中间件正在处理:{context.method}")result = await call_next(context)print(f"原始中间件处理完成:{context.method}")return result
中间件钩子
为便于用户针对特定类型的消息,FastMCP 中间件提供了一系列专用钩子。您可以重写特定的钩子方法(而非实现原始的 __call__
方法),这些方法仅在特定类型的操作时被调用,从而允许您精确地定位中间件逻辑所需的粒度。
钩子层级与执行顺序
FastMCP 提供多个按不同粒度调用的钩子。理解此层级结构对有效设计中间件至关重要。
当请求进入时,同一请求可能触发多个钩子调用,执行顺序由泛化到具体:
on_message
- 为所有 MCP 消息(请求和通知)调用on_request
或on_notification
- 根据消息类型调用- 操作特定钩子 - 为特定 MCP 操作调用,如
on_call_tool
例如,当客户端调用工具时,您的中间件将收到多次钩子调用:
on_message
和on_request
用于任何初始工具发现操作(如 list_tools)on_message
(因为它是任何 MCP 消息)用于工具调用本身on_request
(因为工具调用期望响应)用于工具调用本身on_call_tool
(因为它是具体的工具执行)用于工具调用本身
请注意,MCP SDK 可能会执行额外操作(如为缓存目的列出工具),这将触发超出直接工具执行范围的额外中间件调用。
此层级结构允许您以适当的粒度定位中间件逻辑。对广泛关注点(如日志)使用 on_message
,对身份验证使用 on_request
,对工具特定逻辑(如性能监控)使用 on_call_tool
。
可用钩子
on_message
: 为所有 MCP 消息(请求和通知)调用on_request
: 专为 MCP 请求(期望响应)调用on_notification
: 专为 MCP 通知(即发即弃)调用on_call_tool
: 在执行工具时调用on_read_resource
: 在读取资源时调用on_get_prompt
: 在获取提示时调用on_list_tools
: 在列出可用工具时调用on_list_resources
: 在列出可用资源时调用on_list_resource_templates
: 在列出资源模板时调用on_list_prompts
: 在列出可用提示时调用
中间件中的组件访问
理解如何在中间件中访问组件信息(工具、资源、提示)对构建强大的中间件功能至关重要。访问模式在列出操作与执行操作之间存在显著差异。
列出操作 vs 执行操作
FastMCP 中间件以不同方式处理两种类型的操作:
列出操作 (on_list_tools
, on_list_resources
, on_list_prompts
等):
- 中间件接收FastMCP 组件对象,包含完整元数据
- 这些对象包含 FastMCP 特有属性(如
tags
),可直接从组件访问 - 结果在转换为 MCP 格式前包含完整组件信息
- 标签包含在返回给 MCP 客户端的组件
meta
字段中
执行操作 (on_call_tool
, on_read_resource
, on_get_prompt
):
- 中间件在组件执行前运行
- 中间件结果为执行结果,或组件未找到时的错误
- 组件元数据在钩子参数中不可直接访问
在执行期间访问组件元数据
如果需要在执行操作期间检查组件属性(如标签),请使用通过上下文获取的 FastMCP 服务器实例:
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolErrorclass TagBasedMiddleware(Middleware):async def on_call_tool(self, context: MiddlewareContext, call_next):# 访问工具对象以检查其元数据if context.fastmcp_context:try:tool = await context.fastmcp_context.fastmcp.get_tool(context.message.name)# 检查此工具是否带有 "private" 标签if "private" in tool.tags:raise ToolError("访问被拒绝:私有工具")# 检查工具是否启用if not tool.enabled:raise ToolError("工具当前已禁用")except Exception:# 工具未找到或其他错误 - 让执行继续# 并自然处理错误passreturn await call_next(context)
相同模式适用于资源和提示:
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ResourceError, PromptErrorclass ComponentAccessMiddleware(Middleware):async def on_read_resource(self, context: MiddlewareContext, call_next):if context.fastmcp_context:try:resource = await context.fastmcp_context.fastmcp.get_resource(context.message.uri)if "restricted" in resource.tags:raise ResourceError("访问被拒绝:受限资源")except Exception:passreturn await call_next(context)async def on_get_prompt(self, context: MiddlewareContext, call_next):if context.fastmcp_context:try:prompt = await context.fastmcp_context.fastmcp.get_prompt(context.message.name)if not prompt.enabled:raise PromptError("提示当前已禁用")except Exception:passreturn await call_next(context)
处理列出结果
对于列出操作,中间件 call_next
函数在组件转换为 MCP 格式前返回 FastMCP 组件列表。您可以过滤或修改此列表并将其返回给客户端。例如:
from fastmcp.server.middleware import Middleware, MiddlewareContextclass ListingFilterMiddleware(Middleware):async def on_list_tools(self, context: MiddlewareContext, call_next):result = await call_next(context)# 过滤掉带有 "private" 标签的工具filtered_tools = [tool for tool in result if "private" not in tool.tags]# 返回修改后的列表return filtered_tools
此过滤在组件转换为 MCP 格式并返回给客户端前进行。标签在过滤期间可访问,并包含在最终列出响应的组件 meta
字段中。
在列出操作中过滤组件时,请确保也在相应的执行钩子(
on_call_tool
、on_read_resource
、on_get_prompt
)中阻止已过滤组件的执行,以保持一致性。
工具调用拒绝
您可以通过在中间件中抛出 ToolError
来拒绝访问特定工具。这是阻止工具执行的正确方式,因为它与 FastMCP 错误处理系统正确集成
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolErrorclass AuthMiddleware(Middleware):async def on_call_tool(self, context: MiddlewareContext, call_next):tool_name = context.message.name# 拒绝访问受限工具if tool_name.lower() in ["delete", "admin_config"]:raise ToolError("访问被拒绝:工具需要管理员权限")# 允许其他工具继续执行return await call_next(context)
拒绝工具调用时,务必抛出
ToolError
,而非返回ToolResult
对象或其他值。ToolError
确保错误通过中间件链正确传播,并转换为正确的 MCP 错误响应格式。
工具调用修改
对于工具调用等执行操作,您可以在执行前修改参数,或在执行后转换结果:
from fastmcp.server.middleware import Middleware, MiddlewareContextclass ToolCallMiddleware(Middleware):async def on_call_tool(self, context: MiddlewareContext, call_next):# 在执行前修改参数if context.message.name == "calculate":# 确保输入为正数if context.message.arguments.get("value", 0) < 0:context.message.arguments["value"] = abs(context.message.arguments["value"])result = await call_next(context)# 在执行后转换结果if context.message.name == "get_data":# 向结果添加元数据if result.structured_content:result.structured_content["processed_at"] = "2024-01-01T00:00:00Z"return result
对于更复杂的工具重写场景,请考虑使用 工具转换 模式,它为创建修改后的工具变体提供了更结构化的方法。
钩子剖析
每个中间件钩子遵循相同的模式。让我们通过 on_message
钩子来理解其结构:
async def on_message(self, context: MiddlewareContext, call_next):# 1. 预处理:检查并可选地修改请求print(f"正在处理 {context.method}")# 2. 链式延续:调用下一个中间件/处理器result = await call_next(context)# 3. 后处理:检查并可选地修改响应print(f"已完成 {context.method}")# 4. 返回结果(可能已修改)return result
每个钩子接收两个参数:
-
context: MiddlewareContext
- 包含当前请求信息:context.method
- MCP 方法名称(如 "tools/call")context.source
- 请求来源("client" 或 "server")context.type
- 消息类型("request" 或 "notification")context.message
- MCP 消息数据context.timestamp
- 请求接收时间context.fastmcp_context
- FastMCP Context 对象(如可用)
-
call_next
- 用于继续中间件链的函数。除非您希望完全停止处理,否则必须调用此函数。
开发者对请求流拥有完全控制权:
- 继续处理:调用
await call_next(context)
以继续 - 修改请求:在调用
call_next
前更改上下文 - 修改响应:在调用
call_next
后更改结果 - 停止链:不调用
call_next
(极少需要) - 处理错误:在 try/catch 块中包装
call_next
除了修改请求和响应,您还可以存储状态数据,供工具(可选)稍后访问。为此,请使用 FastMCP Context 适当调用 set_state
或 get_state
。
创建中间件
FastMCP 中间件通过继承 Middleware
基类并重写所需钩子来实现。
from fastmcp import FastMCP
from fastmcp.server.middleware import Middleware, MiddlewareContextclass LoggingMiddleware(Middleware):"""记录所有 MCP 操作的中间件。"""async def on_message(self, context: MiddlewareContext, call_next):"""为所有 MCP 消息调用。"""print(f"正在处理来自 {context.source} 的 {context.method}")result = await call_next(context)print(f"{context.method} 处理完成")return result# 将中间件添加到您的服务器
mcp = FastMCP("MyServer")
mcp.add_middleware(LoggingMiddleware())
向服务器添加中间件
中间件按添加到服务器的顺序执行。最先添加的中间件在进入时最先运行,在退出时最后运行:
mcp = FastMCP("MyServer")mcp.add_middleware(AuthenticationMiddleware("secret-token"))
mcp.add_middleware(PerformanceMiddleware())
mcp.add_middleware(LoggingMiddleware())
这将创建以下执行流:
- AuthenticationMiddleware(预处理)
- PerformanceMiddleware(预处理)
- LoggingMiddleware(预处理)
- 实际工具/资源处理器
- LoggingMiddleware(后处理)
- PerformanceMiddleware(后处理)
- AuthenticationMiddleware(后处理)
组合服务器与中间件
当使用 服务器组合(下面提的Composition) (如 mount
或 import_server
)时,中间件行为遵循以下规则:
- 父服务器中间件为所有请求运行,包括路由到挂载服务器的请求
- 挂载服务器中间件仅为由该特定服务器处理的请求运行
- 中间件顺序在每个服务器内保持不变
# 带有中间件的父服务器
parent = FastMCP("Parent")
parent.add_middleware(AuthenticationMiddleware("token"))# 带有自身中间件的子服务器
child = FastMCP("Child")
child.add_middleware(LoggingMiddleware())@child.tool
def child_tool() -> str:return "from child"# 挂载子服务器
parent.mount(child, prefix="child")
当客户端调用 "child_tool" 时,请求将首先流经父服务器的身份验证中间件,然后路由到子服务器,在子服务器中再经过其日志中间件。
内置中间件
FastMCP 包含多个中间件实现,展示了最佳实践并提供立即可用的功能。让我们通过构建简化版本来探索每种类型的工作原理,然后了解如何使用完整实现。
计时中间件
性能监控对于理解服务器行为和识别瓶颈至关重要。FastMCP 在 fastmcp.server.middleware.timing
中包含计时中间件。
以下是其工作方式的示例:
import time
from fastmcp.server.middleware import Middleware, MiddlewareContextclass SimpleTimingMiddleware(Middleware):async def on_request(self, context: MiddlewareContext, call_next):start_time = time.perf_counter()try:result = await call_next(context)duration_ms = (time.perf_counter() - start_time) * 1000print(f"请求 {context.method} 在 {duration_ms:.2f}ms 内完成")return resultexcept Exception as e:duration_ms = (time.perf_counter() - start_time) * 1000print(f"请求 {context.method} 在 {duration_ms:.2f}ms 后失败:{e}")raise
要使用具有正确日志和配置的完整版本:
from fastmcp.server.middleware.timing import (TimingMiddleware, DetailedTimingMiddleware
)# 对所有请求进行基础计时
mcp.add_middleware(TimingMiddleware())# 详细的操作级计时(工具、资源、提示)
mcp.add_middleware(DetailedTimingMiddleware())
内置版本包括自定义日志支持、正确格式化,且 DetailedTimingMiddleware 提供 on_call_tool
和 on_read_resource
等操作特定钩子,以实现精细计时。
日志中间件
请求和响应日志记录对于调试、监控和理解 MCP 服务器中的使用模式至关重要。FastMCP 在 fastmcp.server.middleware.logging
中提供全面的日志中间件。
以下是其工作方式的示例:
from fastmcp.server.middleware import Middleware, MiddlewareContextclass SimpleLoggingMiddleware(Middleware):async def on_message(self, context: MiddlewareContext, call_next):print(f"正在处理来自 {context.source} 的 {context.method}")try:result = await call_next(context)print(f"{context.method} 处理完成")return resultexcept Exception as e:print(f"{context.method} 失败:{e}")raise
要使用具有高级功能的完整版本:
from fastmcp.server.middleware.logging import (LoggingMiddleware, StructuredLoggingMiddleware
)# 支持负载的人类可读日志
mcp.add_middleware(LoggingMiddleware(include_payloads=True,max_payload_length=1000
))# 用于日志聚合工具的 JSON 结构化日志
mcp.add_middleware(StructuredLoggingMiddleware(include_payloads=True))
内置版本包括负载日志、结构化 JSON 输出、自定义日志支持、负载大小限制以及用于精细控制的操作特定钩子。
速率限制中间件
速率限制对于保护服务器免受滥用、确保公平资源使用以及在负载下保持性能至关重要。FastMCP 在 fastmcp.server.middleware.rate_limiting
中包含复杂的速率限制中间件。
以下是其工作方式的示例:
import time
from collections import defaultdict
from fastmcp.server.middleware import Middleware, MiddlewareContext
from mcp import McpError
from mcp.types import ErrorDataclass SimpleRateLimitMiddleware(Middleware):def __init__(self, requests_per_minute: int = 60):self.requests_per_minute = requests_per_minuteself.client_requests = defaultdict(list)async def on_request(self, context: MiddlewareContext, call_next):current_time = time.time()client_id = "default" # 实际中,从头部或上下文中提取# 清理旧请求并检查限制cutoff_time = current_time - 60self.client_requests[client_id] = [req_time for req_time in self.client_requests[client_id]if req_time > cutoff_time]if len(self.client_requests[client_id]) >= self.requests_per_minute:raise McpError(ErrorData(code=-32000, message="超出速率限制"))self.client_requests[client_id].append(current_time)return await call_next(context)
要使用具有高级算法的完整版本:
from fastmcp.server.middleware.rate_limiting import (RateLimitingMiddleware, SlidingWindowRateLimitingMiddleware
)# 令牌桶速率限制(允许受控突发)
mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=10.0,burst_capacity=20
))# 滑动窗口速率限制(精确的基于时间的控制)
mcp.add_middleware(SlidingWindowRateLimitingMiddleware(max_requests=100,window_minutes=1
))
内置版本包括令牌桶算法、按客户端识别、全局速率限制以及具有可配置客户端识别功能的异步安全实现。
错误处理中间件
一致的错误处理和恢复对于健壮的 MCP 服务器至关重要。FastMCP 在 fastmcp.server.middleware.error_handling
中提供全面的错误处理中间件。
以下是其工作方式的示例:
import logging
from fastmcp.server.middleware import Middleware, MiddlewareContextclass SimpleErrorHandlingMiddleware(Middleware):def __init__(self):self.logger = logging.getLogger("errors")self.error_counts = {}async def on_message(self, context: MiddlewareContext, call_next):try:return await call_next(context)except Exception as error:# 记录错误并跟踪统计信息error_key = f"{type(error).__name__}:{context.method}"self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1self.logger.error(f"{context.method} 中发生错误:{type(error).__name__}: {error}")raise
要使用具有高级功能的完整版本:
from fastmcp.server.middleware.error_handling import (ErrorHandlingMiddleware, RetryMiddleware
)# 全面的错误日志和转换
mcp.add_middleware(ErrorHandlingMiddleware(include_traceback=True,transform_errors=True,error_callback=my_error_callback
))# 带指数退避的自动重试
mcp.add_middleware(RetryMiddleware(max_retries=3,retry_exceptions=(ConnectionError, TimeoutError)
))
内置版本包括错误转换、自定义回调、可配置的重试逻辑以及正确的 MCP 错误格式化。
组合中间件
from fastmcp import FastMCP
from fastmcp.server.middleware.timing import TimingMiddleware
from fastmcp.server.middleware.logging import LoggingMiddleware
from fastmcp.server.middleware.rate_limiting import RateLimitingMiddleware
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddlewaremcp = FastMCP("Production Server")# 按逻辑顺序添加中间件
mcp.add_middleware(ErrorHandlingMiddleware()) # 首先处理错误
mcp.add_middleware(RateLimitingMiddleware(max_requests_per_second=50))
mcp.add_middleware(TimingMiddleware()) # 计时实际执行
mcp.add_middleware(LoggingMiddleware()) # 记录所有内容@mcp.tool
def my_tool(data: str) -> str:return f"已处理:{data}"
Composition
随着MCP 应用规模扩大,你可能希望将工具、资源和提示按逻辑模块组织,或复用现有的服务器组件。FastMCP 通过两种方法支持服务器组合:
import_server
:一次性复制组件并添加前缀(静态组合)。mount
:创建实时链接,主服务器在运行时将请求委托给子服务器(动态组合)。
为什么要组合服务器
- 模块化:将大型应用拆分为更小、更专注的服务器(例如
WeatherServer
、DatabaseServer
、CalendarServer
)。 - 可复用性:创建通用工具服务器(例如
TextProcessingServer
),并在需要时挂载。 - 团队协作:不同团队可分别开发独立的 FastMCP 服务器,后期再进行组合。
- 逻辑组织:将相关功能按逻辑分组,便于管理。
导入vs挂载
选择导入还是挂载取决于您的具体用例和需求。
特性 | 导入 | 挂载 |
---|---|---|
方法 | FastMCP.import_server(server, prefix=None) |
FastMCP.mount(server, prefix=None) |
组合类型 | 一次性复制(静态) | 实时链接(动态) |
更新同步 | 子服务器的变更不会反映到主服务器 | 子服务器的变更立即反映到主服务器 |
性能 | 快速 — 无运行时委托开销 | 较慢 — 受最慢挂载服务器影响 |
前缀 | 可选 — 省略则保留原名称 | 可选 — 省略则保留原名称 |
适用场景 | 打包最终组件、性能敏感场景 | 运行时模块化组合 |
导入
import_server()
方法将一个 FastMCP
实例(子服务器)中的所有组件(工具、资源、模板、提示)复制到另一个实例(主服务器)中。可选提供 prefix
以避免命名冲突。若未提供前缀,组件将按原样导入。当多个服务器使用相同前缀(或无前缀)导入时,最后导入的服务器组件将覆盖先前导入的同名组件。
from fastmcp import FastMCP
import asyncio# 定义子服务器
weather_mcp = FastMCP(name="WeatherService")@weather_mcp.tool
def get_forecast(city: str) -> dict:"""获取天气预报。"""return {"city": city, "forecast": "Sunny"}@weather_mcp.resource("data://cities/supported")
def list_supported_cities() -> list[str]:"""列出支持天气查询的城市。"""return ["London", "Paris", "Tokyo"]# 定义主服务器
main_mcp = FastMCP(name="MainApp")# 导入子服务器
async def setup():await main_mcp.import_server(weather_mcp, prefix="weather")# 结果:main_mcp 现包含带前缀的组件:
# - 工具: "weather_get_forecast"
# - 资源: "data://weather/cities/supported" if __name__ == "__main__":asyncio.run(setup())main_mcp.run()
导入的工作原理
当你调用 await main_mcp.import_server(subserver, prefix={whatever})
时:
- 工具:
subserver
的所有工具被添加到main_mcp
,名称前缀为{prefix}_
。subserver.tool(name="my_tool")
变为main_mcp.tool(name="{prefix}_my_tool")
。
- 资源:所有资源的 URI 和名称均被添加前缀。
- URI:
subserver.resource(uri="data://info")
变为main_mcp.resource(uri="data://{prefix}/info")
。 - 名称:
resource.name
变为"{prefix}_{resource.name}"
。
- URI:
- 资源模板:模板的前缀规则与资源类似。
- URI:
subserver.resource(uri="data://{id}")
变为main_mcp.resource(uri="data://{prefix}/{id}")
。 - 名称:
template.name
变为"{prefix}_{template.name}"
。
- URI:
- 提示:所有提示的名称被添加前缀
{prefix}_
。subserver.prompt(name="my_prompt")
变为main_mcp.prompt(name="{prefix}_my_prompt")
。
请注意,import_server
执行的是一次性复制。在导入之后对 subserver
所做的更改不会反映在 main_mcp
中。subserver
的 lifespan
上下文也不会由主服务器执行。
prefix
参数是可选的。如果省略,组件将按原样导入,不进行修改,这样组件将保留其原始名称。当导入多个具有相同前缀或无前缀的服务器时,最后导入的服务器的组件将优先。
挂载
mount()
方法在 main_mcp
服务器与 subserver
之间创建一个实时链接。它不复制组件,而是在运行时将匹配可选 prefix
的组件请求委托给 subserver
处理。若未提供前缀,则子服务器的组件可通过原始名称直接访问。当多个服务器使用相同前缀(或无前缀)挂载时,对于冲突的组件名称,最后挂载的服务器将优先。
import asyncio
from fastmcp import FastMCP, Client# 定义子服务器
dynamic_mcp = FastMCP(name="DynamicService")@dynamic_mcp.tool
def initial_tool():"""初始工具演示。"""return "Initial Tool Exists"# 挂载子服务器(同步操作)
main_mcp = FastMCP(name="MainAppLive")
main_mcp.mount(dynamic_mcp, prefix="dynamic")# 在挂载后添加工具 — 仍可通过 main_mcp 访问
@dynamic_mcp.tool
def added_later():"""挂载后添加的工具。"""return "Tool Added Dynamically!"# 测试访问已挂载的工具
async def test_dynamic_mount():tools = await main_mcp.get_tools()print("可用工具:", list(tools.keys()))# 输出:['dynamic_initial_tool', 'dynamic_added_later']async with Client(main_mcp) as client:result = await client.call_tool("dynamic_added_later")print("结果:", result.data)# 输出:"Tool Added Dynamically!"if __name__ == "__main__":asyncio.run(test_dynamic_mount())
挂载的工作原理
配置挂载后:
- 实时链接:父服务器与挂载的服务器建立连接。
- 动态更新:对挂载服务器的更改在通过父服务器访问时立即生效。
- 前缀访问:父服务器使用前缀将请求路由到挂载的服务器。
- 委托:对匹配前缀的组件的请求在运行时委托给挂载的服务器处理。
命名工具、资源、模板和提示的前缀规则与 import_server
相同。这包括为资源和模板的 URI/键及名称添加前缀,以便在多服务器配置中更好地识别。
由于“实时链接”的存在,父服务器上的 list_tools()
等操作会受到最慢挂载服务器速度的影响。特别是,基于 HTTP 的挂载服务器可能引入显著延迟(300-400ms,而本地工具仅需 1-2ms),并且这种减速会影响整个服务器,而不仅仅是与 HTTP 代理工具的交互。如果性能至关重要,通过 import_server()
导入工具可能是更合适的解决方案,因为它在启动时一次性复制组件,而不是在运行时委托请求。