Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Go 通道(Channel)详解

掌握 Go 语言最核心的并发通信机制,编写优雅的并发程序

什么是 Channel

概念介绍

Channel(通道)是 Go 语言中的一种类型,用于在不同的 Goroutine 之间进行通信和同步。Go 语言通过 Channel 实现了 CSP(Communicating Sequential Processes)并发模型。

"不要通过共享内存来通信,而要通过通信来共享内存"

Channel 的特点

  • 类型安全:Channel 是类型化的,只能传输特定类型的数据
  • 同步机制:发送和接收操作是同步的,天然支持协程同步
  • 先进先出:数据按照发送顺序被接收
  • 阻塞特性:无缓冲 Channel 的发送和接收会阻塞

基础用法

创建 Channel

go
package main

import "fmt"

func main() {
    // 创建无缓冲 Channel
    ch1 := make(chan int)
    
    // 创建缓冲 Channel
    ch2 := make(chan string, 10)
    
    // 创建只读 Channel
    var readOnly <-chan int = ch1
    
    // 创建只写 Channel
    var writeOnly chan<- int = ch1
    
    fmt.Printf("ch1: %v\n", ch1)
    fmt.Printf("ch2: %v, buffer: %d\n", ch2, cap(ch2))
}

发送和接收数据

go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    
    // 在 Goroutine 中发送数据
    go func() {
        ch <- "Hello from Goroutine!"
    }()
    
    // 主 Goroutine 接收数据
    msg := <-ch
    fmt.Println(msg)
    
    // 使用缓冲 Channel
    bufferedCh := make(chan int, 3)
    
    // 发送数据(不会阻塞,因为有缓冲)
    bufferedCh <- 1
    bufferedCh <- 2
    bufferedCh <- 3
    
    // 接收数据
    fmt.Println(<-bufferedCh) // 1
    fmt.Println(<-bufferedCh) // 2
    fmt.Println(<-bufferedCh) // 3
}

关闭 Channel

go
package main

import "fmt"

func main() {
    ch := make(chan int, 3)
    
    ch <- 1
    ch <- 2
    ch <- 3
    
    // 关闭 Channel
    close(ch)
    
    // 继续接收已发送的数据
    for v := range ch {
        fmt.Println(v)
    }
    
    // 检查 Channel 是否关闭
    v, ok := <-ch
    fmt.Printf("Value: %d, Channel open: %v\n", v, ok) // 0, false
}

Channel 的高级用法

1. Select 语句

select 语句用于在多个 Channel 操作中进行选择:

go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from ch2"
    }()
    
    // 使用 select 等待多个 Channel
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received:", msg2)
        }
    }
}

2. 超时处理

go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    
    go func() {
        time.Sleep(3 * time.Second)
        ch <- "result"
    }()
    
    select {
    case result := <-ch:
        fmt.Println("Received:", result)
    case <-time.After(2 * time.Second):
        fmt.Println("Timeout!")
    }
}

3. 非阻塞操作

go
package main

import "fmt"

func main() {
    ch := make(chan int)
    
    // 非阻塞发送
    select {
    case ch <- 1:
        fmt.Println("Sent successfully")
    default:
        fmt.Println("Send would block")
    }
    
    // 非阻塞接收
    select {
    case v := <-ch:
        fmt.Println("Received:", v)
    default:
        fmt.Println("No data available")
    }
}

实战模式

1. Worker Pool 模式

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// Job 表示一个任务
type Job struct {
    ID   int
    Data string
}

// Result 表示任务结果
type Result struct {
    JobID int
    Value string
    Error error
}

// WorkerPool 工作池
type WorkerPool struct {
    workers  int
    jobs     chan Job
    results  chan Result
    wg       sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        workers: workers,
        jobs:    make(chan Job, 100),
        results: make(chan Result, 100),
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for job := range wp.jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job.ID)
        
        // 模拟处理
        time.Sleep(100 * time.Millisecond)
        
        wp.results <- Result{
            JobID: job.ID,
            Value: "Processed: " + job.Data,
        }
    }
}

func (wp *WorkerPool) Submit(job Job) {
    wp.jobs <- job
}

func (wp *WorkerPool) Stop() {
    close(wp.jobs)
    wp.wg.Wait()
    close(wp.results)
}

func (wp *WorkerPool) Results() <-chan Result {
    return wp.results
}

func main() {
    pool := NewWorkerPool(3)
    pool.Start()
    
    // 提交任务
    go func() {
        for i := 1; i <= 10; i++ {
            pool.Submit(Job{
                ID:   i,
                Data: fmt.Sprintf("Task-%d", i),
            })
        }
        pool.Stop()
    }()
    
    // 收集结果
    for result := range pool.Results() {
        fmt.Printf("Job %d completed: %s\n", result.JobID, result.Value)
    }
}

2. Pipeline 模式

go
package main

import "fmt"

// Generator 生成数据
func Generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// Square 计算平方
func Square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Filter 过滤奇数
func Filter(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // 构建 Pipeline
    nums := Generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squares := Square(nums)
    filtered := Filter(squares)
    
    // 消费结果
    for n := range filtered {
        fmt.Println(n)
    }
}

3. Fan-Out / Fan-In 模式

go
package main

import (
    "fmt"
    "sync"
)

// Producer 生产数据
func Producer(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// SquareWorker 计算平方
func SquareWorker(id int, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            fmt.Printf("Worker %d processing %d\n", id, n)
            out <- n * n
        }
        close(out)
    }()
    return out
}

// FanIn 合并多个 Channel
func FanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    multiplexed := make(chan int)
    
    multiplex := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            multiplexed <- n
        }
    }
    
    wg.Add(len(channels))
    for _, ch := range channels {
        go multiplex(ch)
    }
    
    go func() {
        wg.Wait()
        close(multiplexed)
    }()
    
    return multiplexed
}

func main() {
    in := Producer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    // Fan-Out:启动多个 Worker
    c1 := SquareWorker(1, in)
    c2 := SquareWorker(2, in)
    c3 := SquareWorker(3, in)
    
    // Fan-In:合并结果
    for n := range FanIn(c1, c2, c3) {
        fmt.Println("Result:", n)
    }
}

常见陷阱与解决方案

1. 向已关闭的 Channel 发送数据

go
// ❌ 错误:向已关闭的 Channel 发送数据会导致 panic
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel

// ✅ 正确:使用 select 和 ok 检查
func safeSend(ch chan int, value int) bool {
    select {
    case ch <- value:
        return true
    default:
        return false
    }
}

2. Channel 泄漏

go
// ❌ 错误:Goroutine 可能永远阻塞
func leaky() {
    ch := make(chan int)
    go func() {
        ch <- 42 // 如果没有人接收,永远阻塞
    }()
}

// ✅ 正确:使用缓冲 Channel 或确保有接收者
func notLeaky() {
    ch := make(chan int, 1) // 缓冲 Channel
    go func() {
        ch <- 42
    }()
}

3. 死锁

go
// ❌ 错误:无缓冲 Channel 的发送和接收在同一个 Goroutine
func deadlock() {
    ch := make(chan int)
    ch <- 1 // 阻塞,没有接收者
    <-ch
}

// ✅ 正确:在不同的 Goroutine 中进行发送和接收
func noDeadlock() {
    ch := make(chan int)
    go func() {
        ch <- 1
    }()
    <-ch
}

性能优化

1. 选择合适的 Channel 类型

go
// 无缓冲 Channel:强同步
ch := make(chan int)

// 缓冲 Channel:提高吞吐量
ch := make(chan int, 100)

// 根据场景选择
// - 需要强同步:无缓冲
// - 生产者-消费者模式:有缓冲
// - 批量处理:大缓冲

2. 批量发送

go
// ❌ 低效:逐个发送
for _, item := range items {
    ch <- item
}

// ✅ 高效:批量发送
batch := make([]Item, 0, batchSize)
for _, item := range items {
    batch = append(batch, item)
    if len(batch) >= batchSize {
        ch <- batch
        batch = batch[:0]
    }
}
if len(batch) > 0 {
    ch <- batch
}

3. 使用 nil Channel

go
// nil Channel 会永远阻塞,可用于禁用 select 分支
var ch chan int // nil

select {
case <-ch: // 永远不会执行
    // ...
case <-otherCh:
    // ...
}

总结

Channel 是 Go 并发编程的核心:

  1. 基本操作:创建、发送、接收、关闭
  2. 高级特性:select、超时、非阻塞
  3. 设计模式:Worker Pool、Pipeline、Fan-Out/Fan-In
  4. 注意事项
    • 避免向已关闭的 Channel 发送
    • 防止 Goroutine 泄漏
    • 小心死锁

掌握 Channel,你就能编写出优雅、高效的 Go 并发程序!


下一篇Go 并发模式:WaitGroup 与 Mutex

上次更新于: