Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Go 语言并发模式实战指南

并发编程是 Go 语言的核心优势之一。掌握并发模式,能让你的程序性能提升数倍。

一、并发基础回顾

1.1 Goroutine 与 Channel

go
// 基础并发示例
func main() {
    ch := make(chan int)
    
    go func() {
        ch <- 42
    }()
    
    value := <-ch
    fmt.Println(value) // 42
}

1.2 select 多路复用

go
select {
case v1 := <-ch1:
    fmt.Println("ch1:", v1)
case v2 := <-ch2:
    fmt.Println("ch2:", v2)
case <-time.After(1 * time.Second):
    fmt.Println("timeout")
default:
    fmt.Println("no channel ready")
}

二、Worker Pool 模式

2.1 基础实现

go
type Task struct {
    ID   int
    Data interface{}
}

type WorkerPool struct {
    workers  int
    taskChan chan Task
    wg       sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        workers:  workers,
        taskChan: make(chan Task, 100),
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for task := range wp.taskChan {
        fmt.Printf("Worker %d processing task %d\n", id, task.ID)
        // 处理任务
        process(task)
    }
}

func (wp *WorkerPool) Submit(task Task) {
    wp.taskChan <- task
}

func (wp *WorkerPool) Stop() {
    close(wp.taskChan)
    wp.wg.Wait()
}

2.2 使用示例

go
func main() {
    pool := NewWorkerPool(5)
    pool.Start()
    
    // 提交任务
    for i := 0; i < 20; i++ {
        pool.Submit(Task{ID: i, Data: fmt.Sprintf("task-%d", i)})
    }
    
    pool.Stop()
}

三、Pipeline 模式

3.1 三阶段 Pipeline

go
// Stage 1: 生成数据
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// Stage 2: 处理数据
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Stage 3: 合并结果
func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
    
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

3.2 使用 Pipeline

go
func main() {
    // 创建 Pipeline
    in := generator(1, 2, 3, 4, 5)
    
    // 分发到多个处理通道
    c1 := square(in)
    c2 := square(in)
    
    // 合并结果
    for n := range merge(c1, c2) {
        fmt.Println(n)
    }
}

四、Fan-out/Fan-in 模式

4.1 Fan-out(分发)

go
func fanOut(input <-chan int, n int) []<-chan int {
    outputs := make([]<-chan int, n)
    
    for i := 0; i < n; i++ {
        ch := make(chan int)
        outputs[i] = ch
        
        go func(out chan<- int) {
            defer close(out)
            for v := range input {
                out <- v
            }
        }(ch)
    }
    
    return outputs
}

4.2 Fan-in(汇聚)

go
func fanIn(inputs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    collect := func(in <-chan int) {
        defer wg.Done()
        for v := range in {
            out <- v
        }
    }
    
    wg.Add(len(inputs))
    for _, in := range inputs {
        go collect(in)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

五、Context 控制

5.1 超时控制

go
func workerWithTimeout(ctx context.Context, tasks <-chan Task) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case task, ok := <-tasks:
            if !ok {
                return nil
            }
            if err := processWithContext(ctx, task); err != nil {
                return err
            }
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    tasks := make(chan Task)
    go generateTasks(tasks)
    
    if err := workerWithTimeout(ctx, tasks); err != nil {
        log.Printf("Worker stopped: %v", err)
    }
}

5.2 取消信号传播

go
func pipelineWithCancel(ctx context.Context) {
    stage1 := make(chan int)
    stage2 := make(chan int)
    
    // Stage 1
    go func() {
        defer close(stage1)
        for i := 0; i < 100; i++ {
            select {
            case <-ctx.Done():
                return
            case stage1 <- i:
            }
        }
    }()
    
    // Stage 2
    go func() {
        defer close(stage2)
        for v := range stage1 {
            select {
            case <-ctx.Done():
                return
            case stage2 <- v * 2:
            }
        }
    }()
    
    // 消费
    for v := range stage2 {
        fmt.Println(v)
    }
}

六、错误处理模式

6.1 ErrGroup 模式

go
import "golang.org/x/sync/errgroup"

func processBatch(items []Item) error {
    g, ctx := errgroup.WithContext(context.Background())
    
    for _, item := range items {
        item := item // 捕获循环变量
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }
    
    return g.Wait()
}

6.2 错误通道模式

go
type Result struct {
    Value int
    Error error
}

func workerWithError(input <-chan int) <-chan Result {
    out := make(chan Result)
    
    go func() {
        defer close(out)
        for v := range input {
            result, err := compute(v)
            out <- Result{Value: result, Error: err}
        }
    }()
    
    return out
}

七、性能优化技巧

7.1 避免 Goroutine 泄漏

go
// ❌ 错误:可能泄漏
go func() {
    result := <-ch // 如果 ch 永远不关闭,goroutine 会永远阻塞
    process(result)
}()

// ✅ 正确:使用 select + done channel
done := make(chan struct{})
go func() {
    defer close(done)
    select {
    case result := <-ch:
        process(result)
    case <-time.After(5 * time.Second):
        return
    }
}()

7.2 控制并发数量

go
func limitedConcurrency(items []Item, maxConcurrent int) error {
    sem := make(chan struct{}, maxConcurrent)
    var wg sync.WaitGroup
    
    for _, item := range items {
        wg.Add(1)
        sem <- struct{}{} // 获取信号量
        
        go func(i Item) {
            defer wg.Done()
            defer func() { <-sem }() // 释放信号量
            
            process(i)
        }(item)
    }
    
    wg.Wait()
    return nil
}

八、实战案例:并发爬虫

go
type Crawler struct {
    maxDepth    int
    concurrency int
    visited     sync.Map
}

func (c *Crawler) Crawl(ctx context.Context, startURL string) {
    urls := make(chan string, 100)
    results := make(chan CrawlResult, 100)
    
    // 启动 Worker
    var wg sync.WaitGroup
    for i := 0; i < c.concurrency; i++ {
        wg.Add(1)
        go c.worker(ctx, &wg, urls, results)
    }
    
    // 发送起始 URL
    urls <- startURL
    
    // 收集结果并发现新 URL
    go func() {
        for result := range results {
            fmt.Printf("Crawled: %s (depth: %d)\n", result.URL, result.Depth)
            
            if result.Depth < c.maxDepth {
                for _, link := range result.Links {
                    if _, loaded := c.visited.LoadOrStore(link, true); !loaded {
                        select {
                        case urls <- link:
                        case <-ctx.Done():
                            return
                        }
                    }
                }
            }
        }
    }()
    
    // 等待完成
    wg.Wait()
    close(results)
}

func (c *Crawler) worker(ctx context.Context, wg *sync.WaitGroup, urls <-chan string, results chan<- CrawlResult) {
    defer wg.Done()
    
    for url := range urls {
        select {
        case <-ctx.Done():
            return
        default:
            result := c.fetch(url)
            results <- result
        }
    }
}

九、总结

模式适用场景核心要点
Worker Pool大量任务需要并发处理控制并发数,复用 goroutine
Pipeline数据需要多阶段处理每个阶段独立,通过 channel 连接
Fan-out/Fan-in任务分发和结果汇聚动态扩展处理能力
Context超时和取消控制信号传播,优雅退出
ErrGroup批量任务错误处理任一错误立即返回

掌握这些并发模式,你就能写出高效、可靠的 Go 并发程序。

上次更新于: