Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Go 微服务架构实战指南

从单体到微服务的架构演进,掌握构建高可用、可扩展系统的核心技能。

📚 目录


1. 为什么选择 Go 构建微服务

1.1 Go 的天然优势

go
// ✅ 轻量级协程,适合高并发场景
func handleRequest() {
    // 每个请求一个 goroutine,内存占用仅 ~2KB
    go process()
}

// ✅ 编译为单一二进制文件,部署简单
// ✅ 启动速度快(毫秒级)
// ✅ 强类型,编译时捕获错误
// ✅ 内置 HTTP/2 和 gRPC 支持

1.2 微服务技术栈对比

特性GoJava (Spring)Node.jsPython
内存占用低 (~5MB)高 (~100MB+)中等
启动速度快 (<100ms)慢 (>2s)中等
并发模型协程线程池事件循环GIL限制
部署方式二进制JAR/Dockernpmpip
gRPC 支持原生优秀良好一般一般

2. 核心组件详解

2.1 项目结构

microservices-demo/
├── api-gateway/          # API 网关
├── user-service/         # 用户服务
├── order-service/        # 订单服务
├── product-service/      # 商品服务
├── notification-service/ # 通知服务
└── shared/
    ├── proto/            # protobuf 定义
    └── pkg/              # 公共包

2.2 基础服务模板

go
// main.go - 微服务标准入口
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

type Config struct {
    ServiceName string
    Port        int
}

func main() {
    cfg := Config{
        ServiceName: "user-service",
        Port:        8080,
    }

    // 初始化路由
    r := gin.Default()

    // 健康检查端点
    r.GET("/health", healthCheck)
    
    // 就绪检查端点
    r.GET("/ready", readinessCheck)
    
    // Prometheus 指标
    r.GET("/metrics", gin.WrapH(promhttp.Handler()))

    // API 路由
    v1 := r.Group("/api/v1")
    {
        v1.GET("/users/:id", getUserHandler)
        v1.POST("/users", createUserHandler)
        v1.PUT("/users/:id", updateUserHandler)
    }

    // 启动服务器
    srv := &http.Server{
        Addr:         fmt.Sprintf(":%d", cfg.Port),
        Handler:      r,
        ReadTimeout:  10 * time.Second,
        WriteTimeout: 10 * time.Second,
    }

    // 优雅关闭
    go func() {
        log.Printf("%s starting on :%d", cfg.ServiceName, cfg.Port)
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("server error: %v", err)
        }
    }()

    // 等待中断信号
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down server...")
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("Server forced to shutdown:", err)
    }
    log.Println("Server exited")
}

3. 服务间通信

3.1 RESTful API

go
// handler/user_handler.go
package handler

import (
    "net/http"
    "strconv"

    "github.com/gin-gonic/gin"
)

type UserHandler struct {
    userService UserService
}

func NewUserHandler(svc UserService) *UserHandler {
    return &UserHandler{userService: svc}
}

// @Summary Get user by ID
// @Tags users
// @Produce json
// @Param id path int true "User ID"
// @Success 200 {object} UserResponse
// @Router /api/v1/users/{id} [get]
func (h *UserHandler) GetUser(c *gin.Context) {
    id, err := strconv.Atoi(c.Param("id"))
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": "invalid user id"})
        return
    }

    user, err := h.userService.GetUserByID(c.Request.Context(), uint(id))
    if err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "user not found"})
        return
    }

    c.JSON(http.StatusOK, UserResponse{
        ID:       user.ID,
        Name:     user.Name,
        Email:    user.Email,
        CreatedAt: user.CreatedAt,
    })
}

3.2 gRPC 服务定义

protobuf
syntax = "proto3";

package user.v1;

option go_package = "github.com/demo/proto/gen/go/user/v1;userv1";

service UserService {
    rpc GetUser(GetUserRequest) returns (GetUserResponse);
    rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
    rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
}

message GetUserRequest {
    int64 id = 1;
}

message GetUserResponse {
    User user = 1;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
}

message CreateUserResponse {
    int64 id = 1;
}

message ListUsersRequest {
    int32 page_size = 1;
    string page_token = 2;
}

message ListUsersResponse {
    repeated User users = 1;
    string next_page_token = 2;
}

message User {
    int64 id = 1;
    string name = 2;
    string email = 3;
    string created_at = 4;
}

3.3 gRPC 服务实现

go
// server/user_server.go
package server

import (
    "context"
    "errors"
    "time"

    pb "github.com/demo/proto/gen/go/user/v1"
)

type UserServer struct {
    repo UserRepository
    pb.UnimplementedUserServiceServer
}

func NewUserServer(repo UserRepository) *UserServer {
    return &UserServer{repo: repo}
}

func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, err := s.repo.GetByID(ctx, req.Id)
    if err != nil {
        return nil, status.Error(codes.NotFound, "user not found")
    }

    return &pb.GetUserResponse{
        User: &pb.User{
            Id:   user.ID,
            Name: user.Name,
        },
    }, nil
}

func (s *UserServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    if req.Name == "" || req.Email == "" {
        return nil, status.Error(codes.InvalidArgument, "name and email are required")
    }

    user := &domain.User{
        Name:      req.Name,
        Email:     req.Email,
        CreatedAt: time.Now(),
    }

    if err := s.repo.Create(ctx, user); err != nil {
        return nil, status.Error(codes.Internal, "failed to create user")
    }

    return &pb.CreateUserResponse{Id: user.ID}, nil
}

3.4 客户端调用示例

go
// client/user_client.go
package client

import (
    "context"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    pb "github.com/demo/proto/gen/go/user/v1"
)

type UserClient struct {
    conn   *grpc.ClientConn
    client pb.UserServiceClient
}

func NewUserClient(addr string) (*UserClient, error) {
    conn, err := grpc.Dial(
        addr,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultCallOptions(grpc.MaxRecvMsgSize(10*1024*1024)),
    )
    if err != nil {
        return nil, err
    }
    return &UserClient{
        conn:   conn,
        client: pb.NewUserServiceClient(conn),
    }, nil
}

func (c *UserClient) GetUser(ctx context.Context, id int64) (*pb.User, error) {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    resp, err := c.client.GetUser(ctx, &pb.GetUserRequest{Id: id})
    if err != nil {
        return nil, err
    }
    return resp.User, nil
}

func (c *UserClient) Close() error {
    return c.conn.Close()
}

4. 服务发现与负载均衡

4.1 使用 Consul 作为注册中心

go
// registry/consul.go
package registry

import (
    "fmt"
    "net/http"
    "strings"

    consulapi "github.com/hashicorp/consul/api"
)

type ConsulRegistry struct {
    client *consulapi.Client
    config *ConsulConfig
}

type ConsulConfig struct {
    Address    string
    ServiceID  string
    ServiceName string
    Port       int
    Tags       []string
}

func NewConsulRegistry(config *ConsulConfig) (*ConsulRegistry, error) {
    client, err := consulapi.NewClient(&consulapi.Config{
        Address: config.Address,
    })
    if err != nil {
        return nil, fmt.Errorf("failed to connect to consul: %w", err)
    }
    return &ConsulRegistry{client: client, config: config}, nil
}

func (r *ConsulRegistry) Register() error {
    registration := new(consulapi.AgentServiceRegistration)

    registration.ID = r.config.ServiceID
    registration.Name = r.config.ServiceName
    registration.Port = r.config.Port
    registration.Tags = r.config.Tags
    
    // 健康检查配置
    registration.Check = &consulapi.AgentServiceCheck{
        HTTP:                           fmt.Sprintf("http://localhost:%d/health", r.config.Port),
        Interval:                       "10s",
        Timeout:                        "5s",
        DeregisterCriticalServiceAfter: "30s",
    }
    
    return r.client.Agent().ServiceRegister(registration)
}

func (r *ConsulRegistry) Deregister() error {
    return r.client.Agent().ServiceDeregister(r.config.ServiceID)
}

func (r *ConsulRegistry) Discover(serviceName string) ([]string, error) {
    services, _, err := r.client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }
    
    var addresses []string
    for _, service := range services {
        address := fmt.Sprintf("%s:%d", 
            service.Service.Address, 
            service.Service.Port,
        )
        addresses = append(addresses, address)
    }
    return addresses, nil
}

4.2 客户端负载均衡器

go
// balancer/round_robin.go
package balancer

import (
    "sync"
    "sync/atomic"
)

// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
    mu       sync.RWMutex
    endpoints []string
    counter  atomic.Uint64
}

func NewRoundRobinBalancer(endpoints []string) *RoundRobinBalancer {
    b := &RoundRobinBalancer{}
    b.UpdateEndpoints(endpoints)
    return b
}

func (b *RoundRobinBalancer) UpdateEndpoints(endpoints []string) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.endpoints = endpoints
}

func (b *RoundRobinBalancer) Next() (string, error) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    
    if len(b.endpoints) == 0 {
        return "", ErrNoEndpoints
    }
    
    idx := b.counter.Add(1) % uint64(len(b.endpoints))
    return b.endpoints[idx], nil
}

// WeightedRandomBalancer 加权随机负载均衡器
type WeightedRandomBalancer struct {
    mu       sync.RWMutex
    endpoints []*WeightedEndpoint
}

type WeightedEndpoint struct {
    Address string
    Weight  int
}

5. 配置管理

5.1 Viper 配置管理

go
// config/config.go
package config

import (
    "fmt"
    "os"
    "path/filepath"
    "time"

    "github.com/spf13/viper"
)

type Config struct {
    Server   ServerConfig   `mapstructure:"server"`
    Database DatabaseConfig `mapstructure:"database"`
    Redis    RedisConfig    `mapstructure:"redis"`
    Jaeger   JaegerConfig   `mapstructure:"jaeger"`
}

type ServerConfig struct {
    Host           string        `mapstructure:"host"`
    Port           int           `mapstructure:"port"`
    ReadTimeout    time.Duration `mapstructure:"read_timeout"`
    WriteTimeout   time.Duration `mapstructure:"write_timeout"`
    GracefulShutdown time.Duration `mapstructure:"graceful_shutdown"`
}

type DatabaseConfig struct {
    Host         string `mapstructure:"host"`
    Port         int    `mapstructure:"port"`
    Username     string `mapstructure:"username"`
    Password     string `mapstructure:"password"`
    DBName       string `mapstructure:"dbname"`
    MaxOpenConns int    `mapstructure:"max_open_conns"`
    MaxIdleConns int    `mapstructure:"max_idle_conns"`
}

func Load(path string) (*Config, error) {
    viper.SetConfigFile(path)
    viper.SetConfigType("yaml")

    // 支持环境变量覆盖
    viper.AutomaticEnv()
    viper.SetEnvPrefix("APP")
    viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

    if err := viper.ReadInConfig(); err != nil {
        return nil, fmt.Errorf("failed to read config: %w", err)
    }

    var cfg Config
    if err := viper.Unmarshal(&cfg); err != nil {
        return nil, fmt.Errorf("failed to unmarshal config: %w", err)
    }

    return &cfg, nil
}

5.2 配置热更新

go
// config/watcher.go
package config

import (
    "log"
    "sync"

    "github.com/fsnotify/fsnotify"
)

type WatchableConfig struct {
    config *Config
    mu     sync.RWMutex
    onChange []func(*Config)
}

