Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

FastMCP 3.0 实战:用 Python 构建生产级 MCP Server

前言

Model Context Protocol(MCP)已经成为 AI Agent 生态的事实标准。2026 年 1 月 19 日发布的 FastMCP 3.0 是这个领域最具影响力的更新之一——它引入了 Streamable HTTP 传输、组件版本控制、细粒度授权等生产级特性。OpenAI、Google、Microsoft 都已宣布支持 MCP,这意味着你构建的 MCP Server 可以在多个 AI 平台上运行。

本文将从零开始,带你用 FastMCP 3.0 构建一个生产级的 MCP Server,涵盖架构设计、核心代码、授权机制、测试部署全流程。

为什么选择 FastMCP 3.0?

在 MCP Server 开发领域,你有多个选择:

方案语言优势劣势
官方 TypeScript SDKTypeScript官方维护,功能完整模板代码多
官方 Python SDKPython官方维护抽象层次低
FastMCP 3.0Python极简 API,Pythonic 风格第三方维护
手动实现任意完全控制工作量巨大

FastMCP 的核心设计哲学是 "用最小的代码量做最多的事"。一个 30 行的 echo server,在 FastMCP 中就是几行代码:

python
from fastmcp import FastMCP

mcp = FastMCP("Echo Server")

@mcp.tool()
def echo(text: str) -> str:
    """Echo back the input text."""
    return f"Echo: {text}"

mcp.run()

但 FastMCP 3.0 远不止是"简化版 SDK"。它提供了一套完整的生产级能力。

FastMCP 3.0 核心新特性

1. Streamable HTTP Transport

MCP 协议最早使用 stdio 传输,但 stdio 不适合生产环境——无法水平扩展、无法负载均衡、无法做健康检查。之后引入了 SSE(Server-Sent Events),但 SSE 存在连接管理复杂、代理兼容性问题。

FastMCP 3.0 引入了 Streamable HTTP,这是一种基于 HTTP 的双向流式传输协议:

┌─────────────┐         HTTP POST + SSE          ┌─────────────┐
│             │ ──────────────────────────────────▶│             │
│  MCP Client │                                    │ MCP Server  │
│  (Claude/   │ ◀──────────────────────────────────│  (FastMCP)  │
│   GPT/Copilot│        Streamable Response        │             │
└─────────────┘                                    └─────────────┘

优势:

  • 标准 HTTP 协议,兼容所有反向代理(Nginx、Envoy、Cloudflare)
  • 支持水平扩展和负载均衡
  • 原生支持 Web 框架集成
  • 连接复用和断线重连
python
from fastmcp import FastMCP

mcp = FastMCP("MyServer", transport="streamable-http")

@mcp.tool()
def search_docs(query: str) -> str:
    """Search internal documentation."""
    return f"Results for: {query}"

# 使用 Starlette/Uvicorn 启动
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

2. 组件版本控制

在生产环境中,工具需要独立版本管理。FastMCP 3.0 支持对每个 Tool/Resource/Prompt 进行版本控制:

python
from fastmcp import FastMCP
from fastmcp.tools import Tool

mcp = FastMCP("VersionedServer")

@mcp.tool(version="2.1.0", description="Search with hybrid ranking (v2.1)")
def search(query: str, top_k: int = 10) -> list:
    """Enhanced search with semantic + keyword ranking."""
    # 新版实现:混合排序
    results = hybrid_search(query, top_k)
    return results

@mcp.tool(version="1.0.0", deprecated=True, 
          deprecation_message="Use search v2.1.0 instead")
def search_v1(query: str) -> list:
    """Legacy search - please upgrade."""
    return legacy_search(query)

MCP Client 可以指定需要的工具版本,服务端根据版本号提供对应的实现。这解决了 API 演进中的兼容性问题。

3. 细粒度授权(Fine-Grained Authorization)

FastMCP 3.0 引入了工具级别的访问控制。你可以为每个工具定义权限策略:

python
from fastmcp import FastMCP
from fastmcp.auth import require_auth, AuthLevel

mcp = FastMCP("SecureServer")

@mcp.tool()
async def public_info(query: str) -> str:
    """Everyone can access this."""
    return f"Public data for: {query}"

@mcp.tool()
@require_auth(level=AuthLevel.USER)
async def user_profile(user_id: str) -> dict:
    """Requires user-level authentication."""
    return {"id": user_id, "name": "..."}

@mcp.tool()
@require_auth(level=AuthLevel.ADMIN)
async def delete_user(user_id: str) -> bool:
    """Requires admin-level authentication."""
    return True

认证系统支持 OAuth 2.0、API Key、JWT 三种模式:

python
from fastmcp.auth import OAuth2Provider, APIKeyProvider

# OAuth 2.0
mcp.add_auth_provider(OAuth2Provider(
    issuer_url="https://auth.example.com",
    client_id="mcp-server-001",
    required_scopes=["mcp:read", "mcp:write"]
))

# API Key(适合内部工具)
mcp.add_auth_provider(APIKeyProvider(
    keys=["sk-internal-key-1", "sk-internal-key-2"],
    header_name="X-API-Key"
))

4. Resources 增强

FastMCP 3.0 改进了 Resources 系统,支持文件系统、数据库、API 端点等多种资源类型:

python
from fastmcp.resources import FileResource, DatabaseResource

# 文件资源——自动监听变化
@mcp.resource("file://docs/{path}")
async def doc_resource(path: str) -> str:
    """Serve documentation files."""
    from pathlib import Path
    doc_path = Path("/data/docs") / path
    if not doc_path.exists():
        raise FileNotFoundError(f"Document not found: {path}")
    return doc_path.read_text()

# 数据库资源——支持模板查询
@mcp.resource("db://users/{user_id}")
async def user_resource(user_id: str) -> dict:
    """Query user by ID from database."""
    import asyncpg
    conn = await asyncpg.connect("postgresql://...")
    row = await conn.fetchrow(
        "SELECT id, name, email FROM users WHERE id = $1", 
        int(user_id)
    )
    await conn.close()
    return dict(row) if row else {}

实战:构建企业级 Wiki 搜索 MCP Server

接下来,我们构建一个完整的企业级 MCP Server——Wiki 搜索助手。它连接公司内部的 Wiki 知识库,提供搜索、文档查询、目录浏览等工具。

项目结构

wiki-mcp-server/
├── server.py              # 主入口
├── tools/
│   ├── __init__.py
│   ├── search.py          # 搜索工具
│   └── document.py        # 文档工具
├── resources/
│   ├── __init__.py
│   └── wiki_cache.py      # Wiki 缓存资源
├── auth/
│   └── provider.py        # 认证配置
├── middleware/
│   ├── logging.py         # 日志中间件
│   └── metrics.py         # 监控中间件
├── requirements.txt
└── Dockerfile

Step 1: 安装依赖

txt
# requirements.txt
fastmcp>=3.0.0
uvicorn>=0.30.0
httpx>=0.28.0
redis>=5.0.0
pydantic>=2.0.0
python-jose[cryptography]>=3.3.0
opentelemetry-api>=1.28.0
opentelemetry-sdk>=1.28.0
bash
pip install -r requirements.txt

Step 2: 核心 Server 代码

python
# server.py
from fastmcp import FastMCP
from fastmcp.auth import OAuth2Provider
from tools.search import register_search_tools
from tools.document import register_document_tools
from resources.wiki_cache import register_cache_resources
from middleware.logging import LoggingMiddleware
from middleware.metrics import MetricsMiddleware

# 创建 MCP Server 实例
mcp = FastMCP(
    name="WikiSearchAssistant",
    version="1.0.0",
    description="企业 Wiki 知识库搜索助手 — 支持全文搜索、文档查询和智能推荐",
    transport="streamable-http"
)

# 注册认证
mcp.add_auth_provider(OAuth2Provider(
    issuer_url="https://sso.company.com",
    client_id="wiki-mcp-server",
    required_scopes=["wiki:search", "wiki:read"]
))

# 注册中间件
mcp.add_middleware(LoggingMiddleware())
mcp.add_middleware(MetricsMiddleware())

# 注册工具和资源
register_search_tools(mcp)
register_document_tools(mcp)
register_cache_resources(mcp)

# 健康检查
@mcp.tool()
async def health_check() -> dict:
    """Server health check endpoint."""
    return {
        "status": "healthy",
        "version": "1.0.0",
        "uptime": "OK"
    }

if __name__ == "__main__":
    mcp.run(
        transport="streamable-http",
        host="0.0.0.0",
        port=8000,
        workers=4  # 生产环境多 worker
    )

Step 3: 搜索工具实现

python
# tools/search.py
from fastmcp import FastMCP
from fastmcp.tools import ToolContext
from typing import Optional
import httpx

async def _search_wiki(query: str, space: Optional[str], limit: int) -> list:
    """对接企业内部 Wiki API 进行搜索"""
    async with httpx.AsyncClient(timeout=10.0) as client:
        params = {
            "q": query,
            "limit": min(limit, 50),
            "space": space or ""
        }
        resp = await client.get(
            "https://wiki.company.com/api/v2/search",
            params=params,
            headers={"Authorization": "Bearer internal-api-key"}
        )
        resp.raise_for_status()
        data = resp.json()
        return [
            {
                "title": r["title"],
                "url": r["url"],
                "snippet": r.get("excerpt", ""),
                "space": r.get("space", "General"),
                "updated": r.get("updated_at", "")
            }
            for r in data.get("results", [])
        ]

def register_search_tools(mcp: FastMCP):
    """注册所有搜索相关工具"""

    @mcp.tool(version="2.0.0")
    async def wiki_search(
        query: str,
        space: Optional[str] = None,
        limit: int = 10,
        ctx: ToolContext = None
    ) -> dict:
        """
        搜索企业内部 Wiki 知识库。

        Args:
            query: 搜索关键词,支持 AND/OR/NOT 布尔运算
            space: 限定搜索空间(如 'Engineering', 'Product')
            limit: 返回结果数量(1-50)

        Returns:
            包含搜索结果和元信息的字典

        Examples:
            wiki_search("Kubernetes deployment 最佳实践")
            wiki_search("微服务", space="Engineering", limit=5)
        """
        # 记录搜索日志
        if ctx:
            await ctx.info(f"Searching wiki for: {query}")

        try:
            results = await _search_wiki(query, space, limit)

            return {
                "query": query,
                "total": len(results),
                "results": results,
                "search_timestamp": "2026-06-11T00:00:00Z"
            }
        except Exception as e:
            return {
                "error": str(e),
                "query": query,
                "results": [],
                "message": "搜索失败,请稍后重试"
            }

    @mcp.tool(version="1.0.0")
    async def wiki_suggest(query: str, max_suggestions: int = 5) -> dict:
        """
        根据用户输入提供搜索建议(自动补全)。

        Args:
            query: 用户输入的部分查询词
            max_suggestions: 最大建议数量

        Returns:
            {"suggestions": ["建议1", "建议2", ...]}
        """
        with httpx.Client() as client:
            resp = client.get(
                "https://wiki.company.com/api/v2/suggest",
                params={"prefix": query, "limit": max_suggestions}
            )
            data = resp.json()
            return {
                "query": query,
                "suggestions": data.get("suggestions", [])
            }

Step 4: 文档工具实现

python
# tools/document.py
from fastmcp import FastMCP

def register_document_tools(mcp: FastMCP):

    @mcp.tool(version="1.0.0")
    async def wiki_get_document(doc_id: str, include_body: bool = True) -> dict:
        """
        获取 Wiki 文档的完整内容。

        Args:
            doc_id: 文档唯一标识符(如 'ENG-2026-001')
            include_body: 是否包含文档正文(False 则只返回元信息)

        Returns:
            包含标题、正文、作者、更新时间等字段的字典
        """
        import httpx
        async with httpx.AsyncClient() as client:
            resp = await client.get(
                f"https://wiki.company.com/api/v2/documents/{doc_id}",
                params={"body": str(include_body).lower()}
            )
            resp.raise_for_status()
            doc = resp.json()

            return {
                "id": doc["id"],
                "title": doc["title"],
                "body": doc.get("body", "") if include_body else "[Content omitted]",
                "author": doc.get("author", {}).get("name", "Unknown"),
                "created": doc.get("created_at", ""),
                "updated": doc.get("updated_at", ""),
                "space": doc.get("space", "General"),
                "tags": doc.get("labels", [])
            }

    @mcp.tool(version="1.0.0")
    async def wiki_list_spaces() -> dict:
        """
        列出所有可用的 Wiki 空间。

        Returns:
            {"spaces": [{"key": "ENG", "name": "Engineering", "doc_count": 1500}, ...]}
        """
        # 模拟返回(实际应调用 Wiki API)
        return {
            "spaces": [
                {"key": "ENG", "name": "Engineering", "doc_count": 1523},
                {"key": "PROD", "name": "Product", "doc_count": 892},
                {"key": "DESIGN", "name": "Design", "doc_count": 456},
                {"key": "OPS", "name": "Operations", "doc_count": 678},
                {"key": "HR", "name": "Human Resources", "doc_count": 234},
            ],
            "total_documents": 3783
        }

Step 5: 缓存资源与中间件

python
# resources/wiki_cache.py
from fastmcp import FastMCP
from fastmcp.resources import DatabaseResource
import redis.asyncio as redis
import json

# Redis 连接池
redis_pool = redis.ConnectionPool(
    host="localhost", port=6379, db=0,
    max_connections=20
)

def register_cache_resources(mcp: FastMCP):

    @mcp.resource("cache://search/{query_hash}")
    async def cached_search_result(query_hash: str) -> str:
        """
        读取缓存的搜索结果。
        缓存 key 为搜索查询的 SHA256 哈希值。
        """
        r = redis.Redis(connection_pool=redis_pool)
        try:
            result = await r.get(f"search:{query_hash}")
            if result:
                return result.decode("utf-8")
            return json.dumps({"cached": False, "message": "Cache miss"})
        finally:
            await r.close()
python
# middleware/logging.py
import logging
import time
from fastmcp.tools import ToolContext

logger = logging.getLogger("wiki-mcp")

class LoggingMiddleware:
    """请求日志中间件——记录每次工具调用的耗时和结果"""

    async def __call__(self, context: ToolContext, call_next):
        start = time.time()
        tool_name = context.tool_name

        logger.info(f"[{tool_name}] Started")

        try:
            result = await call_next(context)
            elapsed = time.time() - start
            logger.info(f"[{tool_name}] Completed in {elapsed:.2f}s")
            return result
        except Exception as e:
            elapsed = time.time() - start
            logger.error(f"[{tool_name}] Failed after {elapsed:.2f}s: {e}")
            raise
python
# middleware/metrics.py
from prometheus_client import Counter, Histogram, generate_latest
from fastmcp.tools import ToolContext

TOOL_CALLS = Counter(
    "mcp_tool_calls_total",
    "Total MCP tool calls",
    ["tool_name", "status"]
)
TOOL_DURATION = Histogram(
    "mcp_tool_duration_seconds",
    "MCP tool call duration",
    ["tool_name"]
)

class MetricsMiddleware:
    """Prometheus 监控中间件"""

    async def __call__(self, context: ToolContext, call_next):
        import time
        start = time.time()
        tool_name = context.tool_name

        try:
            result = await call_next(context)
            TOOL_CALLS.labels(tool_name=tool_name, status="success").inc()
            return result
        except Exception:
            TOOL_CALLS.labels(tool_name=tool_name, status="error").inc()
            raise
        finally:
            TOOL_DURATION.labels(tool_name=tool_name).observe(
                time.time() - start
            )

Step 6: Dockerfile 生产部署

dockerfile
# Dockerfile
FROM python:3.12-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl ca-certificates && \
    rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 非 root 用户运行
RUN useradd -m -u 1000 mcpuser
USER mcpuser

EXPOSE 8000
CMD ["python", "server.py"]
yaml
# docker-compose.yml
version: "3.9"
services:
  wiki-mcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - WIKI_API_KEY=${WIKI_API_KEY}
      - REDIS_URL=redis://redis:6379
      - OAUTH_ISSUER=https://sso.company.com
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 256M
          cpus: "0.5"

  redis:
    image: redis:7-alpine
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
    volumes:
      - redis_data:/data

  nginx:
    image: nginx:alpine
    ports:
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - wiki-mcp-server

volumes:
  redis_data:
nginx
# nginx.conf — 反向代理 + TLS 终止
upstream mcp_backend {
    least_conn;
    server wiki-mcp-server:8000;
    server wiki-mcp-server:8001;
    server wiki-mcp-server:8002;
}

server {
    listen 443 ssl http2;
    server_name mcp.company.com;

    ssl_certificate     /etc/ssl/certs/mcp.crt;
    ssl_certificate_key /etc/ssl/private/mcp.key;

    location / {
        proxy_pass http://mcp_backend;

        # Streamable HTTP 需要长连接支持
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_buffering off;          # 关键:禁用缓冲以支持流式传输
        proxy_read_timeout 300s;       # 长连接超时

        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }

    location /health {
        proxy_pass http://mcp_backend/health;
        access_log off;
    }
}

MCP Server 开发最佳实践

通过这个实战项目,总结 8 条生产级 MCP Server 开发原则:

1. 工具有一定"温度"的文档字符串

MCP Client(如 Claude、GPT)会将工具的 docstring 作为 prompt 的一部分发送给 LLM。写得越清晰、越有例子,LLM 就越能正确使用你的工具。

python
# ❌ 不好
@mcp.tool()
def search(q: str) -> str:
    """Search."""
    ...

# ✅ 好
@mcp.tool()
def wiki_search(
    query: str,
    space: Optional[str] = None,
    limit: int = 10
) -> dict:
    """
    搜索企业内部 Wiki 知识库。

    Args:
        query: 搜索关键词,支持 AND/OR/NOT 布尔运算。
               例: "Kubernetes AND deployment"
        space: 限定搜索空间。可选值: Engineering, Product, Design, Ops。
              不指定则搜索全部空间。
        limit: 返回结果数量。默认10,最大50。

    Returns:
        dict: {"results": [...], "total": int, "query": str}

    Examples:
        wiki_search("微服务架构最佳实践")
        wiki_search("CI/CD", space="Engineering", limit=5)
    """

2. 错误处理——返回结构化错误而非抛异常

python
# ✅ 永远返回结构化数据
@mcp.tool()
async def safe_search(query: str) -> dict:
    try:
        results = await api.search(query)
        return {"success": True, "data": results}
    except TimeoutError:
        return {"success": False, "error": "TIMEOUT", 
                "message": "搜索超时,请缩小搜索范围"}
    except PermissionError:
        return {"success": False, "error": "FORBIDDEN",
                "message": "你没有访问该空间的权限"}

3. 工具粒度适中

  • 工具太少 → LLM 难以精确调用
  • 工具太多 → LLM 选择困难
  • 黄金法则:每个工具做一件事,做好它

4. 参数验证前置

python
from pydantic import BaseModel, Field, validator

class SearchParams(BaseModel):
    query: str = Field(..., min_length=1, max_length=500)
    limit: int = Field(default=10, ge=1, le=50)
    space: Optional[str] = Field(default=None, pattern=r"^[A-Z]{2,10}$")

5. 监控可观测性

每个工具调用都应该记录:

  • 调用次数和频率
  • 平均响应时间、P99 延迟
  • 错误率和错误类型
  • 资源使用情况

6. 缓存策略

python
from functools import lru_cache
import hashlib

@mcp.tool()
async def cached_search(query: str) -> dict:
    cache_key = hashlib.sha256(query.encode()).hexdigest()

    # 先查 Redis 缓存
    cached = await redis.get(f"search:{cache_key}")
    if cached:
        return json.loads(cached)

    # 缓存未命中,执行搜索
    result = await api.search(query)

    # 写入缓存,TTL 5 分钟
    await redis.setex(f"search:{cache_key}", 300, json.dumps(result))
    return result

7. 速率限制

python
import asyncio
from collections import defaultdict
import time

class RateLimiter:
    def __init__(self, max_calls: int = 100, window: int = 60):
        self.max_calls = max_calls
        self.window = window
        self.calls = defaultdict(list)

    async def check(self, client_id: str):
        now = time.time()
        window_start = now - self.window
        self.calls[client_id] = [
            t for t in self.calls[client_id] if t > window_start
        ]
        if len(self.calls[client_id]) >= self.max_calls:
            raise Exception("Rate limit exceeded. Try again later.")
        self.calls[client_id].append(now)

8. 灰度发布

python
@mcp.tool(version="2.0.0-beta", experimental=True)
async def ai_summarize(doc_id: str) -> dict:
    """[Experimental] AI-generated document summary."""
    ...

@mcp.tool(version="2.0.0", replaces="ai_summarize@2.0.0-beta")
async def ai_summarize(doc_id: str) -> dict:
    """Generate AI summary for a document."""
    # 稳定版本实现
    ...

对比:FastMCP vs 官方 SDK

维度官方 Python SDKFastMCP 3.0
代码量(echo server)~50 行~8 行
版本控制需手动实现内置
认证授权需手动实现内置 OAuth/API Key/JWT
中间件系统内置
Streamable HTTP需自行集成开箱即用
类型安全手动校验Pydantic 自动校验
生产特性(限流/缓存)全部手写装饰器即用
社区活跃度官方维护非常活跃

总结

FastMCP 3.0 让 Python 构建生产级 MCP Server 变得前所未有的简单。它的核心价值在于:

  1. 极低的上手成本——30 行代码跑起来第一个 server
  2. 完整的生产特性——认证、授权、版本控制、监控开箱即用
  3. Streamable HTTP——解决了 SSE 的生产部署痛点
  4. Pythonic 风格——装饰器 + 类型注解,符合 Python 开发者直觉

如果你正在为 AI Agent 构建工具链,FastMCP 3.0 是目前最值得投入学习的 MCP 框架。

参考资料

上次更新于: