Rust 异步编程从基础到实战:async/await 与 Tokio 详解
一、为什么 Rust 需要异步?
1.1 线程模型的局限
Rust 标准库的线程(std::thread)是 OS 线程:
rust
use std::thread;
// 每个线程开销 ~8MB 栈空间
for i in 0..100 {
thread::spawn(move || {
// 处理任务
println!("线程 {i}");
});
}问题:
- 100 个线程在桌面端尚可,10 万个线程将耗尽内存
- 线程切换需要内核态系统调用
- 大量 I/O 等待(网络请求、文件读写)导致 CPU 空转
1.2 Go goroutine 的启发
Go 的 goroutine 是用户态协程,初始栈仅 2KB,可动态增长:
go
// Go 可以轻松启动数十万 goroutine
for i := 0; i < 100000; i++ {
go handleRequest()
}Rust 的 async 与之类似——在用户态调度轻量级任务,但零 GC 开销且无运行时依赖(可选)。
1.3 Rust async 的设计哲学
┌─────────────────────────────────────────────────┐
│ async fn → 编译为状态机(零成本抽象) │
│ Future trait → 惰性执行(不 poll 就不运行) │
│ 运行时可选 → Tokio / async-std / smol │
│ 取消安全 → Drop 时自动清理 │
│ Send/Sync → 编译期保证并发安全 │
└─────────────────────────────────────────────────┘核心特点:
- 零成本抽象:
async fn编译为状态机,没有 Go 那样的 goroutine 结构体 - 惰性求值:Future 不
.await就不执行,不像 goroutine 一启动就运行 - 可取消:Drop 一个 Future 立即停止执行,无需手动管理生命周期
二、Future trait 与 async/await
2.1 Future 的本质
rust
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}Poll 有两种状态:
Poll::Pending:还没准备好,请稍后再 pollPoll::Ready(value):完成了,返回结果
2.2 async/await 语法糖
rust
// async fn 等价于返回 impl Future 的普通函数
async fn fetch_data(url: &str) -> String {
// 实际的异步操作
String::from("data")
}
// 等价于:
fn fetch_data(url: &str) -> impl Future<Output = String> + '_ {
async move {
String::from("data")
}
}2.3 状态机转换
rust
async fn example() -> i32 {
let a = step_one().await;
let b = step_two().await;
a + b
}
// 编译器生成的简化状态机:
enum ExampleStateMachine {
Start,
WaitingStepOne { fut: StepOneFut },
WaitingStepTwo { a: i32, fut: StepTwoFut },
Done,
}每次 .await 都是一个潜在的挂起点,状态机在这些点之间转换。
三、Tokio 运行时
3.1 运行时架构
rust
use tokio::runtime::Runtime;
// 手工创建运行时
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("Hello from Tokio");
});Tokio 的核心组件:
| 组件 | 作用 |
|---|---|
| I/O 事件循环 | 基于 epoll/kqueue/iocp 的事件驱动 |
| 任务调度器 | 工作窃取(work-stealing)多线程调度 |
| 定时器 | 高效的时间轮(timing wheel) |
| 异步同步原语 | Mutex、RwLock、Semaphore、Barrier |
3.2 多线程 vs 单线程
rust
// 多线程运行时(默认)
#[tokio::main]
async fn main() {
// 使用多线程工作窃取调度器
}
// 单线程运行时(适用于 CPU 密集型、避免数据竞争)
#[tokio::main(flavor = "current_thread")]
async fn main() {
// 所有任务在同一个线程执行
}3.3 任务 spawn
rust
#[tokio::main]
async fn main() {
// spawn 一个后台任务
let handle = tokio::spawn(async {
// 在后台运行
"task result"
});
// 可以同时运行多个任务
let (r1, r2) = tokio::join!(
async { 1 + 2 },
async { 3 + 4 },
);
println!("{r1} {r2}"); // 3 7
// 等待后台任务
let result = handle.await.unwrap();
println!("{result}");
}四、异步 I/O 实战
4.1 TCP 服务器
rust
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, addr) = listener.accept().await?;
println!("新连接: {addr}");
// 每个连接一个独立任务
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => break, // 连接关闭
Ok(n) => {
// 回声服务
if let Err(e) = socket.write_all(&buf[..n]).await {
eprintln!("写入错误: {e}");
break;
}
}
Err(e) => {
eprintln!("读取错误: {e}");
break;
}
}
}
});
}
}4.2 HTTP 客户端
rust
use std::time::Duration;
use tokio::time::timeout;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 并发请求多个 API
let urls = vec![
"https://api.example.com/data1",
"https://api.example.com/data2",
"https://api.example.com/data3",
];
let mut handles = vec![];
for url in urls {
handles.push(tokio::spawn(async move {
fetch_with_timeout(url, Duration::from_secs(5)).await
}));
}
for handle in handles {
match handle.await? {
Ok(data) => println!("成功: {:?}", data),
Err(e) => eprintln!("失败: {e}"),
}
}
Ok(())
}
async fn fetch_with_timeout(url: &str, dur: Duration) -> Result<String, &'static str> {
match timeout(dur, fetch_url(url)).await {
Ok(Ok(data)) => Ok(data),
Ok(Err(_)) => Err("请求失败"),
Err(_) => Err("超时"),
}
}
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
reqwest::get(url).await?.text().await
}4.3 异步文件操作
rust
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
async fn write_log(path: &str, content: &str) -> std::io::Result<()> {
let mut file = File::create(path).await?;
file.write_all(content.as_bytes()).await?;
file.flush().await?;
Ok(())
}五、异步同步原语
5.1 tokio::sync::Mutex vs std::sync::Mutex
rust
use std::sync::Mutex;
use std::sync::Arc;
// ❌ 不能在 .await 之间持有 std Mutex
// async fn bad() {
// let data = Mutex::new(0);
// let guard = data.lock().unwrap();
// some_async_fn().await; // 持有锁跨 .await — 高危!
// *guard += 1;
// }
// ✅ tokio Mutex 支持跨 .await
use tokio::sync::Mutex;
async fn good() {
let data = Mutex::new(0);
let mut guard = data.lock().await;
some_async_fn().await; // 安全,锁会被当前任务持有
*guard += 1;
}选择指南:
| 场景 | 使用 |
|---|---|
| 短临界区(< 1μs) | std::sync::Mutex(更快) |
| 长临界区或需要跨 .await | tokio::sync::Mutex |
| 读多写少 | tokio::sync::RwLock |
5.2 异步 Channel
rust
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// 多生产者单消费者
let (tx, mut rx) = mpsc::channel::<String>(32);
// 生产者
let tx1 = tx.clone();
tokio::spawn(async move {
tx1.send("来自任务1的消息".to_string()).await.unwrap();
});
let tx2 = tx.clone();
tokio::spawn(async move {
tx2.send("来自任务2的消息".to_string()).await.unwrap();
});
// 丢弃发送端,让接收端能正确关闭
drop(tx);
// 消费者
while let Some(msg) = rx.recv().await {
println!("收到: {msg}");
}
}5.3 oneshot 和 broadcast
rust
use tokio::sync::{oneshot, broadcast};
// oneshot:一次发送一次接收
async fn oneshot_example() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("一次性消息").unwrap();
});
let msg = rx.await.unwrap();
println!("{msg}");
}
// broadcast:广播给所有消费者
async fn broadcast_example() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tx.send("广播消息").unwrap();
assert_eq!(rx1.recv().await.unwrap(), "广播消息");
assert_eq!(rx2.recv().await.unwrap(), "广播消息");
}六、错误处理与取消安全
6.1 传播异步错误
rust
use anyhow::{Result, Context};
async fn read_config(path: &str) -> Result<String> {
let content = tokio::fs::read_to_string(path)
.await
.with_context(|| format!("读取配置文件失败: {path}"))?;
Ok(content)
}6.2 取消安全(Cancellation Safety)
rust
// ❌ 取消不安全
async fn bad_read() {
let mut buf = vec![0u8; 1024];
// 如果这里被取消,socket 的读取状态可能不一致
let n = socket_read(&mut buf).await;
process_data(&buf[..n]);
}
// ✅ 取消安全:使用 Tokio 提供的内置函数
use tokio::io::AsyncReadExt;
async fn safe_read(mut socket: impl AsyncReadExt + Unpin) {
let mut buf = vec![0u8; 1024];
match socket.read_exact(&mut buf).await {
Ok(n) => process_data(&buf[..n]),
Err(_) => handle_error(),
}
}取消安全原则:
.await后不要假定任何状态- 使用事务或补偿操作处理部分完成
- 优先使用 well-tested 的库函数
- 使用
std::mem::ManuallyDrop手动管理资源
七、性能调优
7.1 避免阻塞
rust
// ❌ 不要在线程池中执行阻塞操作
#[tokio::main]
async fn main() {
// std::thread::sleep(Duration::from_secs(10)); // 会阻塞整个线程池!
tokio::time::sleep(Duration::from_secs(10)).await; // ✅ 正确
}7.2 使用 spawn_blocking
rust
use tokio::task::spawn_blocking;
async fn hash_password(password: &str) -> Result<String, &'static str> {
// CPU 密集型计算放到专用线程池
let result = spawn_blocking(move || {
// argon2 哈希计算(CPU 密集型)
compute_argon2_hash(password)
}).await.map_err(|_| "join error")?;
Ok(result)
}7.3 控制并发数
rust
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Semaphore::new(10); // 最多 10 个并发
let mut handles = vec![];
for url in urls {
let permit = semaphore.acquire().await.unwrap();
handles.push(tokio::spawn(async move {
let _permit = permit; // 保持 permit 存活
fetch_url(url).await
}));
}
}7.4 使用 tracing 分析性能
rust
use tracing::{info, instrument};
#[instrument]
async fn handle_request(req: Request) -> Response {
info!("处理请求");
// ...
}八、Go vs Rust 异步对比
| 维度 | Go goroutine | Rust async/await |
|---|---|---|
| 启动开销 | ~2KB 栈,动态增长 | 状态机大小,编译期确定 |
| 调度 | Go runtime 内置 | 第三方运行时(Tokio 等) |
| 创建代价 | go f() 立即执行 | async fn 惰性,.await 才执行 |
| 取消 | 无内置机制(需 context) | Drop 即取消 |
| 并发安全 | 竞态检测器(运行时) | Send/Sync trait(编译期) |
| 栈跟踪 | ✅ 友好 | ❌ 复杂(可用 tokio-console) |
| 内存占用 | 每个 goroutine 有结构体 | 零额外开销 |
| 学习曲线 | 低 | 高(需要理解生命周期、Pin) |
rust
// 启动 100,000 个异步任务
#[tokio::main]
async fn main() {
let mut handles = vec![];
for i in 0..100_000 {
handles.push(tokio::spawn(async move {
// 极轻量 — 状态机仅几个字节
i
}));
}
for handle in handles {
let _ = handle.await;
}
}九、常见陷阱
9.1 递归 async fn
rust
// ❌ 递归 async fn 编译错误(状态机大小无限)
async fn recurse(n: u32) {
if n > 0 {
recurse(n - 1).await;
}
}
// ✅ 使用 Box 包装
fn recurse(n: u32) -> Pin<Box<dyn Future<Output = ()>>> {
Box::pin(async move {
if n > 0 {
recurse(n - 1).await;
}
})
}9.2 在 async fn 中持有非 Send 类型
rust
// ❌ 错误:Rc 不是 Send
// async fn bad() {
// let rc = Rc::new(42);
// some_async().await;
// println!("{rc}");
// }
// ✅ 重构成不跨 .await
async fn good() {
let rc = Rc::new(42);
println!("{rc}"); // 在 .await 之前使用
some_async().await;
}9.3 死锁
rust
// ❌ 同线程 async 死锁
async fn deadlock() {
let mutex = Arc::new(tokio::sync::Mutex::new(0));
let guard = mutex.lock().await;
// 在同一个任务中再次锁同一个 mutex
let guard2 = mutex.lock().await; // 死锁!
*guard += 1;
}十、生态与工具
推荐库
| 类别 | 库 | 说明 |
|---|---|---|
| 运行时 | Tokio | 事实标准,功能最全 |
| HTTP 服务 | axum | Tokio 团队出品,类型安全 |
| HTTP 客户端 | reqwest | 简洁的异步 HTTP 客户端 |
| 数据库 | sqlx | 编译期 SQL 检查 |
| WebSocket | tokio-tungstenite | 异步 WebSocket |
| 消息队列 | lapin | AMQP/RabbitMQ |
| 日志 | tracing | 结构化异步日志 |
调试工具
bash
# tokio-console:async 任务监控
cargo install tokio-console
# 查看运行时状态
$ tokio-console
# 显示每个任务的运行时间、poll 次数、等待原因本文是 Rust 系列的第二篇,后续将覆盖 FFI、宏系统、WASM 等高级主题。

