Rust 异步 101

小葵花宝宝课堂开课啦!

问题是啥

你是 XX 公司老板,现在你有一万个任务亟待解决。你有如下几种方案:

  • 方案一(同步 Synchronous):叫一个人过来把这一万个任务一件一件地做完。恭喜你,你的公司倒闭了!
  • 方案二(并行 Parallel):叫一万个人过来,每个人负责一件任务。恭喜你,你的公司倒闭了!(当然,你确实有钱则另论)
  • 方案三(线程池):叫一百个人过来,每个人负责一百个任务。还行,公司还没死,但也差不多了!

你意识到一个问题,很多事件被浪费在 等待 上了。你在公共区沙发上发现五个躺着的员工,一问,在等打印文件呢;活动室里三人,一问,在等甲方回复呢。阳台上那人,你盯了他一天了,最终下班终于忍不住去问,在等 Rust 编译呢。

你忍无可忍,“他们就不能在等待时干点活吗??”

异步

异步 Asynchronous,和同步对应。在同步中,事情总是按照顺序来,一个接一个;而在异步中,事情不一定需要有序:你可以先打印文件,在等待的时候去做报表,打印好了又回来继续工作。

在实际编程中,这里需要 等待 的任务,一般就是 IO 操作(文件读写、网络)。如果是计算密集的任务,异步并不会比并行带来更多好处。

基于回调的异步

一个相当符合直觉的异步实现方式就是使用回调。当一个 IO 操作完成后,系统调用一个你提供的回调函数。调用 IO 操作函数本身是立即返回的(非阻塞)。

这样做有几个问题。假设你需要读取两个文件,然后把它们写到一个文件中。

1
2
3
4
5
6
7
read_file("file1.txt", |content| {
read_file("file2.txt", |content2| {
write_file("file3.txt", content + content2, |result| {
println!("Done: {}", result);
});
});
});

哇,依托史!这被称为回调地狱(Callback Hell)。代码难以编写、难以阅读,难以维护。

其次这里有一个比较隐晦的点:回调是谁在执行?我们知道 IO 操作是内核在负责,要它跑一个用户态的逻辑回调无疑很搞笑。想象一下,你正忙着干活,被内核反手中断(Interrupt)了,你晕头转向中听到它告诉你你好你点的豪华 read 套餐好了,请速来取餐,回头一看你家炸了。因此依赖于回调来直接实现你的程序逻辑往往是不现实的。

基于 Future 的异步

很多语言都有 Future 这个概念。大体意思就是,这是一个将来会出结果的操作。现在我去要结果,它可能会告诉我还没好,我就去先干别的事。等了一会儿一看,欸好了,那就把结果拿来用。

这里还需要引入一个 await 的概念。它的作用就是等待一个 Future 的结果。当一个 Future 还没好的时候,await 会把当前任务挂起,等到这个 Future 准备好了再继续。

1
2
3
let content1 = read_file("file1.txt").await;
let content2 = read_file("file2.txt").await;
write_file("file3.txt", content1 + content2).await;

你可能会想,这里 await 要怎么实现?什么叫“把当前任务挂起”?该不会又是操作系统来中断我吧?但实际上一般的异步实现并没有什么高级魔法,你的异步任务只是被编译为了一个状态机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
let mut state = 0;
let mut content1 = None;
let mut content2 = None;
let mut future1 = None;
let mut future2 = None;

// 一个可以被多次调用的函数
// 这里并不是 Rust 真正的 Future,只作为示例
let task = || {
match state {
0 => {
if future1.is_none() {
future1 = Some(read_file("file1.txt"));
}
if future1.ready() {
content1 = Some(future1.value());
state = 1;
}
}
1 => {
if future2.is_none() {
future2 = Some(read_file("file2.txt"));
}
...
}
...
}
};

这里要注意我们的任务实质上也是一个 Future

在这上面我们可以建立一个异步运行时的概念。它内部有一个线程池(当然也有单线程的版本),每个线程(被称为 worker)每个时刻在跑一个任务。当一个任务跑去调用了 IO 操作,我这个线程就可以把这个 IO 的 Future 甩给一个管理线程,然后继续跑别的任务。等到 IO 操作完成,管理线程就标记这个 Future 为完成,这样下一个空出来的线程就可以继续跑这个任务。

接下来我们考虑怎么实现 Future。最基础的方法就是记一个 is_ready,然后开一个线程或者让内核去跑费时费力的操作,跑完了就标记 is_ready = true。但你会发现你写的管理线程变成了这样:

1
2
3
4
5
6
7
8
9
let list_of_futures = ...;
loop {
for future in list_of_futures {
if future.is_ready() {
let task = /* 等待 Future 的任务 */;
dispatch_to_worker(task); // 分发给某个 worker
}
}
}

你发现你的管理线程在不停轮流检查每个 Future 是否完成,这是很浪费的(Busy Waiting)。我们来看看 Rust 标准库中的 Future

1
2
3
4
5
trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

大致意思是,异步运行时可以通过 poll 去获取这个 Future 的状态。如果 Future 已经有结果了(Resolved),那么皆大欢喜,返回 Poll::Ready(value);如果还没好,欸那这个 Future 会收下异步运行时在 Context 里面提供的一个 waker(本质上是一个回调函数),等我好了我就调用这个 waker 告诉你。这样一来我们的异步运行时就不会面临 Busy Waiting 的问题。

操作系统的异步方式

基于中断

上面很自然地就引出了基于中断的异步方式。我们把 IO 操作交给内核,等到操作完成后内核会通过中断(SIGIO)来通知我们,我们这时调用 waker 就好了。

看上去很不错,但想象一下你有几千个文件读取请求,如果每一次操作完成都需要内核通过中断来唤醒任务,这个上下文切换的开销是很大的。

基于轮询

一种常见的轮询是 POSIX 提供的 poll 函数。

1
int poll(pollfd *fds, unsigned long nfds, int timeout);

我们可以传给它一系列文件描述符,函数会阻塞直到其中一个文件描述符准备好下一次读/写了。但这里每次都需要把所有文件描述符传给内核,比较浪费。另一个 epoll(Linux)可以维护一个用户感兴趣的文件描述符列表,这样就不用每次都传了。

基于准备和完成的 IO

我们注意到上面的 pollepoll 都是工作在文件描述符上面的。他们的原件不是 IO 操作而是 IO 对象。它会告诉我们,那个 IO 对象已经准备好了,而不是告诉我们哪些 IO 操作已经完成了。这种被称为 Readiness-based IO,和 Completion-based IO 相对。

基于完成和准备的 IO 的一个显著的区别是,在基于完成的 IO 中,你需要确保你的 buffer 是完好的,因为涉及到一个取消的问题。

想象你的基于完成 IO 是这样的:

1
2
3
4
5
async fn foo() {
let mut buffer = [0; 1024];
file.submit_read_task(&mut buffer).await;
// ...
}

但此时,BOOM 的一声,你的 foo 还在等但突然被取消了(比如外面的人把你 drop 了),此时 buffer 也跟着似了。如果这时候系统 IO 再往里面写数据那就完蛋喽!

基于准备的 IO 就不会有这种问题,因为它大致是这样的:

1
2
3
4
5
async fn foo() {
let mut buffer = [0; 1024];
file.wait_for_read().await;
file.read(&mut buffer);
}

因此中途打断也不会有问题。

作者

Mivik

发布于

2024-04-30

更新于

2024-04-30

许可协议

评论