Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

AI 辅助渗透测试实战:Multi-Agent 自动化攻防框架搭建

⚠️ 本文所有技术内容仅适用于合法授权的安全测试场景:CTF 竞赛、授权渗透测试、企业安全自检。未经授权对他人系统使用本文技术属于违法行为。

2026 年,AI 在安全领域的渗透已从"辅助写脚本"升级到全自动漏洞挖掘与利用。长亭科技、腾讯云等安全厂商相继发布 Multi-Agent 渗透测试框架,在腾讯云黑盲松黑客松中取得了超越人类选手的成绩。

本文从工程实现角度,带你深入了解这套技术的核心架构和落地方法。

一、传统渗透测试的瓶颈

传统渗透测试面临四大痛点:

传统渗透测试模型
─────────────────────────────────────────────
效率瓶颈    → 高度依赖安全专家人工经验
覆盖不全    → 难以系统覆盖所有攻击面
响应迟缓    → 无法快速适应动态漏洞环境
攻击链弱    → 单点发现难以串联成完整攻击链
─────────────────────────────────────────────

即使是经验丰富的渗透测试工程师,面对复杂 Web 应用,往往也需要数天才能完成一次全面测试。而 AI Multi-Agent 框架可以在数小时内完成同等工作。


二、Multi-Agent 渗透测试架构

2.1 整体架构图

┌─────────────────────────────────────────────────────┐
│                   Orchestrator                        │
│              (任务规划与协调层)                        │
└──────┬──────────────┬──────────────┬─────────────────┘
       │              │              │
       ▼              ▼              ▼
┌──────────┐  ┌──────────────┐  ┌──────────────┐
│Plan Agent│  │ Vuln Agents  │  │ Report Agent │
│(攻击规划) │  │ (专项漏洞)   │  │ (报告生成)   │
└──────────┘  └──────┬───────┘  └──────────────┘

        ┌────────────┼────────────┐
        ▼            ▼            ▼
  ┌──────────┐ ┌──────────┐ ┌──────────┐
  │XSS Agent │ │SQLI Agent│ │SSRF Agent│
  └──────────┘ └──────────┘ └──────────┘
        │            │            │
        ▼            ▼            ▼
  ┌─────────────────────────────────────┐
  │         MCP Tool Layer              │
  │  (nmap / sqlmap / xray / burp ...)  │
  └─────────────────────────────────────┘

2.2 三大核心组件

组件职责技术特点
Plan Agent统筹整体攻击流程生成结构化攻击方案(XML 格式指令)
专项漏洞 Agent垂直化漏洞利用XSS、SQLi、SSRF、SSTI 等专用模块
并行执行引擎多任务资源调度共享内存 + 最大并发控制

三、DSPy:用编程替代 Prompt 工程

3.1 传统 Prompt 工程的问题

传统做法是写长篇 Prompt 来引导 AI 行为,但 Prompt 脆弱、难以调试、版本混乱。

**DSPy(Programming-not-prompting)**核心理念:用 Python 代码定义 Agent 的行为逻辑,让 LLM 只负责"填空",而不是"指挥"。

3.2 DSPy 在渗透测试中的应用

python
import dspy

class VulnScanSignature(dspy.Signature):
    """分析 HTTP 响应,判断是否存在 XSS 漏洞"""
    
    response_body: str = dspy.InputField(desc="HTTP 响应体")
    payload_used: str = dspy.InputField(desc="测试时使用的 Payload")
    
    is_vulnerable: bool = dspy.OutputField(desc="是否存在 XSS 漏洞")
    evidence: str = dspy.OutputField(desc="漏洞证据或判断依据")
    next_payload: str = dspy.OutputField(desc="建议下一步测试的 Payload")

class XSSAgent(dspy.Module):
    def __init__(self):
        self.analyze = dspy.ChainOfThought(VulnScanSignature)
        self.payloads = self._load_payload_db()
    
    def forward(self, target_url: str, response_body: str):
        for payload in self.payloads:
            result = self.analyze(
                response_body=response_body,
                payload_used=payload
            )
            if result.is_vulnerable:
                return {
                    "status": "vulnerable",
                    "payload": payload,
                    "evidence": result.evidence
                }
            # 智能切换到下一个 payload
            payload = result.next_payload
        
        return {"status": "not_found"}

关键优势

  • AI 负责语义理解(判断漏洞),代码负责流程控制(循环、条件、并发)
  • 行为可测试、可复现、可版本化管理
  • 与常规软件工程实践无缝融合

四、实战:自动化 XSS 漏洞挖掘

4.1 反射型 XSS 自动通杀方案

python
import asyncio
from typing import AsyncIterator
import httpx
import dspy

class ReflectedXSSScanner:
    """反射型 XSS 全自动扫描器"""
    
    # 分层 Payload 策略:从简单到绕过
    PAYLOAD_TIERS = [
        # Tier 1:基础探针
        ["<script>alert(1)</script>", "<img src=x onerror=alert(1)>"],
        # Tier 2:引号绕过
        ["'\"><script>alert(1)</script>", "javascript:alert(1)"],
        # Tier 3:编码绕过
        ["<scr\x00ipt>alert(1)</script>", "&#x3C;script&#x3E;alert(1)&#x3C;/script&#x3E;"],
        # Tier 4:AI 生成的自适应 Payload(由 LLM 根据上下文生成)
        []
    ]
    
    def __init__(self, llm_backend: str = "claude"):
        self.analyzer = dspy.ChainOfThought(VulnScanSignature)
        self.client = httpx.AsyncClient(timeout=10.0)
    
    async def scan_parameter(self, url: str, param: str) -> dict:
        """扫描单个参数的 XSS 漏洞"""
        
        for tier_idx, payloads in enumerate(self.PAYLOAD_TIERS):
            if tier_idx == 3:
                # 最后一层:让 AI 根据之前的响应生成自适应 Payload
                payloads = await self._generate_adaptive_payloads(url, param)
            
            for payload in payloads:
                result = await self._test_payload(url, param, payload)
                
                if result["reflected"]:
                    # 调用 LLM 确认是否真实漏洞(降低误报)
                    analysis = self.analyzer(
                        response_body=result["body"],
                        payload_used=payload
                    )
                    
                    if analysis.is_vulnerable:
                        return {
                            "vulnerable": True,
                            "url": url,
                            "parameter": param,
                            "payload": payload,
                            "tier": tier_idx,
                            "evidence": analysis.evidence,
                            "poc": f"{url}?{param}={payload}"
                        }
        
        return {"vulnerable": False, "url": url, "parameter": param}
    
    async def _test_payload(self, url: str, param: str, payload: str) -> dict:
        """发送请求并检查 Payload 是否被反射"""
        try:
            resp = await self.client.get(url, params={param: payload})
            reflected = payload in resp.text or payload.lower() in resp.text.lower()
            return {"reflected": reflected, "body": resp.text[:5000]}
        except Exception as e:
            return {"reflected": False, "body": str(e)}
    
    async def _generate_adaptive_payloads(self, url: str, param: str) -> list:
        """让 AI 根据目标特征生成自适应 Payload"""
        # 先获取正常响应以了解上下文
        resp = await self.client.get(url, params={param: "XSS_TEST_PROBE_12345"})
        
        generate = dspy.ChainOfThought("""
            根据以下 HTTP 响应,生成 5 个针对性的 XSS Payload。
            响应体: {response_body}
            要绕过的可能过滤: {filters_detected}
            输出: payloads (list of strings)
        """)
        
        result = generate(
            response_body=resp.text[:2000],
            filters_detected=self._detect_filters(resp.text)
        )
        return result.payloads[:5]
    
    def _detect_filters(self, response_body: str) -> str:
        """检测目标可能使用的过滤机制"""
        filters = []
        if "alert" not in response_body:
            filters.append("keyword-filter:alert")
        if "<script" not in response_body.lower():
            filters.append("tag-filter:script")
        return ", ".join(filters) if filters else "none-detected"

4.2 并行扫描多个参数

python
async def scan_target(target_url: str) -> list:
    """并发扫描目标的所有参数"""
    
    # 1. 爬取目标,发现所有参数
    params = await discover_parameters(target_url)
    print(f"发现 {len(params)} 个参数: {params}")
    
    # 2. 并行扫描,最多 5 个并发
    scanner = ReflectedXSSScanner()
    semaphore = asyncio.Semaphore(5)
    
    async def scan_with_limit(param):
        async with semaphore:
            return await scanner.scan_parameter(target_url, param)
    
    tasks = [scan_with_limit(param) for param in params]
    results = await asyncio.gather(*tasks)
    
    # 3. 过滤出漏洞
    vulnerabilities = [r for r in results if r.get("vulnerable")]
    print(f"发现 {len(vulnerabilities)} 个 XSS 漏洞")
    return vulnerabilities

五、智能自愈:动态环境适应机制

这是 AI 渗透测试框架最关键的能力——当目标环境变化时,自动调整策略

5.1 典型场景:端口服务变更

在腾讯云黑盲松 CTF 比赛中,出现了以下真实场景:

初始状态:目标服务运行于 8000 端口
异常发生:8000 端口服务失效

Agent 自动响应流程:
┌─────────────────────────────────────────┐
│ 1. 检测到连接失败 (ConnectionRefused)   │
│ 2. 触发端口发现子任务                    │
│    → 执行: nmap -sV -p 1-65535 target   │
│    → 或: for p in 80 8080 8888 3000;    │
│         curl -s http://target:$p        │
│ 3. 发现 80 端口有登录页面               │
│ 4. 更新内存中的目标状态                  │
│ 5. 继续原攻击流程,目标改为 80 端口     │
└─────────────────────────────────────────┘

5.2 实现代码

python
class AdaptiveAttackEngine:
    """具备自愈能力的自适应攻击引擎"""
    
    def __init__(self):
        self.target_state = {}  # 运行时目标状态
        self.retry_limit = 3
    
    async def execute_with_retry(self, attack_func, target: dict) -> dict:
        """执行攻击,失败时自动尝试重新发现目标"""
        
        for attempt in range(self.retry_limit):
            try:
                result = await attack_func(target)
                return result
                
            except ConnectionError as e:
                print(f"连接失败 (第 {attempt+1} 次): {e}")
                
                if attempt < self.retry_limit - 1:
                    # 自动重新发现可用服务
                    new_target = await self.rediscover_target(target)
                    if new_target:
                        print(f"发现新的目标端口: {new_target['port']}")
                        target = new_target
                    else:
                        await asyncio.sleep(2 ** attempt)  # 指数退避
        
        return {"status": "failed", "reason": "所有重试均失败"}
    
    async def rediscover_target(self, target: dict) -> dict | None:
        """重新发现目标可用端口"""
        host = target["host"]
        common_ports = [80, 443, 8080, 8443, 3000, 8000, 8888, 9090]
        
        for port in common_ports:
            try:
                async with httpx.AsyncClient(timeout=3.0) as client:
                    resp = await client.get(f"http://{host}:{port}")
                    if resp.status_code < 500:
                        return {**target, "port": port, "base_url": f"http://{host}:{port}"}
            except:
                continue
        
        return None

六、CTF 实战:AI 辅助解题流程

6.1 标准 CTF 解题 Agent 流程

python
class CTFSolverAgent:
    """CTF 自动解题 Agent"""
    
    async def solve(self, challenge_url: str, category: str) -> str:
        """
        category: web / pwn / reverse / crypto / misc
        """
        
        # 1. 信息收集
        recon = await self.recon_phase(challenge_url)
        print(f"信息收集完成: {recon['tech_stack']}")
        
        # 2. 根据类别选择专项 Agent
        agent_map = {
            "web": WebExploitAgent(),
            "crypto": CryptoSolverAgent(),
        }
        specialist = agent_map.get(category, GenericSolverAgent())
        
        # 3. 专项解题
        result = await specialist.solve(challenge_url, recon)
        
        # 4. 提取 Flag
        flag = self.extract_flag(result["output"])
        if flag:
            print(f"✅ 找到 Flag: {flag}")
            return flag
        
        # 5. 若未找到,让 AI 分析失败原因并调整策略
        adjusted = await self.analyze_and_retry(result, recon)
        return adjusted
    
    def extract_flag(self, text: str) -> str | None:
        """提取标准 CTF Flag 格式"""
        import re
        # 常见 Flag 格式: flag{...} / ctf{...} / FLAG{...}
        patterns = [
            r'flag\{[^}]+\}',
            r'ctf\{[^}]+\}',
            r'FLAG\{[^}]+\}',
            r'[A-Z0-9_]+\{[^}]+\}',  # 赛题自定义格式
        ]
        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                return match.group(0)
        return None

6.2 Web 类型专项 Agent

python
class WebExploitAgent(dspy.Module):
    """Web 漏洞利用专项 Agent"""
    
    def __init__(self):
        self.plan = dspy.ChainOfThought("技术栈信息 -> 攻击向量列表")
        self.exploit = dspy.ReAct(
            "攻击目标, 当前状态 -> 下一步操作, 操作结果",
            tools=[
                self.http_get,
                self.http_post,
                self.run_sqlmap,
                self.run_xray,
                self.decode_base64,
                self.crack_jwt,
            ]
        )
    
    async def solve(self, url: str, recon: dict) -> dict:
        # 让 AI 规划攻击向量
        attack_vectors = self.plan(tech_stack=str(recon["tech_stack"]))
        
        # ReAct 循环:行动 → 观察 → 思考 → 行动
        state = {"url": url, "vectors": attack_vectors, "findings": []}
        
        for step in range(20):  # 最多 20 步
            result = self.exploit(
                attack_target=url,
                current_state=str(state)
            )
            
            state["findings"].append(result.操作结果)
            
            # 检查是否找到 Flag
            if "flag{" in result.操作结果.lower():
                return {"status": "solved", "output": result.操作结果}
        
        return {"status": "unsolved", "output": str(state["findings"])}

七、MCP 与安全工具集成

将传统安全工具接入 MCP 协议,AI 就能直接调用:

python
# 用 Python 实现安全工具 MCP Server
from mcp.server import Server
from mcp.types import Tool, TextContent
import subprocess

app = Server("security-toolkit")

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="run_nmap",
            description="执行 nmap 端口扫描",
            inputSchema={
                "type": "object",
                "properties": {
                    "target": {"type": "string", "description": "目标 IP 或域名"},
                    "ports": {"type": "string", "description": "端口范围,如 '1-1000' 或 '80,443,8080'"},
                    "scan_type": {"type": "string", "enum": ["-sV", "-sS", "-sU"], "description": "扫描类型"}
                },
                "required": ["target"]
            }
        ),
        Tool(
            name="run_sqlmap",
            description="执行 sqlmap SQL 注入测试(仅限授权目标)",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {"type": "string"},
                    "level": {"type": "integer", "minimum": 1, "maximum": 5}
                },
                "required": ["url"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "run_nmap":
        target = arguments["target"]
        ports = arguments.get("ports", "1-1000")
        scan_type = arguments.get("scan_type", "-sV")
        
        cmd = ["nmap", scan_type, "-p", ports, target]
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
        
        return [TextContent(type="text", text=result.stdout or result.stderr)]

八、道德与法律边界

AI 辅助渗透测试的能力边界必须清晰:

✅ 合规场景❌ 违法场景
CTF 比赛(题目靶机)未授权扫描真实网站
企业内部安全自检攻击政府/金融系统
有书面授权的渗透测试DDoS、勒索软件制作
漏洞赏金计划(HackerOne 等)拿到数据后泄露
沙箱/虚拟机环境研究任何未经授权的访问

核心原则:授权是一切的前提。 再强大的 AI 工具,没有书面授权都不能对真实系统使用。


九、总结

AI 多 Agent 渗透测试框架代表了安全行业的下一个范式:

  1. 效率革命:自动化漫扫从"天"缩短到"小时"
  2. 覆盖提升:并行 Agent 同时检测多个攻击面
  3. 智能适应:动态环境变化时自动调整策略,不需要人工干预
  4. 门槛降低:中级安全工程师借助 AI 框架可完成高级渗透任务

但这也意味着防御方需要更快速地应对:AI 辅助的 WAF 规则生成、实时行为检测、主动蜜罐技术,将成为安全防御的新重心。

攻防从未停止,AI 只是让博弈的速度更快。


参考资料

上次更新于: