这是一个很普通很简单的函数,它会运行一秒钟:
use std::thread;
use std::time::Duration;
fn foo(n: u64) {
println!("start {n}");
thread::sleep(Duration::from_secs(1));
println!("end {n}");
}
如果我们想同时调用多次 foo,可以为每个调用生成一个线程:
fn main() {
let mut thread_handles = Vec::new();
for n in 1..=10 {
thread_handles.push(thread::spawn(move || foo(n)));
}
for handle in thread_handles {
handle.join().unwrap();
}
}
可以看到运行这个程序只需一秒左右而非十秒。但是它的输出却是乱序的,也就是说不同线程同时运行时,其执行顺序是不可预测的。
上述代码虽然是多线程并发执行的,但依然属于同步(synchronous)代码的范畴,这是由于每个线程内部仍然按照顺序执行语句,并不会在执行过程中“暂停”去执行其他任务。线程只是提供了一种并发执行多个函数的机制,但函数本身仍然是同步执行的。
另外,线程虽然可以实现并发执行,但它并不“轻量”。创建线程需要分配独立的栈空间,并涉及操作系统调度,因此当并发任务数量较大时,使用线程的成本会迅速增加。
为此,Rust 提供了一种不同于线程的并发模型:异步编程(asynchronous programming)。在异步模型中,当一个任务需要等待时,它可以暂停自身执行,并让出当前线程的执行权,这允许线程去执行其他任务。当等待结束后,再恢复执行。相比之下,同步模型当中处于等待的任务会一直阻塞所在线程直至等待结束。
上面的 foo 函数中使用了下面这条语句:
thread::sleep(Duration::from_secs(1));
thread::sleep 会阻塞当前线程。这意味着在此线程休眠期间,它无法执行任何其他任务。
如果我们将这个例子改成异步的,大概是这样:
use tokio::time::{sleep, Duration};
async fn foo(n: u64) {
println!("start {n}");
sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
#[tokio::main]
async fn main() {
let mut handles = Vec::new();
for n in 1..=10 {
handles.push(tokio::spawn(foo(n)));
}
for handle in handles {
handle.await.unwrap();
}
}
有很多看不懂?不要紧,后面会详细介绍这个例子中出现的新奇玩意。
Future异步 Rust 在语法上的核心是两个关键字——async 与 .await。它们是 Rust 内置语法,用于像编写同步代码那样编写异步函数。
通过在一个函数的 fn 前面加上关键字 async 来表明这是一个异步函数。对异步函数的返回值使用 .await 语法会创建一个“暂停点”(或称“等待点”),意思是在这里允许当前异步函数暂停执行,并把控制权交还给执行器(executor),等将来条件满足时再继续执行。如果在 main 中不对异步函数使用 .await,该函数无法被执行(这也是为什么说 Rust 的异步执行是惰性的)。
事实上 async 会把一段代码转化为一个实现了 Future 特征的状态机。在普通的同步方法中调用阻塞函数会阻塞整个线程,而阻塞的 future 却是放弃对线程的控制,转而允许其他 future 来执行。为了执行 future,我们需要执行器(executor)。

还是上面的异步 foo 函数:
use tokio::time::{sleep, Duration};
async fn foo(n: u64) {
println!("start {n}");
sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
对于这个简单的异步函数,其对应的非异步版本1是:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
fn foo(n: u64) -> Foo {
let started = false;
let duration = Duration::from_secs(1);
let sleep = Box::pin(tokio::time::sleep(duration));
Foo { n, started, sleep }
}
struct Foo {
n: u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
impl Future for Foo {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if !self.started {
println!("start {}", self.n);
self.started = true;
}
if self.sleep.as_mut().poll(context).is_pending() {
return Poll::Pending;
}
println!("end {}", self.n);
Poll::Ready(())
}
}
从头开始看,foo 是一个返回 Foo 结构体的普通函数。Foo 是一个 future,因为它实现了 future 特征。按照惯例,异步函数使用小写名称,而它返回的 future 使用大写名称。因此 foo 函数返回一个 Foo future,你创建的 sleep 异步函数也会返回一个 Sleep future。
任何 future 都只能通过 poll 方法来推进执行。也就是说任何 future 在等待结束之后,都需要调用 poll 才能继续往下执行。
以下是标准库中完整的 Future 定义:
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Pin<&mut T> 是我们用来调用 Future::poll 的指针类型。它的所有权版本是 Pin<Box<T>>,这里我始终使用 Box::pin 来构造它——虽然有其他的构造方式——这样可以避免讨论“pin 投影”(pin projection)。Pin 实际上为异步 Rust 解决了一个关键问题:它确保在安全的代码中某些 future 被轮询后不能被移动。我们将在下文讨论为什么这很重要。
每次调用 Future::poll 都会从调用者那里接收一个 Context。当一个 poll 方法调用另一个时(比如 Foo::poll 调用 Sleep::poll),它会传递那个 Context。
Future::poll 会返回 Poll 类型。这东西是一个枚举,其中 Ready 是携带值的变体,Pending 是不携带值的变体。一个 future 可能返回任意次数的 Pending,直到最终返回 Ready。Future::poll 返回 Pending 意味着 future 尚未完成,poll 将在未来某个时刻再次被调用;返回 Ready 意味着 future 已完成其工作,并包含 Output 值(如果有的话)。前面的例子中,异步函数 foo 没有返回值,因此 Foo future 没有 Output。
那么下一个问题是,poll 什么时候会被再次调用?一个简短的回答是:等一切就绪之后。当 future 所需的资源全部准备好,有人会告诉执行器去重新执行对应的 poll 方法,而那个人正是 Waker。
Waker 和 ContextContext——正如其名——表示一个异步任务的上下文。目前而言,它只用于提供对 &Waker 的访问,该引用会用于唤醒当前任务。
Waker 通过通知其执行器已准备就绪,从而唤醒该任务。该类型的典型的生命周期是这样的:首先由执行器创建 Waker,随后它的引用会被包装进 Context 中,并传递给 Future::poll。接着,如果该 future 选择返回 Poll::Pending,则 future 会通过某种方式保存该 Waker,并在 future 需要再次被 poll 时调用 Waker::wake()。
Waker 类型实现了 Clone、Send 和 Sync 特征,这是为了可以从任意线程调用 Waker(以唤醒任务)。想象一下,引起 future 暂停的原因多种多样:可能是某个线程正进行的 I/O,也可能是操作系统的事件,这些需要等待的位置与 future 很可能分处不同的线程。为了将一切准备就绪,需要在这些地方都安排一个可以通知执行器的东西,所以我们允许 Waker 跨线程被调用。这样的话,future 在此线程被 poll,而在另外的线程被唤醒,其中也包括那些完全不由执行器管理的线程。
而 Context 并没有实现 Clone、Send 或 Sync,因此,它无法自由地在线程之间传递。
PinPin 类型(以及一系列相关的概念)是构建整个 Rust 异步生态系统的基础模块。它的出现是为了解决异步 Rust 中出现的自引用类型问题。
本小节会解决两个问题:为什么说自引用类型是危险的,以及异步 Rust 中为什么会出现它的身影。
考虑下面这个例子:
async fn foo<'a>(z: &'a mut i32) { /* ... */ }
async fn bar(x: i32, y: i32) -> i32 {
let mut z = x + y;
foo(&mut z).await;
z
}
这两个异步函数都生成一个匿名 future,这个 future 会包含它可能暂停的每一步的状态:启动时、每个 await 点以及完成时。
为了进行清晰的说明,我们将 foo 得到的匿名 future 称为 Foo<'a>('a 是参数 z 的生命周期),将 bar 得到的匿名 future 称为 Bar。Bar 大致将是下面这样:
enum Bar {
/// 当它启动时,仅包含其参数
Start { x: i32, y: i32 },
/// 第一个 await 时,它必须包含 `z`
/// 以及引用了 `z` 的 `Foo` future
FirstAwait { z: i32, foo: Foo<'?> },
/// 完成时,不需要任何数据
Complete,
}
请注意 Foo<'_> future 生命周期中的 '?:这会是个什么样的生命周期呢?它并不是一个比 Bar 活得更久的生命周期,Bar 本身并没有生命周期。实际上 Foo 借用了 Bar 的 z 字段,而该字段与它存储在同一个结构体中,这使得 Bar 变成了自引用类型。这就是为什么自引用会出现在异步 Rust 当中。
无论自引用类型是如何定义的,一旦它存在,就会带来内存安全问题。想象一下,Bar 已经进入了 FirstAwait 状态,因此它包含对自身 z 字段的引用。如果此时 Bar 被移动,这些引用就会变成极其危险的悬垂指针。因此,一旦 Bar 被置于 FirstAwait 状态就不能再被移动是至关重要的。而这也是 Pin 所要做的。
Unpin 特征正式介绍 Pin 之前,先来了解一下它的孪生兄弟——Unpin 特征。
对绝大多数类型而言,它们都不是自引用的——这表明它们可以安全地在内存中移动——并且也不在乎是否被移动,所以 Rust 设计出 Unpin 这个由编译器自动实现的标记特征。它表示该类型在内存中移动是安全的。
而如果某类型是自引用的——它无法被安全地移动——那么它应该被标为 !Unpin(即没有实现 Unpin)。
后面 Pin 会依赖这个特性决定自身的行为。
Pin需要提前说明的是,Pin 的目标并不是安全地创建自引用类型,而是使得操作自引用类型变得安全,这些自引用类型通常是由编译器从异步函数生成的,或是像 Tokio 这样的运行时使用 unsafe 代码从底层实现的。
只要不让自引用类型发生内存层面的移动,操作就是安全的。那么何时会发生移动?比如说对其赋值或将值传递给函数时,而这需要该类型的可变引用。因此 Pin 选择通过限制用户获取可变引用来让操作变得安全。
Pin 是一个包装器,它可以包装任何种类的指针(包括内置的引用类型和像 Box 这样的智能指针)。它的含义是:该指针会将其目标对象在内存中固定,也就是绝不能再被移动。当代码需要修改被固定的对象时,它必须使用 unsafe API 来获取底层数据的可变引用,并向编译器人工保证该对象不会通过那个可变引用被意外移动。
如果用 Pin 包装指向 Unpin 类型的指针,那么用户无需 unsafe 就能取出可变引用。
而如果用 Pin 包装指向 !Unpin 类型的指针,那么 Pin 会保证只会在 unsafe 的情况下给用户提供可变引用,而在 Safe Rust 中没有任何获取可变引用的途径。通过杜绝获取可变引用的可能,Pin 保证了在 Safe 情况下自引用类型不会被移动。
总结一下,Pin 会包装一个指针,如果指针指向的数据是 Unpin 的,那么你依然可以轻易获取其可变引用并进行那些会触发移动的操作;但如果指针指向的数据未实现 Unpin,那么一旦被 Pin 包装,除非你使用 unsafe,否则不可能获取到该数据的可变引用。没有可变引用,你自然无法做出那些会触发移动的操作,也就保障了内存安全。
这里 foo 真正返回的应该是 impl Future<Output = ()> 匿名类型。为了简化表述而用 Foo 代之。 ↩