Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Go 微服务治理:熔断、限流与降级实战

在分布式系统中,服务间的依赖关系错综复杂。当某个服务出现故障时,如果不加以控制,故障可能会像滚雪球一样蔓延,导致整个系统瘫痪。熔断、限流和降级是保障微服务稳定性的三大法宝。

为什么需要服务治理?

雪崩效应

用户服务 → 订单服务 → 库存服务 → 数据库

            数据库故障

    订单服务超时 → 用户服务超时 → 全站不可用

三大保护机制

机制作用触发条件
熔断快速失败,防止故障扩散错误率超过阈值
限流控制流量,保护系统资源请求量超过容量
降级牺牲非核心功能,保障核心系统负载过高

熔断器实现

熔断器状态机

        失败率 < 阈值              失败率 >= 阈值
    ┌──────────────┐           ┌──────────────┐
    │              │           │              │
    ▼              │           ▼              │
┌────────┐         │      ┌──────────┐        │
│ CLOSED │─────────┘      │   OPEN   │────────┘
│ (正常) │  成功           │ (熔断)   │  超时
└────────┘                 └────┬─────┘
    ▲                           │
    │     熔断时间到达          │
    └───────────────────────────┘


          ┌──────────┐
          │ HALF-OPEN│
          │(半开状态)│
          └──────────┘

完整熔断器实现

go
package circuitbreaker

import (
    "errors"
    "sync"
    "sync/atomic"
    "time"
)

// State 熔断器状态
type State int32

const (
    StateClosed    State = iota // 关闭状态,正常请求
    StateOpen                   // 开启状态,拒绝请求
    StateHalfOpen               // 半开状态,试探请求
)

func (s State) String() string {
    switch s {
    case StateClosed:
        return "closed"
    case StateOpen:
        return "open"
    case StateHalfOpen:
        return "half-open"
    default:
        return "unknown"
    }
}

// Config 熔断器配置
type Config struct {
    // 触发熔断的失败率阈值 (0-1)
    FailureThreshold float64
    // 触发熔断的最小请求数
    MinRequests int64
    // 熔断持续时间
    Timeout time.Duration
    // 半开状态允许的请求数
    HalfOpenMaxRequests int64
    // 统计窗口大小
    WindowSize time.Duration
}

// DefaultConfig 默认配置
func DefaultConfig() Config {
    return Config{
        FailureThreshold:    0.5,
        MinRequests:         10,
        Timeout:             30 * time.Second,
        HalfOpenMaxRequests: 3,
        WindowSize:          10 * time.Second,
    }
}

// CircuitBreaker 熔断器
type CircuitBreaker struct {
    name   string
    config Config
    
    state     int32
    lastFail  int64
    
    // 统计信息
    requests  int64 // 总请求数
    failures  int64 // 失败数
    successes int64 // 成功数
    
    mu sync.RWMutex
}

// New 创建熔断器
func New(name string, config Config) *CircuitBreaker {
    return &CircuitBreaker{
        name:   name,
        config: config,
        state:  int32(StateClosed),
    }
}

// State 获取当前状态
func (cb *CircuitBreaker) State() State {
    return State(atomic.LoadInt32(&cb.state))
}

// Allow 检查是否允许请求
func (cb *CircuitBreaker) Allow() bool {
    state := cb.State()
    
    switch state {
    case StateClosed:
        return true
        
    case StateOpen:
        // 检查是否超过熔断时间
        lastFail := atomic.LoadInt64(&cb.lastFail)
        if time.Now().UnixNano()-lastFail > int64(cb.config.Timeout) {
            // 切换到半开状态
            if atomic.CompareAndSwapInt32(&cb.state, int32(StateOpen), int32(StateHalfOpen)) {
                atomic.StoreInt64(&cb.requests, 0)
                atomic.StoreInt64(&cb.failures, 0)
                atomic.StoreInt64(&cb.successes, 0)
            }
            return true
        }
        return false
        
    case StateHalfOpen:
        // 半开状态限制请求数
        requests := atomic.AddInt64(&cb.requests, 1)
        return requests <= cb.config.HalfOpenMaxRequests
        
    default:
        return false
    }
}

// Execute 执行受保护的函数
func (cb *CircuitBreaker) Execute(fn func() error) error {
    if !cb.Allow() {
        return errors.New("circuit breaker is open")
    }
    
    err := fn()
    cb.RecordResult(err)
    return err
}

// RecordResult 记录执行结果
func (cb *CircuitBreaker) RecordResult(err error) {
    if err != nil {
        cb.recordFailure()
    } else {
        cb.recordSuccess()
    }
}

func (cb *CircuitBreaker) recordFailure() {
    atomic.AddInt64(&cb.failures, 1)
    atomic.StoreInt64(&cb.lastFail, time.Now().UnixNano())
    
    cb.checkThreshold()
}

func (cb *CircuitBreaker) recordSuccess() {
    atomic.AddInt64(&cb.successes, 1)
    
    // 半开状态下成功则关闭熔断器
    if cb.State() == StateHalfOpen {
        atomic.StoreInt32(&cb.state, int32(StateClosed))
    }
}

func (cb *CircuitBreaker) checkThreshold() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    requests := atomic.LoadInt64(&cb.requests)
    failures := atomic.LoadInt64(&cb.failures)
    
    // 检查是否达到最小请求数
    if requests < cb.config.MinRequests {
        return
    }
    
    // 计算失败率
    failureRate := float64(failures) / float64(requests)
    
    // 超过阈值则熔断
    if failureRate >= cb.config.FailureThreshold {
        atomic.StoreInt32(&cb.state, int32(StateOpen))
        atomic.StoreInt64(&cb.lastFail, time.Now().UnixNano())
    }
}

// Stats 获取统计信息
func (cb *CircuitBreaker) Stats() map[string]interface{} {
    return map[string]interface{}{
        "name":      cb.name,
        "state":     cb.State().String(),
        "requests":  atomic.LoadInt64(&cb.requests),
        "failures":  atomic.LoadInt64(&cb.failures),
        "successes": atomic.LoadInt64(&cb.successes),
    }
}

// Reset 重置熔断器
func (cb *CircuitBreaker) Reset() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    atomic.StoreInt32(&cb.state, int32(StateClosed))
    atomic.StoreInt64(&cb.requests, 0)
    atomic.StoreInt64(&cb.failures, 0)
    atomic.StoreInt64(&cb.successes, 0)
    atomic.StoreInt64(&cb.lastFail, 0)
}

使用示例

go
func main() {
    // 创建熔断器
    cb := circuitbreaker.New("order-service", circuitbreaker.Config{
        FailureThreshold:    0.5,  // 50% 失败率触发熔断
        MinRequests:         10,   // 至少 10 个请求才统计
        Timeout:             30 * time.Second,
        HalfOpenMaxRequests: 3,
    })
    
    // 执行受保护的操作
    err := cb.Execute(func() error {
        return callOrderService()
    })
    
    if err != nil {
        log.Printf("Request failed: %v", err)
    }
    
    // 查看熔断器状态
    stats := cb.Stats()
    log.Printf("Circuit breaker stats: %+v", stats)
}

func callOrderService() error {
    // 模拟调用订单服务
    resp, err := http.Get("http://order-service/api/orders")
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != 200 {
        return fmt.Errorf("service returned %d", resp.StatusCode)
    }
    return nil
}

限流算法实现

1. 令牌桶算法

go
package ratelimit

import (
    "context"
    "sync"
    "time"
)

// TokenBucket 令牌桶限流器
type TokenBucket struct {
    capacity int64         // 桶容量
    tokens   int64         // 当前令牌数
    rate     time.Duration // 令牌生成间隔
    lastTime int64         // 上次更新时间
    mu       sync.Mutex
}

// NewTokenBucket 创建令牌桶
// capacity: 桶容量
// rate: 每秒生成的令牌数
func NewTokenBucket(capacity int64, rate int) *TokenBucket {
    return &TokenBucket{
        capacity: capacity,
        tokens:   capacity,
        rate:     time.Second / time.Duration(rate),
        lastTime: time.Now().UnixNano(),
    }
}

// Allow 检查是否允许请求
func (tb *TokenBucket) Allow() bool {
    return tb.AllowN(1)
}

// AllowN 检查是否允许 n 个请求
func (tb *TokenBucket) AllowN(n int64) bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    now := time.Now().UnixNano()
    elapsed := now - tb.lastTime
    
    // 计算生成的令牌数
    newTokens := elapsed / int64(tb.rate)
    if newTokens > 0 {
        tb.tokens = min(tb.tokens+newTokens, tb.capacity)
        tb.lastTime = now
    }
    
    // 检查令牌是否足够
    if tb.tokens >= n {
        tb.tokens -= n
        return true
    }
    
    return false
}

// Wait 等待直到获取令牌
func (tb *TokenBucket) Wait(ctx context.Context) error {
    return tb.WaitN(ctx, 1)
}

// WaitN 等待直到获取 n 个令牌
func (tb *TokenBucket) WaitN(ctx context.Context, n int64) error {
    for {
        if tb.AllowN(n) {
            return nil
        }
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(time.Millisecond * 10):
            // 继续尝试
        }
    }
}

func min(a, b int64) int64 {
    if a < b {
        return a
    }
    return b
}

2. 漏桶算法

go
// LeakyBucket 漏桶限流器
type LeakyBucket struct {
    capacity   int64         // 桶容量
    water      int64         // 当前水量
    leakRate   time.Duration // 漏水间隔
    lastLeak   int64         // 上次漏水时间
    mu         sync.Mutex
}

// NewLeakyBucket 创建漏桶
// capacity: 桶容量
// rate: 每秒处理的请求数
func NewLeakyBucket(capacity int64, rate int) *LeakyBucket {
    return &LeakyBucket{
        capacity: capacity,
        leakRate: time.Second / time.Duration(rate),
        lastLeak: time.Now().UnixNano(),
    }
}

// Allow 检查是否允许请求
func (lb *LeakyBucket) Allow() bool {
    lb.mu.Lock()
    defer lb.mu.Unlock()
    
    now := time.Now().UnixNano()
    elapsed := now - lb.lastLeak
    
    // 计算漏掉的水量
    leaked := elapsed / int64(lb.leakRate)
    if leaked > 0 {
        lb.water = max(0, lb.water-leaked)
        lb.lastLeak = now
    }
    
    // 检查桶是否已满
    if lb.water < lb.capacity {
        lb.water++
        return true
    }
    
    return false
}

func max(a, b int64) int64 {
    if a > b {
        return a
    }
    return b
}

3. 滑动窗口限流

go
// SlidingWindow 滑动窗口限流器
type SlidingWindow struct {
    limit    int           // 窗口内最大请求数
    window   time.Duration // 窗口大小
    requests []int64       // 请求时间戳
    mu       sync.Mutex
}

// NewSlidingWindow 创建滑动窗口限流器
func NewSlidingWindow(limit int, window time.Duration) *SlidingWindow {
    return &SlidingWindow{
        limit:    limit,
        window:   window,
        requests: make([]int64, 0, limit),
    }
}

// Allow 检查是否允许请求
func (sw *SlidingWindow) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    
    now := time.Now().UnixNano()
    windowStart := now - int64(sw.window)
    
    // 清理过期的请求记录
    validRequests := make([]int64, 0, len(sw.requests))
    for _, t := range sw.requests {
        if t > windowStart {
            validRequests = append(validRequests, t)
        }
    }
    sw.requests = validRequests
    
    // 检查是否超过限制
    if len(sw.requests) < sw.limit {
        sw.requests = append(sw.requests, now)
        return true
    }
    
    return false
}

降级策略实现

go
package fallback

import (
    "context"
    "encoding/json"
    "log"
    "time"
)

// FallbackFunc 降级函数类型
type FallbackFunc func(ctx context.Context, req interface{}) (interface{}, error)

// Service 带降级保护的服务
type Service struct {
    name         string
    primary      func(ctx context.Context, req interface{}) (interface{}, error)
    fallback     FallbackFunc
    timeout      time.Duration
    enableCache  bool
    cache        map[string]*cacheItem
}

type cacheItem struct {
    data      interface{}
    timestamp time.Time
    ttl       time.Duration
}

// NewService 创建带降级保护的服务
func NewService(name string, primary func(ctx context.Context, req interface{}) (interface{}, error), opts ...Option) *Service {
    s := &Service{
        name:    name,
        primary: primary,
        timeout: 5 * time.Second,
        cache:   make(map[string]*cacheItem),
    }
    
    for _, opt := range opts {
        opt(s)
    }
    
    return s
}

type Option func(*Service)

func WithFallback(fn FallbackFunc) Option {
    return func(s *Service) {
        s.fallback = fn
    }
}

func WithTimeout(timeout time.Duration) Option {
    return func(s *Service) {
        s.timeout = timeout
    }
}

func WithCache(ttl time.Duration) Option {
    return func(s *Service) {
        s.enableCache = true
    }
}

// Execute 执行服务调用
func (s *Service) Execute(ctx context.Context, req interface{}) (interface{}, error) {
    ctx, cancel := context.WithTimeout(ctx, s.timeout)
    defer cancel()
    
    // 尝试主服务
    result, err := s.primary(ctx, req)
    if err == nil {
        // 更新缓存
        if s.enableCache {
            s.updateCache(req, result)
        }
        return result, nil
    }
    
    log.Printf("Primary service failed: %v, trying fallback", err)
    
    // 尝试缓存
    if s.enableCache {
        if cached := s.getCache(req); cached != nil {
            log.Printf("Returning cached data")
            return cached, nil
        }
    }
    
    // 执行降级
    if s.fallback != nil {
        return s.fallback(ctx, req)
    }
    
    return nil, err
}

func (s *Service) updateCache(req interface{}, data interface{}) {
    key := s.cacheKey(req)
    s.cache[key] = &cacheItem{
        data:      data,
        timestamp: time.Now(),
        ttl:       5 * time.Minute,
    }
}

func (s *Service) getCache(req interface{}) interface{} {
    key := s.cacheKey(req)
    item, exists := s.cache[key]
    if !exists {
        return nil
    }
    
    if time.Since(item.timestamp) > item.ttl {
        delete(s.cache, key)
        return nil
    }
    
    return item.data
}

func (s *Service) cacheKey(req interface{}) string {
    data, _ := json.Marshal(req)
    return string(data)
}

// 常用降级策略

// StaticFallback 返回静态默认值
func StaticFallback(defaultValue interface{}) FallbackFunc {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        return defaultValue, nil
    }
}

// EmptyFallback 返回空结果
func EmptyFallback() FallbackFunc {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        return nil, nil
    }
}

// ErrorFallback 返回友好错误
func ErrorFallback(errMsg string) FallbackFunc {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        return nil, &FriendlyError{Message: errMsg}
    }
}

type FriendlyError struct {
    Message string
}

func (e *FriendlyError) Error() string {
    return e.Message
}

综合实战:HTTP 服务治理

go
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "time"
    
    "github.com/pfinal/circuitbreaker"
    "github.com/pfinal/ratelimit"
    "github.com/pfinal/fallback"
)

// ServiceClient 带治理的 HTTP 客户端
type ServiceClient struct {
    baseURL        string
    circuitBreaker *circuitbreaker.CircuitBreaker
    rateLimiter    *ratelimit.TokenBucket
    client         *http.Client
}

func NewServiceClient(baseURL string) *ServiceClient {
    return &ServiceClient{
        baseURL: baseURL,
        circuitBreaker: circuitbreaker.New("api-service", circuitbreaker.Config{
            FailureThreshold: 0.5,
            MinRequests:      10,
            Timeout:          30 * time.Second,
        }),
        rateLimiter: ratelimit.NewTokenBucket(100, 50), // 容量100,每秒50个
        client: &http.Client{
            Timeout: 5 * time.Second,
        },
    }
}

func (c *ServiceClient) Call(ctx context.Context, method, path string) (*http.Response, error) {
    // 1. 限流检查
    if !c.rateLimiter.Allow() {
        return nil, fmt.Errorf("rate limit exceeded")
    }
    
    // 2. 熔断检查并执行
    var resp *http.Response
    var err error
    
    cbErr := c.circuitBreaker.Execute(func() error {
        url := c.baseURL + path
        req, reqErr := http.NewRequestWithContext(ctx, method, url, nil)
        if reqErr != nil {
            return reqErr
        }
        
        resp, err = c.client.Do(req)
        if err != nil {
            return err
        }
        
        if resp.StatusCode >= 500 {
            return fmt.Errorf("server error: %d", resp.StatusCode)
        }
        
        return nil
    })
    
    if cbErr != nil {
        return nil, cbErr
    }
    
    return resp, err
}

// 使用示例
func main() {
    client := NewServiceClient("http://api.example.com")
    
    // 创建带降级的服务
    userService := fallback.NewService("user-service",
        func(ctx context.Context, req interface{}) (interface{}, error) {
            resp, err := client.Call(ctx, "GET", "/users/"+req.(string))
            if err != nil {
                return nil, err
            }
            // 解析响应...
            return resp, nil
        },
        fallback.WithFallback(fallback.StaticFallback(map[string]interface{}{
            "id":       req.(string),
            "name":     "Unknown",
            "fallback": true,
        })),
        fallback.WithTimeout(3*time.Second),
        fallback.WithCache(5*time.Minute),
    )
    
    // 执行调用
    result, err := userService.Execute(context.Background(), "123")
    if err != nil {
        log.Printf("Error: %v", err)
    } else {
        log.Printf("Result: %+v", result)
    }
}

监控与告警

go
// Metrics 监控指标
type Metrics struct {
    CircuitBreakerState *prometheus.GaugeVec
    RequestTotal        *prometheus.CounterVec
    RequestDuration     *prometheus.HistogramVec
    RateLimitHits     prometheus.Counter
}

func NewMetrics() *Metrics {
    return &Metrics{
        CircuitBreakerState: prometheus.NewGaugeVec(prometheus.GaugeOpts{
            Name: "circuit_breaker_state",
            Help: "Circuit breaker state (0=closed, 1=half-open, 2=open)",
        }, []string{"name"}),
        RequestTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
            Name: "http_requests_total",
            Help: "Total HTTP requests",
        }, []string{"service", "status"}),
        RequestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
            Name:    "http_request_duration_seconds",
            Help:    "HTTP request duration",
            Buckets: prometheus.DefBuckets,
        }, []string{"service"}),
        RateLimitHits: prometheus.NewCounter(prometheus.CounterOpts{
            Name: "rate_limit_hits_total",
            Help: "Total rate limit hits",
        }),
    }
}

最佳实践总结

  1. 熔断器配置:根据业务特点调整阈值,避免过于敏感或迟钝
  2. 限流粒度:按用户、IP、接口多维度限流
  3. 降级策略:优先返回缓存,其次静态数据,最后友好错误
  4. 监控告警:实时关注熔断、限流触发频率
  5. 渐进恢复:半开状态逐步放量,避免瞬时压垮
  6. 测试验证:定期进行混沌测试,验证治理效果

总结

通过本文的实战示例,你已经掌握了:

  • 熔断器的三种状态及转换机制
  • 令牌桶、漏桶、滑动窗口三种限流算法
  • 降级策略的设计与实现
  • 综合应用:HTTP 服务治理方案

这些机制是构建高可用微服务系统的基础,建议在生产环境中结合 Prometheus、Grafana 进行监控,形成完整的服务治理体系。


参考资源:

上次更新于: