蜜罐部署实战:构建主动防御体系
传统的安全防御是被动的,等待攻击发生后再响应。蜜罐(Honeypot)技术改变了这一模式,通过部署诱饵系统主动吸引攻击者,从而发现威胁、收集情报、延缓攻击。
安全防御系列文章:
- SSH 安全加固指南 2025 - 暴力破解防护
- [Golang Web 应用完整安全指南](./golang Web应用完整安全指南.md) - 应用安全开发
- Web 安全三大漏洞攻防实战 - 常见漏洞防护
- Nginx 安全网关配置完全指南 - Web 服务器安全
- Kubernetes 基础入门 - 容器安全
蜜罐技术概述
什么是蜜罐?
蜜罐是一种安全资源,其价值在于被探测、攻击或入侵。它模拟真实系统,但没有任何生产价值,因此任何对蜜罐的访问都是可疑的。
蜜罐分类
┌─────────────────────────────────────────────────────────────┐
│ 蜜罐分类体系 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 按交互级别 │ │ 按部署目标 │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ • 低交互蜜罐 │ │ • 研究型蜜罐 │ │
│ │ (模拟服务) │ │ (收集情报) │ │
│ │ • 中交互蜜罐 │ │ • 生产型蜜罐 │ │
│ │ (模拟系统) │ │ (保护资产) │ │
│ │ • 高交互蜜罐 │ │ │ │
│ │ (真实系统) │ │ │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 按模拟服务 │ │ 按部署位置 │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ • SSH 蜜罐 │ │ • 内网蜜罐 │ │
│ │ • Web 蜜罐 │ │ • 外网蜜罐 │ │
│ │ • 数据库蜜罐 │ │ • 云蜜罐 │ │
│ │ • 工控蜜罐 │ │ • 容器蜜罐 │ │
│ │ • 物联网蜜罐 │ │ │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘蜜罐 vs 传统防御
| 特性 | 防火墙/IDS | 蜜罐 |
|---|---|---|
| 防御方式 | 被动拦截 | 主动诱捕 |
| 误报率 | 较高 | 极低 |
| 威胁情报 | 有限 | 丰富 |
| 攻击延缓 | 无 | 有 |
| 成本 | 低 | 中等 |
SSH 蜜罐:Cowrie 部署
Cowrie 是一个中等交互的 SSH/Telnet 蜜罐,可以记录暴力破解和 shell 交互。
Docker 快速部署
bash
# 创建数据目录
mkdir -p ~/cowrie/{log,downloads,keys}
# 运行 Cowrie
docker run -d \
--name cowrie \
-p 2222:2222 \
-p 2223:2223 \
-v ~/cowrie/log:/cowrie/cowrie-git/var/log/cowrie \
-v ~/cowrie/downloads:/cowrie/cowrie-git/var/lib/cowrie/downloads \
cowrie/cowrie:latest
# 查看日志
docker logs -f cowrie
tail -f ~/cowrie/log/cowrie/cowrie.log高级配置
python
# cowrie.cfg
[honeypot]
hostname = web-server-01
log_path = var/log/cowrie
download_path = var/lib/cowrie/downloads
contents_path = var/lib/cowrie/contents
filesystem_file = share/cowrie/fs.pickle
# SSH 配置
[ssh]
enabled = true
version = SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.5
listen_endpoints = tcp:2222:interface=0.0.0.0
# 启用 SFTP
[sftp]
enabled = true
# 启用 Telnet
[telnet]
enabled = true
listen_endpoints = tcp:2223:interface=0.0.0.0
# 输出配置
[output_jsonlog]
enabled = true
logfile = ${honeypot:log_path}/cowrie.json
# 集成 Elasticsearch
[output_elasticsearch]
enabled = true
host = elasticsearch
port = 9200
index = cowrie
# 集成 MISP
[output_misp]
enabled = true
base_url = https://misp.example.com
api_key = YOUR_MISP_API_KEY自定义文件系统
python
# 创建逼真的文件系统
from cowrie.core.config import CONFIG
import pickle
fs = {
'/': {
'type': 'dir',
'contents': {
'etc': {
'type': 'dir',
'contents': {
'passwd': {
'type': 'file',
'contents': '''root:x:0:0:root:/root:/bin/bash
daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin
bin:x:2:2:bin:/bin:/usr/sbin/nologin
sys:x:3:3:sys:/dev:/usr/sbin/nologin
sync:x:4:65534:sync:/bin:/bin/sync
'''
},
'shadow': {
'type': 'file',
'contents': 'root:$6$xyz$hash:18967:0:99999:7:::'
}
}
},
'home': {
'type': 'dir',
'contents': {
'admin': {
'type': 'dir',
'contents': {
'.ssh': {
'type': 'dir',
'contents': {
'id_rsa': {'type': 'file', 'contents': 'FAKE_KEY'},
'authorized_keys': {'type': 'file', 'contents': ''}
}
}
}
}
}
}
}
}
}
with open('fs.pickle', 'wb') as f:
pickle.dump(fs, f)Web 蜜罐:多种方案
1. SNARE/TANNER - 智能 Web 蜜罐
bash
# 克隆目标网站
pip3 install snare
docker pull mushorg/snare
# 克隆网站
mkdir -p ~/snare/pages
cd ~/snare/pages
snare --clone https://example.com --target-dir example
# 运行蜜罐
docker run -d \
--name snare \
-p 80:80 \
-v ~/snare/pages:/opt/snare/pages \
-e TANNER=tanner \
mushorg/snare \
--page-dir example
# 运行 TANNER 分析引擎
docker run -d \
--name tanner \
-p 8090:8090 \
mushorg/tanner2. WordPress 蜜罐
php
<?php
// wp-honeypot.php - 模拟 WordPress 登录
class WPHoneypot {
private $logFile;
private $fakeUsers;
public function __construct() {
$this->logFile = '/var/log/wp-honeypot.log';
$this->fakeUsers = [
'admin' => '$P$Bxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/',
'wordpress' => '$P$Byxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx/.',
];
}
public function handleLogin() {
$ip = $_SERVER['REMOTE_ADDR'];
$userAgent = $_SERVER['HTTP_USER_AGENT'] ?? 'Unknown';
$username = $_POST['log'] ?? '';
$password = $_POST['pwd'] ?? '';
// 记录攻击
$this->logAttack([
'timestamp' => date('Y-m-d H:i:s'),
'ip' => $ip,
'username' => $username,
'password' => $password,
'user_agent' => $userAgent,
'request_uri' => $_SERVER['REQUEST_URI'],
'post_data' => $_POST,
]);
// 模拟登录延迟(增加攻击成本)
sleep(2);
// 总是返回错误,但响应要逼真
wp_redirect(wp_login_url() . '?log=' . urlencode($username) . '&pwd=1');
exit;
}
private function logAttack($data) {
$json = json_encode($data, JSON_UNESCAPED_UNICODE);
file_put_contents($this->logFile, $json . "\n", FILE_APPEND | LOCK_EX);
// 发送到 SIEM
$this->sendToSIEM($data);
}
private function sendToSIEM($data) {
$ch = curl_init('https://siem.example.com/api/events');
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_exec($ch);
curl_close($ch);
}
}
// 模拟 WordPress 登录页面
function renderLoginPage() {
?>
<!DOCTYPE html>
<html>
<head>
<title>WordPress › Log In</title>
<style>
/* 复制真实 WordPress 登录页样式 */
body { background: #f0f0f1; font-family: sans-serif; }
#login { width: 320px; margin: 100px auto; }
/* ... */
</style>
</head>
<body>
<div id="login">
<h1>WordPress</h1>
<form name="loginform" action="" method="post">
<p><input type="text" name="log" placeholder="Username"></p>
<p><input type="password" name="pwd" placeholder="Password"></p>
<p><input type="submit" value="Log In"></p>
</form>
</div>
</body>
</html>
<?php
}
// 启动
$honeypot = new WPHoneypot();
if ($_SERVER['REQUEST_METHOD'] === 'POST') {
$honeypot->handleLogin();
} else {
renderLoginPage();
}
?>3. API 蜜罐
go
package main
import (
"encoding/json"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
type APIHoneypot struct {
logger *log.Logger
}
type AttackEvent struct {
Timestamp time.Time `json:"timestamp"`
IP string `json:"ip"`
Method string `json:"method"`
Path string `json:"path"`
Headers map[string]string `json:"headers"`
Query map[string]string `json:"query"`
Body map[string]interface{} `json:"body"`
UserAgent string `json:"user_agent"`
}
func NewAPIHoneypot() *APIHoneypot {
return &APIHoneypot{
logger: log.New(os.Stdout, "[HONEYPOT] ", log.LstdFlags),
}
}
func (h *APIHoneypot) logAttack(c *gin.Context, body map[string]interface{}) {
event := AttackEvent{
Timestamp: time.Now(),
IP: c.ClientIP(),
Method: c.Request.Method,
Path: c.Request.URL.Path,
Headers: h.extractHeaders(c),
Query: h.extractQuery(c),
Body: body,
UserAgent: c.Request.UserAgent(),
}
data, _ := json.Marshal(event)
h.logger.Println(string(data))
// 发送到 Kafka/Redis
h.publishEvent(event)
}
func (h *APIHoneypot) extractHeaders(c *gin.Context) map[string]string {
headers := make(map[string]string)
for k, v := range c.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
return headers
}
func (h *APIHoneypot) extractQuery(c *gin.Context) map[string]string {
query := make(map[string]string)
for k, v := range c.Request.URL.Query() {
if len(v) > 0 {
query[k] = v[0]
}
}
return query
}
func (h *APIHoneypot) publishEvent(event AttackEvent) {
// 实现消息队列发送
}
func main() {
h := NewAPIHoneypot()
r := gin.New()
// 模拟常见 API 端点
// 1. 用户登录 API
r.POST("/api/v1/auth/login", func(c *gin.Context) {
var body map[string]interface{}
c.ShouldBindJSON(&body)
h.logAttack(c, body)
// 返回逼真的错误响应
c.JSON(401, gin.H{
"error": "Invalid credentials",
"code": "AUTH_FAILED",
})
})
// 2. 管理员后台
r.POST("/api/v1/admin/users", func(c *gin.Context) {
var body map[string]interface{}
c.ShouldBindJSON(&body)
h.logAttack(c, body)
c.JSON(403, gin.H{
"error": "Admin access required",
"code": "FORBIDDEN",
})
})
// 3. 文件上传
r.POST("/api/v1/upload", func(c *gin.Context) {
h.logAttack(c, map[string]interface{}{
"content_type": c.ContentType(),
})
c.JSON(413, gin.H{
"error": "File too large",
"code": "FILE_TOO_LARGE",
})
})
// 4. 数据库查询接口
r.GET("/api/v1/query", func(c *gin.Context) {
h.logAttack(c, map[string]interface{}{
"sql": c.Query("sql"),
})
c.JSON(500, gin.H{
"error": "Database connection failed",
"code": "DB_ERROR",
})
})
// 5. 敏感文件访问
r.GET("/.env", func(c *gin.Context) {
h.logAttack(c, nil)
c.String(404, "Not Found")
})
r.GET("/config.php", func(c *gin.Context) {
h.logAttack(c, nil)
c.String(403, "Forbidden")
})
r.Run(":8080")
}数据库蜜罐
python
#!/usr/bin/env python3
"""
MySQL 蜜罐 - 记录连接尝试和查询
"""
import socket
import struct
import hashlib
import json
import logging
from datetime import datetime
from threading import Thread
class MySQLHoneypot:
def __init__(self, host='0.0.0.0', port=3306):
self.host = host
self.port = port
self.logger = self._setup_logger()
def _setup_logger(self):
logger = logging.getLogger('mysql_honeypot')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('/var/log/mysql-honeypot.log')
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _create_greeting(self):
"""创建 MySQL 握手包"""
protocol_version = b'\x0a'
server_version = b'5.7.38-log\x00'
connection_id = struct.pack('<I', 12345)
auth_plugin_data_part1 = b'\x12\x34\x56\x78\x9a\xbc\xde\xf0'
filler = b'\x00'
capability_flags = struct.pack('<H', 0xffff)
character_set = b'\x21'
status_flags = struct.pack('<H', 0x0002)
capability_flags2 = struct.pack('<H', 0xfc7f)
auth_plugin_data_len = b'\x15'
reserved = b'\x00' * 10
auth_plugin_data_part2 = b'\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\x00'
auth_plugin_name = b'mysql_native_password\x00'
payload = (protocol_version + server_version + connection_id +
auth_plugin_data_part1 + filler + capability_flags +
character_set + status_flags + capability_flags2 +
auth_plugin_data_len + reserved + auth_plugin_data_part2 +
auth_plugin_name)
return self._create_packet(payload, 0)
def _create_packet(self, payload, sequence_id):
"""创建 MySQL 协议包"""
length = len(payload)
header = struct.pack('<I', length)[:3] + struct.pack('B', sequence_id)
return header + payload
def _create_error_packet(self, sequence_id, code, message):
"""创建错误包"""
payload = b'\xff' + struct.pack('<H', code) + b'#' + b'28000' + message.encode()
return self._create_packet(payload, sequence_id)
def _parse_handshake_response(self, data):
"""解析客户端握手响应"""
# 跳过包头部
pos = 4
# 解析能力标志
capability_flags = struct.unpack('<I', data[pos:pos+4])[0]
pos += 4
# 最大包大小
max_packet_size = struct.unpack('<I', data[pos:pos+4])[0]
pos += 4
# 字符集
character_set = data[pos]
pos += 1
# 跳过保留字节
pos += 23
# 用户名
username_end = data.find(b'\x00', pos)
username = data[pos:username_end].decode('utf-8')
pos = username_end + 1
# 密码长度
auth_len = data[pos]
pos += 1
# 密码
password = data[pos:pos+auth_len].hex()
return {
'username': username,
'password': password,
'capability_flags': capability_flags,
}
def _log_event(self, client_addr, event_type, data):
"""记录事件"""
event = {
'timestamp': datetime.now().isoformat(),
'client_ip': client_addr[0],
'client_port': client_addr[1],
'type': event_type,
'data': data,
}
self.logger.info(json.dumps(event))
def _handle_client(self, conn, addr):
"""处理客户端连接"""
try:
# 发送握手包
greeting = self._create_greeting()
conn.send(greeting)
# 接收客户端响应
data = conn.recv(1024)
if not data:
return
# 解析登录信息
login_info = self._parse_handshake_response(data)
self._log_event(addr, 'login_attempt', login_info)
# 总是返回认证失败
error = self._create_error_packet(2, 1045,
f"Access denied for user '{login_info['username']}'@'{addr[0]}'")
conn.send(error)
except Exception as e:
self._log_event(addr, 'error', {'message': str(e)})
finally:
conn.close()
def start(self):
"""启动蜜罐"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((self.host, self.port))
sock.listen(5)
print(f"MySQL Honeypot listening on {self.host}:{self.port}")
while True:
conn, addr = sock.accept()
Thread(target=self._handle_client, args=(conn, addr)).start()
if __name__ == '__main__':
honeypot = MySQLHoneypot()
honeypot.start()蜜罐网络架构
yaml
# docker-compose.yml - 完整蜜罐网络
version: '3.8'
services:
# SSH 蜜罐
cowrie:
image: cowrie/cowire:latest
ports:
- "2222:2222"
- "2223:2223"
volumes:
- ./cowrie/log:/cowrie/cowrie-git/var/log/cowrie
networks:
- honeynet
# Web 蜜罐
snare:
image: mushorg/snare:latest
ports:
- "80:80"
environment:
- TANNER=tanner
networks:
- honeynet
tanner:
image: mushorg/tanner:latest
networks:
- honeynet
# 数据库蜜罐
mysql-honeypot:
build: ./mysql-honeypot
ports:
- "3306:3306"
networks:
- honeynet
# 日志收集
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- honeynet
kibana:
image: docker.elastic.co/kibana/kibana:8.5.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
networks:
- honeynet
# 告警系统
elastalert:
image: jertel/elastalert2:latest
volumes:
- ./elastalert/rules:/opt/elastalert/rules
networks:
- honeynet
networks:
honeynet:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
volumes:
es_data:威胁情报集成
python
# threat_intel.py - 威胁情报处理
import requests
import json
from datetime import datetime
from typing import List, Dict
class ThreatIntel:
def __init__(self):
self.misp_url = "https://misp.example.com"
self.misp_key = "YOUR_MISP_API_KEY"
self.abuseipdb_key = "YOUR_ABUSEIPDB_KEY"
def check_ip_reputation(self, ip: str) -> Dict:
"""检查 IP 信誉"""
# AbuseIPDB 查询
headers = {
'Key': self.abuseipdb_key,
'Accept': 'application/json'
}
params = {
'ipAddress': ip,
'maxAgeInDays': 90,
'verbose': ''
}
response = requests.get(
'https://api.abuseipdb.com/api/v2/check',
headers=headers,
params=params
)
return response.json()
def create_misp_event(self, attack_data: Dict) -> str:
"""创建 MISP 事件"""
headers = {
'Authorization': self.misp_key,
'Accept': 'application/json',
'Content-Type': 'application/json'
}
event = {
'Event': {
'info': f"Honeypot Attack from {attack_data['ip']}",
'distribution': 1,
'threat_level_id': 2,
'analysis': 0,
'date': datetime.now().strftime('%Y-%m-%d'),
'Attribute': [
{
'type': 'ip-src',
'value': attack_data['ip'],
'category': 'Network activity',
},
{
'type': 'text',
'value': json.dumps(attack_data),
'category': 'Other',
}
]
}
}
response = requests.post(
f'{self.misp_url}/events',
headers=headers,
json=event
)
return response.json()['Event']['id']
def block_ip(self, ip: str, duration: int = 3600):
"""联动防火墙阻断 IP"""
# 调用防火墙 API
firewall_api = "https://firewall.example.com/api/block"
requests.post(firewall_api, json={
'ip': ip,
'duration': duration,
'reason': 'Honeypot detection'
})
# 使用示例
intel = ThreatIntel()
# 处理蜜罐事件
def handle_honeypot_event(event):
ip = event['ip']
# 检查 IP 信誉
reputation = intel.check_ip_reputation(ip)
if reputation['data']['abuseConfidenceScore'] > 50:
# 创建威胁情报事件
event_id = intel.create_misp_event(event)
# 自动阻断
intel.block_ip(ip, duration=86400)最佳实践
- 部署位置:内网关键网段、DMZ、云平台
- 仿真度:高交互蜜罐更真实但风险更高
- 隔离性:蜜罐必须与生产网络严格隔离
- 监控告警:实时分析日志,及时响应
- 法律合规:确保部署符合当地法律法规
总结
蜜罐是主动防御的重要组成部分,通过本文你已经掌握了:
- SSH 蜜罐(Cowrie)的部署与配置
- Web 蜜罐的多种实现方案
- 数据库蜜罐的协议模拟
- 蜜罐网络架构设计
- 威胁情报联动
建议从低交互蜜罐开始,逐步构建多层次的蜜罐网络。
参考资源:

