Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

2026 AI Agent 工程化:从原型到生产的 10 个关键决策

概述

2026 年,AI Agent 已从实验室走向生产环境。Anthropic 的 MCP 协议达到 1.0 里程碑,Google 推出 A2A(Agent-to-Agent)与 A2UI 协议,OpenAI 发布 Agents SDK 和 Secure MCP Tunnel。基础设施已经就位,但从 Demo 到 Production 的工程化鸿沟依然是最大挑战。

本文梳理了 10 个关键工程决策,每一个都直接影响 Agent 的可靠性、成本和安全。

决策全景图

AI Agent 生产化关键决策矩阵
════════════════════════════════════════════════════════════════

  架构层          执行层          运维层
  ┌──────────┐   ┌──────────┐   ┌──────────┐
  │ 1. 单/多Agent│   │ 5. 重试策略│   │ 8. 可观测性│
  │ 2. 编排模式  │   │ 6. 并发控制│   │ 9. 评估体系│
  │ 3. MCP 传输  │   │ 7. 安全护栏│   │10. 渐进部署│
  │ 4. 记忆架构  │   │           │   │           │
  └──────────┘   └──────────┘   └──────────┘

════════════════════════════════════════════════════════════════

决策 1:单 Agent 还是 Multi-Agent?

错误示范:一上来就搭 Multi-Agent,结果调度开销比业务逻辑还大。

python
# ❌ 过度设计:简单任务用 Multi-Agent
from crewai import Agent, Task, Crew

researcher = Agent(role="Researcher", goal="Research topics", llm=llm)
writer = Agent(role="Writer", goal="Write summary", llm=llm)
# 一个简单的查询 → 2 轮 LLM 调用 + 调度开销

决策框架

场景推荐架构理由
单一工具调用(查天气、查数据库)单 Agent + Function Call一次 LLM 调用即可
多步骤固定流程(ETL)单 Agent + 工具链不需要"角色分工"
需要不同专长领域2-3 Agent 协作保持简单
复杂自主研究Multi-Agent(3-5个)需要规划+执行+验证

经验法则:能用 Function Calling 解决的不用 Agent,能用单 Agent 解决的不用 Multi-Agent。

单 Agent 的正确姿势

python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

tools = [
    {
        "type": "function",
        "function": {
            "name": "search_database",
            "description": "Query the product database",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"},
                    "filters": {"type": "object"}
                },
                "required": ["query"]
            }
        }
    }
]

async def run_agent(user_query: str) -> str:
    messages = [{"role": "user", "content": user_query}]

    while True:
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
            tool_choice="auto"
        )

        msg = response.choices[0].message
        messages.append(msg)

        # 如果模型决定回复文本,结束循环
        if not msg.tool_calls:
            return msg.content

        # 执行工具调用
        for tool_call in msg.tool_calls:
            result = await execute_tool(tool_call)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": str(result)
            })

决策 2:编排模式 — 硬编码 vs LLM 自主决策

核心问题是:流程由谁控制?

编排模式光谱
═══════════════════════════════════════════════════

  确定性 ────────────────────────────► 灵活性

  [硬编码]      [状态机]        [LLM路由]      [自主Agent]
  DAG/Pipeline   LangGraph      Router Agent    AutoGPT
  100%可控       可控+可观测     灵活+可解释      灵活+黑盒
═══════════════════════════════════════════════════

推荐:90% 的场景用 状态机(LangGraph) 模式,同时保留 LLM 在关键节点的决策权。

python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class AgentState(TypedDict):
    query: str
    intent: str
    search_results: list
    final_response: str

def classify_intent(state: AgentState) -> Literal["search", "calculate", "chat"]:
    """LLM 分类意图"""
    response = llm.invoke(
        f"Classify this query into one of [search, calculate, chat]: {state['query']}"
    )
    return response.strip()

def search_handler(state: AgentState) -> AgentState:
    """执行搜索"""
    results = search_api(state["query"])
    return {"search_results": results}

def calculate_handler(state: AgentState) -> AgentState:
    """执行计算"""
    result = eval_expression(state["query"])
    return {"final_response": str(result)}

# 构建状态图
workflow = StateGraph(AgentState)
workflow.add_node("classify", classify_intent)
workflow.add_node("search", search_handler)
workflow.add_node("calculate", calculate_handler)
workflow.add_node("respond", generate_response)

workflow.add_conditional_edges("classify", {
    "search": "search",
    "calculate": "calculate",
    "chat": "respond"
})
workflow.add_edge("search", "respond")
workflow.add_edge("calculate", END)

app = workflow.compile()

为什么不用全自动 Agent? 自主 Agent 容易出现循环调用、成本爆炸、结果不可预测。生产环境首选可控编排。

决策 3:MCP 传输层选择

2026 年 MCP 协议支持三种传输模式:

传输方式延迟适用场景安全性
stdio最低本地工具、CLI进程隔离
SSE (HTTP)中等同网络服务需内网部署
Streamable HTTP中等远程服务需认证
Secure Tunnel较高跨网络/企业E2E 加密
python
# 本地 MCP Server 连接示例
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def connect_local_tool():
    server_params = StdioServerParameters(
        command="python",
        args=["-m", "my_mcp_server"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # 发现可用工具
            tools = await session.list_tools()
            print(f"Available tools: {[t.name for t in tools.tools]}")

            # 调用工具
            result = await session.call_tool(
                "search_files",
                arguments={"pattern": "*.go", "path": "/src"}
            )
            return result

生产环境推荐:本地工具用 stdio(零网络开销),远程工具用 Secure Tunnel(特别是跨 VPC 场景)。

决策 4:记忆架构 — 短期 vs 长期

Agent 记忆分层架构
═══════════════════════════════════════════════════

  [工作记忆]          对话窗口内的上下文
  messages[-N:]       ← 受 token 限制

  [短期记忆]          跨对话的摘要
  Redis/Memcached     ← TTL 过期

  [长期记忆]          向量数据库中的知识
  pgvector/Qdrant     ← 语义检索

  [过程记忆]          任务执行历史
  PostgreSQL          ← 经验积累
═══════════════════════════════════════════════════
python
class AgentMemory:
    def __init__(self, vector_store, db_session):
        self.vector_store = vector_store
        self.db = db_session

    async def remember(self, session_id: str, context: str) -> None:
        """存储对话上下文到长期记忆"""
        embedding = await self.embed(context)
        await self.vector_store.upsert(
            collection="agent_memory",
            points=[{
                "id": f"{session_id}:{uuid4()}",
                "vector": embedding,
                "payload": {
                    "session_id": session_id,
                    "content": context,
                    "timestamp": time.time()
                }
            }]
        )

    async def recall(self, session_id: str, query: str, top_k: int = 5) -> list[str]:
        """语义检索相关记忆"""
        embedding = await self.embed(query)
        results = await self.vector_store.search(
            collection="agent_memory",
            query_vector=embedding,
            filter_condition={"must": [{"key": "session_id", "match": {"value": session_id}}]},
            limit=top_k
        )
        return [hit.payload["content"] for hit in results]

决策 5:工具调用的可靠性工程

LLM 调用工具时最常见的问题:

python
# ❌ 这些问题都会导致 Agent 崩溃
# 1. 工具返回格式不符合预期
# 2. 工具执行超时
# 3. 工具抛出异常
# 4. 参数格式错误(LLM 幻觉)
# 5. 返回结果过大超出 token 限制

生产级工具包装器

python
import asyncio
import json
from functools import wraps
from typing import Any, Callable

def production_tool(
    timeout: float = 30.0,
    max_retries: int = 2,
    max_result_length: int = 4000,
    validate_params: bool = True
):
    """生产级工具装饰器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(**kwargs):
            last_error = None

            for attempt in range(max_retries + 1):
                try:
                    # 超时控制
                    result = await asyncio.wait_for(
                        func(**kwargs),
                        timeout=timeout
                    )

                    # 结果截断
                    result_str = str(result)
                    if len(result_str) > max_result_length:
                        result_str = result_str[:max_result_length] + "\n...[truncated]"

                    return {
                        "success": True,
                        "data": result_str,
                        "attempt": attempt + 1
                    }

                except asyncio.TimeoutError:
                    last_error = f"Tool timeout after {timeout}s"
                except Exception as e:
                    last_error = f"Tool error: {type(e).__name__}: {str(e)}"

                if attempt < max_retries:
                    await asyncio.sleep(2 ** attempt)  # 指数退避

            return {
                "success": False,
                "error": last_error,
                "attempts": max_retries + 1
            }

        return wrapper
    return decorator

# 使用示例
@production_tool(timeout=10, max_retries=2)
async def search_documents(query: str, limit: int = 10):
    # 业务逻辑
    results = await elasticsearch.search(query, size=limit)
    return [hit["_source"] for hit in results["hits"]["hits"]]

输出格式约束

使用 json_schemastructured outputs 强制 LLM 返回结构化数据:

python
from pydantic import BaseModel

class SearchPlan(BaseModel):
    keywords: list[str]
    filters: dict[str, str] | None
    sort_by: str = "relevance"
    max_results: int = 10

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Find recent Go articles about concurrency"}],
    response_format={
        "type": "json_schema",
        "json_schema": {
            "name": "search_plan",
            "schema": SearchPlan.model_json_schema()
        }
    }
)

plan = SearchPlan.model_validate_json(response.choices[0].message.content)

决策 6:并发控制与速率限制

Agent 的并发场景:同时调用多个工具、处理多个用户请求。

python
import asyncio
from asyncio import Semaphore

class AgentRateLimiter:
    def __init__(self, max_concurrent: int = 10, max_per_minute: int = 60):
        self._concurrent = Semaphore(max_concurrent)
        self._minute_bucket = asyncio.Semaphore(max_per_minute)
        # 每分钟重置令牌桶
        asyncio.create_task(self._refill_bucket())

    async def _refill_bucket(self):
        while True:
            await asyncio.sleep(60)
            # 重置令牌(简化实现)
            for _ in range(self._minute_bucket._value):
                try:
                    self._minute_bucket.release()
                except ValueError:
                    pass

    async def acquire(self):
        await self._concurrent.acquire()
        await self._minute_bucket.acquire()

    def release(self):
        self._concurrent.release()
        # minute bucket 不立即释放,等待 refill

# 使用
limiter = AgentRateLimiter(max_concurrent=5, max_per_minute=30)

async def process_user_query(query: str):
    await limiter.acquire()
    try:
        return await run_agent(query)
    finally:
        limiter.release()

决策 7:安全护栏(Guardrails)

Agent 安全的三层防线:

Agent 安全护栏架构
═══════════════════════════════════════════════════

  第 1 层:输入过滤
  ┌─────────────────────────────────────┐
  │ • Prompt Injection 检测             │
  │ • 敏感词过滤                        │
  │ • 输入长度限制                      │
  └──────────────┬──────────────────────┘

  第 2 层:执行沙箱
  ┌─────────────────────────────────────┐
  │ • 工具调用白名单                    │
  │ • 参数验证与净化                    │
  │ • 权限最小化(只读/只写分离)       │
  │ • 操作审计日志                      │
  └──────────────┬──────────────────────┘

  第 3 层:输出审查
  ┌─────────────────────────────────────┐
  │ • 敏感信息脱敏(PII/密钥)          │
  │ • 内容安全检测                      │
  │ • Human-in-the-loop 审批            │
  └─────────────────────────────────────┘

═══════════════════════════════════════════════════
python
class AgentGuardrails:
    DANGEROUS_PATTERNS = [
        r"rm\s+-rf\s+/",
        r"DROP\s+TABLE",
        r"DELETE\s+FROM",
        r"os\.system\(|subprocess\.|eval\(|exec\(",
    ]

    @staticmethod
    def validate_input(user_input: str) -> tuple[bool, str]:
        """检查注入攻击"""
        for pattern in AgentGuardrails.DANGEROUS_PATTERNS:
            if re.search(pattern, user_input, re.IGNORECASE):
                return False, f"Blocked dangerous pattern: {pattern}"
        return True, ""

    @staticmethod
    def validate_tool_call(tool_name: str, args: dict) -> tuple[bool, str]:
        """工具调用白名单和参数验证"""
        ALLOWED_TOOLS = {"search_docs", "query_db", "send_email", "create_task"}

        if tool_name not in ALLOWED_TOOLS:
            return False, f"Tool '{tool_name}' is not in allowed list"

        if tool_name == "send_email":
            if len(args.get("to", "")) > 50:
                return False, "Email recipient too long"

        return True, ""

    @staticmethod
    def sanitize_output(output: str) -> str:
        """脱敏输出中的敏感信息"""
        # 脱敏 API 密钥
        output = re.sub(r'sk-[A-Za-z0-9]{32,}', '[API_KEY_REDACTED]', output)
        # 脱敏邮箱
        output = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
                        '[EMAIL_REDACTED]', output)
        return output

决策 8:可观测性

Agent 的调试难度远超传统应用,因为决策链路涉及 LLM 调用、工具执行、多次推理循环。

python
import time
import structlog
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

logger = structlog.get_logger()
tracer = trace.get_tracer(__name__)

async def observed_tool_call(tool_name: str, args: dict) -> dict:
    """带完整可观测性的工具调用"""
    start = time.time()
    span_id = str(uuid4())[:8]

    logger.info("tool_call_start",
                tool=tool_name, args=args, span_id=span_id)

    with tracer.start_as_current_span(f"tool.{tool_name}") as span:
        span.set_attribute("tool.name", tool_name)
        span.set_attribute("tool.args", str(args)[:1000])

        try:
            result = await execute_tool(tool_name, args)
            elapsed = time.time() - start

            span.set_attribute("tool.success", True)
            span.set_attribute("tool.duration_ms", elapsed * 1000)

            logger.info("tool_call_success",
                        tool=tool_name, duration_ms=elapsed*1000,
                        result_len=len(str(result)),
                        span_id=span_id)

            return result

        except Exception as e:
            elapsed = time.time() - start
            span.set_status(Status(StatusCode.ERROR))
            span.record_exception(e)

            logger.error("tool_call_failed",
                         tool=tool_name, error=str(e),
                         duration_ms=elapsed*1000,
                         span_id=span_id)
            raise

关键指标仪表盘

指标含义告警阈值
tool_latency_p99工具调用 P99 延迟> 30s
tool_error_rate工具调用错误率> 5%
llm_tokens_per_query每次查询 token 消耗> 10K
agent_loop_iterationsAgent 推理循环次数> 10
fallback_rate触发降级比例> 20%

决策 9:评估体系

没有评估就没有迭代方向。

python
class AgentEval:
    """Agent 评估框架"""

    METRICS = {
        "task_success_rate": "任务成功率",
        "tool_call_accuracy": "工具调用准确率",
        "response_relevance": "回复相关性",
        "latency_p95": "P95 延迟",
        "token_efficiency": "Token 效率"
    }

    @staticmethod
    async def evaluate(
        agent,
        test_cases: list[dict],
        judge_model: str = "gpt-4o"
    ) -> dict:
        """
        test_cases: [{"input": "...", "expected_tools": [...], "expected_output": "..."}]
        """
        results = []

        for case in test_cases:
            start = time.time()

            try:
                output = await agent.run(case["input"])
                latency = time.time() - start

                # LLM-as-Judge 评估回复质量
                judge_score = await AgentEval._judge(
                    case["input"],
                    case["expected_output"],
                    output,
                    judge_model
                )

                results.append({
                    "input": case["input"],
                    "output": output,
                    "latency": latency,
                    "judge_score": judge_score,
                    "passed": judge_score > 0.7
                })

            except Exception as e:
                results.append({
                    "input": case["input"],
                    "error": str(e),
                    "passed": False
                })

        success_rate = sum(1 for r in results if r["passed"]) / len(results)
        avg_latency = sum(r.get("latency", 0) for r in results) / len(results)

        return {
            "success_rate": success_rate,
            "avg_latency": avg_latency,
            "details": results
        }

    @staticmethod
    async def _judge(input_text: str, expected: str, actual: str, model: str) -> float:
        prompt = f"""Rate the quality of the Agent's response on a scale of 0-1.

User Query: {input_text}
Expected: {expected}
Actual Response: {actual}

Score based on accuracy, completeness, and relevance. Return only a number."""

        response = await llm_call(prompt, model=model)
        try:
            return float(response.strip())
        except ValueError:
            return 0.5

决策 10:渐进式部署

Agent 灰度发布策略
═══════════════════════════════════════════════════

  [阶段 1]        [阶段 2]        [阶段 3]        [阶段 4]
  Shadow          Canary          A/B              Full
  ─────────       ─────────       ─────────        ─────────
  影子模式        金丝雀           流量对比          全量
  只记录不响应    5% 流量         50% vs 旧版      100%
  验证正确性      监控异常        对比指标          持续监控
═══════════════════════════════════════════════════
python
class GradualRollout:
    def __init__(self):
        self.new_agent = NewAgent()
        self.old_agent = OldAgent()
        self.shadow_mode = True
        self.canary_pct = 0.05

    async def handle(self, user_id: str, query: str) -> str:
        # 阶段 1: Shadow — 新 Agent 静默执行,只记录结果
        if self.shadow_mode:
            asyncio.create_task(self._shadow_run(query))
            return await self.old_agent.run(query)

        # 阶段 2: Canary — 小比例流量走新 Agent
        if self._in_canary(user_id):
            try:
                return await self.new_agent.run(query)
            except Exception:
                return await self.old_agent.run(query)  # 降级

        return await self.old_agent.run(query)

    async def _shadow_run(self, query: str):
        """影子模式:记录新 Agent 结果用于对比"""
        try:
            new_result = await self.new_agent.run(query)
            self._log_comparison(query, new_result)
        except Exception as e:
            logger.warning("shadow_agent_error", error=str(e))

    def _in_canary(self, user_id: str) -> bool:
        return hash(user_id) % 100 < self.canary_pct * 100

总结

AI Agent 生产化的核心不是 AI 能力本身,而是工程化基础设施

Agent 生产化成熟度模型
═══════════════════════════════════════════════════

  Level 1 — Demo       单次对话演示
  Level 2 — Script     固定流程 + 工具调用
  Level 3 — App        错误处理 + 重试 + 监控
  Level 4 — Service    多 Agent + 记忆 + 评估
  Level 5 — Platform   自优化 + 灰度 + 成本治理

═══════════════════════════════════════════════════

10 个决策不是先后顺序,而是一个整体 —— 缺任何一个,生产环境都会用故障来教你补课。

参考资料

上次更新于: