Rust Stream Trait 的异步迭代与并发处理实践

Rust Stream Trait 的异步迭代与并发处理实践

Rust 的 Stream trait 是异步编程中的核心概念之一,它与 Future trait 类似,但具有能够生成多个值的能力,类似于同步中的 Iterator。在本篇文章中,我们将深入探讨 Rust 中的 Stream trait,了解如何利用它进行高效的并发处理和迭代操作,并通过示例代码展示如何在异步编程中高效地处理流数据。

本文介绍了 Rust 的 Stream trait,阐述了其在异步编程中的使用方式。与 Future trait 类似,Stream 允许我们处理多个异步值,但它的设计使得我们可以像同步的 Iterator 一样进行值的迭代。文章深入探讨了 Stream 的定义与工作原理,通过示例代码展示如何使用 Stream 处理异步流数据,如何利用并发处理提升性能,并重点介绍了 for_each_concurrent 和 try_for_each_concurrent 方法的应用。

Streams

Stream trait

  • Stream trait 和 Future trait 类似,但它可以在完成前产生多个值,这点和标准库 Iterator trait 也很像
trait Stream {
    /// The type of the value yielded by the stream.
    // Stream生成的值的类型
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    // 尝试去解析Stream中的下一个值,
    // 若无数据,返回`Poll::Pending`, 若有数据,返回 `Poll::Ready(Some(x))`, `Stream`完成则返回 `Poll::Ready(None)`
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

关于 Stream 的一个常见例子是消息通道( futures 包中的)的消费者 Receiver。每次有消息从 Send 端发送后,它都可以接收到一个 Some(val) 值, 一旦 Send 端关闭(drop),且消息通道中没有消息后,它会接收到一个 None 值。

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    // `StreamExt::next` 类似于 `Iterator::next`, 但是前者返回的不是值,而是一个 `Future<Output = Option<T>>`,
    // 因此还需要使用`.await`来获取具体的值
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

Iteration and Concurrency 迭代与并发

  • 与同步的 Iterator 类似,有很多方法迭代和处理 Stream 中的值:
  • 组合器风格的:map、filter、fold
  • 相应的 "early-exit-on-error" 版本:try_map、try_filter、try_fold
  • for 循环无法和 Stream 一起使用
  • 命令式的 while let 和 next/try_next 函数可以与 Stream 一起使用

例子

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

上面代码是一次处理一个值的模式,但是需要注意的是:如果你选择一次处理一个值的模式,可能会造成无法并发,这就失去了异步编程的意义。 因此,如果可以的话我们还是要选择从一个 Stream 并发处理多个值的方式,通过 for_each_concurrenttry_for_each_concurrent 方法来实现:

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

总结

通过对 Stream trait 的深入解析,我们看到它为 Rust 的异步编程提供了强大的支持,特别是在处理多个异步值时的迭代和并发控制。通过适当地使用异步流和并发处理,我们可以极大地提升 Rust 程序的性能和响应能力。理解和掌握这些概念将帮助开发者更好地利用 Rust 在高性能并发应用中的潜力。

全部评论(0)