Rust 异步编程:从 Future Trait 到手写一个简易 Runtime
一、异步编程的"黑盒"困境:用 Tokio 但不懂为什么
Rust 的 async/await 语法看起来和 JavaScript、Python 一样简单——加个async,写个.await,但底层机制完全不同。JavaScript 是单线程事件循环,Python 有 GIL,而 Rust 的 Future 是惰性的:声明 async 函数不会执行任何代码,必须有人调用.await推进状态机。这个"有人"就是 Runtime。Tokio 是最常用的 Runtime,但如果不懂 Future 的底层机制,遇到异步代码不执行、任务卡死、性能抖动等问题就只能靠猜。理解 Future Trait 和 Runtime 的工作原理,是从"会用 Tokio"到"能调异步问题"的关键一步。
graph TB A[async fn] --> B[编译器生成 Future 状态机] B --> C[.await 挂起当前任务] C --> D[Runtime 调度其他任务] D --> E[IO 就绪后唤醒 Waker] E --> F[Runtime 重新 Poll 该 Future] F --> G{Poll 结果} G -->|Pending| C G -->|Ready| H[返回结果] style B fill:#fff3e0 style E fill:#e8f5e9二、Future Trait 与 Runtime 的底层机制
2.1 Future Trait 的核心定义
Future 只有一个方法poll:Runtime 调用poll推进状态机,返回Pending(需要等待)或Ready(value)(已完成)。Pending时必须注册Waker,让 Runtime 在条件满足时重新 poll。
stateDiagram-v2 [*] --> Poll0: 首次 poll Poll0 --> Pending0: IO 未就绪<br/>注册 Waker Pending0 --> Poll1: Waker 唤醒 Poll1 --> Pending1: IO 仍未就绪 Pending1 --> Poll2: Waker 再次唤醒 Poll2 --> Ready: IO 就绪<br/>返回值 Ready --> [*]2.2 Waker 的唤醒机制
Waker 是 Future 与 Runtime 之间的桥梁。当 IO 操作(如 epoll_wait)检测到文件描述符就绪时,操作系统通知 Runtime,Runtime 调用 Waker 唤醒对应的 Future,重新 poll。Waker 内部包含一个虚表(vtable)指针和数据指针,允许不同 Runtime 实现不同的唤醒策略。
2.3 Runtime 的调度循环
Runtime 的核心是一个循环:不断从就绪队列取任务执行 poll,将返回 Pending 的任务挂起等待 Waker 唤醒,唤醒后重新放入就绪队列。
三、手写简易 Runtime 与 Future 实现
3.1 Future Trait 的简化实现
use std::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; /// 简易 Future Trait(与标准库一致) pub trait SimpleFuture { type Output; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } /// 一个定时器 Future:在指定时间后完成 pub struct TimerFuture { shared_state: Arc<Mutex<TimerState>>, } struct TimerState { completed: bool, waker: Option<Waker>, } impl TimerFuture { pub fn new(duration: std::time::Duration) -> Self { let shared_state = Arc::new(Mutex::new(TimerState { completed: false, waker: None, })); // 启动后台线程模拟异步等待 let state = Arc::clone(&shared_state); std::thread::spawn(move || { std::thread::sleep(duration); let mut state = state.lock().unwrap(); state.completed = true; // 通知 Runtime 重新 poll if let Some(waker) = state.waker.take() { waker.wake(); } }); TimerFuture { shared_state } } } impl SimpleFuture for TimerFuture { type Output = (); fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut state = self.shared_state.lock().unwrap(); if state.completed { Poll::Ready(()) } else { // 注册 Waker,以便完成时能被唤醒 state.waker = Some(cx.waker().clone()); Poll::Pending } } }3.2 简易 Runtime 实现
/// 简易异步 Runtime pub struct MiniRuntime { task_queue: Arc<Mutex<VecDeque<Task>>>, } /// 任务封装:Box 化的 Future + Waker type Task = std::pin::Pin<Box<dyn SimpleFuture<Output = ()> + Send>>; impl MiniRuntime { pub fn new() -> Self { Self { task_queue: Arc::new(Mutex::new(VecDeque::new())), } } /// 生成一个 Waker,唤醒时将任务放回就绪队列 fn create_waker(&self, task_id: usize) -> Waker { let queue = Arc::clone(&self.task_queue); // 使用 RawWaker 实现自定义唤醒逻辑 let data = Box::into_raw(Box::new((task_id, queue))) as *const (); let vtable = &RawWakerVTable::new( Self::clone_raw, Self::wake_raw, Self::wake_by_ref_raw, Self::drop_raw, ); unsafe { Waker::from_raw(RawWaker::new(data, vtable)) } } unsafe fn clone_raw(data: *const ()) -> RawWaker { let (task_id, queue) = &*(data as *const (usize, Arc<Mutex<VecDeque<Task>>>)); let cloned = Box::new((*task_id, Arc::clone(queue))); let data = Box::into_raw(cloned) as *const (); RawWaker::new(data, &RawWakerVTable::new( Self::clone_raw, Self::wake_raw, Self::wake_by_ref_raw, Self::drop_raw )) } unsafe fn wake_raw(data: *const ()) { let (task_id, queue) = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>)); // 唤醒时将任务重新放入队列(此处简化,实际需要保存 Future) let _ = (task_id, queue); } unsafe fn wake_by_ref_raw(data: *const ()) { Self::wake_raw(data); } unsafe fn drop_raw(data: *const ()) { let _ = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>)); } /// 运行所有任务直到完成 pub fn block_on<F>(&self, future: F) where F: SimpleFuture<Output = ()> + Send + 'static, { let mut task = Box::pin(future); let waker = self.create_waker(0); let mut cx = Context::from_waker(&waker); loop { match task.as_mut().poll(&mut cx) { Poll::Ready(()) => break, Poll::Pending => { // 简化处理:让出 CPU 等待唤醒 std::thread::yield_now(); } } } } }3.3 组合多个 Future:Join 与 Select
/// Join: 等待所有 Future 完成 pub struct JoinAll<F> { futures: Vec<Option<F>>, pending_count: usize, } impl<F: SimpleFuture + Unpin> JoinAll<F> { pub fn new(futures: Vec<F>) -> Self { let pending_count = futures.len(); Self { futures: futures.into_iter().map(Some).collect(), pending_count, } } } impl<F: SimpleFuture<Output = ()> + Unpin> SimpleFuture for JoinAll<F> { type Output = (); fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { for future_slot in &mut self.futures { if let Some(future) = future_slot { match future.poll(cx) { Poll::Ready(()) => { *future_slot = None; // 已完成,移除 self.pending_count -= 1; } Poll::Pending => {} } } } if self.pending_count == 0 { Poll::Ready(()) } else { Poll::Pending } } } /// Select: 返回最先完成的 Future 的结果 pub struct Select<F1, F2> { future1: Option<F1>, future2: Option<F2>, } impl<F1: SimpleFuture + Unpin, F2: SimpleFuture + Unpin> Select<F1, F2> { pub fn new(f1: F1, f2: F2) -> Self { Self { future1: Some(f1), future2: Some(f2) } } } impl<F1, F2> SimpleFuture for Select<F1, F2> where F1: SimpleFuture + Unpin, F2: SimpleFuture + Unpin, { type Output = (); fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Poll 第一个 Future if let Some(ref mut f1) = self.future1 { if let Poll::Ready(()) = f1.poll(cx) { return Poll::Ready(()); } } // Poll 第二个 Future if let Some(ref mut f2) = self.future2 { if let Poll::Ready(()) = f2.poll(cx) { return Poll::Ready(()); } } Poll::Pending } }四、异步 Runtime 的架构权衡
4.1 单线程 vs 多线程 Runtime
| 维度 | 单线程 Runtime | 多线程 Runtime (Tokio) |
|---|---|---|
| 任务窃取 | 无 | Work-Stealing 调度 |
| CPU 利用 | 单核 | 多核 |
| 锁竞争 | 无 | 有(全局就绪队列) |
| 适用场景 | 嵌入式/轻量服务 | 高并发服务端 |
4.2 协作式调度 vs 抢占式调度
Rust 的 Future 是协作式调度:任务必须主动.await让出控制权。如果某个 Future 在 poll 中执行长时间计算而不 yield,会阻塞整个线程。Tokio 提供了tokio::task::yield_now()强制让出,以及coop机制限制单次 poll 的预算。
4.3 适用边界与禁用场景
异步适用:
- IO 密集型任务(网络请求、文件读写)
- 高并发连接(HTTP 服务器、WebSocket)
- 需要同时等待多个 IO 事件
异步禁用:
- CPU 密集型计算(用
spawn_blocking委托给线程池) - 简单的顺序逻辑(同步代码更清晰)
- 不需要并发的场景(过度设计)
五、总结
Rust 的异步模型是"零成本抽象"的典范:async 函数编译为状态机,没有隐式的堆分配和运行时开销。但代价是必须显式管理 Runtime——Future 是惰性的,不会自动执行。理解poll、Waker、Context三者的协作关系,是诊断异步问题的前提。手写 Runtime 虽然不适合生产使用,但能揭示 Tokio 的核心机制:就绪队列、Waker 唤醒、任务调度。从"会用 async/await"到"理解底层状态机",这个跨越决定了面对异步 Bug 时的排查效率。