Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

分布式链路追踪最佳实践

在微服务架构中,一个请求可能经过数十个服务,分布式链路追踪帮助我们理解请求在系统中的完整路径,快速定位性能瓶颈和故障点。

为什么需要分布式追踪

传统监控的局限

  • 日志分散:每个服务独立记录日志,难以关联
  • 指标聚合:只能看到整体指标,无法追踪单个请求
  • 故障定位困难:跨服务问题难以快速定位

分布式追踪的价值

  • 请求可视化:完整展示请求链路
  • 性能分析:识别慢调用和瓶颈
  • 故障定位:快速找到错误发生的服务
  • 依赖分析:理解服务间的调用关系

核心概念

Trace(追踪)

Trace 表示一个完整的端到端请求链路:

Trace ID: abc123def456
├── Service A (100ms)
│   ├── Service B (50ms)
│   │   └── Database (10ms)
│   └── Service C (30ms)
└── Service D (20ms)

Span(跨度)

Span 是追踪的基本单元:

go
type Span struct {
    TraceID      string    // 追踪ID
    SpanID       string    // 跨度ID
    ParentSpanID string    // 父跨度ID
    Operation    string    // 操作名称
    StartTime    time.Time // 开始时间
    Duration     int64     // 持续时间(ns)
    Tags         map[string]string  // 标签
    Logs         []Log             // 日志事件
}

上下文传播

Request Headers:
└── traceparent: 00-abc123def456-xyz789-01
    ├── version: 00
    ├── trace-id: abc123def456
    ├── parent-id: xyz789
    └── trace-flags: 01

架构设计

数据流

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ Application │───▶│   Agent     │───▶│  Collector  │
│  (SDK/Auto) │    │  (Sidecar)  │    │             │
└─────────────┘    └─────────────┘    └──────┬──────┘

                    ┌─────────────────────────┼─────────────────────────┐
                    ▼                         ▼                         ▼
              ┌─────────┐               ┌─────────┐               ┌─────────┐
              │ Storage │               │  Kafka  │               │  Other  │
              │(ES/Cass)│               │ (Queue) │               │(Custom) │
              └────┬────┘               └─────────┘               └─────────┘


              ┌─────────┐
              │   UI    │
              │(Jaeger) │
              └─────────┘

组件说明

组件职责常用方案
SDK埋点采集OpenTelemetry SDK
Agent本地代理OpenTelemetry Collector
Collector数据收集Jaeger Collector
Storage数据存储Elasticsearch, Cassandra
UI可视化Jaeger UI, Grafana

Go 语言实现

基础埋点

go
package tracing

import (
    "context"
    "net/http"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/trace"
)

var tracer = otel.Tracer("my-service")

// HTTP 中间件
func TracingMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 提取上游传递的上下文
        ctx := otel.GetTextMapPropagator().Extract(r.Context(), 
            propagation.HeaderCarrier(r.Header))
        
        // 创建 Span
        ctx, span := tracer.Start(ctx, r.URL.Path,
            trace.WithSpanKind(trace.SpanKindServer),
            trace.WithAttributes(
                attribute.String("http.method", r.Method),
                attribute.String("http.url", r.URL.String()),
                attribute.String("http.target", r.URL.Path),
            ),
        )
        defer span.End()
        
        // 将 trace context 注入响应头
        w.Header().Set("X-Trace-ID", span.SpanContext().TraceID().String())
        
        // 继续处理请求
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

// HTTP 客户端
func HTTPClient(ctx context.Context, method, url string) (*http.Response, error) {
    ctx, span := tracer.Start(ctx, "HTTP "+method,
        trace.WithSpanKind(trace.SpanKindClient),
        trace.WithAttributes(
            attribute.String("http.method", method),
            attribute.String("http.url", url),
        ),
    )
    defer span.End()
    
    req, err := http.NewRequestWithContext(ctx, method, url, nil)
    if err != nil {
        span.RecordError(err)
        return nil, err
    }
    
    // 注入 trace context 到请求头
    otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
    
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        span.RecordError(err)
        return nil, err
    }
    
    span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
    return resp, nil
}

数据库追踪

go
package tracing

import (
    "context"
    "database/sql"
    "time"
    
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

type TracedDB struct {
    db *sql.DB
}

func (t *TracedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    ctx, span := tracer.Start(ctx, "db.query",
        trace.WithSpanKind(trace.SpanKindClient),
        trace.WithAttributes(
            attribute.String("db.system", "mysql"),
            attribute.String("db.statement", query),
            attribute.String("db.operation", "SELECT"),
        ),
    )
    defer span.End()
    
    start := time.Now()
    rows, err := t.db.QueryContext(ctx, query, args...)
    duration := time.Since(start)
    
    span.SetAttributes(attribute.Int64("db.duration_ms", duration.Milliseconds()))
    
    if err != nil {
        span.RecordError(err)
        span.SetAttributes(attribute.Bool("error", true))
    }
    
    return rows, err
}

func (t *TracedDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
    ctx, span := tracer.Start(ctx, "db.exec",
        trace.WithSpanKind(trace.SpanKindClient),
        trace.WithAttributes(
            attribute.String("db.system", "mysql"),
            attribute.String("db.statement", query),
            attribute.String("db.operation", "EXEC"),
        ),
    )
    defer span.End()
    
    return t.db.ExecContext(ctx, query, args...)
}

消息队列追踪

go
package tracing

import (
    "context"
    "encoding/json"
    
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/trace"
)

// 生产者
func PublishMessage(ctx context.Context, topic string, msg interface{}) error {
    ctx, span := tracer.Start(ctx, "kafka.produce",
        trace.WithSpanKind(trace.SpanKindProducer),
        trace.WithAttributes(
            attribute.String("messaging.system", "kafka"),
            attribute.String("messaging.destination", topic),
        ),
    )
    defer span.End()
    
    // 序列化消息
    payload, err := json.Marshal(msg)
    if err != nil {
        span.RecordError(err)
        return err
    }
    
    // 注入 trace context
    headers := make(map[string]string)
    otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(headers))
    
    // 发送消息(包含 headers)
    return kafkaProducer.Send(topic, payload, headers)
}

// 消费者
func ConsumeMessage(ctx context.Context, topic string, handler func(context.Context, []byte) error) error {
    return kafkaConsumer.Subscribe(topic, func(msg *kafka.Message) error {
        // 提取 trace context
        headers := make(map[string]string)
        for _, h := range msg.Headers {
            headers[h.Key] = string(h.Value)
        }
        ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(headers))
        
        // 创建消费 Span
        ctx, span := tracer.Start(ctx, "kafka.consume",
            trace.WithSpanKind(trace.SpanKindConsumer),
            trace.WithAttributes(
                attribute.String("messaging.system", "kafka"),
                attribute.String("messaging.source", topic),
            ),
        )
        defer span.End()
        
        return handler(ctx, msg.Value)
    })
}

采样策略

头部采样

go
// 概率采样
sampler := sdktrace.TraceIDRatioBased(0.1) // 10% 采样

// 速率限制采样
sampler := sdktrace.ParentBased(
    sdktrace.TraceIDRatioBased(0.1),
)

尾部采样

yaml
# otel-collector-config.yaml
tail_sampling:
  policies:
    - name: errors
      type: status_code
      status_code: {status_codes: [ERROR]}
    
    - name: slow_requests
      type: latency
      latency: {threshold_ms: 1000}
    
    - name: probabilistic
      type: probabilistic
      probabilistic: {sampling_percentage: 10}

部署方案

Kubernetes 部署

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: jaeger
spec:
  replicas: 1
  selector:
    matchLabels:
      app: jaeger
  template:
    metadata:
      labels:
        app: jaeger
    spec:
      containers:
        - name: jaeger
          image: jaegertracing/all-in-one:latest
          ports:
            - containerPort: 16686
            - containerPort: 14268
          env:
            - name: COLLECTOR_OTLP_ENABLED
              value: "true"
---
apiVersion: v1
kind: Service
metadata:
  name: jaeger
spec:
  selector:
    app: jaeger
  ports:
    - name: ui
      port: 16686
      targetPort: 16686
    - name: collector
      port: 14268
      targetPort: 14268

Sidecar 注入

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-service
spec:
  template:
    spec:
      containers:
        - name: my-service
          image: my-service:latest
          env:
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otel-collector:4317"
            - name: OTEL_SERVICE_NAME
              value: "my-service"
        
        - name: otel-collector
          image: otel/opentelemetry-collector-contrib:latest
          command: ["--config=/conf/otel-agent-config.yaml"]
          volumeMounts:
            - name: otel-config
              mountPath: /conf
      volumes:
        - name: otel-config
          configMap:
            name: otel-agent-config

最佳实践

1. 命名规范

Span Name:
- HTTP: <method> <path> (GET /api/users)
- DB: <operation> <table> (SELECT users)
- MQ: <operation> <topic> (produce orders)
- Cache: <operation> <key> (GET user:123)

2. 标签设计

go
// 标准标签
span.SetAttributes(
    // HTTP
    attribute.String("http.method", "GET"),
    attribute.String("http.url", "/api/users"),
    attribute.Int("http.status_code", 200),
    
    // Database
    attribute.String("db.system", "mysql"),
    attribute.String("db.statement", "SELECT * FROM users"),
    attribute.String("db.operation", "SELECT"),
    
    // Messaging
    attribute.String("messaging.system", "kafka"),
    attribute.String("messaging.destination", "orders"),
)

3. 错误处理

go
func handleRequest(ctx context.Context) error {
    ctx, span := tracer.Start(ctx, "handleRequest")
    defer span.End()
    
    if err := doSomething(); err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
        span.SetAttributes(attribute.Bool("error", true))
        return err
    }
    
    span.SetStatus(codes.Ok, "success")
    return nil
}

常见问题

1. 上下文丢失

go
// ❌ 错误:在新 goroutine 中丢失上下文
go func() {
    processData(ctx)  // ctx 可能已过期
}()

// ✅ 正确:传递上下文
go func(ctx context.Context) {
    processData(ctx)
}(ctx)

2. 循环依赖

go
// 避免 A -> B -> A 的循环调用导致 Span 爆炸
// 使用 Baggage 标记已访问服务
if baggage.Get(ctx, "visited") == "true" {
    return nil  // 跳过追踪
}

总结

分布式链路追踪是微服务架构中不可或缺的工具,通过合理的埋点和采样策略,可以有效提升系统的可观测性。


参考资源:

上次更新于: