深入解析 Rust 异步编程:Future、Waker 与执行器的幕后原理

深入解析 Rust 异步编程:Future、Waker 与执行器的幕后原理

Rust 提供了强大的异步编程能力,能够高效地处理大量的并发任务。在 Rust 的异步模型中,Future 是核心概念之一。它表示一个可能尚未完成的异步操作,并提供了等待该操作完成的方法。本文将深入探讨 Future 的工作原理,重点介绍如何通过 poll 和 wake 来驱动异步任务的进展,进而实现任务调度和高效执行。

本文介绍了 Rust 中 Future trait 的基本概念、执行机制及其在异步编程中的作用。通过分析 poll 和 wake 方法的实现,阐明了如何在任务调度中保持高效的非阻塞执行。同时,文章展示了如何构建简单的执行器,处理多个并发的异步任务,并通过系统 IO 集成实现异步网络操作,提升性能和可扩展性。

Rust async 编程

幕后原理:执行Future 和任务

2.1 Future trait

Future trait

  • Future trait是 Rust Async异步编程的核心
  • Future 是一种异步计算,它可以产生一个值
  • 实现了 Future 的类型表示目前可能还不可用的值

下面是一个简化版的 Future trait:

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
  • Future 可以表示:

  • 下一次网络数据包的到来

  • 下一次鼠标的移动

  • 或者仅仅是经过一段时间的时间点

  • Future 代表着一种你可以检验其是否完成的操作

  • Future 可以通过调用 poll 函数来取得进展
  • poll 函数会驱动 Future 尽可能接近完成
  • 如果 Future 完成了:就返回 poll::Ready(result),其中 result 就是最终的结果
  • 如果 Future 还无法完成:就返回 poll::Pending,并当 Future 准备好取得更多进展时调用一个 waker的wake() 函数
  • 针对 Future,你唯一能做的就是使用 poll 来敲它,直到一个值掉出来

wake() 函数

  • 当 wake() 函数被调用时:
  • 执行器将驱动 Future 再次调用 poll 函数,以便 Future 能取得更多的进展
  • 没有wake() 函数,执行器就不知道特定的 Future 何时能取得进展(就得不断的 poll)
  • 通过 wake() 函数,执行器就确切的知道哪些 Future 已准备好进行 poll() 的调用

例子

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it. socket有数据,写入buffer中并返回
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data. socket中还没数据
            //
            // Arrange for `wake` to be called once data is available. 注册一个`wake`函数,当数据可用时,该函数会被调用,
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data. 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

例子

  • 组合多个异步操作,而无需中间分配
  • 可以通过无分配的状态机来实现多个 Future 同时运行或串联运行
/// A SimpleFuture that runs two other futures to completion concurrently. 它会并发地运行两个Future直到它们完成
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace. 之所以可以并发,是因为两个Future的轮询可以交替进行,一个阻塞,另一个就可以立刻执行,反之亦然
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion. 结构体的每个字段都包含一个Future,可以运行直到完成.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait. 等到Future完成后,字段会被设置为 `None`. 这样Future完成后,就不会再被轮询
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`. 尝试去完成一个 Future `a`
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`. 尝试去完成一个 Future `b`
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed -- we can return successfully 两个 Future都已完成 - 我们可以成功地返回了
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have 至少还有一个 Future 没有完成任务,因此返回 `Poll::Pending`.
            // work to do. They will call `wake()` when progress can be made. 当该 Future 再次准备好时,通过调用`wake()`函数来继续执行
            Poll::Pending
        }
    }
}

例子

多个连续的 Future 可以一个接一个的运行

/// A SimpleFuture that runs two futures to completion, one after another. 一个SimpleFuture, 它使用顺序的方式,一个接一个地运行两个Future
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both 注意: 由于本例子用于演示,因此功能简单,`AndThenFut` 会假设两个 Future 在创建时就可用了.
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`. 而真实的`Andthen`允许根据第一个`Future`的输出来创建第二个`Future`,因此复杂的多。
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future -- remove it and start on
                // the second! 我们已经完成了第一个 Future, 可以将它移除, 然后准备开始运行第二个
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future. 第一个 Future 还不能完成
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second. 运行到这里,说明第一个Future已经完成,尝试去完成第二个
        self.second.poll(wake)
    }
}

真正的 Future trait

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`: 首先值得注意的地方是,`self`的类型从`&mut self`变成了`Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`: 其次将`wake: fn()` 修改为 `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
  • self 类型不再是 &mut self,而是 pin<&mut self>
  • 它允许我们创建不可移动的 future
  • 不可移动的对象可以在它们的字段间存储指针
  • 需要启用async/awaitPin 就是必须的

  • wake: fn() 变成了 &mut Context<'_>

  • 在 SimpleFuture 里:
  • 我们通过调用函数指针 (fn()) 来告诉 Future 的执行器:相关的 Future 应该被 poll 了。
  • 由于 fn() 是一个函数指针,它不能存储任何关于哪个 Future调用了 wake 的数据
  • Context 类型提供了访问 Waker类型的值的方式,这些值可以被用来 wake up 特定的任务
  • 例如,实际项目中Web Server可能有上千个不同的连接,它们的wakeup 应单独管理

总之,在正式场景要进行 wake ,就必须携带上数据。 而 Context 类型通过提供一个 Waker 类型的值,就可以用来唤醒特定的的任务。

2.2 使用 Waker 唤醒任务

Waker 类型的作用

  • Future在第一次poll的时候通常无法完成任务,所以Future需要保证在准备好取得更多进展后,可以再次被 poll
  • 每次 Future 被 poll,它都是作为一个任务的一部分
  • 任务(Task)就是被提交给执行者的顶层的 Future

Waker 类型

  • Waker 提供了 wake() 方法,它可以被用来告诉执行者:相关的任务应该被唤醒
  • 当 wake() 被调用,执行者知道 Waker 所关联的任务已经准备好取得更多进展,Future应该被再次 poll
  • Waker 实现了 clone(),可以复制和存储

例子

  • 使用 Waker 实现一个简单的计时器 Future
  • 构建一个定时器
~/rust via 🅒 base
➜ cargo new timer_future
     Created binary (application) `timer_future` package

~/rust via 🅒 base
➜ cd timer_future

timer_future on  master [?] via 🦀 1.67.1 via 🅒 base
➜ c

lib.rs 文件

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
/// 在Future和等待的线程间共享状态
struct SharedState {
    /// Whether or not the sleep time has elapsed
    ///  定时(睡眠)是否结束
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
    waker: Option<Waker>,
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        // 通过检查共享状态,来确定定时器是否已经完成
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
            //
            // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
            // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
            // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread 创建新线程
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration); //  睡眠指定时间实现计时功能
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

2.3 构建一个执行器(Executor)

Future 执行者(Executor)

  • Future 是惰性的:除非驱动它们来完成,否则就什么都不做
  • 一种驱动方式是在 async 函数里使用 .await,但这只是把问题推到了上一层面
  • 谁来执行顶层 async 函数返回的 Future?
  • 需要的是一个 Future 执行者
  • Future 执行者会获取一系列顶层的 Future,通过在 Future 可以有进展的时候调用 poll,来将这些 Future 运行至完成
  • 通常执行者首先会对 Future进行 poll 一次,以便开始
  • 当 Future 通过调用 wake() 表示它们已经准备好取得进展时,它们就会被放回到一个队列里,然后 poll 再次被调用,重复此操作直到 Future 完成

例子

  • 构建简单的执行者,可以运行大量的顶层 Future 来并发的完成
  • 需要使用 future crate 的 ArcWake trait:
  • 它提供了简单的方式用来组建 Waker

lib.rs 文件

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
/// 在Future和等待的线程间共享状态
struct SharedState {
    /// Whether or not the sleep time has elapsed
    ///  定时(睡眠)是否结束
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
    waker: Option<Waker>,
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("[{:?}] Polling TimerFuture...", thread::current().id());
        // Look at the shared state to see if the timer has already completed.
        // 通过检查共享状态,来确定定时器是否已经完成
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            println!("[{:?}] TimerFuture completed...", thread::current().id());
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
            //
            // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
            // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
            // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
            shared_state.waker = Some(cx.waker().clone());
            println!("[{:?}] TimerFuture pending...", thread::current().id());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
    pub fn new(duration: Duration) -> Self {
        println!("[{:?}] 开始创建新的 TimerFuture...", thread::current().id());
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread 创建新线程
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            println!(
                "[{:?}] TimerFuture 生成新的线程并开始睡眠...",
                thread::current().id()
            );
            thread::sleep(duration); //  睡眠指定时间实现计时功能
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                println!(
                    "[{:?}] TimerFuture 新线程获得 waker,并进行 wake()...",
                    thread::current().id()
                );
                waker.wake()
            } else {
                println!(
                    "[{:?}] TimerFuture 新线程没获得 waker...",
                    thread::current().id()
                );
            }
        });

        println!("[{:?}] 返回新的 TimerFuture...", thread::current().id());
        TimerFuture { shared_state }
    }
}

main.rs 文件

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::{
    thread,
    future::Future,
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    sync::{Arc, Mutex},
    task::Context,
    time::Duration,
};
// The timer we wrote in the previous section: 引入之前实现的定时器模块
use timer_future::TimerFuture;

/// Task executor that receives tasks off of a channel and runs them.
/// 任务执行器,负责从通道中接收任务然后执行
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
    /// In-progress future that should be pushed to completion. 进行中的Future,在未来的某个时间点会被完成
    ///
    /// The `Mutex` is not necessary for correctness, since we only have 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread, Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
    /// so we need to use the `Mutex` to prove thread-safety. A production 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
    /// executor would not need this, and could use `UnsafeCell` instead.
    ///  如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    /// 可以将该任务自身放回到任务通道中,等待执行器的poll
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once. 任务通道允许的最大缓冲数(任务队列的最大长度)
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor. 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    println!("[{:?}] 生成 Executor 和 Spawner (含发送端、接收端)...", thread::current().id());
    (Executor { ready_queue }, Spawner { task_sender })
}

impl Spawner {
    // 把 Future 包装成任务发送到通道
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        println!("[{:?}] 将 Future 组成 Task,放入 Channel...", thread::current().id());
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
        println!("[{:?}] wake_by_ref...", thread::current().id());
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

impl Executor {
    fn run(&self) {
        println!("[{:?}] Executor running...", thread::current().id());
        while let Ok(task) = self.ready_queue.recv() { // 从通道不断接收任务
            println!("[{:?}] 接收到任务...", thread::current().id());
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it. 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                println!("[{:?}] 从任务中取得 Future...", thread::current().id());
                // Create a `LocalWaker` from the task itself  基于任务自身创建一个 `LocalWaker`
                let waker = waker_ref(&task);
                println!("[{:?}] 获得 waker by ref...", thread::current().id());
                let context = &mut Context::from_waker(&waker);
                // `BoxFuture<T>` is a type alias for  #`BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
                println!("[{:?}] 获得 context,准备进行 poll()...", thread::current().id());
                if future.as_mut().poll(context).is_pending() {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future. Future还没执行完,因此将它放回任务中,等待下次被poll
                    *future_slot = Some(future);
                    println!("[{:?}] Poll::Pending ====", thread::current().id());
                } else {
                    println!("[{:?}] Poll::Ready....", thread::current().id());
                }
            }
        }
        println!("[{:?}] Excutor run 结束", thread::current().id());
    }
}

fn main() {
    // 返回一个执行者和一个任务的生成器
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer. 生成一个任务 async块是一个Future
    spawner.spawn(async {
        println!("[{:?}] howdy!", thread::current().id());
        // Wait for our timer future to complete after two seconds. 创建定时器Future,并等待它完成
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("[{:?}] done!", thread::current().id());
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run. drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来
    println!("[{:?}] drop Spawner!", thread::current().id());
    drop(spawner);

    // Run the executor until the task queue is empty. 运行执行器直到任务队列为空
    // This will print "howdy!", pause, and then print "done!". 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印 `done!`
    executor.run();
}

运行

timer_future on  master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 2.9s 
➜ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.06s
     Running `target/debug/timer_future`
[ThreadId(1)] 生成 Executor 和 Spawner (含发送端、接收端)...
[ThreadId(1)] 将 Future 组成 Task,放入 Channel...
[ThreadId(1)] drop Spawner!
[ThreadId(1)] Executor running...
[ThreadId(1)] 接收到任务...
[ThreadId(1)] 从任务中取得 Future...
[ThreadId(1)] 获得 waker by ref...
[ThreadId(1)] 获得 context,准备进行 poll()...
[ThreadId(1)] howdy!
[ThreadId(1)] 开始创建新的 TimerFuture...
[ThreadId(1)] 返回新的 TimerFuture...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture pending...
[ThreadId(1)] Poll::Pending ====
[ThreadId(2)] TimerFuture 生成新的线程并开始睡眠...
[ThreadId(2)] TimerFuture 新线程获得 waker,并进行 wake()...
[ThreadId(2)] wake_by_ref...
[ThreadId(1)] 接收到任务...
[ThreadId(1)] 从任务中取得 Future...
[ThreadId(1)] 获得 waker by ref...
[ThreadId(1)] 获得 context,准备进行 poll()...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture completed...
[ThreadId(1)] done!
[ThreadId(1)] Poll::Ready....
[ThreadId(1)] Excutor run 结束

timer_future on  master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 2.6s 
➜ 

2.4 执行器和系统IO

FutureSocket 上执行异步读取

  • 这个 Future 会读取 socket 上可用的数据
  • 如果没有数据,它就屈服于执行者:请求当 socket 再次可读时,唤醒它的任务
  • 本例中我们不知道 Socket 类型是如何实现的,尤其不知道 set_readable_callback 函数如何工作。那么如何在 socket 再次可读时安排调用 wake() 呢?
  • 一种办法是使用一个线程不断检查 socket 是否可读(低效!)
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it. 
           // socket有数据,写入buffer中并返回
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data. socket中还没数据
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
           // 注册一个`wake`函数,当数据可用时,该函数会被调用,
      // 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
  • 实际中,这个问题通过与IO感知的系统阻塞原语(primitive)的集成来解决
  • 例如 Linux 中的 epollFreeBSDmacOS 中的 kqueueWindows 中的 IOCP, Fuchisa中的 ports
  • 所有这些都是通过 Rust 跨平台包的 mio crate 来暴露的
  • 这些原语(primitive)都允许线程阻塞多个异步 IO 事件,并在其中一个事件完成后返回。
struct IoBlocker {
    /* ... */
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    // Event的唯一ID,该事件发生后,就会被监听起来
    id: usize,

    // A set of signals to wait for, or which occurred.
    // 一组需要等待或者已发生的信号
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    /// 创建需要阻塞等待的异步IO事件的集合
    fn new() -> Self { /* ... */ }

    /// Express an interest in a particular IO event.
    /// 对指定的IO事件表示兴趣
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        /// 事件所绑定的socket
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { /* ... */ }

    /// Block until one of the events occurs.
    /// 进入阻塞,直到某个事件出现
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
// 当socket的数据可以读取时,打印 "Socket 1 is now READABLE" 
println!("Socket {:?} is now {:?}", event.id, event.signals);

Socket::set_readable_callback 的伪代码

  • Future 执行者可以使用这些原语提供异步 IO 对象,例如 socket,它就可以当特定 IO 事件发生时通过配置回调来运行
  • 针对我们 SocketRead 例子,Socket::set_readable_callback 的伪代码大致如下:
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

现在,我们就只有一个执行者线程,它可以接收 IO事件,并将它们分配到适合的 Waker,这将唤醒相应的任务,并允许执行者在返回检查更多的IO事件之前,驱动更多的任务完成(循环继续...)。

这样,我们只需要一个执行器线程,它会接收IO事件并将其分发到对应的 Waker 中,接着后者会唤醒相关的任务,最终通过执行器 poll 后,任务可以顺利的继续执行, 这种IO读取流程可以不停的循环,直到 socket 关闭。

总结

Rust 的异步编程模型通过精妙的 Future trait 和 poll、wake 函数的配合,能够实现高效的任务调度和执行。执行器的设计使得多个异步任务可以并发运行,且不会阻塞主线程,极大提升了程序的响应性。通过结合系统级的 IO 原语,Rust 在处理网络和文件 IO 时也能保持高效的非阻塞特性,这使得 Rust 成为构建高性能并发系统的理想选择。

全部评论(0)