Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

蜜罐部署实战:构建主动防御体系

传统的安全防御是被动的,等待攻击发生后再响应。蜜罐(Honeypot)技术改变了这一模式,通过部署诱饵系统主动吸引攻击者,从而发现威胁、收集情报、延缓攻击。

安全防御系列文章:

蜜罐技术概述

什么是蜜罐?

蜜罐是一种安全资源,其价值在于被探测、攻击或入侵。它模拟真实系统,但没有任何生产价值,因此任何对蜜罐的访问都是可疑的。

蜜罐分类

┌─────────────────────────────────────────────────────────────┐
│                      蜜罐分类体系                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────┐    ┌─────────────────┐                │
│  │   按交互级别     │    │   按部署目标     │                │
│  ├─────────────────┤    ├─────────────────┤                │
│  │ • 低交互蜜罐     │    │ • 研究型蜜罐     │                │
│  │   (模拟服务)     │    │   (收集情报)     │                │
│  │ • 中交互蜜罐     │    │ • 生产型蜜罐     │                │
│  │   (模拟系统)     │    │   (保护资产)     │                │
│  │ • 高交互蜜罐     │    │                 │                │
│  │   (真实系统)     │    │                 │                │
│  └─────────────────┘    └─────────────────┘                │
│                                                             │
│  ┌─────────────────┐    ┌─────────────────┐                │
│  │   按模拟服务     │    │   按部署位置     │                │
│  ├─────────────────┤    ├─────────────────┤                │
│  │ • SSH 蜜罐       │    │ • 内网蜜罐       │                │
│  │ • Web 蜜罐       │    │ • 外网蜜罐       │                │
│  │ • 数据库蜜罐     │    │ • 云蜜罐         │                │
│  │ • 工控蜜罐       │    │ • 容器蜜罐       │                │
│  │ • 物联网蜜罐     │    │                 │                │
│  └─────────────────┘    └─────────────────┘                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

蜜罐 vs 传统防御

特性防火墙/IDS蜜罐
防御方式被动拦截主动诱捕
误报率较高极低
威胁情报有限丰富
攻击延缓
成本中等

SSH 蜜罐:Cowrie 部署

Cowrie 是一个中等交互的 SSH/Telnet 蜜罐,可以记录暴力破解和 shell 交互。

Docker 快速部署

bash
# 创建数据目录
mkdir -p ~/cowrie/{log,downloads,keys}

# 运行 Cowrie
docker run -d \
  --name cowrie \
  -p 2222:2222 \
  -p 2223:2223 \
  -v ~/cowrie/log:/cowrie/cowrie-git/var/log/cowrie \
  -v ~/cowrie/downloads:/cowrie/cowrie-git/var/lib/cowrie/downloads \
  cowrie/cowrie:latest

# 查看日志
docker logs -f cowrie
tail -f ~/cowrie/log/cowrie/cowrie.log

高级配置

python
# cowrie.cfg
[honeypot]
hostname = web-server-01
log_path = var/log/cowrie
download_path = var/lib/cowrie/downloads
contents_path = var/lib/cowrie/contents
filesystem_file = share/cowrie/fs.pickle

# SSH 配置
[ssh]
enabled = true
version = SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.5
listen_endpoints = tcp:2222:interface=0.0.0.0

# 启用 SFTP
[sftp]
enabled = true

# 启用 Telnet
[telnet]
enabled = true
listen_endpoints = tcp:2223:interface=0.0.0.0

# 输出配置
[output_jsonlog]
enabled = true
logfile = ${honeypot:log_path}/cowrie.json

# 集成 Elasticsearch
[output_elasticsearch]
enabled = true
host = elasticsearch
port = 9200
index = cowrie

# 集成 MISP
[output_misp]
enabled = true
base_url = https://misp.example.com
api_key = YOUR_MISP_API_KEY

自定义文件系统

python
# 创建逼真的文件系统
from cowrie.core.config import CONFIG
import pickle

fs = {
    '/': {
        'type': 'dir',
        'contents': {
            'etc': {
                'type': 'dir',
                'contents': {
                    'passwd': {
                        'type': 'file',
                        'contents': '''root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
bin:x:2:2:bin:/bin:/usr/sbin/nologin
sys:x:3:3:sys:/dev:/usr/sbin/nologin
sync:x:4:65534:sync:/bin:/bin/sync
'''
                    },
                    'shadow': {
                        'type': 'file',
                        'contents': 'root:$6$xyz$hash:18967:0:99999:7:::'
                    }
                }
            },
            'home': {
                'type': 'dir',
                'contents': {
                    'admin': {
                        'type': 'dir',
                        'contents': {
                            '.ssh': {
                                'type': 'dir',
                                'contents': {
                                    'id_rsa': {'type': 'file', 'contents': 'FAKE_KEY'},
                                    'authorized_keys': {'type': 'file', 'contents': ''}
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

with open('fs.pickle', 'wb') as f:
    pickle.dump(fs, f)

Web 蜜罐:多种方案

1. SNARE/TANNER - 智能 Web 蜜罐

bash
# 克隆目标网站
pip3 install snare
docker pull mushorg/snare

# 克隆网站
mkdir -p ~/snare/pages
cd ~/snare/pages
snare --clone https://example.com --target-dir example

# 运行蜜罐
docker run -d \
  --name snare \
  -p 80:80 \
  -v ~/snare/pages:/opt/snare/pages \
  -e TANNER=tanner \
  mushorg/snare \
  --page-dir example

# 运行 TANNER 分析引擎
docker run -d \
  --name tanner \
  -p 8090:8090 \
  mushorg/tanner

2. WordPress 蜜罐

php
<?php
// wp-honeypot.php - 模拟 WordPress 登录
class WPHoneypot {
    private $logFile;
    private $fakeUsers;
    
    public function __construct() {
        $this->logFile = '/var/log/wp-honeypot.log';
        $this->fakeUsers = [
            'admin' => '$P$Bxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/',
            'wordpress' => '$P$Byxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/.',
        ];
    }
    
    public function handleLogin() {
        $ip = $_SERVER['REMOTE_ADDR'];
        $userAgent = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
        $username = $_POST['log'] ?? '';
        $password = $_POST['pwd'] ?? '';
        
        // 记录攻击
        $this->logAttack([
            'timestamp' => date('Y-m-d H:i:s'),
            'ip' => $ip,
            'username' => $username,
            'password' => $password,
            'user_agent' => $userAgent,
            'request_uri' => $_SERVER['REQUEST_URI'],
            'post_data' => $_POST,
        ]);
        
        // 模拟登录延迟(增加攻击成本)
        sleep(2);
        
        // 总是返回错误,但响应要逼真
        wp_redirect(wp_login_url() . '?log=' . urlencode($username) . '&pwd=1');
        exit;
    }
    
    private function logAttack($data) {
        $json = json_encode($data, JSON_UNESCAPED_UNICODE);
        file_put_contents($this->logFile, $json . "\n", FILE_APPEND | LOCK_EX);
        
        // 发送到 SIEM
        $this->sendToSIEM($data);
    }
    
    private function sendToSIEM($data) {
        $ch = curl_init('https://siem.example.com/api/events');
        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_exec($ch);
        curl_close($ch);
    }
}

// 模拟 WordPress 登录页面
function renderLoginPage() {
    ?>
<!DOCTYPE html>
<html>
<head>
    <title>WordPress Log In</title>
    <style>
        /* 复制真实 WordPress 登录页样式 */
        body { background: #f0f0f1; font-family: sans-serif; }
        #login { width: 320px; margin: 100px auto; }
        /* ... */
    </style>
</head>
<body>
    <div id="login">
        <h1>WordPress</h1>
        <form name="loginform" action="" method="post">
            <p><input type="text" name="log" placeholder="Username"></p>
            <p><input type="password" name="pwd" placeholder="Password"></p>
            <p><input type="submit" value="Log In"></p>
        </form>
    </div>
</body>
</html>
    <?php
}

// 启动
$honeypot = new WPHoneypot();
if ($_SERVER['REQUEST_METHOD'] === 'POST') {
    $honeypot->handleLogin();
} else {
    renderLoginPage();
}
?>

3. API 蜜罐

go
package main

import (
    "encoding/json"
    "log"
    "net/http"
    "time"
    
    "github.com/gin-gonic/gin"
)

type APIHoneypot struct {
    logger *log.Logger
}

type AttackEvent struct {
    Timestamp   time.Time              `json:"timestamp"`
    IP          string                 `json:"ip"`
    Method      string                 `json:"method"`
    Path        string                 `json:"path"`
    Headers     map[string]string      `json:"headers"`
    Query       map[string]string      `json:"query"`
    Body        map[string]interface{} `json:"body"`
    UserAgent   string                 `json:"user_agent"`
}

func NewAPIHoneypot() *APIHoneypot {
    return &APIHoneypot{
        logger: log.New(os.Stdout, "[HONEYPOT] ", log.LstdFlags),
    }
}

func (h *APIHoneypot) logAttack(c *gin.Context, body map[string]interface{}) {
    event := AttackEvent{
        Timestamp: time.Now(),
        IP:        c.ClientIP(),
        Method:    c.Request.Method,
        Path:      c.Request.URL.Path,
        Headers:   h.extractHeaders(c),
        Query:     h.extractQuery(c),
        Body:      body,
        UserAgent: c.Request.UserAgent(),
    }
    
    data, _ := json.Marshal(event)
    h.logger.Println(string(data))
    
    // 发送到 Kafka/Redis
    h.publishEvent(event)
}

func (h *APIHoneypot) extractHeaders(c *gin.Context) map[string]string {
    headers := make(map[string]string)
    for k, v := range c.Request.Header {
        if len(v) > 0 {
            headers[k] = v[0]
        }
    }
    return headers
}

func (h *APIHoneypot) extractQuery(c *gin.Context) map[string]string {
    query := make(map[string]string)
    for k, v := range c.Request.URL.Query() {
        if len(v) > 0 {
            query[k] = v[0]
        }
    }
    return query
}

func (h *APIHoneypot) publishEvent(event AttackEvent) {
    // 实现消息队列发送
}

func main() {
    h := NewAPIHoneypot()
    r := gin.New()
    
    // 模拟常见 API 端点
    
    // 1. 用户登录 API
    r.POST("/api/v1/auth/login", func(c *gin.Context) {
        var body map[string]interface{}
        c.ShouldBindJSON(&body)
        h.logAttack(c, body)
        
        // 返回逼真的错误响应
        c.JSON(401, gin.H{
            "error": "Invalid credentials",
            "code": "AUTH_FAILED",
        })
    })
    
    // 2. 管理员后台
    r.POST("/api/v1/admin/users", func(c *gin.Context) {
        var body map[string]interface{}
        c.ShouldBindJSON(&body)
        h.logAttack(c, body)
        
        c.JSON(403, gin.H{
            "error": "Admin access required",
            "code": "FORBIDDEN",
        })
    })
    
    // 3. 文件上传
    r.POST("/api/v1/upload", func(c *gin.Context) {
        h.logAttack(c, map[string]interface{}{
            "content_type": c.ContentType(),
        })
        
        c.JSON(413, gin.H{
            "error": "File too large",
            "code": "FILE_TOO_LARGE",
        })
    })
    
    // 4. 数据库查询接口
    r.GET("/api/v1/query", func(c *gin.Context) {
        h.logAttack(c, map[string]interface{}{
            "sql": c.Query("sql"),
        })
        
        c.JSON(500, gin.H{
            "error": "Database connection failed",
            "code": "DB_ERROR",
        })
    })
    
    // 5. 敏感文件访问
    r.GET("/.env", func(c *gin.Context) {
        h.logAttack(c, nil)
        c.String(404, "Not Found")
    })
    
    r.GET("/config.php", func(c *gin.Context) {
        h.logAttack(c, nil)
        c.String(403, "Forbidden")
    })
    
    r.Run(":8080")
}

数据库蜜罐

python
#!/usr/bin/env python3
"""
MySQL 蜜罐 - 记录连接尝试和查询
"""

import socket
import struct
import hashlib
import json
import logging
from datetime import datetime
from threading import Thread

class MySQLHoneypot:
    def __init__(self, host='0.0.0.0', port=3306):
        self.host = host
        self.port = port
        self.logger = self._setup_logger()
        
    def _setup_logger(self):
        logger = logging.getLogger('mysql_honeypot')
        logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('/var/log/mysql-honeypot.log')
        formatter = logging.Formatter('%(asctime)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    def _create_greeting(self):
        """创建 MySQL 握手包"""
        protocol_version = b'\x0a'
        server_version = b'5.7.38-log\x00'
        connection_id = struct.pack('<I', 12345)
        auth_plugin_data_part1 = b'\x12\x34\x56\x78\x9a\xbc\xde\xf0'
        filler = b'\x00'
        capability_flags = struct.pack('<H', 0xffff)
        character_set = b'\x21'
        status_flags = struct.pack('<H', 0x0002)
        capability_flags2 = struct.pack('<H', 0xfc7f)
        auth_plugin_data_len = b'\x15'
        reserved = b'\x00' * 10
        auth_plugin_data_part2 = b'\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\x00'
        auth_plugin_name = b'mysql_native_password\x00'
        
        payload = (protocol_version + server_version + connection_id +
                  auth_plugin_data_part1 + filler + capability_flags +
                  character_set + status_flags + capability_flags2 +
                  auth_plugin_data_len + reserved + auth_plugin_data_part2 +
                  auth_plugin_name)
        
        return self._create_packet(payload, 0)
    
    def _create_packet(self, payload, sequence_id):
        """创建 MySQL 协议包"""
        length = len(payload)
        header = struct.pack('<I', length)[:3] + struct.pack('B', sequence_id)
        return header + payload
    
    def _create_error_packet(self, sequence_id, code, message):
        """创建错误包"""
        payload = b'\xff' + struct.pack('<H', code) + b'#' + b'28000' + message.encode()
        return self._create_packet(payload, sequence_id)
    
    def _parse_handshake_response(self, data):
        """解析客户端握手响应"""
        # 跳过包头部
        pos = 4
        
        # 解析能力标志
        capability_flags = struct.unpack('<I', data[pos:pos+4])[0]
        pos += 4
        
        # 最大包大小
        max_packet_size = struct.unpack('<I', data[pos:pos+4])[0]
        pos += 4
        
        # 字符集
        character_set = data[pos]
        pos += 1
        
        # 跳过保留字节
        pos += 23
        
        # 用户名
        username_end = data.find(b'\x00', pos)
        username = data[pos:username_end].decode('utf-8')
        pos = username_end + 1
        
        # 密码长度
        auth_len = data[pos]
        pos += 1
        
        # 密码
        password = data[pos:pos+auth_len].hex()
        
        return {
            'username': username,
            'password': password,
            'capability_flags': capability_flags,
        }
    
    def _log_event(self, client_addr, event_type, data):
        """记录事件"""
        event = {
            'timestamp': datetime.now().isoformat(),
            'client_ip': client_addr[0],
            'client_port': client_addr[1],
            'type': event_type,
            'data': data,
        }
        self.logger.info(json.dumps(event))
    
    def _handle_client(self, conn, addr):
        """处理客户端连接"""
        try:
            # 发送握手包
            greeting = self._create_greeting()
            conn.send(greeting)
            
            # 接收客户端响应
            data = conn.recv(1024)
            if not data:
                return
            
            # 解析登录信息
            login_info = self._parse_handshake_response(data)
            self._log_event(addr, 'login_attempt', login_info)
            
            # 总是返回认证失败
            error = self._create_error_packet(2, 1045, 
                f"Access denied for user '{login_info['username']}'@'{addr[0]}'")
            conn.send(error)
            
        except Exception as e:
            self._log_event(addr, 'error', {'message': str(e)})
        finally:
            conn.close()
    
    def start(self):
        """启动蜜罐"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((self.host, self.port))
        sock.listen(5)
        
        print(f"MySQL Honeypot listening on {self.host}:{self.port}")
        
        while True:
            conn, addr = sock.accept()
            Thread(target=self._handle_client, args=(conn, addr)).start()

if __name__ == '__main__':
    honeypot = MySQLHoneypot()
    honeypot.start()

蜜罐网络架构

yaml
# docker-compose.yml - 完整蜜罐网络
version: '3.8'

services:
  # SSH 蜜罐
  cowrie:
    image: cowrie/cowire:latest
    ports:
      - "2222:2222"
      - "2223:2223"
    volumes:
      - ./cowrie/log:/cowrie/cowrie-git/var/log/cowrie
    networks:
      - honeynet

  # Web 蜜罐
  snare:
    image: mushorg/snare:latest
    ports:
      - "80:80"
    environment:
      - TANNER=tanner
    networks:
      - honeynet

  tanner:
    image: mushorg/tanner:latest
    networks:
      - honeynet

  # 数据库蜜罐
  mysql-honeypot:
    build: ./mysql-honeypot
    ports:
      - "3306:3306"
    networks:
      - honeynet

  # 日志收集
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - honeynet

  kibana:
    image: docker.elastic.co/kibana/kibana:8.5.0
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    networks:
      - honeynet

  # 告警系统
  elastalert:
    image: jertel/elastalert2:latest
    volumes:
      - ./elastalert/rules:/opt/elastalert/rules
    networks:
      - honeynet

networks:
  honeynet:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/24

volumes:
  es_data:

威胁情报集成

python
# threat_intel.py - 威胁情报处理
import requests
import json
from datetime import datetime
from typing import List, Dict

class ThreatIntel:
    def __init__(self):
        self.misp_url = "https://misp.example.com"
        self.misp_key = "YOUR_MISP_API_KEY"
        self.abuseipdb_key = "YOUR_ABUSEIPDB_KEY"
        
    def check_ip_reputation(self, ip: str) -> Dict:
        """检查 IP 信誉"""
        # AbuseIPDB 查询
        headers = {
            'Key': self.abuseipdb_key,
            'Accept': 'application/json'
        }
        params = {
            'ipAddress': ip,
            'maxAgeInDays': 90,
            'verbose': ''
        }
        
        response = requests.get(
            'https://api.abuseipdb.com/api/v2/check',
            headers=headers,
            params=params
        )
        
        return response.json()
    
    def create_misp_event(self, attack_data: Dict) -> str:
        """创建 MISP 事件"""
        headers = {
            'Authorization': self.misp_key,
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }
        
        event = {
            'Event': {
                'info': f"Honeypot Attack from {attack_data['ip']}",
                'distribution': 1,
                'threat_level_id': 2,
                'analysis': 0,
                'date': datetime.now().strftime('%Y-%m-%d'),
                'Attribute': [
                    {
                        'type': 'ip-src',
                        'value': attack_data['ip'],
                        'category': 'Network activity',
                    },
                    {
                        'type': 'text',
                        'value': json.dumps(attack_data),
                        'category': 'Other',
                    }
                ]
            }
        }
        
        response = requests.post(
            f'{self.misp_url}/events',
            headers=headers,
            json=event
        )
        
        return response.json()['Event']['id']
    
    def block_ip(self, ip: str, duration: int = 3600):
        """联动防火墙阻断 IP"""
        # 调用防火墙 API
        firewall_api = "https://firewall.example.com/api/block"
        requests.post(firewall_api, json={
            'ip': ip,
            'duration': duration,
            'reason': 'Honeypot detection'
        })

# 使用示例
intel = ThreatIntel()

# 处理蜜罐事件
def handle_honeypot_event(event):
    ip = event['ip']
    
    # 检查 IP 信誉
    reputation = intel.check_ip_reputation(ip)
    
    if reputation['data']['abuseConfidenceScore'] > 50:
        # 创建威胁情报事件
        event_id = intel.create_misp_event(event)
        
        # 自动阻断
        intel.block_ip(ip, duration=86400)

最佳实践

  1. 部署位置:内网关键网段、DMZ、云平台
  2. 仿真度:高交互蜜罐更真实但风险更高
  3. 隔离性:蜜罐必须与生产网络严格隔离
  4. 监控告警:实时分析日志,及时响应
  5. 法律合规:确保部署符合当地法律法规

总结

蜜罐是主动防御的重要组成部分,通过本文你已经掌握了:

  • SSH 蜜罐(Cowrie)的部署与配置
  • Web 蜜罐的多种实现方案
  • 数据库蜜罐的协议模拟
  • 蜜罐网络架构设计
  • 威胁情报联动

建议从低交互蜜罐开始,逐步构建多层次的蜜罐网络。


参考资源:

上次更新于: