深入浅出 IAsyncEnumerable:超越 async/await 的异步流式处理实战指南

作者:微信公众号:【架构师老卢】
9-20 10:3
1

异步编程在现代 .NET 开发中无处不在。任务(Task)、async/await 和并行性是每个开发人员每天都要处理的主题。但是,当您需要异步流式传输数据、处理大型数据集、使用分页 API 或处理慢速 I/O 源时,该怎么办?这就是 IAsyncEnumerable 的用武之地。

如果您和大多数开发人员一样,可能听说过 IAsyncEnumerable,但还没有找到合适的理由或正确的解释将其应用到实际项目中。在本次深度探讨中,我将改变这一现状。

我们将远远超出基础示例的范围。我将解释:

  • 什么是异步流(async streams)。
  • 为什么它们不仅仅是另一个异步噱头。
  • 它们如何解决现实世界中的后端问题?
  • 以及何时应避免使用它们。

这是多部分系列的第一部分。在本部分中,我们将介绍理解 IAsyncEnumerable 所需的核心概念和实际使用模式。

之后,我们将深入探讨高级模式、性能注意事项和流式 API。

在本指南结束时,您将通过实际可运行的示例(而不仅仅是理论)牢固掌握如何使用以及何时使用异步流。

让我们开始吧。

简史 异步流自 .NET Core 3.0 和 C# 8.0 起就成为 .NET 的一部分。引入它们是为了解决一个简单但重要的问题:如何在不一次性加载所有内容的情况下处理大型或连续数据?

在 IAsyncEnumerable 出现之前,您只有两个不好的选择:

  1. 使用 Task<List<T>> 加载所有内容,即将整个数据集加载到内存中。
  2. 使用基于事件或回调的设计,这会使您的代码复杂化。

有了 IAsyncEnumerable,.NET 为我们提供了一个简洁、惰性、异步的流模型。您可以在数据产生时拉取数据,而不会阻塞线程或导致内存爆炸。

什么是 IAsyncEnumerable? 从其核心来说,IAsyncEnumerable<T> 正如其名:是 IEnumerable 的异步版本。但您不是同步拉取项,而是使用 await foreach 异步拉取它们。

可以将其视为一个惰性数据管道:

  • 您的方法不返回完整的列表。
  • 它在一个项可用时逐个返回它们。
  • 每个项都可以异步产生(例如,从文件读取一行或等待 API 响应之后)。

基础示例:产生一个异步流

async IAsyncEnumerable<string> GetDataAsync()
{
    yield return await Task.FromResult("Item 1");
    yield return await Task.FromResult("Item 2");
    yield return await Task.FromResult("Item 3");
}

每个 yield return 都会暂停执行并发出一个新值。async 关键字允许您在产生(yield)之前等待(await)异步操作。

消费一个异步流:await foreach 要读取异步流,您可以像这样使用 await foreach

await foreach (var item in GetDataAsync())
{
    Console.WriteLine(item);
}

每个项一旦可用就会被处理,无需等待完整的数据集。

为什么不直接使用 Task<List>? 因为 Task<List<T>> 迫使您等到整个数据集准备就绪。异步流让您能够:

  • 立即开始处理数据。
  • 在结果产生时进行流式传输。
  • 高效处理海量或无限的数据集。

这是基础。接下来,让我们谈谈如何产生和消费现实世界中的异步流。

产生异步流 创建异步流意味着编写一个返回 IAsyncEnumerable<T> 的方法,并使用 yield return 一次发射一个数据片段。

可以将其视为构建一个惰性数据工厂,该工厂在请求项时异步产生它们。

示例:带延迟的流式传输项 这是一个模拟随时间推移生成数据的简单示例:

async IAsyncEnumerable<string> GenerateDataAsync()
{
    for (int i = 1; i <= 5; i++)
    {
        await Task.Delay(500); // 模拟异步工作(API 调用、文件读取等)
        yield return $"Item {i}";
    }
}

每个 yield return 在模拟延迟后发出下一个项。

实际示例:逐行读取大文件 您可以异步地逐行产生(yield)内容,而不是将整个文件读入内存:

async IAsyncEnumerable<string> ReadLinesAsync(string filePath)
{
    using var reader = new StreamReader(filePath);
    while (!reader.EndOfStream)
    {
        yield return await reader.ReadLineAsync();
    }
}

这使得消费者可以在每行被读取时处理它,而无需等待整个文件加载完毕。

资源作用域 请始终记住:

  • 在流内部使用 using 块来管理可释放资源。
  • 一旦异步流完成,任何本地资源(如文件句柄)都会被释放。

现在您知道了如何产生流,接下来让我们探索如何有效地消费它们。

消费异步流 产生异步流只是故事的一半。要利用它,您需要以异步、逐项的方式消费流。

这就是 await foreach 发挥作用的地方。

基础消费示例

await foreach (var item in GenerateDataAsync())
{
    Console.WriteLine($"Received: {item}");
}

每个项一旦被产生(yielded)就会被立即处理,无需等待整个数据集。

为什么这很重要 与等待完整列表相比:

  • 您可以立即开始工作。
  • 您可以保持低内存使用率。
  • 您可以优雅地处理连续或无限的数据流。

不要陷入这个陷阱 这样做是错误的:

var list = await GenerateDataAsync().ToListAsync();

为什么?您将高效的流重新转换为批量加载的列表,扼杀了它的主要优势。

如果您的目标是流式传输,请坚持使用 await foreach

实时示例:处理大文件 使用我们之前的 ReadLinesAsync 方法:

await foreach (var line in ReadLinesAsync(filePath))
{
    Console.WriteLine($"Line: {line}");
}

每一行都被实时读取、处理和显示,非常适合等待会损害性能的大文件。

既然您已经看到了两个方面(产生和消费),接下来让我们将异步流应用到实际用例中。

实际用例 让我们超越玩具示例。以下是一些异步流可以大放异彩的现实场景。

1. 大文件处理 代替将整个文件读入内存:

await foreach (var line in ReadLinesAsync(filePath))
{
    ProcessLine(line);  // 在每行被读取时立即处理它。
}

非常适合:

  • 日志处理。
  • 大型 CSV 解析。
  • 实时文件监控。

2. 流式传输 API 数据 从远程 API 逐页获取数据?不要缓冲整个响应。对其进行流式传输。

将分页 API 包装为异步流的示例:

async IAsyncEnumerable<Order> FetchOrdersAsync()
{
    int page = 1;
    while (true)
    {
        var orders = await FetchPageAsync(page);
        if (orders.Count == 0)
            yield break;
        foreach (var order in orders)
            yield return order;
        page++;
    }
}

这让您的消费者可以在订单到达时处理它们:

await foreach (var order in FetchOrdersAsync())
{
    HandleOrder(order);
}

无需等待整个数据集。您的 UI 或后端可以实时持续工作。

文件流和分页 API 流这两种模式是最常见的实际应用。它们也是异步流通常优于传统的基于集合的方法的地方。

接下来,让我们讨论同样重要的事情:何时不应使用异步流。

何时不应使用异步流 尽管有诸多好处,但 IAsyncEnumerable 并不总是合适的工具。在以下情况下应避免使用它:

小型数据集?不必麻烦 如果您正在处理:

  • 十几个左右的记录,
  • 一个简短的项列表,
  • 快速的内存数据……

……就没有理由引入异步迭代的开销。一个简单的 Task<List<T>> 甚至普通的同步循环更快、更清晰。

需要立即获取完整结果? 有时,您需要所有数据才能开始处理,可能是因为:

  • 您正在聚合总计,
  • 应用全局排序,或
  • 将数据传递给仅接受列表的库。

在这些情况下,异步流可能会在没有真正收益的情况下使您的代码复杂化。

您认为它是并行的(剧透:它不是) 异步流默认是顺序的。它们允许您惰性地、异步地处理数据,但不是并行地。

如果您的目标是并行性,您将需要:

  • 手动缓冲项,
  • 启动任务,或
  • 使用其他模式,如 Parallel.ForEachAsync

常见错误和误解 即使经验丰富的开发人员也会误用 IAsyncEnumerable。让我们澄清一些最常见的误解:

错误 1:认为它是并行的 许多开发人员将异步与并行混淆。 IAsyncEnumerable 顺序处理项,只是是异步的。每次迭代都会等待前一次迭代完成,除非您显式引入并行性。

错误 2:忘记 await foreach 与常规的 foreach 不同,消费异步流需要 await foreach。忘记这一点会导致令人困惑的编译错误。

await foreach (var item in GetDataAsync())
{
    // 处理项
}

错误 3:将流转回列表 最讽刺的错误之一:

var list = await GetDataAsync().ToListAsync();

这破坏了流式传输的全部意义。您刚刚强制将流一次性全部放入内存。

错误 4:误解惰性执行 异步流是惰性的。生产者直到您开始迭代时才会运行。

  • 声明方法时不会立即返回任何内容。
  • 当通过 await foreach 请求第一个项时,执行才开始。

注意:异步流关乎效率,而非复杂性。除非它们能简化您的代码或改善内存/资源使用情况,否则不要使用它们。

这就是 IAsyncEnumerable 的真实面貌。它不是某种理论抽象;而是一种工具,当您处理大型或慢速数据时,它可以解决实际问题。我们已经探讨了如何构建异步流、如何正确消费它们以及它们在哪些场景下有意义。更重要的是,您现在知道了在哪些场景下它们不适用。在下一部分中,我将带您更深入地了解实际场景:流式 API、性能注意事项、错误处理、取消以及 .NET 10 通过异步 LINQ 带来的新特性。这才是真正有趣的地方。

相关留言评论
昵称:
邮箱:
阅读排行