news 2026/6/11 9:24:24

Rust 异步编程:从 Future Trait 到手写一个简易 Runtime

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Rust 异步编程:从 Future Trait 到手写一个简易 Runtime

Rust 异步编程:从 Future Trait 到手写一个简易 Runtime

一、异步编程的"黑盒"困境:用 Tokio 但不懂为什么

Rust 的 async/await 语法看起来和 JavaScript、Python 一样简单——加个async,写个.await,但底层机制完全不同。JavaScript 是单线程事件循环,Python 有 GIL,而 Rust 的 Future 是惰性的:声明 async 函数不会执行任何代码,必须有人调用.await推进状态机。这个"有人"就是 Runtime。Tokio 是最常用的 Runtime,但如果不懂 Future 的底层机制,遇到异步代码不执行、任务卡死、性能抖动等问题就只能靠猜。理解 Future Trait 和 Runtime 的工作原理,是从"会用 Tokio"到"能调异步问题"的关键一步。

graph TB A[async fn] --> B[编译器生成 Future 状态机] B --> C[.await 挂起当前任务] C --> D[Runtime 调度其他任务] D --> E[IO 就绪后唤醒 Waker] E --> F[Runtime 重新 Poll 该 Future] F --> G{Poll 结果} G -->|Pending| C G -->|Ready| H[返回结果] style B fill:#fff3e0 style E fill:#e8f5e9

二、Future Trait 与 Runtime 的底层机制

2.1 Future Trait 的核心定义

Future 只有一个方法poll:Runtime 调用poll推进状态机,返回Pending(需要等待)或Ready(value)(已完成)。Pending时必须注册Waker,让 Runtime 在条件满足时重新 poll。

stateDiagram-v2 [*] --> Poll0: 首次 poll Poll0 --> Pending0: IO 未就绪<br/>注册 Waker Pending0 --> Poll1: Waker 唤醒 Poll1 --> Pending1: IO 仍未就绪 Pending1 --> Poll2: Waker 再次唤醒 Poll2 --> Ready: IO 就绪<br/>返回值 Ready --> [*]

2.2 Waker 的唤醒机制

Waker 是 Future 与 Runtime 之间的桥梁。当 IO 操作(如 epoll_wait)检测到文件描述符就绪时,操作系统通知 Runtime,Runtime 调用 Waker 唤醒对应的 Future,重新 poll。Waker 内部包含一个虚表(vtable)指针和数据指针,允许不同 Runtime 实现不同的唤醒策略。

2.3 Runtime 的调度循环

Runtime 的核心是一个循环:不断从就绪队列取任务执行 poll,将返回 Pending 的任务挂起等待 Waker 唤醒,唤醒后重新放入就绪队列。

三、手写简易 Runtime 与 Future 实现

3.1 Future Trait 的简化实现

use std::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; /// 简易 Future Trait(与标准库一致) pub trait SimpleFuture { type Output; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } /// 一个定时器 Future:在指定时间后完成 pub struct TimerFuture { shared_state: Arc<Mutex<TimerState>>, } struct TimerState { completed: bool, waker: Option<Waker>, } impl TimerFuture { pub fn new(duration: std::time::Duration) -> Self { let shared_state = Arc::new(Mutex::new(TimerState { completed: false, waker: None, })); // 启动后台线程模拟异步等待 let state = Arc::clone(&shared_state); std::thread::spawn(move || { std::thread::sleep(duration); let mut state = state.lock().unwrap(); state.completed = true; // 通知 Runtime 重新 poll if let Some(waker) = state.waker.take() { waker.wake(); } }); TimerFuture { shared_state } } } impl SimpleFuture for TimerFuture { type Output = (); fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut state = self.shared_state.lock().unwrap(); if state.completed { Poll::Ready(()) } else { // 注册 Waker,以便完成时能被唤醒 state.waker = Some(cx.waker().clone()); Poll::Pending } } }

3.2 简易 Runtime 实现

/// 简易异步 Runtime pub struct MiniRuntime { task_queue: Arc<Mutex<VecDeque<Task>>>, } /// 任务封装:Box 化的 Future + Waker type Task = std::pin::Pin<Box<dyn SimpleFuture<Output = ()> + Send>>; impl MiniRuntime { pub fn new() -> Self { Self { task_queue: Arc::new(Mutex::new(VecDeque::new())), } } /// 生成一个 Waker,唤醒时将任务放回就绪队列 fn create_waker(&self, task_id: usize) -> Waker { let queue = Arc::clone(&self.task_queue); // 使用 RawWaker 实现自定义唤醒逻辑 let data = Box::into_raw(Box::new((task_id, queue))) as *const (); let vtable = &RawWakerVTable::new( Self::clone_raw, Self::wake_raw, Self::wake_by_ref_raw, Self::drop_raw, ); unsafe { Waker::from_raw(RawWaker::new(data, vtable)) } } unsafe fn clone_raw(data: *const ()) -> RawWaker { let (task_id, queue) = &*(data as *const (usize, Arc<Mutex<VecDeque<Task>>>)); let cloned = Box::new((*task_id, Arc::clone(queue))); let data = Box::into_raw(cloned) as *const (); RawWaker::new(data, &RawWakerVTable::new( Self::clone_raw, Self::wake_raw, Self::wake_by_ref_raw, Self::drop_raw )) } unsafe fn wake_raw(data: *const ()) { let (task_id, queue) = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>)); // 唤醒时将任务重新放入队列(此处简化,实际需要保存 Future) let _ = (task_id, queue); } unsafe fn wake_by_ref_raw(data: *const ()) { Self::wake_raw(data); } unsafe fn drop_raw(data: *const ()) { let _ = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>)); } /// 运行所有任务直到完成 pub fn block_on<F>(&self, future: F) where F: SimpleFuture<Output = ()> + Send + 'static, { let mut task = Box::pin(future); let waker = self.create_waker(0); let mut cx = Context::from_waker(&waker); loop { match task.as_mut().poll(&mut cx) { Poll::Ready(()) => break, Poll::Pending => { // 简化处理:让出 CPU 等待唤醒 std::thread::yield_now(); } } } } }

3.3 组合多个 Future:Join 与 Select

/// Join: 等待所有 Future 完成 pub struct JoinAll<F> { futures: Vec<Option<F>>, pending_count: usize, } impl<F: SimpleFuture + Unpin> JoinAll<F> { pub fn new(futures: Vec<F>) -> Self { let pending_count = futures.len(); Self { futures: futures.into_iter().map(Some).collect(), pending_count, } } } impl<F: SimpleFuture<Output = ()> + Unpin> SimpleFuture for JoinAll<F> { type Output = (); fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { for future_slot in &mut self.futures { if let Some(future) = future_slot { match future.poll(cx) { Poll::Ready(()) => { *future_slot = None; // 已完成,移除 self.pending_count -= 1; } Poll::Pending => {} } } } if self.pending_count == 0 { Poll::Ready(()) } else { Poll::Pending } } } /// Select: 返回最先完成的 Future 的结果 pub struct Select<F1, F2> { future1: Option<F1>, future2: Option<F2>, } impl<F1: SimpleFuture + Unpin, F2: SimpleFuture + Unpin> Select<F1, F2> { pub fn new(f1: F1, f2: F2) -> Self { Self { future1: Some(f1), future2: Some(f2) } } } impl<F1, F2> SimpleFuture for Select<F1, F2> where F1: SimpleFuture + Unpin, F2: SimpleFuture + Unpin, { type Output = (); fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Poll 第一个 Future if let Some(ref mut f1) = self.future1 { if let Poll::Ready(()) = f1.poll(cx) { return Poll::Ready(()); } } // Poll 第二个 Future if let Some(ref mut f2) = self.future2 { if let Poll::Ready(()) = f2.poll(cx) { return Poll::Ready(()); } } Poll::Pending } }

四、异步 Runtime 的架构权衡

4.1 单线程 vs 多线程 Runtime

维度单线程 Runtime多线程 Runtime (Tokio)
任务窃取Work-Stealing 调度
CPU 利用单核多核
锁竞争有(全局就绪队列)
适用场景嵌入式/轻量服务高并发服务端

4.2 协作式调度 vs 抢占式调度

Rust 的 Future 是协作式调度:任务必须主动.await让出控制权。如果某个 Future 在 poll 中执行长时间计算而不 yield,会阻塞整个线程。Tokio 提供了tokio::task::yield_now()强制让出,以及coop机制限制单次 poll 的预算。

4.3 适用边界与禁用场景

异步适用:

  • IO 密集型任务(网络请求、文件读写)
  • 高并发连接(HTTP 服务器、WebSocket)
  • 需要同时等待多个 IO 事件

异步禁用:

  • CPU 密集型计算(用spawn_blocking委托给线程池)
  • 简单的顺序逻辑(同步代码更清晰)
  • 不需要并发的场景(过度设计)

五、总结

Rust 的异步模型是"零成本抽象"的典范:async 函数编译为状态机,没有隐式的堆分配和运行时开销。但代价是必须显式管理 Runtime——Future 是惰性的,不会自动执行。理解pollWakerContext三者的协作关系,是诊断异步问题的前提。手写 Runtime 虽然不适合生产使用,但能揭示 Tokio 的核心机制:就绪队列、Waker 唤醒、任务调度。从"会用 async/await"到"理解底层状态机",这个跨越决定了面对异步 Bug 时的排查效率。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/11 9:24:05

为什么你的微信聊天记录需要专业管理?WeChatMsg终极数据归档指南

为什么你的微信聊天记录需要专业管理&#xff1f;WeChatMsg终极数据归档指南 【免费下载链接】WeChatMsg 提取微信聊天记录&#xff0c;将其导出成HTML、Word、CSV文档永久保存&#xff0c;对聊天记录进行分析生成年度聊天报告 项目地址: https://gitcode.com/GitHub_Trendin…

作者头像 李华
网站建设 2026/6/11 9:23:53

MC9S12G端口集成模块(PIM)详解:从GPIO配置到实战应用

1. 从零开始&#xff1a;理解MC9S12G的端口集成模块&#xff08;PIM&#xff09;如果你正在使用NXP的MC9S12G系列微控制器&#xff0c;并且已经厌倦了在数据手册里翻找那些零散的GPIO配置说明&#xff0c;那么这篇文章就是为你准备的。我花了相当长的时间&#xff0c;在汽车电子…

作者头像 李华
网站建设 2026/6/11 9:23:37

Hadoop-StringTokenizer

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 StringTokenizer(String str)&#xff1a;构造一个用来解析str的StringTokenizer对象。 java默认的分隔符是“空格”、“制表符(‘\t’)…

作者头像 李华
网站建设 2026/6/11 9:23:26

Linux部署zerotier局域网工具,并搭建moon;客户端配置moon

在线安装zerotier curl -s https://install.zerotier.com | sudo bash 查看安装zerotier版本 sudo zerotier-cli status 加入一个netWork sudo zerotier-cli join ################&#xff08;networkid&#xff09; 查看加入的网络的信息&#xff0c;比如network sudo zeroti…

作者头像 李华