Go 通道(Channel)详解
掌握 Go 语言最核心的并发通信机制,编写优雅的并发程序
什么是 Channel
概念介绍
Channel(通道)是 Go 语言中的一种类型,用于在不同的 Goroutine 之间进行通信和同步。Go 语言通过 Channel 实现了 CSP(Communicating Sequential Processes)并发模型。
"不要通过共享内存来通信,而要通过通信来共享内存"
Channel 的特点
- 类型安全:Channel 是类型化的,只能传输特定类型的数据
- 同步机制:发送和接收操作是同步的,天然支持协程同步
- 先进先出:数据按照发送顺序被接收
- 阻塞特性:无缓冲 Channel 的发送和接收会阻塞
基础用法
创建 Channel
go
package main
import "fmt"
func main() {
// 创建无缓冲 Channel
ch1 := make(chan int)
// 创建缓冲 Channel
ch2 := make(chan string, 10)
// 创建只读 Channel
var readOnly <-chan int = ch1
// 创建只写 Channel
var writeOnly chan<- int = ch1
fmt.Printf("ch1: %v\n", ch1)
fmt.Printf("ch2: %v, buffer: %d\n", ch2, cap(ch2))
}发送和接收数据
go
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
// 在 Goroutine 中发送数据
go func() {
ch <- "Hello from Goroutine!"
}()
// 主 Goroutine 接收数据
msg := <-ch
fmt.Println(msg)
// 使用缓冲 Channel
bufferedCh := make(chan int, 3)
// 发送数据(不会阻塞,因为有缓冲)
bufferedCh <- 1
bufferedCh <- 2
bufferedCh <- 3
// 接收数据
fmt.Println(<-bufferedCh) // 1
fmt.Println(<-bufferedCh) // 2
fmt.Println(<-bufferedCh) // 3
}关闭 Channel
go
package main
import "fmt"
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
// 关闭 Channel
close(ch)
// 继续接收已发送的数据
for v := range ch {
fmt.Println(v)
}
// 检查 Channel 是否关闭
v, ok := <-ch
fmt.Printf("Value: %d, Channel open: %v\n", v, ok) // 0, false
}Channel 的高级用法
1. Select 语句
select 语句用于在多个 Channel 操作中进行选择:
go
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from ch2"
}()
// 使用 select 等待多个 Channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received:", msg1)
case msg2 := <-ch2:
fmt.Println("Received:", msg2)
}
}
}2. 超时处理
go
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch <- "result"
}()
select {
case result := <-ch:
fmt.Println("Received:", result)
case <-time.After(2 * time.Second):
fmt.Println("Timeout!")
}
}3. 非阻塞操作
go
package main
import "fmt"
func main() {
ch := make(chan int)
// 非阻塞发送
select {
case ch <- 1:
fmt.Println("Sent successfully")
default:
fmt.Println("Send would block")
}
// 非阻塞接收
select {
case v := <-ch:
fmt.Println("Received:", v)
default:
fmt.Println("No data available")
}
}实战模式
1. Worker Pool 模式
go
package main
import (
"fmt"
"sync"
"time"
)
// Job 表示一个任务
type Job struct {
ID int
Data string
}
// Result 表示任务结果
type Result struct {
JobID int
Value string
Error error
}
// WorkerPool 工作池
type WorkerPool struct {
workers int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
workers: workers,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
// 模拟处理
time.Sleep(100 * time.Millisecond)
wp.results <- Result{
JobID: job.ID,
Value: "Processed: " + job.Data,
}
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
func (wp *WorkerPool) Stop() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
func main() {
pool := NewWorkerPool(3)
pool.Start()
// 提交任务
go func() {
for i := 1; i <= 10; i++ {
pool.Submit(Job{
ID: i,
Data: fmt.Sprintf("Task-%d", i),
})
}
pool.Stop()
}()
// 收集结果
for result := range pool.Results() {
fmt.Printf("Job %d completed: %s\n", result.JobID, result.Value)
}
}2. Pipeline 模式
go
package main
import "fmt"
// Generator 生成数据
func Generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Square 计算平方
func Square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Filter 过滤奇数
func Filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 == 0 {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// 构建 Pipeline
nums := Generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squares := Square(nums)
filtered := Filter(squares)
// 消费结果
for n := range filtered {
fmt.Println(n)
}
}3. Fan-Out / Fan-In 模式
go
package main
import (
"fmt"
"sync"
)
// Producer 生产数据
func Producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// SquareWorker 计算平方
func SquareWorker(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
fmt.Printf("Worker %d processing %d\n", id, n)
out <- n * n
}
close(out)
}()
return out
}
// FanIn 合并多个 Channel
func FanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
multiplexed := make(chan int)
multiplex := func(c <-chan int) {
defer wg.Done()
for n := range c {
multiplexed <- n
}
}
wg.Add(len(channels))
for _, ch := range channels {
go multiplex(ch)
}
go func() {
wg.Wait()
close(multiplexed)
}()
return multiplexed
}
func main() {
in := Producer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Fan-Out:启动多个 Worker
c1 := SquareWorker(1, in)
c2 := SquareWorker(2, in)
c3 := SquareWorker(3, in)
// Fan-In:合并结果
for n := range FanIn(c1, c2, c3) {
fmt.Println("Result:", n)
}
}常见陷阱与解决方案
1. 向已关闭的 Channel 发送数据
go
// ❌ 错误:向已关闭的 Channel 发送数据会导致 panic
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel
// ✅ 正确:使用 select 和 ok 检查
func safeSend(ch chan int, value int) bool {
select {
case ch <- value:
return true
default:
return false
}
}2. Channel 泄漏
go
// ❌ 错误:Goroutine 可能永远阻塞
func leaky() {
ch := make(chan int)
go func() {
ch <- 42 // 如果没有人接收,永远阻塞
}()
}
// ✅ 正确:使用缓冲 Channel 或确保有接收者
func notLeaky() {
ch := make(chan int, 1) // 缓冲 Channel
go func() {
ch <- 42
}()
}3. 死锁
go
// ❌ 错误:无缓冲 Channel 的发送和接收在同一个 Goroutine
func deadlock() {
ch := make(chan int)
ch <- 1 // 阻塞,没有接收者
<-ch
}
// ✅ 正确:在不同的 Goroutine 中进行发送和接收
func noDeadlock() {
ch := make(chan int)
go func() {
ch <- 1
}()
<-ch
}性能优化
1. 选择合适的 Channel 类型
go
// 无缓冲 Channel:强同步
ch := make(chan int)
// 缓冲 Channel:提高吞吐量
ch := make(chan int, 100)
// 根据场景选择
// - 需要强同步:无缓冲
// - 生产者-消费者模式:有缓冲
// - 批量处理:大缓冲2. 批量发送
go
// ❌ 低效:逐个发送
for _, item := range items {
ch <- item
}
// ✅ 高效:批量发送
batch := make([]Item, 0, batchSize)
for _, item := range items {
batch = append(batch, item)
if len(batch) >= batchSize {
ch <- batch
batch = batch[:0]
}
}
if len(batch) > 0 {
ch <- batch
}3. 使用 nil Channel
go
// nil Channel 会永远阻塞,可用于禁用 select 分支
var ch chan int // nil
select {
case <-ch: // 永远不会执行
// ...
case <-otherCh:
// ...
}总结
Channel 是 Go 并发编程的核心:
- 基本操作:创建、发送、接收、关闭
- 高级特性:select、超时、非阻塞
- 设计模式:Worker Pool、Pipeline、Fan-Out/Fan-In
- 注意事项:
- 避免向已关闭的 Channel 发送
- 防止 Goroutine 泄漏
- 小心死锁
掌握 Channel,你就能编写出优雅、高效的 Go 并发程序!

