2026 AI Agent 工程化:从原型到生产的 10 个关键决策
概述
2026 年,AI Agent 已从实验室走向生产环境。Anthropic 的 MCP 协议达到 1.0 里程碑,Google 推出 A2A(Agent-to-Agent)与 A2UI 协议,OpenAI 发布 Agents SDK 和 Secure MCP Tunnel。基础设施已经就位,但从 Demo 到 Production 的工程化鸿沟依然是最大挑战。
本文梳理了 10 个关键工程决策,每一个都直接影响 Agent 的可靠性、成本和安全。
决策全景图
AI Agent 生产化关键决策矩阵
════════════════════════════════════════════════════════════════
架构层 执行层 运维层
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 1. 单/多Agent│ │ 5. 重试策略│ │ 8. 可观测性│
│ 2. 编排模式 │ │ 6. 并发控制│ │ 9. 评估体系│
│ 3. MCP 传输 │ │ 7. 安全护栏│ │10. 渐进部署│
│ 4. 记忆架构 │ │ │ │ │
└──────────┘ └──────────┘ └──────────┘
════════════════════════════════════════════════════════════════决策 1:单 Agent 还是 Multi-Agent?
错误示范:一上来就搭 Multi-Agent,结果调度开销比业务逻辑还大。
# ❌ 过度设计:简单任务用 Multi-Agent
from crewai import Agent, Task, Crew
researcher = Agent(role="Researcher", goal="Research topics", llm=llm)
writer = Agent(role="Writer", goal="Write summary", llm=llm)
# 一个简单的查询 → 2 轮 LLM 调用 + 调度开销决策框架:
| 场景 | 推荐架构 | 理由 |
|---|---|---|
| 单一工具调用(查天气、查数据库) | 单 Agent + Function Call | 一次 LLM 调用即可 |
| 多步骤固定流程(ETL) | 单 Agent + 工具链 | 不需要"角色分工" |
| 需要不同专长领域 | 2-3 Agent 协作 | 保持简单 |
| 复杂自主研究 | Multi-Agent(3-5个) | 需要规划+执行+验证 |
经验法则:能用 Function Calling 解决的不用 Agent,能用单 Agent 解决的不用 Multi-Agent。
单 Agent 的正确姿势
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
tools = [
{
"type": "function",
"function": {
"name": "search_database",
"description": "Query the product database",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"filters": {"type": "object"}
},
"required": ["query"]
}
}
}
]
async def run_agent(user_query: str) -> str:
messages = [{"role": "user", "content": user_query}]
while True:
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
tool_choice="auto"
)
msg = response.choices[0].message
messages.append(msg)
# 如果模型决定回复文本,结束循环
if not msg.tool_calls:
return msg.content
# 执行工具调用
for tool_call in msg.tool_calls:
result = await execute_tool(tool_call)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})决策 2:编排模式 — 硬编码 vs LLM 自主决策
核心问题是:流程由谁控制?
编排模式光谱
═══════════════════════════════════════════════════
确定性 ────────────────────────────► 灵活性
[硬编码] [状态机] [LLM路由] [自主Agent]
DAG/Pipeline LangGraph Router Agent AutoGPT
100%可控 可控+可观测 灵活+可解释 灵活+黑盒
═══════════════════════════════════════════════════推荐:90% 的场景用 状态机(LangGraph) 模式,同时保留 LLM 在关键节点的决策权。
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class AgentState(TypedDict):
query: str
intent: str
search_results: list
final_response: str
def classify_intent(state: AgentState) -> Literal["search", "calculate", "chat"]:
"""LLM 分类意图"""
response = llm.invoke(
f"Classify this query into one of [search, calculate, chat]: {state['query']}"
)
return response.strip()
def search_handler(state: AgentState) -> AgentState:
"""执行搜索"""
results = search_api(state["query"])
return {"search_results": results}
def calculate_handler(state: AgentState) -> AgentState:
"""执行计算"""
result = eval_expression(state["query"])
return {"final_response": str(result)}
# 构建状态图
workflow = StateGraph(AgentState)
workflow.add_node("classify", classify_intent)
workflow.add_node("search", search_handler)
workflow.add_node("calculate", calculate_handler)
workflow.add_node("respond", generate_response)
workflow.add_conditional_edges("classify", {
"search": "search",
"calculate": "calculate",
"chat": "respond"
})
workflow.add_edge("search", "respond")
workflow.add_edge("calculate", END)
app = workflow.compile()为什么不用全自动 Agent? 自主 Agent 容易出现循环调用、成本爆炸、结果不可预测。生产环境首选可控编排。
决策 3:MCP 传输层选择
2026 年 MCP 协议支持三种传输模式:
| 传输方式 | 延迟 | 适用场景 | 安全性 |
|---|---|---|---|
| stdio | 最低 | 本地工具、CLI | 进程隔离 |
| SSE (HTTP) | 中等 | 同网络服务 | 需内网部署 |
| Streamable HTTP | 中等 | 远程服务 | 需认证 |
| Secure Tunnel | 较高 | 跨网络/企业 | E2E 加密 |
# 本地 MCP Server 连接示例
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def connect_local_tool():
server_params = StdioServerParameters(
command="python",
args=["-m", "my_mcp_server"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# 发现可用工具
tools = await session.list_tools()
print(f"Available tools: {[t.name for t in tools.tools]}")
# 调用工具
result = await session.call_tool(
"search_files",
arguments={"pattern": "*.go", "path": "/src"}
)
return result生产环境推荐:本地工具用 stdio(零网络开销),远程工具用 Secure Tunnel(特别是跨 VPC 场景)。
决策 4:记忆架构 — 短期 vs 长期
Agent 记忆分层架构
═══════════════════════════════════════════════════
[工作记忆] 对话窗口内的上下文
messages[-N:] ← 受 token 限制
[短期记忆] 跨对话的摘要
Redis/Memcached ← TTL 过期
[长期记忆] 向量数据库中的知识
pgvector/Qdrant ← 语义检索
[过程记忆] 任务执行历史
PostgreSQL ← 经验积累
═══════════════════════════════════════════════════class AgentMemory:
def __init__(self, vector_store, db_session):
self.vector_store = vector_store
self.db = db_session
async def remember(self, session_id: str, context: str) -> None:
"""存储对话上下文到长期记忆"""
embedding = await self.embed(context)
await self.vector_store.upsert(
collection="agent_memory",
points=[{
"id": f"{session_id}:{uuid4()}",
"vector": embedding,
"payload": {
"session_id": session_id,
"content": context,
"timestamp": time.time()
}
}]
)
async def recall(self, session_id: str, query: str, top_k: int = 5) -> list[str]:
"""语义检索相关记忆"""
embedding = await self.embed(query)
results = await self.vector_store.search(
collection="agent_memory",
query_vector=embedding,
filter_condition={"must": [{"key": "session_id", "match": {"value": session_id}}]},
limit=top_k
)
return [hit.payload["content"] for hit in results]决策 5:工具调用的可靠性工程
LLM 调用工具时最常见的问题:
# ❌ 这些问题都会导致 Agent 崩溃
# 1. 工具返回格式不符合预期
# 2. 工具执行超时
# 3. 工具抛出异常
# 4. 参数格式错误(LLM 幻觉)
# 5. 返回结果过大超出 token 限制生产级工具包装器
import asyncio
import json
from functools import wraps
from typing import Any, Callable
def production_tool(
timeout: float = 30.0,
max_retries: int = 2,
max_result_length: int = 4000,
validate_params: bool = True
):
"""生产级工具装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(**kwargs):
last_error = None
for attempt in range(max_retries + 1):
try:
# 超时控制
result = await asyncio.wait_for(
func(**kwargs),
timeout=timeout
)
# 结果截断
result_str = str(result)
if len(result_str) > max_result_length:
result_str = result_str[:max_result_length] + "\n...[truncated]"
return {
"success": True,
"data": result_str,
"attempt": attempt + 1
}
except asyncio.TimeoutError:
last_error = f"Tool timeout after {timeout}s"
except Exception as e:
last_error = f"Tool error: {type(e).__name__}: {str(e)}"
if attempt < max_retries:
await asyncio.sleep(2 ** attempt) # 指数退避
return {
"success": False,
"error": last_error,
"attempts": max_retries + 1
}
return wrapper
return decorator
# 使用示例
@production_tool(timeout=10, max_retries=2)
async def search_documents(query: str, limit: int = 10):
# 业务逻辑
results = await elasticsearch.search(query, size=limit)
return [hit["_source"] for hit in results["hits"]["hits"]]输出格式约束
使用 json_schema 或 structured outputs 强制 LLM 返回结构化数据:
from pydantic import BaseModel
class SearchPlan(BaseModel):
keywords: list[str]
filters: dict[str, str] | None
sort_by: str = "relevance"
max_results: int = 10
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Find recent Go articles about concurrency"}],
response_format={
"type": "json_schema",
"json_schema": {
"name": "search_plan",
"schema": SearchPlan.model_json_schema()
}
}
)
plan = SearchPlan.model_validate_json(response.choices[0].message.content)决策 6:并发控制与速率限制
Agent 的并发场景:同时调用多个工具、处理多个用户请求。
import asyncio
from asyncio import Semaphore
class AgentRateLimiter:
def __init__(self, max_concurrent: int = 10, max_per_minute: int = 60):
self._concurrent = Semaphore(max_concurrent)
self._minute_bucket = asyncio.Semaphore(max_per_minute)
# 每分钟重置令牌桶
asyncio.create_task(self._refill_bucket())
async def _refill_bucket(self):
while True:
await asyncio.sleep(60)
# 重置令牌(简化实现)
for _ in range(self._minute_bucket._value):
try:
self._minute_bucket.release()
except ValueError:
pass
async def acquire(self):
await self._concurrent.acquire()
await self._minute_bucket.acquire()
def release(self):
self._concurrent.release()
# minute bucket 不立即释放,等待 refill
# 使用
limiter = AgentRateLimiter(max_concurrent=5, max_per_minute=30)
async def process_user_query(query: str):
await limiter.acquire()
try:
return await run_agent(query)
finally:
limiter.release()决策 7:安全护栏(Guardrails)
Agent 安全的三层防线:
Agent 安全护栏架构
═══════════════════════════════════════════════════
第 1 层:输入过滤
┌─────────────────────────────────────┐
│ • Prompt Injection 检测 │
│ • 敏感词过滤 │
│ • 输入长度限制 │
└──────────────┬──────────────────────┘
▼
第 2 层:执行沙箱
┌─────────────────────────────────────┐
│ • 工具调用白名单 │
│ • 参数验证与净化 │
│ • 权限最小化(只读/只写分离) │
│ • 操作审计日志 │
└──────────────┬──────────────────────┘
▼
第 3 层:输出审查
┌─────────────────────────────────────┐
│ • 敏感信息脱敏(PII/密钥) │
│ • 内容安全检测 │
│ • Human-in-the-loop 审批 │
└─────────────────────────────────────┘
═══════════════════════════════════════════════════class AgentGuardrails:
DANGEROUS_PATTERNS = [
r"rm\s+-rf\s+/",
r"DROP\s+TABLE",
r"DELETE\s+FROM",
r"os\.system\(|subprocess\.|eval\(|exec\(",
]
@staticmethod
def validate_input(user_input: str) -> tuple[bool, str]:
"""检查注入攻击"""
for pattern in AgentGuardrails.DANGEROUS_PATTERNS:
if re.search(pattern, user_input, re.IGNORECASE):
return False, f"Blocked dangerous pattern: {pattern}"
return True, ""
@staticmethod
def validate_tool_call(tool_name: str, args: dict) -> tuple[bool, str]:
"""工具调用白名单和参数验证"""
ALLOWED_TOOLS = {"search_docs", "query_db", "send_email", "create_task"}
if tool_name not in ALLOWED_TOOLS:
return False, f"Tool '{tool_name}' is not in allowed list"
if tool_name == "send_email":
if len(args.get("to", "")) > 50:
return False, "Email recipient too long"
return True, ""
@staticmethod
def sanitize_output(output: str) -> str:
"""脱敏输出中的敏感信息"""
# 脱敏 API 密钥
output = re.sub(r'sk-[A-Za-z0-9]{32,}', '[API_KEY_REDACTED]', output)
# 脱敏邮箱
output = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
'[EMAIL_REDACTED]', output)
return output决策 8:可观测性
Agent 的调试难度远超传统应用,因为决策链路涉及 LLM 调用、工具执行、多次推理循环。
import time
import structlog
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
logger = structlog.get_logger()
tracer = trace.get_tracer(__name__)
async def observed_tool_call(tool_name: str, args: dict) -> dict:
"""带完整可观测性的工具调用"""
start = time.time()
span_id = str(uuid4())[:8]
logger.info("tool_call_start",
tool=tool_name, args=args, span_id=span_id)
with tracer.start_as_current_span(f"tool.{tool_name}") as span:
span.set_attribute("tool.name", tool_name)
span.set_attribute("tool.args", str(args)[:1000])
try:
result = await execute_tool(tool_name, args)
elapsed = time.time() - start
span.set_attribute("tool.success", True)
span.set_attribute("tool.duration_ms", elapsed * 1000)
logger.info("tool_call_success",
tool=tool_name, duration_ms=elapsed*1000,
result_len=len(str(result)),
span_id=span_id)
return result
except Exception as e:
elapsed = time.time() - start
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
logger.error("tool_call_failed",
tool=tool_name, error=str(e),
duration_ms=elapsed*1000,
span_id=span_id)
raise关键指标仪表盘:
| 指标 | 含义 | 告警阈值 |
|---|---|---|
tool_latency_p99 | 工具调用 P99 延迟 | > 30s |
tool_error_rate | 工具调用错误率 | > 5% |
llm_tokens_per_query | 每次查询 token 消耗 | > 10K |
agent_loop_iterations | Agent 推理循环次数 | > 10 |
fallback_rate | 触发降级比例 | > 20% |
决策 9:评估体系
没有评估就没有迭代方向。
class AgentEval:
"""Agent 评估框架"""
METRICS = {
"task_success_rate": "任务成功率",
"tool_call_accuracy": "工具调用准确率",
"response_relevance": "回复相关性",
"latency_p95": "P95 延迟",
"token_efficiency": "Token 效率"
}
@staticmethod
async def evaluate(
agent,
test_cases: list[dict],
judge_model: str = "gpt-4o"
) -> dict:
"""
test_cases: [{"input": "...", "expected_tools": [...], "expected_output": "..."}]
"""
results = []
for case in test_cases:
start = time.time()
try:
output = await agent.run(case["input"])
latency = time.time() - start
# LLM-as-Judge 评估回复质量
judge_score = await AgentEval._judge(
case["input"],
case["expected_output"],
output,
judge_model
)
results.append({
"input": case["input"],
"output": output,
"latency": latency,
"judge_score": judge_score,
"passed": judge_score > 0.7
})
except Exception as e:
results.append({
"input": case["input"],
"error": str(e),
"passed": False
})
success_rate = sum(1 for r in results if r["passed"]) / len(results)
avg_latency = sum(r.get("latency", 0) for r in results) / len(results)
return {
"success_rate": success_rate,
"avg_latency": avg_latency,
"details": results
}
@staticmethod
async def _judge(input_text: str, expected: str, actual: str, model: str) -> float:
prompt = f"""Rate the quality of the Agent's response on a scale of 0-1.
User Query: {input_text}
Expected: {expected}
Actual Response: {actual}
Score based on accuracy, completeness, and relevance. Return only a number."""
response = await llm_call(prompt, model=model)
try:
return float(response.strip())
except ValueError:
return 0.5决策 10:渐进式部署
Agent 灰度发布策略
═══════════════════════════════════════════════════
[阶段 1] [阶段 2] [阶段 3] [阶段 4]
Shadow Canary A/B Full
───────── ───────── ───────── ─────────
影子模式 金丝雀 流量对比 全量
只记录不响应 5% 流量 50% vs 旧版 100%
验证正确性 监控异常 对比指标 持续监控
═══════════════════════════════════════════════════class GradualRollout:
def __init__(self):
self.new_agent = NewAgent()
self.old_agent = OldAgent()
self.shadow_mode = True
self.canary_pct = 0.05
async def handle(self, user_id: str, query: str) -> str:
# 阶段 1: Shadow — 新 Agent 静默执行,只记录结果
if self.shadow_mode:
asyncio.create_task(self._shadow_run(query))
return await self.old_agent.run(query)
# 阶段 2: Canary — 小比例流量走新 Agent
if self._in_canary(user_id):
try:
return await self.new_agent.run(query)
except Exception:
return await self.old_agent.run(query) # 降级
return await self.old_agent.run(query)
async def _shadow_run(self, query: str):
"""影子模式:记录新 Agent 结果用于对比"""
try:
new_result = await self.new_agent.run(query)
self._log_comparison(query, new_result)
except Exception as e:
logger.warning("shadow_agent_error", error=str(e))
def _in_canary(self, user_id: str) -> bool:
return hash(user_id) % 100 < self.canary_pct * 100总结
AI Agent 生产化的核心不是 AI 能力本身,而是工程化基础设施:
Agent 生产化成熟度模型
═══════════════════════════════════════════════════
Level 1 — Demo 单次对话演示
Level 2 — Script 固定流程 + 工具调用
Level 3 — App 错误处理 + 重试 + 监控
Level 4 — Service 多 Agent + 记忆 + 评估
Level 5 — Platform 自优化 + 灰度 + 成本治理
═══════════════════════════════════════════════════10 个决策不是先后顺序,而是一个整体 —— 缺任何一个,生产环境都会用故障来教你补课。

