深入解析 Rust 异步编程:Future、Waker 与执行器的幕后原理
Rust 提供了强大的异步编程能力,能够高效地处理大量的并发任务。在 Rust 的异步模型中,Future 是核心概念之一。它表示一个可能尚未完成的异步操作,并提供了等待该操作完成的方法。本文将深入探讨 Future 的工作原理,重点介绍如何通过 poll 和 wake 来驱动异步任务的进展,进而实现任务调度和高效执行。
本文介绍了 Rust 中 Future trait 的基本概念、执行机制及其在异步编程中的作用。通过分析 poll 和 wake 方法的实现,阐明了如何在任务调度中保持高效的非阻塞执行。同时,文章展示了如何构建简单的执行器,处理多个并发的异步任务,并通过系统 IO 集成实现异步网络操作,提升性能和可扩展性。
Rust async 编程
幕后原理:执行Future 和任务
2.1 Future trait
Future trait
Future
trait是 Rust Async异步编程的核心Future
是一种异步计算,它可以产生一个值- 实现了 Future 的类型表示目前可能还不可用的值
下面是一个简化版的 Future trait:
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
-
Future 可以表示:
-
下一次网络数据包的到来
-
下一次鼠标的移动
-
或者仅仅是经过一段时间的时间点
-
Future
代表着一种你可以检验其是否完成的操作 Future
可以通过调用 poll 函数来取得进展- poll 函数会驱动 Future 尽可能接近完成
- 如果
Future
完成了:就返回poll::Ready(result)
,其中 result 就是最终的结果 - 如果
Future
还无法完成:就返回poll::Pending
,并当 Future 准备好取得更多进展时调用一个waker
的wake() 函数 - 针对
Future
,你唯一能做的就是使用 poll 来敲它,直到一个值掉出来
wake() 函数
- 当 wake() 函数被调用时:
- 执行器将驱动 Future 再次调用 poll 函数,以便 Future 能取得更多的进展
- 没有wake() 函数,执行器就不知道特定的 Future 何时能取得进展(就得不断的 poll)
- 通过 wake() 函数,执行器就确切的知道哪些 Future 已准备好进行 poll() 的调用
例子
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it. socket有数据,写入buffer中并返回
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data. socket中还没数据
//
// Arrange for `wake` to be called once data is available. 注册一个`wake`函数,当数据可用时,该函数会被调用,
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data. 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
例子
- 组合多个异步操作,而无需中间分配
- 可以通过无分配的状态机来实现多个 Future 同时运行或串联运行
/// A SimpleFuture that runs two other futures to completion concurrently. 它会并发地运行两个Future直到它们完成
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace. 之所以可以并发,是因为两个Future的轮询可以交替进行,一个阻塞,另一个就可以立刻执行,反之亦然
pub struct Join<FutureA, FutureB> {
// Each field may contain a future that should be run to completion. 结构体的每个字段都包含一个Future,可以运行直到完成.
// If the future has already completed, the field is set to `None`.
// This prevents us from polling a future after it has completed, which
// would violate the contract of the `Future` trait. 等到Future完成后,字段会被设置为 `None`. 这样Future完成后,就不会再被轮询
a: Option<FutureA>,
b: Option<FutureB>,
}
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// Attempt to complete future `a`. 尝试去完成一个 Future `a`
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// Attempt to complete future `b`. 尝试去完成一个 Future `b`
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// Both futures have completed -- we can return successfully 两个 Future都已完成 - 我们可以成功地返回了
Poll::Ready(())
} else {
// One or both futures returned `Poll::Pending` and still have 至少还有一个 Future 没有完成任务,因此返回 `Poll::Pending`.
// work to do. They will call `wake()` when progress can be made. 当该 Future 再次准备好时,通过调用`wake()`函数来继续执行
Poll::Pending
}
}
}
例子
多个连续的 Future 可以一个接一个的运行
/// A SimpleFuture that runs two futures to completion, one after another. 一个SimpleFuture, 它使用顺序的方式,一个接一个地运行两个Future
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both 注意: 由于本例子用于演示,因此功能简单,`AndThenFut` 会假设两个 Future 在创建时就可用了.
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`. 而真实的`Andthen`允许根据第一个`Future`的输出来创建第二个`Future`,因此复杂的多。
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
// We've completed the first future -- remove it and start on
// the second! 我们已经完成了第一个 Future, 可以将它移除, 然后准备开始运行第二个
Poll::Ready(()) => self.first.take(),
// We couldn't yet complete the first future. 第一个 Future 还不能完成
Poll::Pending => return Poll::Pending,
};
}
// Now that the first future is done, attempt to complete the second. 运行到这里,说明第一个Future已经完成,尝试去完成第二个
self.second.poll(wake)
}
}
真正的 Future trait
trait Future {
type Output;
fn poll(
// Note the change from `&mut self` to `Pin<&mut Self>`: 首先值得注意的地方是,`self`的类型从`&mut self`变成了`Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from `wake: fn()` to `cx: &mut Context<'_>`: 其次将`wake: fn()` 修改为 `cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
- self 类型不再是
&mut self
,而是pin<&mut self>
- 它允许我们创建不可移动的 future
- 不可移动的对象可以在它们的字段间存储指针
-
需要启用
async/await
,Pin
就是必须的 -
wake: fn()
变成了&mut Context<'_>
。 - 在 SimpleFuture 里:
- 我们通过调用函数指针 (fn()) 来告诉 Future 的执行器:相关的 Future 应该被 poll 了。
- 由于 fn() 是一个函数指针,它不能存储任何关于哪个 Future调用了 wake 的数据
- Context 类型提供了访问 Waker类型的值的方式,这些值可以被用来 wake up 特定的任务
- 例如,实际项目中Web Server可能有上千个不同的连接,它们的wakeup 应单独管理
总之,在正式场景要进行 wake
,就必须携带上数据。 而 Context
类型通过提供一个 Waker
类型的值,就可以用来唤醒特定的的任务。
2.2 使用 Waker 唤醒任务
Waker 类型的作用
- Future在第一次poll的时候通常无法完成任务,所以Future需要保证在准备好取得更多进展后,可以再次被 poll
- 每次 Future 被 poll,它都是作为一个任务的一部分
- 任务(Task)就是被提交给执行者的顶层的 Future
Waker 类型
- Waker 提供了 wake() 方法,它可以被用来告诉执行者:相关的任务应该被唤醒
- 当 wake() 被调用,执行者知道 Waker 所关联的任务已经准备好取得更多进展,Future应该被再次 poll
- Waker 实现了 clone(),可以复制和存储
例子
- 使用 Waker 实现一个简单的计时器 Future
- 构建一个定时器
~/rust via 🅒 base
➜ cargo new timer_future
Created binary (application) `timer_future` package
~/rust via 🅒 base
➜ cd timer_future
timer_future on master [?] via 🦀 1.67.1 via 🅒 base
➜ c
lib.rs 文件
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
/// 在Future和等待的线程间共享状态
struct SharedState {
/// Whether or not the sleep time has elapsed
/// 定时(睡眠)是否结束
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
/// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
// 通过检查共享状态,来确定定时器是否已经完成
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
// 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
//
// 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
// 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
// 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
/// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread 创建新线程
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration); // 睡眠指定时间实现计时功能
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
// 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
2.3 构建一个执行器(Executor)
Future 执行者(Executor)
- Future 是惰性的:除非驱动它们来完成,否则就什么都不做
- 一种驱动方式是在 async 函数里使用 .await,但这只是把问题推到了上一层面
- 谁来执行顶层 async 函数返回的 Future?
- 需要的是一个 Future 执行者
- Future 执行者会获取一系列顶层的 Future,通过在 Future 可以有进展的时候调用 poll,来将这些 Future 运行至完成
- 通常执行者首先会对 Future进行 poll 一次,以便开始
- 当 Future 通过调用 wake() 表示它们已经准备好取得进展时,它们就会被放回到一个队列里,然后 poll 再次被调用,重复此操作直到 Future 完成
例子
- 构建简单的执行者,可以运行大量的顶层 Future 来并发的完成
- 需要使用 future crate 的 ArcWake trait:
- 它提供了简单的方式用来组建 Waker
lib.rs 文件
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
/// 在Future和等待的线程间共享状态
struct SharedState {
/// Whether or not the sleep time has elapsed
/// 定时(睡眠)是否结束
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
/// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("[{:?}] Polling TimerFuture...", thread::current().id());
// Look at the shared state to see if the timer has already completed.
// 通过检查共享状态,来确定定时器是否已经完成
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
println!("[{:?}] TimerFuture completed...", thread::current().id());
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
// 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
//
// 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
// 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
// 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
shared_state.waker = Some(cx.waker().clone());
println!("[{:?}] TimerFuture pending...", thread::current().id());
Poll::Pending
}
}
}
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
/// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
pub fn new(duration: Duration) -> Self {
println!("[{:?}] 开始创建新的 TimerFuture...", thread::current().id());
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread 创建新线程
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
println!(
"[{:?}] TimerFuture 生成新的线程并开始睡眠...",
thread::current().id()
);
thread::sleep(duration); // 睡眠指定时间实现计时功能
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
// 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
println!(
"[{:?}] TimerFuture 新线程获得 waker,并进行 wake()...",
thread::current().id()
);
waker.wake()
} else {
println!(
"[{:?}] TimerFuture 新线程没获得 waker...",
thread::current().id()
);
}
});
println!("[{:?}] 返回新的 TimerFuture...", thread::current().id());
TimerFuture { shared_state }
}
}
main.rs 文件
use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
use std::{
thread,
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};
// The timer we wrote in the previous section: 引入之前实现的定时器模块
use timer_future::TimerFuture;
/// Task executor that receives tasks off of a channel and runs them.
/// 任务执行器,负责从通道中接收任务然后执行
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
/// In-progress future that should be pushed to completion. 进行中的Future,在未来的某个时间点会被完成
///
/// The `Mutex` is not necessary for correctness, since we only have 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread, Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
/// so we need to use the `Mutex` to prove thread-safety. A production 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
/// executor would not need this, and could use `UnsafeCell` instead.
/// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
/// 可以将该任务自身放回到任务通道中,等待执行器的poll
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once. 任务通道允许的最大缓冲数(任务队列的最大长度)
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor. 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
println!("[{:?}] 生成 Executor 和 Spawner (含发送端、接收端)...", thread::current().id());
(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
// 把 Future 包装成任务发送到通道
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
println!("[{:?}] 将 Future 组成 Task,放入 Channel...", thread::current().id());
self.task_sender.send(task).expect("too many tasks queued");
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
// 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
println!("[{:?}] wake_by_ref...", thread::current().id());
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}
impl Executor {
fn run(&self) {
println!("[{:?}] Executor running...", thread::current().id());
while let Ok(task) = self.ready_queue.recv() { // 从通道不断接收任务
println!("[{:?}] 接收到任务...", thread::current().id());
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it. 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
println!("[{:?}] 从任务中取得 Future...", thread::current().id());
// Create a `LocalWaker` from the task itself 基于任务自身创建一个 `LocalWaker`
let waker = waker_ref(&task);
println!("[{:?}] 获得 waker by ref...", thread::current().id());
let context = &mut Context::from_waker(&waker);
// `BoxFuture<T>` is a type alias for #`BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
println!("[{:?}] 获得 context,准备进行 poll()...", thread::current().id());
if future.as_mut().poll(context).is_pending() {
// We're not done processing the future, so put it
// back in its task to be run again in the future. Future还没执行完,因此将它放回任务中,等待下次被poll
*future_slot = Some(future);
println!("[{:?}] Poll::Pending ====", thread::current().id());
} else {
println!("[{:?}] Poll::Ready....", thread::current().id());
}
}
}
println!("[{:?}] Excutor run 结束", thread::current().id());
}
}
fn main() {
// 返回一个执行者和一个任务的生成器
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer. 生成一个任务 async块是一个Future
spawner.spawn(async {
println!("[{:?}] howdy!", thread::current().id());
// Wait for our timer future to complete after two seconds. 创建定时器Future,并等待它完成
TimerFuture::new(Duration::new(2, 0)).await;
println!("[{:?}] done!", thread::current().id());
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run. drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来
println!("[{:?}] drop Spawner!", thread::current().id());
drop(spawner);
// Run the executor until the task queue is empty. 运行执行器直到任务队列为空
// This will print "howdy!", pause, and then print "done!". 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印 `done!`
executor.run();
}
运行
timer_future on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 2.9s
➜ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.06s
Running `target/debug/timer_future`
[ThreadId(1)] 生成 Executor 和 Spawner (含发送端、接收端)...
[ThreadId(1)] 将 Future 组成 Task,放入 Channel...
[ThreadId(1)] drop Spawner!
[ThreadId(1)] Executor running...
[ThreadId(1)] 接收到任务...
[ThreadId(1)] 从任务中取得 Future...
[ThreadId(1)] 获得 waker by ref...
[ThreadId(1)] 获得 context,准备进行 poll()...
[ThreadId(1)] howdy!
[ThreadId(1)] 开始创建新的 TimerFuture...
[ThreadId(1)] 返回新的 TimerFuture...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture pending...
[ThreadId(1)] Poll::Pending ====
[ThreadId(2)] TimerFuture 生成新的线程并开始睡眠...
[ThreadId(2)] TimerFuture 新线程获得 waker,并进行 wake()...
[ThreadId(2)] wake_by_ref...
[ThreadId(1)] 接收到任务...
[ThreadId(1)] 从任务中取得 Future...
[ThreadId(1)] 获得 waker by ref...
[ThreadId(1)] 获得 context,准备进行 poll()...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture completed...
[ThreadId(1)] done!
[ThreadId(1)] Poll::Ready....
[ThreadId(1)] Excutor run 结束
timer_future on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 2.6s
➜
2.4 执行器和系统IO
Future
在 Socket
上执行异步读取
- 这个 Future 会读取 socket 上可用的数据
- 如果没有数据,它就屈服于执行者:请求当 socket 再次可读时,唤醒它的任务
- 本例中我们不知道 Socket 类型是如何实现的,尤其不知道 set_readable_callback 函数如何工作。那么如何在 socket 再次可读时安排调用 wake() 呢?
- 一种办法是使用一个线程不断检查 socket 是否可读(低效!)
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it.
// socket有数据,写入buffer中并返回
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data. socket中还没数据
//
// Arrange for `wake` to be called once data is available.
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data.
// 注册一个`wake`函数,当数据可用时,该函数会被调用,
// 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
- 实际中,这个问题通过与IO感知的系统阻塞原语(primitive)的集成来解决
- 例如
Linux
中的epoll
,FreeBSD
和macOS
中的kqueue
,Windows
中的IOCP
,Fuchisa
中的ports
等 - 所有这些都是通过 Rust 跨平台包的
mio crate
来暴露的 - 这些原语(primitive)都允许线程阻塞多个异步 IO 事件,并在其中一个事件完成后返回。
struct IoBlocker {
/* ... */
}
struct Event {
// An ID uniquely identifying the event that occurred and was listened for.
// Event的唯一ID,该事件发生后,就会被监听起来
id: usize,
// A set of signals to wait for, or which occurred.
// 一组需要等待或者已发生的信号
signals: Signals,
}
impl IoBlocker {
/// Create a new collection of asynchronous IO events to block on.
/// 创建需要阻塞等待的异步IO事件的集合
fn new() -> Self { /* ... */ }
/// Express an interest in a particular IO event.
/// 对指定的IO事件表示兴趣
fn add_io_event_interest(
&self,
/// The object on which the event will occur
/// 事件所绑定的socket
io_object: &IoObject,
/// A set of signals that may appear on the `io_object` for
/// which an event should be triggered, paired with
/// an ID to give to events that result from this interest.
event: Event,
) { /* ... */ }
/// Block until one of the events occurs.
/// 进入阻塞,直到某个事件出现
fn block(&self) -> Event { /* ... */ }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
// 当socket的数据可以读取时,打印 "Socket 1 is now READABLE"
println!("Socket {:?} is now {:?}", event.id, event.signals);
Socket::set_readable_callback 的伪代码
- Future 执行者可以使用这些原语提供异步 IO 对象,例如 socket,它就可以当特定 IO 事件发生时通过配置回调来运行
- 针对我们 SocketRead 例子,Socket::set_readable_callback 的伪代码大致如下:
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
// `local_executor` is a reference to the local executor.
// this could be provided at creation of the socket, but in practice
// many executor implementations pass it down through thread local
// storage for convenience.
let local_executor = self.local_executor;
// Unique ID for this IO object.
let id = self.id;
// Store the local waker in the executor's map so that it can be called
// once the IO event arrives.
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}
现在,我们就只有一个执行者线程,它可以接收 IO事件,并将它们分配到适合的 Waker,这将唤醒相应的任务,并允许执行者在返回检查更多的IO事件之前,驱动更多的任务完成(循环继续...)。
这样,我们只需要一个执行器线程,它会接收IO事件并将其分发到对应的 Waker
中,接着后者会唤醒相关的任务,最终通过执行器 poll
后,任务可以顺利的继续执行, 这种IO读取流程可以不停的循环,直到 socket
关闭。
总结
Rust 的异步编程模型通过精妙的 Future trait 和 poll、wake 函数的配合,能够实现高效的任务调度和执行。执行器的设计使得多个异步任务可以并发运行,且不会阻塞主线程,极大提升了程序的响应性。通过结合系统级的 IO 原语,Rust 在处理网络和文件 IO 时也能保持高效的非阻塞特性,这使得 Rust 成为构建高性能并发系统的理想选择。