Rust 异步编程全解析:async/.await 实践与原理

Rust 异步编程全解析:async/.await 实践与原理

异步编程是现代软件开发中的重要工具,尤其在高并发和 I/O 密集型场景中,Rust 提供了独特的 async/.await 语法,结合其高性能和内存安全特性,成为异步编程领域的亮点。本文将深入讲解 Rust 中的异步编程模型,探讨 async/.await 的基本用法、实现细节及其工作原理,并通过实际代码示例帮助你掌握这一强大的工具。

本文详细介绍了 Rust 中的异步编程语法 async/.await,包括两种使用方式(异步函数和异步块)、运行原理(惰性 Future 和执行器的协作)、生命周期管理以及 async move 的作用。此外,还探讨了跨线程的 Future 执行以及多线程场景中变量的安全性问题。通过丰富的代码示例,本文将帮助开发者从基础到进阶,全面掌握 Rust 异步编程的核心知识。

async & .await

什么是 async/.await

  • async/.await 是 Rust 的特殊语法,在发生阻塞时,它让放弃当前线程的控制权成为可能,这就允许在等待操作完成的时候,允许其它代码取得进展

使用 async 的两种方式

有两种方式可以使用asyncasync fn用于声明函数,async { ... }用于声明语句块,它们会返回一个实现 Future 特征的值:

  • async 和 async blocks:
  • 都返回实现了 Future trait 的值
  • async 体和其它 future都是惰性的:
  • 在真正运行之前什么都不做
  • 使用 .await是最常见的运行future 的方式:
  • 对 future 使用 .await就会尝试驱动Future运行至完成
  • 如果 Future 被阻塞:
  • 它会放弃当前线程的控制权
  • 当可取得更多进展时,执行器会捡起这个 Future 并恢复执行,最终由 .await 完成解析
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
// `foo()`返回一个`Future<Output = u8>`,
// 当调用`foo().await`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    // 下面的`async`语句块返回`Future<Output = u8>`
    async {
        let x: u8 = foo().await;
        x + 5
    }
}

例子

use async_std::io::prelude::*;
use async_std::net;
use async_std::task;

// 异步函数以 async fn 开始  里面由3个异步函数 .await
// 无需调整 async fn 的返回类型,Rust 自动把它当成相应的 Future 类型
// 返回的 Future 包含所需相关信息:参数、本地变量空间...
// Future 的具体类型由编译器基于函数体和参数自动生成
// 该类型没有名称
// 它实现了 Future<Output=R>
// 第一次对 cheapo_request 进行 poll 时:
// 从函数体顶部开始执行
// 直到第一个 await(针对 TcpStream::connect 返回的 Future)
// 随着 cheapo_request 的 Future 不断被 poll,其执行就是从一个 await 到下一个 await,而且只有子 Future变成 Ready 之后才继续
// cheapo_reauest 的 Future 会追踪:
// 下一次 poll 应恢复继续的那个店
// 以及所需的本地状态(变量、参数、临时变量等)
// 这种途中能暂停执行,然后恢复执行的能力是 async 所独有的
// 由于 await 表达式依赖于“可恢复执行”这个特性,所以 await 只能用在 async 里
// 暂停执行时线程在做什么?
// 它不是在干等,而是在做其它工作。
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
  // .await 会等待,直到 future 变成 ready, await 最终会解析出 future 的值
  // connect 当调用 async 函数时,在其函数体执行前,它就会立即返回
  // 这个 await 表达式会对 connect 的Future进行 poll:
  // 如果没完成 -> 返回 Pending
  // 针对 cheapo_request 的 poll 也无法继续,
  // 直到 connect 的 Future 返回 Ready
  let mut socket = net::TcpStream::connect((host, port)).await?;
  let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);

  socket.write_all(request.as_bytes()).await?;
  socket.shutdown(net::Shutdown::Write)?;
  let mut response = String::new();
  socket.read_to_string(&mut response).await?;
  Ok(response)
}

fn main() -> std::io::Result<()> {
  // 注意:
  // 下一次对 cheapo_request 的 Future 进行 poll 时:
  // 并不在函数体顶部开始执行
  // 它会在 connect Future 进行 poll 的地方继续执行
  // 直到它变成 Ready,才会继续在函数体往下走
  let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
  println!("{}", response);
  Ok(())
}
  • await:
  • 获得 Future 的所有权,并对其进行 poll
  • 如果 Future Ready,其最终值就是 await 表达式的值,这时执行就可以继续了
  • 否则就返回 Pending 给调用者

async 的生命周期

  • 与传统函数不同:async fn,如果它的参数是引用或是其它非 'static 的,那么它返回的 Future 就会绑定到参数的生命周期上。
  • 这意味着 async fn 返回的 future,在 .await 的同时,fn 的非 'static 的参数必须保持有效
// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
// 上面的函数跟下面的函数是等价的:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

存储 future 或传递 future

  • 通常,async 的函数在调用后会立即 .await,这就不是问题:
  • 例如:foo(&x).await
  • 如果存储 future 或将其传递给其它任务或线程,就有问题了...
  • 一种变通解决办法:
  • 思路:把使用引用作为参数的 async fn 转为一个 'static future
  • 做法:在 async 块里,将参数和 async fn 的 调用捆绑到一起(延长参数的生命周期来匹配 future)
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

// 将参数和对 async fn 的调用放在同一个 async 语句块
fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

以上代码会报错,因为 x 的生命周期只到 bad 函数的结尾。 但是 Future 显然会活得更久

通过将参数移动到 async 语句块内, 将它的生命周期扩展到 'static, 并跟返回的 Future 保持了一致。

async move

  • async 块和闭包都支持 move
  • async move 块会获得其引用变量的所有权:
  • 允许其比当前所在的作用域活得长
  • 但同时也放弃了与其它代码共享这些变量的能力
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
// 多个不同的 `async` 语句块可以访问同一个本地变量,只要它们在该变量的作用域内执行
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{my_string}");
    };

    let future_two = async {
        // ...
        println!("{my_string}");
    };

    // Run both futures to completion, printing "foo" twice:
    // 运行两个 Future 直到完成
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
// 由于`async move`会捕获环境中的变量,因此只有一个`async move`语句块可以访问该变量,
// 但是它也有非常明显的好处: 变量可以转移到返回的 Future 中,不再受借用生命周期的限制
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{my_string}");
    }
}

在多线程执行者上进行 .await

  • 当使用多线程 future 执行者时,future 就可以在线程间移动:
  • 所以 async 体里面用的变量必须能够在线程间移动
  • 因为任何的 .await 都可能导致切换到一个新线程
  • 这意味着使用以下类型时不安全的:
  • Rc、&RefCell 和任何其它没有实现 Send trait 的类型,包括没实现 Sync trait 的引用
  • 注意:调用 .await 时,只要这些类型不在作用域内,就可以使用它们。
  • 在跨域一个 .await 期间,持有传统的、对 future 无感知的锁,也不是好主意:
  • 可导致线程池锁定
  • 为此,可使用 futures::lock 里的 Mutex 而不是 std::sync 里的

总结

Rust 的 async/.await 提供了一种高效而安全的异步编程方式。通过对 Future 的惰性执行、生命周期管理和多线程支持的深入理解,开发者可以更好地应对高并发编程的挑战。本文不仅讲解了基础知识,还通过实际代码演示了如何将这些概念应用到真实项目中。通过掌握这些内容,你将能够在 Rust 的异步生态中如鱼得水。

全部评论(0)