Rust 异步编程全解析:async/.await 实践与原理
异步编程是现代软件开发中的重要工具,尤其在高并发和 I/O 密集型场景中,Rust 提供了独特的 async/.await 语法,结合其高性能和内存安全特性,成为异步编程领域的亮点。本文将深入讲解 Rust 中的异步编程模型,探讨 async/.await 的基本用法、实现细节及其工作原理,并通过实际代码示例帮助你掌握这一强大的工具。
本文详细介绍了 Rust 中的异步编程语法 async/.await,包括两种使用方式(异步函数和异步块)、运行原理(惰性 Future 和执行器的协作)、生命周期管理以及 async move 的作用。此外,还探讨了跨线程的 Future 执行以及多线程场景中变量的安全性问题。通过丰富的代码示例,本文将帮助开发者从基础到进阶,全面掌握 Rust 异步编程的核心知识。
async & .await
什么是 async/.await
- async/.await 是 Rust 的特殊语法,在发生阻塞时,它让放弃当前线程的控制权成为可能,这就允许在等待操作完成的时候,允许其它代码取得进展
使用 async 的两种方式
有两种方式可以使用async
: async fn
用于声明函数,async { ... }
用于声明语句块,它们会返回一个实现 Future
特征的值:
- async 和 async blocks:
- 都返回实现了 Future trait 的值
- async 体和其它 future都是惰性的:
- 在真正运行之前什么都不做
- 使用
.await
是最常见的运行future 的方式: - 对 future 使用
.await
就会尝试驱动Future运行至完成 - 如果 Future 被阻塞:
- 它会放弃当前线程的控制权
- 当可取得更多进展时,执行器会捡起这个 Future 并恢复执行,最终由
.await
完成解析
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
// `foo()`返回一个`Future<Output = u8>`,
// 当调用`foo().await`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// This `async` block results in a type that implements
// `Future<Output = u8>`.
// 下面的`async`语句块返回`Future<Output = u8>`
async {
let x: u8 = foo().await;
x + 5
}
}
例子
use async_std::io::prelude::*;
use async_std::net;
use async_std::task;
// 异步函数以 async fn 开始 里面由3个异步函数 .await
// 无需调整 async fn 的返回类型,Rust 自动把它当成相应的 Future 类型
// 返回的 Future 包含所需相关信息:参数、本地变量空间...
// Future 的具体类型由编译器基于函数体和参数自动生成
// 该类型没有名称
// 它实现了 Future<Output=R>
// 第一次对 cheapo_request 进行 poll 时:
// 从函数体顶部开始执行
// 直到第一个 await(针对 TcpStream::connect 返回的 Future)
// 随着 cheapo_request 的 Future 不断被 poll,其执行就是从一个 await 到下一个 await,而且只有子 Future变成 Ready 之后才继续
// cheapo_reauest 的 Future 会追踪:
// 下一次 poll 应恢复继续的那个店
// 以及所需的本地状态(变量、参数、临时变量等)
// 这种途中能暂停执行,然后恢复执行的能力是 async 所独有的
// 由于 await 表达式依赖于“可恢复执行”这个特性,所以 await 只能用在 async 里
// 暂停执行时线程在做什么?
// 它不是在干等,而是在做其它工作。
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
// .await 会等待,直到 future 变成 ready, await 最终会解析出 future 的值
// connect 当调用 async 函数时,在其函数体执行前,它就会立即返回
// 这个 await 表达式会对 connect 的Future进行 poll:
// 如果没完成 -> 返回 Pending
// 针对 cheapo_request 的 poll 也无法继续,
// 直到 connect 的 Future 返回 Ready
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
fn main() -> std::io::Result<()> {
// 注意:
// 下一次对 cheapo_request 的 Future 进行 poll 时:
// 并不在函数体顶部开始执行
// 它会在 connect Future 进行 poll 的地方继续执行
// 直到它变成 Ready,才会继续在函数体往下走
let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
println!("{}", response);
Ok(())
}
- await:
- 获得 Future 的所有权,并对其进行 poll
- 如果 Future Ready,其最终值就是 await 表达式的值,这时执行就可以继续了
- 否则就返回 Pending 给调用者
async 的生命周期
- 与传统函数不同:async fn,如果它的参数是引用或是其它非 'static 的,那么它返回的 Future 就会绑定到参数的生命周期上。
- 这意味着 async fn 返回的 future,在 .await 的同时,fn 的非 'static 的参数必须保持有效
// This function:
async fn foo(x: &u8) -> u8 { *x }
// Is equivalent to this function:
// 上面的函数跟下面的函数是等价的:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
存储 future 或传递 future
- 通常,async 的函数在调用后会立即 .await,这就不是问题:
- 例如:
foo(&x).await
- 如果存储 future 或将其传递给其它任务或线程,就有问题了...
- 一种变通解决办法:
- 思路:把使用引用作为参数的 async fn 转为一个 'static future
- 做法:在 async 块里,将参数和 async fn 的 调用捆绑到一起(延长参数的生命周期来匹配 future)
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR: `x` does not live long enough
}
// 将参数和对 async fn 的调用放在同一个 async 语句块
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
以上代码会报错,因为 x
的生命周期只到 bad
函数的结尾。 但是 Future
显然会活得更久
通过将参数移动到 async
语句块内, 将它的生命周期扩展到 'static
, 并跟返回的 Future
保持了一致。
async move
- async 块和闭包都支持 move
- async move 块会获得其引用变量的所有权:
- 允许其比当前所在的作用域活得长
- 但同时也放弃了与其它代码共享这些变量的能力
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
// 多个不同的 `async` 语句块可以访问同一个本地变量,只要它们在该变量的作用域内执行
async fn blocks() {
let my_string = "foo".to_string();
let future_one = async {
// ...
println!("{my_string}");
};
let future_two = async {
// ...
println!("{my_string}");
};
// Run both futures to completion, printing "foo" twice:
// 运行两个 Future 直到完成
let ((), ()) = futures::join!(future_one, future_two);
}
/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
// 由于`async move`会捕获环境中的变量,因此只有一个`async move`语句块可以访问该变量,
// 但是它也有非常明显的好处: 变量可以转移到返回的 Future 中,不再受借用生命周期的限制
fn move_block() -> impl Future<Output = ()> {
let my_string = "foo".to_string();
async move {
// ...
println!("{my_string}");
}
}
在多线程执行者上进行 .await
- 当使用多线程 future 执行者时,future 就可以在线程间移动:
- 所以 async 体里面用的变量必须能够在线程间移动
- 因为任何的 .await 都可能导致切换到一个新线程
- 这意味着使用以下类型时不安全的:
- Rc、&RefCell 和任何其它没有实现 Send trait 的类型,包括没实现 Sync trait 的引用
- 注意:调用 .await 时,只要这些类型不在作用域内,就可以使用它们。
- 在跨域一个 .await 期间,持有传统的、对 future 无感知的锁,也不是好主意:
- 可导致线程池锁定
- 为此,可使用 futures::lock 里的 Mutex 而不是 std::sync 里的
总结
Rust 的 async/.await 提供了一种高效而安全的异步编程方式。通过对 Future 的惰性执行、生命周期管理和多线程支持的深入理解,开发者可以更好地应对高并发编程的挑战。本文不仅讲解了基础知识,还通过实际代码演示了如何将这些概念应用到真实项目中。通过掌握这些内容,你将能够在 Rust 的异步生态中如鱼得水。