Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Go 通道(Channel)详解:Goroutine 通信的艺术

什么是 Channel?

Channel(通道)是 Go 语言中用于Goroutine 之间通信和同步的核心机制。它基于 CSP(Communicating Sequential Processes)模型,遵循 Go 的并发哲学:

"不要通过共享内存来通信,而要通过通信来共享内存"

Channel 提供了一种类型安全的方式,让 Goroutine 之间可以安全地传递数据,避免了传统多线程编程中的锁和竞态条件问题。

创建 Channel

基本语法

go
// 无缓冲 Channel(同步)
ch := make(chan int)

// 有缓冲 Channel(异步)
ch := make(chan int, 10) // 缓冲区大小为 10

Channel 类型

go
// 双向 Channel
ch := make(chan int)

// 只发送 Channel
sendOnly := make(chan<- int)

// 只接收 Channel
recvOnly := make(<-chan int)

Channel 操作

发送数据

go
ch <- value // 将 value 发送到 Channel

接收数据

go
value := <-ch      // 接收数据并赋值
<-ch               // 接收并丢弃数据
value, ok := <-ch  // 检查 Channel 是否关闭

关闭 Channel

go
close(ch) // 关闭 Channel

注意:

  • 只有发送方应该关闭 Channel
  • 向已关闭的 Channel 发送数据会导致 panic
  • 从已关闭的 Channel 接收会返回零值,可以通过 ok 判断

无缓冲 Channel(同步 Channel)

无缓冲 Channel 是同步的:发送和接收必须同时准备好,否则操作会阻塞。

go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string) // 无缓冲 Channel
    
    go func() {
        fmt.Println("Goroutine: 准备发送数据")
        ch <- "Hello" // 阻塞,直到有接收者
        fmt.Println("Goroutine: 数据已发送")
    }()
    
    time.Sleep(1 * time.Second)
    fmt.Println("Main: 准备接收数据")
    msg := <-ch // 接收数据,发送者才能继续
    fmt.Println("Main: 收到", msg)
    
    time.Sleep(500 * time.Millisecond)
}

输出:

Goroutine: 准备发送数据
Main: 准备接收数据
Goroutine: 数据已发送
Main: 收到 Hello

有缓冲 Channel(异步 Channel)

有缓冲 Channel 是异步的:发送方在缓冲区未满时不会阻塞,接收方在缓冲区非空时不会阻塞。

go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int, 3) // 缓冲区大小为 3
    
    // 发送 3 个数据,不会阻塞
    ch <- 1
    ch <- 2
    ch <- 3
    fmt.Println("发送了 3 个数据")
    
    // 第 4 个会阻塞,直到有接收者
    go func() {
        time.Sleep(1 * time.Second)
        fmt.Println("接收:", <-ch)
    }()
    
    ch <- 4 // 阻塞等待
    fmt.Println("第 4 个数据已发送")
}

遍历 Channel

使用 for-range

go
package main

import "fmt"

func main() {
    ch := make(chan int, 5)
    
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
        }
        close(ch) // 发送完毕后关闭
    }()
    
    // 使用 for-range 遍历
    for value := range ch {
        fmt.Println("Received:", value)
    }
    fmt.Println("Channel closed")
}

Select 语句

select 语句用于在多个 Channel 操作中进行选择,类似于 switch,但用于 Channel。

基本用法

go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from ch2"
    }()
    
    // 等待任意一个 Channel 有数据
    select {
    case msg1 := <-ch1:
        fmt.Println(msg1)
    case msg2 := <-ch2:
        fmt.Println(msg2)
    }
}

非阻塞操作

go
package main

import "fmt"

func main() {
    ch := make(chan int)
    
    // 非阻塞发送
    select {
    case ch <- 1:
        fmt.Println("Sent")
    default:
        fmt.Println("Channel full, skip")
    }
    
    // 非阻塞接收
    select {
    case v := <-ch:
        fmt.Println("Received:", v)
    default:
        fmt.Println("No data available")
    }
}

超时处理

go
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    
    go func() {
        time.Sleep(2 * time.Second)
        ch <- "result"
    }()
    
    select {
    case result := <-ch:
        fmt.Println("Received:", result)
    case <-time.After(1 * time.Second):
        fmt.Println("Timeout!")
    }
}

使用 select 进行多路复用

go
package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(500 * time.Millisecond)
    boom := time.After(2 * time.Second)
    
    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(100 * time.Millisecond)
        }
    }
}

Channel 方向

函数参数中的 Channel 方向

go
package main

import "fmt"

// 只能发送的 Channel
func sender(ch chan<- int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

// 只能接收的 Channel
func receiver(ch <-chan int) {
    for v := range ch {
        fmt.Println("Received:", v)
    }
}

func main() {
    ch := make(chan int)
    
    go sender(ch)
    receiver(ch)
}

这种类型约束可以在编译期防止错误的使用方式。

常见模式

1. 工作池模式(Worker Pool)

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(time.Second) // 模拟工作
        results <- job * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    var wg sync.WaitGroup
    
    // 启动 workers
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }
    
    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 等待所有 worker 完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Println("Result:", result)
    }
}

2. 扇出/扇入模式(Fan-out/Fan-in)

go
package main

import (
    "fmt"
    "sync"
)

// 生成数据
func producer(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 处理数据
func processor(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n // 平方
        }
        close(out)
    }()
    return out
}

// 合并多个 Channel
func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
    
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    in := producer(1, 2, 3, 4, 5)
    
    // 扇出:多个处理器
    c1 := processor(in)
    c2 := processor(in)
    
    // 扇入:合并结果
    for n := range merge(c1, c2) {
        fmt.Println(n)
    }
}

3. 优雅退出

go
package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, ch chan<- int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker exiting:", ctx.Err())
            return
        case ch <- 42:
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    ch := make(chan int)
    go worker(ctx, ch)
    
    for i := 0; i < 5; i++ {
        fmt.Println(<-ch)
    }
}

Channel 的 nil 值

nil Channel 的行为:

  • 发送操作:永久阻塞
  • 接收操作:永久阻塞
  • 关闭操作:panic
go
package main

import "fmt"

func main() {
    var ch chan int // nil Channel
    
    // 这些操作都会永久阻塞
    // ch <- 1      // 阻塞
    // <-ch         // 阻塞
    // close(ch)    // panic
    
    // 在 select 中,nil Channel 的分支永远不会被选中
    select {
    case <-ch:
        fmt.Println("Received")
    default:
        fmt.Println("Default case") // 会执行这里
    }
}

性能考虑

缓冲大小的选择

  • 无缓冲 Channel:适合同步场景,确保发送和接收同时发生
  • 小缓冲 Channel:适合平滑突发流量
  • 大缓冲 Channel:适合解耦生产者和消费者,但会增加内存占用

避免 Channel 泄漏

确保所有 Goroutine 都能正常退出,避免无限等待 Channel:

go
// 不好的做法:可能永远阻塞
func bad() int {
    ch := make(chan int)
    go func() {
        // 某些条件下可能不会发送数据
        if someCondition {
            ch <- 42
        }
    }()
    return <-ch // 可能永远阻塞
}

// 好的做法:使用 select 和 timeout
func good() (int, error) {
    ch := make(chan int)
    go func() {
        if someCondition {
            ch <- 42
        }
    }()
    
    select {
    case result := <-ch:
        return result, nil
    case <-time.After(5 * time.Second):
        return 0, errors.New("timeout")
    }
}

常见陷阱

1. 向已关闭的 Channel 发送数据

go
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel

2. 重复关闭 Channel

go
ch := make(chan int)
close(ch)
close(ch) // panic: close of closed channel

3. 在多个 Goroutine 中关闭 Channel

应该只有发送方关闭 Channel,且只关闭一次。

4. 忘记关闭 Channel

使用 for-range 遍历 Channel 时,如果 Channel 未关闭,会导致死锁。

总结

Channel 是 Go 并发编程的核心机制,掌握 Channel 的使用是编写高质量 Go 程序的关键:

特性无缓冲 Channel有缓冲 Channel
同步性同步异步(缓冲区未满时)
用途同步、信号传递解耦、批量处理
性能较低(需要配对)较高(可缓冲)

最佳实践:

  1. 使用 Channel 进行 Goroutine 通信,避免共享内存
  2. 合理选择缓冲大小,平衡性能和内存
  3. 明确 Channel 的所有权(谁发送谁关闭)
  4. 使用 select 处理多个 Channel 操作
  5. 使用 Context 控制 Goroutine 生命周期

相关文章推荐:

上次更新于: