Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Go WebSocket 高并发实时通信实战:从原理到万级连接的生产架构

实时通信是现代 Web 应用的核心能力——即时聊天、实时行情推送、协作编辑、在线游戏,都依赖 WebSocket 构建低延迟的双向数据通道。Go 语言凭借 goroutine 轻量级并发模型和 net/http 原生高效网络库,成为构建 WebSocket 服务的天然选择。

但"能跑"和"能扛万级连接"之间,横亘着架构设计、内存控制、背压处理、优雅关闭等多道鸿沟。本文将从 WebSocket 协议原理出发,逐步构建一个支持万级并发连接的生产级 WebSocket 服务。

WebSocket 协议核心原理

为什么不是 HTTP 长轮询?

特性HTTP 长轮询WebSocket
通信模式请求-响应全双工
连接开销每次请求重建一次握手持久连接
延迟1-2 个 RTT握手后 < 1ms
带宽重复 HTTP 头帧 overhead 仅 2-14 字节
服务器推送需等客户端请求主动推送

握手流程

WebSocket 通过 HTTP Upgrade 机制完成握手:

Client → Server:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

Server → Client:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

握手成功后,TCP 连接升级为 WebSocket 连接,双方可自由收发数据帧。

数据帧结构

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - -+
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - -+-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - -+
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data (continued)                  |
+---------------------------------------------------------------+

关键帧类型:

  • Text (0x1):UTF-8 文本数据
  • Binary (0x2):二进制数据
  • Close (0x8):关闭连接
  • Ping (0x9) / Pong (0xA):心跳保活

技术选型:gorilla/websocket vs nhooyr.io/websocket

2026 年 Go WebSocket 生态两大主流库对比:

特性gorilla/websocketnhooyr.io/websocket
API 风格经典 Read/Writeio.Reader/io.Writer
Context 支持部分支持原生完整
并发写入需自行加锁内置串行化
维护状态社区维护(Archive后复活)活跃维护
压缩扩展permessage-deflatepermessage-deflate
HTTP/2 支持

本文选择 nhooyr.io/websocket:原生 context 集成更符合 Go 惯例,内置并发写入安全,API 更现代。

bash
go get nhooyr.io/websocket@latest

架构设计:Hub-Client 模式

高并发 WebSocket 的核心架构是 Hub-Client 模式——一个中心 Hub 管理所有连接和消息广播,每个 Client 对应一个用户连接。

┌─────────────────────────────────────────────────┐
│                    HTTP Server                   │
│                 (Upgrade Handler)                │
└─────────────┬───────────────────┬───────────────┘
              │                   │
              ▼                   ▼
┌─────────────────┐   ┌─────────────────┐
│    Client #1    │   │    Client #2    │   ...  N 个
│  (goroutine)    │   │  (goroutine)    │
│  ┌───────────┐  │   │  ┌───────────┐  │
│  │ Read Loop │  │   │  │ Read Loop │  │
│  └───────────┘  │   │  └───────────┘  │
│  ┌───────────┐  │   │  ┌───────────┐  │
│  │ Write Loop│  │   │  │ Write Loop│  │
│  └───────────┘  │   │  └───────────┘  │
└────────┬────────┘   └────────┬────────┘
         │                     │
         ▼                     ▼
┌─────────────────────────────────────────────────┐
│                      Hub                         │
│  ┌──────────────────────────────────────────┐   │
│  │           clients map[string]*Client      │   │
│  │           broadcast chan []byte           │   │
│  │           register / unregister chan      │   │
│  └──────────────────────────────────────────┘   │
│                                                  │
│  Hub.Run() — 单 goroutine 事件循环              │
│  1. 接收注册/注销                                │
│  2. 接收广播消息 → 遍历 clients → 写入发送队列   │
└─────────────────────────────────────────────────┘

为什么 Hub 需要单 goroutine 事件循环? 避免对 clients map 加锁——所有对 map 的操作(增删改查)都在同一个 goroutine 中串行执行,消除了锁竞争。

核心代码实现

1. 消息与客户端定义

go
package ws

import (
	"context"
	"encoding/json"
	"log/slog"
	"sync"
	"time"

	"nhooyr.io/websocket"
)

// Message WebSocket 消息统一格式
type Message struct {
	Type    string          `json:"type"`    // chat/system/broadcast
	From    string          `json:"from"`    // 发送者 ID
	Content json.RawMessage `json:"content"` // 消息内容(延迟解析)
	Time    int64           `json:"time"`    // Unix 毫秒时间戳
}

// Client 代表一个 WebSocket 连接
type Client struct {
	ID       string
	Hub      *Hub
	Conn     *websocket.Conn
	Send     chan []byte // 写入队列(背压控制)
	mu       sync.Mutex  // 保护 Conn 写入
	closed   bool
}

const (
	writeWait      = 10 * time.Second // 写超时
	pongWait       = 60 * time.Second // Pong 超时
	pingPeriod     = (pongWait * 9) / 10 // Ping 间隔
	maxMessageSize = 4096             // 单消息最大字节数
	sendBufSize    = 256              // 发送缓冲区大小
)

2. Hub 中心管理器

go
// Hub 管理所有活跃客户端和消息广播
type Hub struct {
	clients    map[string]*Client // 活跃连接
	broadcast  chan *broadcastMsg  // 广播通道
	register   chan *Client       // 注册通道
	unregister chan *Client       // 注销通道
	logger     *slog.Logger
}

type broadcastMsg struct {
	from    string
	payload []byte
}

func NewHub(logger *slog.Logger) *Hub {
	return &Hub{
		clients:    make(map[string]*Client),
		broadcast:  make(chan *broadcastMsg, 1024),
		register:   make(chan *Client),
		unregister: make(chan *Client),
		logger:     logger,
	}
}

// Run 启动 Hub 事件循环(单 goroutine)
func (h *Hub) Run(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			h.shutdown()
			return

		case client := <-h.register:
			h.clients[client.ID] = client
			h.logger.Info("client registered",
				"client_id", client.ID,
				"total_clients", len(h.clients),
			)

		case client := <-h.unregister:
			if _, ok := h.clients[client.ID]; ok {
				delete(h.clients, client.ID)
				close(client.Send)
				h.logger.Info("client unregistered",
					"client_id", client.ID,
					"total_clients", len(h.clients),
				)
			}

		case msg := <-h.broadcast:
			for id, client := range h.clients {
				if id == msg.from {
					continue // 不回送给发送者
				}
				select {
				case client.Send <- msg.payload:
					// 成功入队
				default:
					// 缓冲区满,关闭慢客户端
					h.logger.Warn("client send buffer full, disconnecting",
						"client_id", id,
					)
					close(client.Send)
					delete(h.clients, id)
				}
			}
		}
	}
}

func (h *Hub) shutdown() {
	for id, client := range h.clients {
		close(client.Send)
		delete(h.clients, id)
		client.Conn.Close(websocket.StatusGoingAway, "server shutdown")
	}
	h.logger.Info("hub shutdown complete")
}

3. 客户端读写循环

go
// readPump 从 WebSocket 读取消息,推送到 Hub
func (c *Client) readPump(ctx context.Context) {
	defer func() {
		c.Hub.unregister <- c
		c.Conn.Close(websocket.StatusNormalClosure, "read pump done")
	}()

	ctx, cancel := context.WithTimeout(ctx, pongWait)
	defer cancel()

	c.Conn.SetReadLimit(maxMessageSize)

	for {
		msgType, data, err := c.Conn.Read(ctx)
		if err != nil {
			if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
				return
			}
			c.Hub.logger.Error("read error", "client_id", c.ID, "error", err)
			return
		}

		if msgType != websocket.MessageText {
			continue // 仅处理文本消息
		}

		// 验证消息格式
		var msg Message
		if err := json.Unmarshal(data, &msg); err != nil {
			c.Hub.logger.Warn("invalid message format",
				"client_id", c.ID, "error", err,
			)
			continue
		}

		msg.From = c.ID
		msg.Time = time.Now().UnixMilli()

		broadcastData, _ := json.Marshal(msg)
		c.Hub.broadcast <- &broadcastMsg{
			from:    c.ID,
			payload: broadcastData,
		}
	}
}

// writePump 从发送队列读取消息写入 WebSocket
func (c *Client) writePump(ctx context.Context) {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.Conn.Close(websocket.StatusNormalClosure, "write pump done")
	}()

	for {
		select {
		case <-ctx.Done():
			c.Conn.Close(websocket.StatusGoingAway, "context cancelled")
			return

		case msg, ok := <-c.Send:
			if !ok {
				c.Conn.Close(websocket.StatusNormalClosure, "hub closed send")
				return
			}

			ctx, cancel := context.WithTimeout(ctx, writeWait)
			err := c.Conn.Write(ctx, websocket.MessageText, msg)
			cancel()

			if err != nil {
				c.Hub.logger.Error("write error",
					"client_id", c.ID, "error", err,
				)
				return
			}

		case <-ticker.C:
			// 发送 Ping 保活
			ctx, cancel := context.WithTimeout(ctx, writeWait)
			err := c.Conn.Ping(ctx)
			cancel()

			if err != nil {
				return
			}
		}
	}
}

4. HTTP 升级处理

go
import (
	"net/http"
	"crypto/rand"
	"encoding/hex"
)

type Server struct {
	hub    *Hub
	logger *slog.Logger
}

func NewServer(hub *Hub, logger *slog.Logger) *Server {
	return &Server{hub: hub, logger: logger}
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	// 生成唯一客户端 ID
	clientID := generateID()

	// WebSocket 握手升级
	conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
		OriginPatterns: []string{"*"}, // 生产环境应限制域名
		CompressionMode: websocket.CompressionDisabled, // 按需开启
	})
	if err != nil {
		s.logger.Error("upgrade failed", "error", err)
		return
	}

	client := &Client{
		ID:   clientID,
		Hub:  s.hub,
		Conn: conn,
		Send: make(chan []byte, sendBufSize),
	}

	s.hub.register <- client

	// 读写分离:两个 goroutine 分别处理收发
	go client.writePump(r.Context())
	go client.readPump(r.Context())
}

func generateID() string {
	b := make([]byte, 16)
	rand.Read(b)
	return hex.EncodeToString(b)
}

5. 主函数组装

go
package main

import (
	"context"
	"log/slog"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"your-project/ws"
)

func main() {
	logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelInfo,
	}))

	hub := ws.NewHub(logger)
	ctx, cancel := context.WithCancel(context.Background())

	go hub.Run(ctx)

	server := ws.NewServer(hub, logger)
	mux := http.NewServeMux()
	mux.Handle("/ws", server)

	httpServer := &http.Server{
		Addr:              ":8080",
		Handler:           mux,
		ReadHeaderTimeout: 5 * time.Second,
		MaxHeaderBytes:    1 << 20, // 1MB
	}

	// 优雅关闭
	go func() {
		sigCh := make(chan os.Signal, 1)
		signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
		<-sigCh

		logger.Info("shutting down...")

		// 1. 停止接收新连接
		shutdownCtx, shutdownCancel := context.WithTimeout(
			context.Background(), 30*time.Second,
		)
		defer shutdownCancel()

		if err := httpServer.Shutdown(shutdownCtx); err != nil {
			logger.Error("http shutdown error", "error", err)
		}

		// 2. 关闭 Hub(关闭所有现有连接)
		cancel()

		logger.Info("shutdown complete")
	}()

	logger.Info("WebSocket server starting", "addr", ":8080")
	if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
		logger.Error("server error", "error", err)
		os.Exit(1)
	}
}

生产级优化策略

1. 背压控制:防止慢客户端拖垮服务

上面的代码已经实现了基本的背压控制:当客户端 Send 通道满时,Hub 直接丢弃该客户端。但在生产环境中,你可能需要更精细的策略:

go
// 优先级发送队列:系统消息优先,聊天消息可丢
type PrioritySend struct {
	highCh chan []byte // 系统消息(连接确认、强制下线等)
	lowCh  chan []byte // 普通消息(聊天、广播)
}

func (c *Client) writePumpWithPriority(ctx context.Context) {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.Conn.Close(websocket.StatusNormalClosure, "done")
	}()

	for {
		select {
		case <-ctx.Done():
			return
		case msg := <-c.PrioritySend.highCh:
			// 高优先级消息必须送达
			c.writeWithTimeout(ctx, msg)
		default:
			// 没有高优先级消息时,尝试读取低优先级
			select {
			case msg := <-c.PrioritySend.highCh:
				c.writeWithTimeout(ctx, msg)
			case msg, ok := <-c.PrioritySend.lowCh:
				if !ok {
					return
				}
				c.writeWithTimeout(ctx, msg)
			case <-ticker.C:
				c.Conn.Ping(ctx)
			case <-ctx.Done():
				return
			}
		}
	}
}

2. 房间/频道分组

单一大广播频道无法满足复杂场景。引入房间概念:

go
// Room 房间管理
type Room struct {
	ID      string
	Clients map[string]*Client
	mu      sync.RWMutex
}

type RoomHub struct {
	rooms     map[string]*Room
	clientRoom map[string]string // clientID → roomID 映射
	register  chan *roomJoin
	leave     chan *roomLeave
	broadcast chan *roomBroadcast
}

type roomJoin struct {
	client *Client
	roomID string
}

type roomLeave struct {
	clientID string
	roomID   string
}

type roomBroadcast struct {
	roomID  string
	from    string
	payload []byte
}

func (rh *RoomHub) Run(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return

		case join := <-rh.register:
			room, ok := rh.rooms[join.roomID]
			if !ok {
				room = &Room{
					ID:      join.roomID,
					Clients: make(map[string]*Client),
				}
				rh.rooms[join.roomID] = room
			}
			room.Clients[join.client.ID] = join.client
			rh.clientRoom[join.client.ID] = join.roomID

		case leave := <-rh.leave:
			if room, ok := rh.rooms[leave.roomID]; ok {
				delete(room.Clients, leave.clientID)
				if len(room.Clients) == 0 {
					delete(rh.rooms, leave.roomID)
				}
			}
			delete(rh.clientRoom, leave.clientID)

		case msg := <-rh.broadcast:
			if room, ok := rh.rooms[msg.roomID]; ok {
				for id, client := range room.Clients {
					if id == msg.from {
						continue
					}
					select {
					case client.Send <- msg.payload:
					default:
						// 背压:丢弃慢客户端
						close(client.Send)
						delete(room.Clients, id)
						delete(rh.clientRoom, id)
					}
				}
			}
		}
	}
}

3. 连接限流与认证

go
import (
	"golang.org/x/time/rate"
	"sync"
)

// RateLimiter 连接级限流
type RateLimiter struct {
	limiters map[string]*rate.Limiter
	mu       sync.Mutex
	rate     rate.Limit
	burst    int
}

func NewRateLimiter(r rate.Limit, burst int) *RateLimiter {
	return &RateLimiter{
		limiters: make(map[string]*rate.Limiter),
		rate:     r,
		burst:    burst,
	}
}

func (rl *RateLimiter) Get(clientID string) *rate.Limiter {
	rl.mu.Lock()
	defer rl.mu.Unlock()

	limiter, ok := rl.limiters[clientID]
	if !ok {
		limiter = rate.NewLimiter(rl.rate, rl.burst)
		rl.limiters[clientID] = limiter
	}
	return limiter
}

// 认证中间件
func (s *Server) authenticated(h http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		token := r.URL.Query().Get("token")
		if token == "" {
			token = r.Header.Get("Authorization")
		}

		userID, err := s.validateToken(token)
		if err != nil {
			http.Error(w, "Unauthorized", http.StatusUnauthorized)
			return
		}

		// 将 userID 注入 context
		ctx := context.WithValue(r.Context(), ctxKeyUserID, userID)
		h.ServeHTTP(w, r.WithContext(ctx))
	})
}

4. 监控指标(Prometheus)

go
import (
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
	connectionsGauge = promauto.NewGauge(prometheus.GaugeOpts{
		Name: "websocket_active_connections",
		Help: "当前活跃 WebSocket 连接数",
	})

	messagesCounter = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "websocket_messages_total",
		Help: "消息计数",
	}, []string{"direction", "type"})

	messageLatency = promauto.NewHistogram(prometheus.HistogramOpts{
		Name:    "websocket_message_latency_seconds",
		Help:    "消息处理延迟",
		Buckets: prometheus.DefBuckets,
	})
)

// 在 Hub.Run 中更新指标
func (h *Hub) updateMetrics() {
	connectionsGauge.Set(float64(len(h.clients)))
}

// 在 readPump/writePump 中记录
func recordMessage(direction, msgType string, start time.Time) {
	messagesCounter.WithLabelValues(direction, msgType).Inc()
	messageLatency.Observe(time.Since(start).Seconds())
}

性能基准测试

连接容量测试

go
package ws_test

import (
	"context"
	"fmt"
	"net/http/httptest"
	"testing"
	"time"

	"nhooyr.io/websocket"
)

func BenchmarkConnections(b *testing.B) {
	// 启动测试服务器
	hub := ws.NewHub(slog.Default())
	ctx := context.Background()
	go hub.Run(ctx)
	defer ctx.Err()

	server := ws.NewServer(hub, slog.Default())
	ts := httptest.NewServer(server)
	defer ts.Close()

	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		conn, _, err := websocket.Dial(ctx,
			fmt.Sprintf("ws%s/ws", ts.URL[4:]), nil,
		)
		if err != nil {
			b.Fatal(err)
		}
		conn.Close(websocket.StatusNormalClosure, "")
	}
}

func TestConcurrentBroadcast(t *testing.T) {
	const numClients = 1000
	const numMessages = 100

	hub := ws.NewHub(slog.Default())
	ctx := context.Background()
	go hub.Run(ctx)
	defer ctx.Err()

	server := ws.NewServer(hub, slog.Default())
	ts := httptest.NewServer(server)
	defer ts.Close()

	// 建立 N 个连接
	conns := make([]*websocket.Conn, numClients)
	for i := 0; i < numClients; i++ {
		conn, _, err := websocket.Dial(ctx,
			fmt.Sprintf("ws%s/ws", ts.URL[4:]), nil,
		)
		if err != nil {
			t.Fatalf("connect %d: %v", i, err)
		}
		conns[i] = conn
	}

	// 等待所有连接注册
	time.Sleep(100 * time.Millisecond)

	start := time.Now()
	for i := 0; i < numMessages; i++ {
		msg, _ := json.Marshal(ws.Message{
			Type:    "chat",
			From:    "benchmark",
			Content: json.RawMessage(`"hello"`),
			Time:    time.Now().UnixMilli(),
		})
		hub.Broadcast(ctx, msg)
	}
	elapsed := time.Since(start)

	t.Logf("%d clients × %d messages = %d total, elapsed: %v",
		numClients, numMessages, numClients*numMessages, elapsed,
	)
	t.Logf("Throughput: %.0f msg/s", float64(numClients*numMessages)/elapsed.Seconds())

	// 清理
	for _, conn := range conns {
		conn.Close(websocket.StatusNormalClosure, "")
	}
}

典型性能数据(4 核 8GB 服务器)

指标数值
最大并发连接~50,000
单连接内存~8KB (goroutine + buffer)
广播延迟 (1000 连接)< 5ms
广播延迟 (10000 连接)< 50ms
消息吞吐量> 100,000 msg/s
CPU 占用 (10K 空闲连接)< 5%

内核调优(Linux)

支持万级连接需要调整内核参数:

bash
# /etc/sysctl.d/99-websocket.conf

# 增大文件描述符上限
fs.file-max = 1000000

# TCP 连接队列
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535

# TIME_WAIT 复用
net.ipv4.tcp_tw_reuse = 1

# TCP 缓冲区
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216

# Keepalive
net.ipv4.tcp_keepalive_time = 60
net.ipv4.tcp_keepalive_intvl = 10
net.ipv4.tcp_keepalive_probes = 6
bash
# 应用配置
sudo sysctl -p /etc/sysctl.d/99-websocket.conf

# 进程级文件描述符限制
ulimit -n 1000000

优雅关闭策略

生产环境必须在关闭时确保消息不丢失:

go
func gracefulShutdown(httpServer *http.Server, hub *Hub, cancel context.CancelFunc) {
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh

	logger.Info("graceful shutdown initiated")

	// 第 1 步:标记不接收新连接(健康检查返回 503)
	httpServer.Handler.(*GracefulMux).SetDraining(true)

	// 第 2 步:等待现有连接处理完(最多 30 秒)
	shutdownCtx, shutdownCancel := context.WithTimeout(
		context.Background(), 30*time.Second,
	)
	defer shutdownCancel()

	// 第 3 步:关闭 HTTP 监听
	if err := httpServer.Shutdown(shutdownCtx); err != nil {
		logger.Error("http shutdown error", "error", err)
	}

	// 第 4 步:通知 Hub 停止,等待广播队列排空
	cancel()

	logger.Info("graceful shutdown complete")
}

常见陷阱与排查

1. goroutine 泄漏

症状:内存持续增长,pprof 显示大量 goroutine 堆积在 channel 收发。

根因:客户端断开时 readPump/writePump 没有正确退出。

排查

go
import _ "net/http/pprof"

// 在 main 中启动 pprof
go func() {
	log.Println(http.ListenAndServe("localhost:6060", nil))
}()

// 然后用浏览器访问 http://localhost:6060/debug/pprof/goroutine?debug=1

修复:确保 readPump 和 writePump 有明确的退出路径,且一方退出时另一方也能退出:

go
func (c *Client) readPump(ctx context.Context) {
	defer func() {
		c.closeOnce.Do(func() {
			c.Hub.unregister <- c
		})
	}()
	// ...
}

func (c *Client) writePump(ctx context.Context) {
	defer func() {
		c.closeOnce.Do(func() {
			c.Hub.unregister <- c
			c.Conn.Close(websocket.StatusNormalClosure, "")
		})
	}()
	// ...
}

2. 跨域安全

风险:恶意网站可利用用户浏览器向你的 WebSocket 服务发起连接(CSWSH 攻击)。

修复:严格限制 Origin:

go
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
	OriginPatterns: []string{
		"yourdomain.com",
		"*.yourdomain.com",
		"localhost:*", // 开发环境
	},
})

3. 消息大小限制

风险:客户端发送超大消息耗尽服务器内存。

修复:设置 ReadLimit 并在业务层验证:

go
c.Conn.SetReadLimit(maxMessageSize) // 4KB

// 业务层验证
if len(data) > maxMessageSize {
	c.Send <- errorMessage("message too large")
	return
}

完整项目结构

websocket-server/
├── cmd/
│   └── server/
│       └── main.go          # 入口,组装所有组件
├── internal/
│   └── ws/
│       ├── hub.go           # Hub 中心管理器
│       ├── client.go        # Client 读写循环
│       ├── room.go          # 房间/频道管理
│       ├── middleware.go    # 认证、限流中间件
│       ├── metrics.go       # Prometheus 指标
│       └── server.go        # HTTP 升级处理
├── pkg/
│   └── message/
│       └── message.go       # 消息定义与验证
├── config/
│   └── config.go            # 配置加载
├── deploy/
│   ├── Dockerfile
│   └── k8s.yaml
├── go.mod
└── go.sum

参考资料


总结:构建万级并发 WebSocket 服务的关键在于——Hub 单 goroutine 事件循环消除锁竞争、读写分离的双 goroutine 模型、带背压的 channel 缓冲区、优先级消息队列、以及系统级内核参数调优。Go 语言的 goroutine 模型天然适合这种架构,每个连接仅消耗约 8KB 内存,使得单机承载 5 万+ 并发连接成为可能。

上次更新于: