Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

gRPC 与 Protobuf 实战:Go 微服务通信最佳实践

在微服务架构中,服务间通信是核心问题。gRPC 作为 Google 开源的高性能 RPC 框架,结合 Protocol Buffers 序列化协议,已成为 Go 微服务开发的首选方案。

为什么选择 gRPC?

核心优势

特性HTTP/JSONgRPC/Protobuf
序列化性能文本,体积大二进制,体积小
传输效率需多次握手HTTP/2 多路复用
强类型弱类型,易出错强类型,编译期检查
流式支持不支持双向流式
代码生成手动编写自动生成

适用场景

  • 微服务间通信:内部服务调用,低延迟要求高
  • 多语言环境:支持 10+ 语言,便于异构系统
  • 实时数据流:需要双向流式通信的场景
  • 移动端 API:节省流量,提升响应速度

Protocol Buffers 基础

定义服务

protobuf
syntax = "proto3";

package user;
option go_package = "github.com/pfinal/userpb";

// 用户服务定义
service UserService {
  // 获取用户信息
  rpc GetUser(GetUserRequest) returns (User);
  
  // 创建用户
  rpc CreateUser(CreateUserRequest) returns (User);
  
  // 更新用户
  rpc UpdateUser(UpdateUserRequest) returns (User);
  
  // 删除用户
  rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);
  
  // 流式获取用户列表
  rpc ListUsers(ListUsersRequest) returns (stream User);
  
  // 双向流式聊天
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

// 用户消息定义
message User {
  string id = 1;
  string username = 2;
  string email = 3;
  int32 age = 4;
  UserStatus status = 5;
  google.protobuf.Timestamp created_at = 6;
}

enum UserStatus {
  UNKNOWN = 0;
  ACTIVE = 1;
  INACTIVE = 2;
  BANNED = 3;
}

message GetUserRequest {
  string id = 1;
}

message CreateUserRequest {
  string username = 1;
  string email = 2;
  int32 age = 3;
}

message UpdateUserRequest {
  string id = 1;
  string username = 2;
  string email = 3;
}

message DeleteUserRequest {
  string id = 1;
}

message DeleteUserResponse {
  bool success = 1;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

message ChatMessage {
  string user_id = 1;
  string content = 2;
  int64 timestamp = 3;
}

代码生成

bash
# 安装 protoc 编译器和 Go 插件
brew install protobuf

# 安装 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 生成代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       user.proto

gRPC 服务端实现

基础服务实现

go
package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "sync"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    pb "github.com/pfinal/userpb"
)

// UserServer 实现 UserService 接口
type UserServer struct {
    pb.UnimplementedUserServiceServer
    users  map[string]*pb.User
    mu     sync.RWMutex
    nextID int64
}

func NewUserServer() *UserServer {
    return &UserServer{
        users:  make(map[string]*pb.User),
        nextID: 1,
    }
}

// GetUser 获取用户信息
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
    }
    return user, nil
}

// CreateUser 创建用户
func (s *UserServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    // 参数校验
    if req.Username == "" {
        return nil, status.Errorf(codes.InvalidArgument, "username is required")
    }
    if req.Email == "" {
        return nil, status.Errorf(codes.InvalidArgument, "email is required")
    }

    s.mu.Lock()
    defer s.mu.Unlock()

    user := &pb.User{
        Id:        fmt.Sprintf("%d", s.nextID),
        Username:  req.Username,
        Email:     req.Email,
        Age:       req.Age,
        Status:    pb.UserStatus_ACTIVE,
        CreatedAt: timestamppb.Now(),
    }

    s.users[user.Id] = user
    s.nextID++

    log.Printf("Created user: %v", user)
    return user, nil
}

// UpdateUser 更新用户
func (s *UserServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.User, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
    }

    if req.Username != "" {
        user.Username = req.Username
    }
    if req.Email != "" {
        user.Email = req.Email
    }

    return user, nil
}

// DeleteUser 删除用户
func (s *UserServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*pb.DeleteUserResponse, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    if _, exists := s.users[req.Id]; !exists {
        return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
    }

    delete(s.users, req.Id)
    return &pb.DeleteUserResponse{Success: true}, nil
}

// ListUsers 流式返回用户列表
func (s *UserServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    s.mu.RLock()
    defer s.mu.RUnlock()

    count := 0
    for _, user := range s.users {
        if count >= int(req.PageSize) && req.PageSize > 0 {
            break
        }
        if err := stream.Send(user); err != nil {
            return err
        }
        count++
        time.Sleep(100 * time.Millisecond) // 模拟延迟
    }
    return nil
}

// Chat 双向流式聊天
func (s *UserServer) Chat(stream pb.UserService_ChatServer) error {
    ctx := stream.Context()
    
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        msg, err := stream.Recv()
        if err != nil {
            return err
        }

        log.Printf("Received from %s: %s", msg.UserId, msg.Content)

        // 广播消息
        response := &pb.ChatMessage{
            UserId:    "server",
            Content:   fmt.Sprintf("Echo: %s", msg.Content),
            Timestamp: time.Now().Unix(),
        }
        
        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    // 创建 gRPC 服务器
    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(unaryInterceptor),
        grpc.StreamInterceptor(streamInterceptor),
    )

    // 注册服务
    pb.RegisterUserServiceServer(grpcServer, NewUserServer())

    log.Println("gRPC server starting on :50051")
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

拦截器实现

go
// unaryInterceptor 一元拦截器
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    start := time.Now()
    
    // 认证检查
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Errorf(codes.Unauthenticated, "missing metadata")
    }
    
    token := md.Get("authorization")
    if len(token) == 0 {
        return nil, status.Errorf(codes.Unauthenticated, "missing token")
    }
    
    // 调用实际方法
    resp, err := handler(ctx, req)
    
    // 记录日志
    log.Printf("Method: %s, Duration: %v, Error: %v", info.FullMethod, time.Since(start), err)
    
    return resp, err
}

// streamInterceptor 流式拦截器
func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    log.Printf("Stream started: %s", info.FullMethod)
    
    err := handler(srv, ss)
    
    log.Printf("Stream ended: %s, Error: %v", info.FullMethod, err)
    return err
}

// wrappedStream 包装流,用于监控
type wrappedStream struct {
    grpc.ServerStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    err := w.ServerStream.RecvMsg(m)
    if err == nil {
        log.Printf("Received message: %T", m)
    }
    return err
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    log.Printf("Sending message: %T", m)
    return w.ServerStream.SendMsg(m)
}

gRPC 客户端实现

基础客户端

go
package main

import (
    "context"
    "io"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
    pb "github.com/pfinal/userpb"
)

func main() {
    // 建立连接
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(1024*1024*10), // 10MB
        ),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewUserServiceClient(conn)
    ctx := context.Background()

    // 1. 创建用户
    createUser(ctx, client)
    
    // 2. 获取用户
    getUser(ctx, client)
    
    // 3. 流式获取用户列表
    listUsers(ctx, client)
    
    // 4. 双向流式聊天
    chat(ctx, client)
}

func createUser(ctx context.Context, client pb.UserServiceClient) {
    // 添加认证信息
    md := metadata.New(map[string]string{
        "authorization": "Bearer token123",
    })
    ctx = metadata.NewOutgoingContext(ctx, md)

    resp, err := client.CreateUser(ctx, &pb.CreateUserRequest{
        Username: "pfinal",
        Email:    "pfinal@example.com",
        Age:      25,
    })
    if err != nil {
        log.Printf("CreateUser error: %v", err)
        return
    }
    log.Printf("Created user: %v", resp)
}

func getUser(ctx context.Context, client pb.UserServiceClient) {
    resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: "1"})
    if err != nil {
        log.Printf("GetUser error: %v", err)
        return
    }
    log.Printf("Got user: %v", resp)
}

func listUsers(ctx context.Context, client pb.UserServiceClient) {
    stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
        PageSize: 10,
    })
    if err != nil {
        log.Printf("ListUsers error: %v", err)
        return
    }

    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Recv error: %v", err)
            return
        }
        log.Printf("Received user: %s", user.Username)
    }
}

func chat(ctx context.Context, client pb.UserServiceClient) {
    stream, err := client.Chat(ctx)
    if err != nil {
        log.Printf("Chat error: %v", err)
        return
    }

    // 发送消息 goroutine
    go func() {
        messages := []string{"Hello!", "How are you?", "Goodbye!"}
        for _, msg := range messages {
            if err := stream.Send(&pb.ChatMessage{
                UserId:    "client1",
                Content:   msg,
                Timestamp: time.Now().Unix(),
            }); err != nil {
                log.Printf("Send error: %v", err)
                return
            }
            time.Sleep(time.Second)
        }
        stream.CloseSend()
    }()

    // 接收消息
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Recv error: %v", err)
            return
        }
        log.Printf("Received: %s", msg.Content)
    }
}

连接池与负载均衡

go
// ClientPool gRPC 客户端连接池
type ClientPool struct {
    connections []*grpc.ClientConn
    current     uint32
}

func NewClientPool(addresses []string) (*ClientPool, error) {
    pool := &ClientPool{
        connections: make([]*grpc.ClientConn, 0, len(addresses)),
    }

    for _, addr := range addresses {
        conn, err := grpc.Dial(addr,
            grpc.WithTransportCredentials(insecure.NewCredentials()),
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                Time:                10 * time.Second,
                Timeout:             3 * time.Second,
                PermitWithoutStream: true,
            }),
        )
        if err != nil {
            return nil, err
        }
        pool.connections = append(pool.connections, conn)
    }

    return pool, nil
}

func (p *ClientPool) Get() *grpc.ClientConn {
    idx := atomic.AddUint32(&p.current, 1) % uint32(len(p.connections))
    return p.connections[idx]
}

func (p *ClientPool) Close() {
    for _, conn := range p.connections {
        conn.Close()
    }
}

// 使用示例
func main() {
    pool, err := NewClientPool([]string{
        "localhost:50051",
        "localhost:50052",
        "localhost:50053",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pool.Close()

    client := pb.NewUserServiceClient(pool.Get())
    // 使用 client 调用服务...
}

高级特性

1. 健康检查

go
import "google.golang.org/grpc/health"
import "google.golang.org/grpc/health/grpc_health_v1"

// 注册健康检查服务
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)

// 设置服务状态
healthServer.SetServingStatus("user.UserService", grpc_health_v1.HealthCheckResponse_SERVING)

2. 重试与超时

go
// 客户端重试配置
retryOpts := []grpc.CallOption{
    grpc.MaxRetryRPCBufferSize(1024 * 1024),
}

// 带超时的调用
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: "1"}, retryOpts...)

3. 元数据传输

go
// 发送元数据
md := metadata.Pairs(
    "request-id", uuid.New().String(),
    "user-agent", "grpc-client/1.0",
)
ctx = metadata.NewOutgoingContext(ctx, md)

// 接收元数据
var trailer metadata.MD
resp, err := client.GetUser(ctx, req, grpc.Trailer(&trailer))

性能优化

1. 连接优化

go
conn, err := grpc.Dial(addr,
    grpc.WithKeepaliveParams(keepalive.ClientParameters{
        Time:                10 * time.Second,
        Timeout:             3 * time.Second,
        PermitWithoutStream: true,
    }),
    grpc.WithDefaultServiceConfig(`{
        "loadBalancingPolicy": "round_robin",
        "healthCheckConfig": {"serviceName": ""}
    }`),
)

2. 消息大小限制

go
// 服务端
grpc.NewServer(
    grpc.MaxRecvMsgSize(1024*1024*100), // 100MB
    grpc.MaxSendMsgSize(1024*1024*100),
)

// 客户端
grpc.WithDefaultCallOptions(
    grpc.MaxCallRecvMsgSize(1024*1024*100),
    grpc.MaxCallSendMsgSize(1024*1024*100),
)

3. 压缩

go
import "google.golang.org/grpc/encoding/gzip"

// 启用 gzip 压缩
grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))

最佳实践总结

  1. 服务定义:保持 proto 文件简洁,版本控制 proto 定义
  2. 错误处理:使用 gRPC 状态码,提供详细的错误信息
  3. 超时控制:为每个 RPC 调用设置合理的超时时间
  4. 连接管理:使用连接池,实现负载均衡
  5. 可观测性:添加拦截器记录日志、指标和追踪
  6. 安全:使用 TLS 加密,实现认证授权
  7. 限流熔断:在客户端实现限流和熔断机制

总结

gRPC 与 Protocol Buffers 为 Go 微服务提供了高性能、强类型的通信方案。通过本文的实战示例,你已经掌握了:

  • Protocol Buffers 服务定义和代码生成
  • gRPC 四种通信模式的实现
  • 拦截器、认证、负载均衡等高级特性
  • 性能优化和最佳实践

在下一篇文章中,我们将深入探讨微服务中的熔断、限流与降级实现。


参考资源:

上次更新于: