Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Rust 异步编程从基础到实战:async/await 与 Tokio 详解

一、为什么 Rust 需要异步?

1.1 线程模型的局限

Rust 标准库的线程(std::thread)是 OS 线程:

rust
use std::thread;

// 每个线程开销 ~8MB 栈空间
for i in 0..100 {
    thread::spawn(move || {
        // 处理任务
        println!("线程 {i}");
    });
}

问题

  • 100 个线程在桌面端尚可,10 万个线程将耗尽内存
  • 线程切换需要内核态系统调用
  • 大量 I/O 等待(网络请求、文件读写)导致 CPU 空转

1.2 Go goroutine 的启发

Go 的 goroutine 是用户态协程,初始栈仅 2KB,可动态增长:

go
// Go 可以轻松启动数十万 goroutine
for i := 0; i < 100000; i++ {
    go handleRequest()
}

Rust 的 async 与之类似——在用户态调度轻量级任务,但零 GC 开销无运行时依赖(可选)。

1.3 Rust async 的设计哲学

┌─────────────────────────────────────────────────┐
│  async fn → 编译为状态机(零成本抽象)              │
│  Future trait → 惰性执行(不 poll 就不运行)       │
│  运行时可选 → Tokio / async-std / smol            │
│  取消安全 → Drop 时自动清理                        │
│  Send/Sync → 编译期保证并发安全                    │
└─────────────────────────────────────────────────┘

核心特点

  • 零成本抽象async fn 编译为状态机,没有 Go 那样的 goroutine 结构体
  • 惰性求值:Future 不 .await 就不执行,不像 goroutine 一启动就运行
  • 可取消:Drop 一个 Future 立即停止执行,无需手动管理生命周期

二、Future trait 与 async/await

2.1 Future 的本质

rust
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Poll 有两种状态:

  • Poll::Pending:还没准备好,请稍后再 poll
  • Poll::Ready(value):完成了,返回结果

2.2 async/await 语法糖

rust
// async fn 等价于返回 impl Future 的普通函数
async fn fetch_data(url: &str) -> String {
    // 实际的异步操作
    String::from("data")
}

// 等价于:
fn fetch_data(url: &str) -> impl Future<Output = String> + '_ {
    async move {
        String::from("data")
    }
}

2.3 状态机转换

rust
async fn example() -> i32 {
    let a = step_one().await;
    let b = step_two().await;
    a + b
}

// 编译器生成的简化状态机:
enum ExampleStateMachine {
    Start,
    WaitingStepOne { fut: StepOneFut },
    WaitingStepTwo { a: i32, fut: StepTwoFut },
    Done,
}

每次 .await 都是一个潜在的挂起点,状态机在这些点之间转换。

三、Tokio 运行时

3.1 运行时架构

rust
use tokio::runtime::Runtime;

// 手工创建运行时
let rt = Runtime::new().unwrap();
rt.block_on(async {
    println!("Hello from Tokio");
});

Tokio 的核心组件

组件作用
I/O 事件循环基于 epoll/kqueue/iocp 的事件驱动
任务调度器工作窃取(work-stealing)多线程调度
定时器高效的时间轮(timing wheel)
异步同步原语Mutex、RwLock、Semaphore、Barrier

3.2 多线程 vs 单线程

rust
// 多线程运行时(默认)
#[tokio::main]
async fn main() {
    // 使用多线程工作窃取调度器
}

// 单线程运行时(适用于 CPU 密集型、避免数据竞争)
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // 所有任务在同一个线程执行
}

3.3 任务 spawn

rust
#[tokio::main]
async fn main() {
    // spawn 一个后台任务
    let handle = tokio::spawn(async {
        // 在后台运行
        "task result"
    });
    
    // 可以同时运行多个任务
    let (r1, r2) = tokio::join!(
        async { 1 + 2 },
        async { 3 + 4 },
    );
    
    println!("{r1} {r2}");  // 3 7
    
    // 等待后台任务
    let result = handle.await.unwrap();
    println!("{result}");
}

四、异步 I/O 实战

4.1 TCP 服务器

rust
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    
    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("新连接: {addr}");
        
        // 每个连接一个独立任务
        tokio::spawn(async move {
            let mut buf = [0; 1024];
            
            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => break,  // 连接关闭
                    Ok(n) => {
                        // 回声服务
                        if let Err(e) = socket.write_all(&buf[..n]).await {
                            eprintln!("写入错误: {e}");
                            break;
                        }
                    }
                    Err(e) => {
                        eprintln!("读取错误: {e}");
                        break;
                    }
                }
            }
        });
    }
}

4.2 HTTP 客户端

rust
use std::time::Duration;
use tokio::time::timeout;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 并发请求多个 API
    let urls = vec![
        "https://api.example.com/data1",
        "https://api.example.com/data2",
        "https://api.example.com/data3",
    ];
    
    let mut handles = vec![];
    for url in urls {
        handles.push(tokio::spawn(async move {
            fetch_with_timeout(url, Duration::from_secs(5)).await
        }));
    }
    
    for handle in handles {
        match handle.await? {
            Ok(data) => println!("成功: {:?}", data),
            Err(e) => eprintln!("失败: {e}"),
        }
    }
    
    Ok(())
}

async fn fetch_with_timeout(url: &str, dur: Duration) -> Result<String, &'static str> {
    match timeout(dur, fetch_url(url)).await {
        Ok(Ok(data)) => Ok(data),
        Ok(Err(_)) => Err("请求失败"),
        Err(_) => Err("超时"),
    }
}

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    reqwest::get(url).await?.text().await
}

4.3 异步文件操作

rust
use tokio::fs::File;
use tokio::io::AsyncWriteExt;

async fn write_log(path: &str, content: &str) -> std::io::Result<()> {
    let mut file = File::create(path).await?;
    file.write_all(content.as_bytes()).await?;
    file.flush().await?;
    Ok(())
}

五、异步同步原语

5.1 tokio::sync::Mutex vs std::sync::Mutex

rust
use std::sync::Mutex;
use std::sync::Arc;

// ❌ 不能在 .await 之间持有 std Mutex
// async fn bad() {
//     let data = Mutex::new(0);
//     let guard = data.lock().unwrap();
//     some_async_fn().await;  // 持有锁跨 .await — 高危!
//     *guard += 1;
// }

// ✅ tokio Mutex 支持跨 .await
use tokio::sync::Mutex;

async fn good() {
    let data = Mutex::new(0);
    let mut guard = data.lock().await;
    some_async_fn().await;  // 安全,锁会被当前任务持有
    *guard += 1;
}

选择指南

场景使用
短临界区(< 1μs)std::sync::Mutex(更快)
长临界区或需要跨 .awaittokio::sync::Mutex
读多写少tokio::sync::RwLock

5.2 异步 Channel

rust
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 多生产者单消费者
    let (tx, mut rx) = mpsc::channel::<String>(32);
    
    // 生产者
    let tx1 = tx.clone();
    tokio::spawn(async move {
        tx1.send("来自任务1的消息".to_string()).await.unwrap();
    });
    
    let tx2 = tx.clone();
    tokio::spawn(async move {
        tx2.send("来自任务2的消息".to_string()).await.unwrap();
    });
    
    // 丢弃发送端,让接收端能正确关闭
    drop(tx);
    
    // 消费者
    while let Some(msg) = rx.recv().await {
        println!("收到: {msg}");
    }
}

5.3 oneshot 和 broadcast

rust
use tokio::sync::{oneshot, broadcast};

// oneshot:一次发送一次接收
async fn oneshot_example() {
    let (tx, rx) = oneshot::channel();
    
    tokio::spawn(async move {
        tx.send("一次性消息").unwrap();
    });
    
    let msg = rx.await.unwrap();
    println!("{msg}");
}

// broadcast:广播给所有消费者
async fn broadcast_example() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe();
    
    tx.send("广播消息").unwrap();
    
    assert_eq!(rx1.recv().await.unwrap(), "广播消息");
    assert_eq!(rx2.recv().await.unwrap(), "广播消息");
}

六、错误处理与取消安全

6.1 传播异步错误

rust
use anyhow::{Result, Context};

async fn read_config(path: &str) -> Result<String> {
    let content = tokio::fs::read_to_string(path)
        .await
        .with_context(|| format!("读取配置文件失败: {path}"))?;
    
    Ok(content)
}

6.2 取消安全(Cancellation Safety)

rust
// ❌ 取消不安全
async fn bad_read() {
    let mut buf = vec![0u8; 1024];
    // 如果这里被取消,socket 的读取状态可能不一致
    let n = socket_read(&mut buf).await;
    process_data(&buf[..n]);
}

// ✅ 取消安全:使用 Tokio 提供的内置函数
use tokio::io::AsyncReadExt;

async fn safe_read(mut socket: impl AsyncReadExt + Unpin) {
    let mut buf = vec![0u8; 1024];
    match socket.read_exact(&mut buf).await {
        Ok(n) => process_data(&buf[..n]),
        Err(_) => handle_error(),
    }
}

取消安全原则

  1. .await 后不要假定任何状态
  2. 使用事务或补偿操作处理部分完成
  3. 优先使用 well-tested 的库函数
  4. 使用 std::mem::ManuallyDrop 手动管理资源

七、性能调优

7.1 避免阻塞

rust
// ❌ 不要在线程池中执行阻塞操作
#[tokio::main]
async fn main() {
    // std::thread::sleep(Duration::from_secs(10));  // 会阻塞整个线程池!
    tokio::time::sleep(Duration::from_secs(10)).await;  // ✅ 正确
}

7.2 使用 spawn_blocking

rust
use tokio::task::spawn_blocking;

async fn hash_password(password: &str) -> Result<String, &'static str> {
    // CPU 密集型计算放到专用线程池
    let result = spawn_blocking(move || {
        // argon2 哈希计算(CPU 密集型)
        compute_argon2_hash(password)
    }).await.map_err(|_| "join error")?;
    
    Ok(result)
}

7.3 控制并发数

rust
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Semaphore::new(10);  // 最多 10 个并发
    let mut handles = vec![];
    
    for url in urls {
        let permit = semaphore.acquire().await.unwrap();
        handles.push(tokio::spawn(async move {
            let _permit = permit;  // 保持 permit 存活
            fetch_url(url).await
        }));
    }
}

7.4 使用 tracing 分析性能

rust
use tracing::{info, instrument};

#[instrument]
async fn handle_request(req: Request) -> Response {
    info!("处理请求");
    // ...
}

八、Go vs Rust 异步对比

维度Go goroutineRust async/await
启动开销~2KB 栈,动态增长状态机大小,编译期确定
调度Go runtime 内置第三方运行时(Tokio 等)
创建代价go f() 立即执行async fn 惰性,.await 才执行
取消无内置机制(需 context)Drop 即取消
并发安全竞态检测器(运行时)Send/Sync trait(编译期)
栈跟踪✅ 友好❌ 复杂(可用 tokio-console
内存占用每个 goroutine 有结构体零额外开销
学习曲线高(需要理解生命周期、Pin)
rust
// 启动 100,000 个异步任务
#[tokio::main]
async fn main() {
    let mut handles = vec![];
    for i in 0..100_000 {
        handles.push(tokio::spawn(async move {
            // 极轻量 — 状态机仅几个字节
            i
        }));
    }
    
    for handle in handles {
        let _ = handle.await;
    }
}

九、常见陷阱

9.1 递归 async fn

rust
// ❌ 递归 async fn 编译错误(状态机大小无限)
async fn recurse(n: u32) {
    if n > 0 {
        recurse(n - 1).await;
    }
}

// ✅ 使用 Box 包装
fn recurse(n: u32) -> Pin<Box<dyn Future<Output = ()>>> {
    Box::pin(async move {
        if n > 0 {
            recurse(n - 1).await;
        }
    })
}

9.2 在 async fn 中持有非 Send 类型

rust
// ❌ 错误:Rc 不是 Send
// async fn bad() {
//     let rc = Rc::new(42);
//     some_async().await;
//     println!("{rc}");
// }

// ✅ 重构成不跨 .await
async fn good() {
    let rc = Rc::new(42);
    println!("{rc}");  // 在 .await 之前使用
    some_async().await;
}

9.3 死锁

rust
// ❌ 同线程 async 死锁
async fn deadlock() {
    let mutex = Arc::new(tokio::sync::Mutex::new(0));
    let guard = mutex.lock().await;
    // 在同一个任务中再次锁同一个 mutex
    let guard2 = mutex.lock().await;  // 死锁!
    *guard += 1;
}

十、生态与工具

推荐库

类别说明
运行时Tokio事实标准,功能最全
HTTP 服务axumTokio 团队出品,类型安全
HTTP 客户端reqwest简洁的异步 HTTP 客户端
数据库sqlx编译期 SQL 检查
WebSockettokio-tungstenite异步 WebSocket
消息队列lapinAMQP/RabbitMQ
日志tracing结构化异步日志

调试工具

bash
# tokio-console:async 任务监控
cargo install tokio-console

# 查看运行时状态
$ tokio-console
# 显示每个任务的运行时间、poll 次数、等待原因

本文是 Rust 系列的第二篇,后续将覆盖 FFI、宏系统、WASM 等高级主题。

上次更新于: