Rust Stream Trait 的异步迭代与并发处理实践
Rust 的 Stream trait 是异步编程中的核心概念之一,它与 Future trait 类似,但具有能够生成多个值的能力,类似于同步中的 Iterator。在本篇文章中,我们将深入探讨 Rust 中的 Stream trait,了解如何利用它进行高效的并发处理和迭代操作,并通过示例代码展示如何在异步编程中高效地处理流数据。
本文介绍了 Rust 的 Stream trait,阐述了其在异步编程中的使用方式。与 Future trait 类似,Stream 允许我们处理多个异步值,但它的设计使得我们可以像同步的 Iterator 一样进行值的迭代。文章深入探讨了 Stream 的定义与工作原理,通过示例代码展示如何使用 Stream 处理异步流数据,如何利用并发处理提升性能,并重点介绍了 for_each_concurrent 和 try_for_each_concurrent 方法的应用。
Streams
Stream trait
- Stream trait 和 Future trait 类似,但它可以在完成前产生多个值,这点和标准库 Iterator trait 也很像
trait Stream {
/// The type of the value yielded by the stream.
// Stream生成的值的类型
type Item;
/// Attempt to resolve the next item in the stream.
/// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
/// is ready, and `Poll::Ready(None)` if the stream has completed.
// 尝试去解析Stream中的下一个值,
// 若无数据,返回`Poll::Pending`, 若有数据,返回 `Poll::Ready(Some(x))`, `Stream`完成则返回 `Poll::Ready(None)`
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
关于 Stream
的一个常见例子是消息通道( futures
包中的)的消费者 Receiver
。每次有消息从 Send
端发送后,它都可以接收到一个 Some(val)
值, 一旦 Send
端关闭(drop),且消息通道中没有消息后,它会接收到一个 None
值。
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
// `StreamExt::next` is similar to `Iterator::next`, but returns a
// type that implements `Future<Output = Option<T>>`.
// `StreamExt::next` 类似于 `Iterator::next`, 但是前者返回的不是值,而是一个 `Future<Output = Option<T>>`,
// 因此还需要使用`.await`来获取具体的值
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
Iteration and Concurrency 迭代与并发
- 与同步的 Iterator 类似,有很多方法迭代和处理 Stream 中的值:
- 组合器风格的:map、filter、fold
- 相应的 "early-exit-on-error" 版本:try_map、try_filter、try_fold
- for 循环无法和 Stream 一起使用
- 命令式的 while let 和 next/try_next 函数可以与 Stream 一起使用
例子
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
上面代码是一次处理一个值的模式,但是需要注意的是:如果你选择一次处理一个值的模式,可能会造成无法并发,这就失去了异步编程的意义。 因此,如果可以的话我们还是要选择从一个 Stream
并发处理多个值的方式,通过 for_each_concurrent
或 try_for_each_concurrent
方法来实现:
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
总结
通过对 Stream trait 的深入解析,我们看到它为 Rust 的异步编程提供了强大的支持,特别是在处理多个异步值时的迭代和并发控制。通过适当地使用异步流和并发处理,我们可以极大地提升 Rust 程序的性能和响应能力。理解和掌握这些概念将帮助开发者更好地利用 Rust 在高性能并发应用中的潜力。