func WatchConfig(path string) (*WatchableConfig, error) {
    wc := &WatchableConfig{}

    cfg, err := Load(path)
    if err != nil {
        return nil, err
    }
    wc.config = cfg

    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        return nil, err
    }

    go func() {
        for {
            select {
            case event, ok := <-watcher.Events:
                if !ok {
                    return
                }
                if event.Op&fsnotify.Write == fsnotify.Write {
                    newCfg, err := Load(path)
                    if err != nil {
                        log.Printf("config reload failed: %v", err)
                        continue
                    }
                    
                    wc.mu.Lock()
                    wc.config = newCfg
                    wc.mu.Unlock()
                    
                    for _, fn := range wc.onChange {
                        fn(newCfg)
                    }
                    log.Println("Config reloaded successfully")
                }
            case err, ok := <-watcher.Errors:
                if !ok {
                    return
                }
                log.Printf("watcher error: %v", err)
            }
        }
    }()

    watcher.Add(filepath.Dir(path))
    return wc, nil
}

6. 可观测性

6.1 结构化日志

go
// logger/zap_logger.go
package logger

import (
    "os"
    "time"

    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

var Log *zap.Logger

func Init(env string) error {
    var config zap.Config
    
    switch env {
    case "production":
        config = zap.Config{
            Level:       zap.NewAtomicLevelAt(zap.InfoLevel),
            Development: false,
            Encoding:    "json",
            EncoderConfig: zapcore.EncoderConfig{
                TimeKey:        "timestamp",
                LevelKey:       "level",
                NameKey:        "logger",
                CallerKey:      "caller",
                MessageKey:     "msg",
                StacktraceKey:  "stacktrace",
                LineEnding:     zapcore.DefaultLineEnding,
                EncodeLevel:    zapcore.LowercaseLevelEncoder,
                EncodeTime:     zapcore.ISO8601TimeEncoder,
                EncodeDuration: zapcore.SecondsDurationEncoder,
                EncodeCaller:   zapcore.ShortCallerEncoder,
            },
            OutputPaths:      []string{"stdout"},
            ErrorOutputPaths: []string{"stderr"},
        }
    default:
        config = zap.DevelopmentConfig()
    }
    
    var err error
    Log, err = config.Build(zap.AddCallerSkip(1))
    return err
}

// 使用示例
func handleRequest(userID string) {
    logger.Log.Info("handling request",
        zap.String("userID", userID),
        zap.String("requestID", getTraceID()),
    )
}

6.2 OpenTelemetry 集成

go
// telemetry/tracing.go
package telemetry

import (
    "context"
    "log"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

func InitTracer(serviceName, jaegerURL string) error {
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerURL)))
    if err != nil {
        return err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String(serviceName),
        )),
    )

    otel.SetTracerProvider(tp)
    return nil
}

// 中间件集成
func TracingMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        ctx := c.Request.Context()
        
        tracer := otel.Tracer("gin-middleware")
        ctx, span := tracer.Start(ctx, c.FullPath())
        defer span.End()
        
        c.Request = c.Request.WithContext(ctx)
        c.Next()
    }
}

6.3 Prometheus 指标收集

go
// metrics/prometheus.go
package metrics

import (
    "github.com/gin-gonic/gin"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
    HttpRequestsTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "http_requests_total",
            Help: "Total number of HTTP requests",
        },
        []string{"method", "endpoint", "status"},
    )
    
    HttpRequestDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "http_request_duration_seconds",
            Help:    "HTTP request duration in seconds",
            Buckets: prometheus.DefBuckets,
        },
        []string{"method", "endpoint"},
    )
    
    ActiveConnectionsGauge = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "active_connections",
        Help: "Number of active connections",
    })
)

func MetricsMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        start := time.Now()
        
        ActiveConnectionsGauge.Inc()
        defer ActiveConnectionsGauge.Decr()
        
        c.Next()
        
        duration := time.Since(start).Seconds()
        method := c.Request.Method
        endpoint := c.FullPath()
        status := c.Writer.Status()
        
        HttpRequestsTotal.WithLabelValues(method, endpoint, 
            fmt.Sprintf("%d", status)).Inc()
        HttpRequestDuration.WithLabelValues(method, endpoint).Observe(duration)
    }
}

func MetricsHandler() gin.HandlerFunc {
    return func(c *gin.Context) {
        promhttp.Handler().ServeHTTP(c.Writer, c.Request)
    }
}

7. 实战案例:订单系统

7.1 领域模型

go
// order/domain/order.go
package domain

import (
    "errors"
    "time"
)

var (
    ErrOrderNotFound     = errors.New("order not found")
    ErrInvalidOrderStatus = errors.New("invalid order status")
    ErrInsufficientStock = errors.New("insufficient stock")
)

type OrderStatus string

const (
    OrderStatusPending    OrderStatus = "pending"
    OrderStatusPaid       OrderStatus = "paid"
    OrderStatusShipped    OrderStatus = "shipped"
    OrderStatusDelivered  OrderStatus = "delivered"
    OrderStatusCancelled  OrderStatus = "cancelled"
)

type Order struct {
    ID          uint64
    UserID      uint64
    Items       []OrderItem
    TotalAmount float64
    Status      OrderStatus
    CreatedAt   time.Time
    UpdatedAt   time.Time
}

type OrderItem struct {
    ProductID uint64
    Quantity  int
    UnitPrice float64
}

func NewOrder(userID uint64, items []OrderItem) (*Order, error) {
    totalAmount := 0.0
    for _, item := range items {
        totalAmount += float64(item.Quantity) * item.UnitPrice
    }
    
    return &Order{
        UserID:      userID,
        Items:       items,
        TotalAmount: totalAmount,
        Status:      OrderStatusPending,
        CreatedAt:   time.Now(),
        UpdatedAt:   time.Now(),
    }, nil
}

func (o *Order) CanTransitionTo(newStatus OrderStatus) bool {
    validTransitions := map[OrderStatus][]OrderStatus{
        OrderStatusPending: {OrderStatusPaid, OrderStatusCancelled},
        OrderStatusPaid:    {OrderStatusShipped, OrderStatusCancelled},
        OrderStatusShipped: {OrderStatusDelivered},
    }
    
    allowed, ok := validTransitions[o.Status]
    if !ok {
        return false
    }
    
    for _, s := range allowed {
        if s == newStatus {
            return true
        }
    }
    return false
}

func (o *Order) TransitionTo(newStatus OrderStatus) error {
    if !o.CanTransitionTo(newStatus) {
        return ErrInvalidOrderStatus
    }
    o.Status = newStatus
    o.UpdatedAt = time.Now()
    return nil
}

7.2 事件驱动架构

go
// order/event/event_bus.go
package event

import (
    "context"
    "encoding/json"
    "log"
    "sync"
)

type Event interface {
    Type() string
    AggregateID() string
    Data() interface{}
}

type EventHandler func(ctx context.Context, e Event) error

type EventBus struct {
    handlers map[string][]EventHandler
    mu       sync.RWMutex
}

func NewEventBus() *EventBus {
    return &EventBus{
        handlers: make(map[string][]EventHandler),
    }
}

func (eb *EventBus) Subscribe(eventType string, handler EventHandler) {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}

func (eb *EventBus) Publish(ctx context.Context, events ...Event) error {
    for _, e := range events {
        eb.mu.RLock()
        handlers := eb.handlers[e.Type()]
        eb.mu.RUnlock()
        
        for _, h := range handlers {
            if err := h(ctx, e); err != nil {
                return err
            }
        }
    }
    return nil
}

// 订单创建事件
type OrderCreatedEvent struct {
    OrderID     uint64
    UserID      uint64
    TotalAmount float64
    Items       []ItemData
    Timestamp   time.Time
}

func (e *OrderCreatedEvent) Type() string { return "order.created" }
func (e *OrderCreatedEvent) AggregateID() string { return fmt.Sprintf("%d", e.OrderID) }
func (e *OrderCreatedEvent) Data() interface{} { return e }

// 库存扣减处理器
func InventoryDeductionHandler(inventoryService InventoryService) EventHandler {
    return func(ctx context.Context, e Event) error {
        event := e.(*OrderCreatedEvent)
        for _, item := range event.Items {
            if err := inventoryService.DeductStock(ctx, item.ProductID, item.Quantity); err != nil {
                return err
            }
        }
        return nil
    }
}

// 发送确认邮件处理器
func SendConfirmationEmailHandler(emailService EmailService) EventHandler {
    return func(ctx context.Context, e Event) error {
        event := e.(*OrderCreatedEvent)
        return emailService.SendOrderConfirmation(ctx, event.UserID, event.OrderID)
    }
}

8. 常见陷阱与解决方案

8.1 分布式事务问题

问题:跨服务的数据一致性难以保证。

解决方案:Saga 模式或 TCC(Try-Confirm-Cancel)模式。

go
// saga/saga_orchestrator.go
package saga

type SagaStep struct {
    Name    string
    Execute func(context.Context) error
    Compensate func(context.Context) error
}

type SagaOrchestrator struct {
    steps []SagaStep
}

func NewSagaOrchestrator() *SagaOrchestrator {
    return &SagaOrchestrator{}
}

func (s *SagaOrchestrator) AddStep(step SagaStep) *SagaOrchestrator {
    s.steps = append(s.steps, step)
    return s
}

func (s *SagaOrchestrator) Execute(ctx context.Context) error {
    // 正向执行所有步骤
    completedSteps := 0
    for i, step := range s.steps {
        if err := step.Execute(ctx); err != nil {
            // 执行失败,开始补偿
            s.compensate(ctx, i-1)
            return err
        }
        completedSteps++
    }
    return nil
}

func (s *SagaOrchestrator) compensate(ctx context.Context, failedIndex int) {
    for i := failedIndex; i >= 0; i-- {
        step := s.steps[i]
        if err := step.Compensate(ctx); err != nil {
            log.Printf("compensation failed for step %s: %v", step.Name, err)
            // 继续补偿其他步骤
        }
    }
}

8.2 级联故障处理

问题:一个服务故障导致整个链路失败。

解决方案:熔断器和降级机制。

go
// circuitbreaker/circuit_breaker.go
package circuitbreaker

import (
    "errors"
    "sync"
    "time"
)

var (
    ErrCircuitOpen = errors.New("circuit breaker is open")
)

type State int

const (
    Closed State = iota
    Open
    HalfOpen
)

type CircuitBreaker struct {
    mu sync.Mutex
    
    state State
    
    failureThreshold int
    successThreshold int
    
    timeout time.Duration
    
    failures int
    successes int
    
    lastFailureTime time.Time
}

func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        state:             Closed,
        failureThreshold:  failureThreshold,
        successThreshold:  successThreshold,
        timeout:           timeout,
    }
}

func (cb *CircuitBreaker) Allow() error {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    switch cb.state {
    case Closed:
        return nil
        
    case Open:
        if time.Since(cb.lastFailureTime) > cb.timeout {
            cb.state = HalfOpen
            cb.successes = 0
            return nil
        }
        return ErrCircuitOpen
        
    case HalfOpen:
        return nil
    }
    return ErrCircuitOpen
}

func (cb *CircuitBreaker) RecordSuccess() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if cb.state == HalfOpen {
        cb.successes++
        if cb.successes >= cb.successThreshold {
            cb.state = Closed
            cb.failures = 0
        }
    } else {
        cb.failures = 0
    }
}

func (cb *CircuitBreaker) RecordFailure() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    cb.failures++
    cb.lastFailureTime = time.Now()
    
    if cb.failures >= cb.failureThreshold {
        cb.state = Open
    }
}

9. 性能调优

9.1 连接池优化

go
// database/pool.go
package database

import (
    "database/sql"
    "time"

    _ "github.com/lib/pq"
)

func NewPostgresPool(cfg DatabaseConfig) (*sql.DB, error) {
    dsn := fmt.Sprintf(
        "host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
        cfg.Host, cfg.Port, cfg.Username, cfg.Password, cfg.DBName,
    )
    
    db, err := sql.Open("postgres", dsn)
    if err != nil {
        return nil, err
    }
    
    // 关键连接池参数设置
    db.SetMaxOpenConns(cfg.MaxOpenConns)       // 最大连接数
    db.SetMaxIdleConns(cfg.MaxIdleConns)       // 最大空闲连接
    db.SetConnMaxLifetime(30 * time.Minute)     // 连接最大生命周期
    db.SetConnMaxIdleTime(5 * time.Minute)     // 空闲连接超时时间
    
    return db, db.Ping()
}

9.2 Redis 缓存策略

go
// cache/redis_cache.go
package cache

import (
    "context"
    "encoding/json"
    "time"

    "github.com/redis/go-redis/v9"
)

type Cache interface {
    Get(ctx context.Context, key string, dest interface{}) error
    Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error
    Delete(ctx context.Context, keys ...string) error
}

type RedisCache struct {
    client *redis.Client
}

func NewRedisCache(addr string) (*RedisCache, error) {
    client := redis.NewClient(&redis.Options{
        Addr: addr,
        PoolSize: 100,
        MinIdleConns: 10,
    })
    
    if err := client.Ping(context.Background()).Err(); err != nil {
        return nil, err
    }
    
    return &RedisCache{client: client}, nil
}

func (c *RedisCache) Get(ctx context.Context, key string, dest interface{}) error {
    data, err := c.client.Get(ctx, key).Bytes()
    if err != nil {
        return err
    }
    return json.Unmarshal(data, dest)
}

func (c *RedisCache) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
    data, err := json.Marshal(value)
    if err != nil {
        return err
    }
    return c.client.Set(ctx, key, data, ttl).Err()
}

// 缓存穿透保护 - 布隆过滤器
func (c *RedisCache) GetWithBloomFilter(ctx context.Context, key string, bf *BloomFilter, dest interface{}) error {
    // 先查布隆过滤器
    if !bf.MightContain(key) {
        return ErrNotFound
    }
    return c.Get(ctx, key, dest)
}

// 缓存击穿保护 - 分布式锁
func (c *RedisCache) GetWithLock(ctx context.Context, key string, lockTTL time.Duration, loader func() (interface{}, error), dest interface{}) error {
    // 尝试获取缓存
    err := c.Get(ctx, key, dest)
    if err == nil {
        return nil
    }
    if err != redis.Nil {
        return err
    }
    
    // 获取分布式锁
    lockKey := "lock:" + key
    acquired, _ := c.client.SetNX(ctx, lockKey, "1", lockTTL).Result()
    if !acquired {
        // 锁被其他进程持有,短暂等待后重试获取缓存
        time.Sleep(50 * time.Millisecond)
        return c.Get(ctx, key, dest)
    }
    defer c.client.Del(ctx, lockKey)
    
    // 加载数据并写入缓存
    value, err := loader()
    if err != nil {
        return err
    }
    
    data, err := json.Marshal(value)
    if err != nil {
        return err
    }
    c.client.Set(ctx, key, data, 10*time.Minute)
    return json.Unmarshal(data, dest)
}

10. 总结与展望

关键要点总结

方面推荐实践
服务拆分按业务边界拆分,避免过度细分
通信协议内部用 gRPC,对外用 REST
服务发现Consul / etcd + 客户端负载均衡
配置管理Viper + 配置中心
可观测性结构化日志 + OpenTelemetry + Prometheus
容错设计熔断器 + 降级 + 重试

进阶学习方向


💡 提示:微服务不是银弹。对于小型团队和项目,先从单体架构开始,当遇到真正的扩展性瓶颈时再考虑迁移到微服务。

上次更新于: