C#中具有 IAsyncEnumerable 和 System.Linq.Async 的异步处理

作者:微信公众号:【架构师老卢】
7-19 8:9
26

概述:在现代软件开发中,响应式应用程序开发是主要要求之一。但是,处理耗时的操作(如 I/O 绑定任务和数据检索)通常会带来挑战,尤其是使用传统的同步方法时。 并且是 C# 8 中引入的两个强大功能,旨在彻底改变异步数据处理。在这篇文章中,我将讨论示例场景和功能,并揭示它们如何解决传统异步方法的局限性。在进入异步范式之前,了解传统同步编程的约束至关重要。Blocking Calls同步操作会暂停程序执行,直到程序完成,从而导致应用程序响应能力降低。Memory Overhead同步加载大型数据集可能会使内存资源紧张,从而可能导致性能下降或内存不足异常。Limited Scalability同步处理可能

在现代软件开发中,响应式应用程序开发是主要要求之一。但是,处理耗时的操作(如 I/O 绑定任务和数据检索)通常会带来挑战,尤其是使用传统的同步方法时。 并且是 C# 8 中引入的两个强大功能,旨在彻底改变异步数据处理。在这篇文章中,我将讨论示例场景和功能,并揭示它们如何解决传统异步方法的局限性。

在进入异步范式之前,了解传统同步编程的约束至关重要。

Blocking Calls

同步操作会暂停程序执行,直到程序完成,从而导致应用程序响应能力降低。

Memory Overhead

同步加载大型数据集可能会使内存资源紧张,从而可能导致性能下降或内存不足异常。

Limited Scalability

同步处理可能会遇到 I/O 绑定操作或大型数据集,因为它依赖于阻塞线程,而这些线程是有限的资源。

Traditional Asynchronous Data Retrieval

传统的异步数据检索通常涉及使用 .NET Framework 提供的异步方法,例如 .以下示例说明了如何使用此方法异步提取数据:

public async Task<List<int>> FetchDataAsync()
{
    List<int> result = new List<int>();
    for (int page = 1; page <= 3; page++)
    {
        var data = await GetDataFromApiAsync(page);
        result.AddRange(data);
    }
    return result;
}

public async Task<List<int>> GetDataFromApiAsync(int page)
{
    // Simulate fetching data from API asynchronously
    await Task.Delay(100);
    // Return data for the specified page
    return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}

在这种方法中,使用 从 Web API 的多个页面中异步检索数据。但是,请注意,方法签名返回一个 ,表示将在任务完成之前获取整个数据集。

使用 IAsyncEnumerable 进行异步数据检索

IAsyncEnumerable提供一种更高效、响应更迅速的方式来处理异步数据流。让我们重构我们的数据检索示例以利用:

public async IAsyncEnumerable<int> FetchDataAsync()
{
    for (int page = 1; page <= 3; page++)
    {
        var data = await GetDataFromApiAsync(page);
        foreach (var item in data)
        {
            yield return item;
        }
    }
}

public async Task<List<int>> GetDataFromApiAsync(int page)
{
    // Simulate fetching data from API asynchronously
    await Task.Delay(1000);
    // Return data for the specified page
    return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}

在此代码中,异步从 Web API 提取数据,并在元素可用时生成元素。这样可以更有效地使用资源,并在数据到达时进行处理。

IAsyncEnumerable 的高级概念

错误处理

IAsyncEnumerable支持在生成元素的异步方法中使用 try-catch 块进行错误处理。此外,您可以利用这些方法(例如,)优雅地处理错误。

public async IAsyncEnumerable<int> FetchDataAsync(CancellationToken cancellationToken = default)
{
    for (int page = 1; page <= 3; page++)
    {
        try
        {
            var data = await GetDataFromApiAsync(page, cancellationToken);
            foreach (var item in data)
            {
                yield return item;
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"An error occurred: {ex.Message}");
        }
    }
}

public async Task<List<int>> GetDataFromApiAsync(int page, CancellationToken cancellationToken)
{
    // Simulate fetching data from API asynchronously
    await Task.Delay(100, cancellationToken);
    // Return data for the specified page
    return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}

在此示例中,该方法通过捕获异常并记录异常来正常处理错误。

取消

异步操作通常需要可取消,以提高响应能力和资源利用率。 支持通过使用 取消。您可以将 a 传递给生成元素的异步方法,并使用 定期检查是否取消。

public async IAsyncEnumerable<int> FetchDataAsync(CancellationToken cancellationToken = default)
{
    for (int page = 1; page <= 3; page++)
    {
        cancellationToken.ThrowIfCancellationRequested();
        var data = await GetDataFromApiAsync(page, cancellationToken);
        foreach (var item in data)
        {
            yield return item;
        }
    }
}

public async Task<List<int>> GetDataFromApiAsync(int page, CancellationToken cancellationToken)
{
    // Simulate fetching data from API asynchronously
    await Task.Delay(100, cancellationToken);
    // Return data for the specified page
    return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}

在这里,该方法接受一个参数并定期检查取消。

缓冲和批处理

在一次处理一个元素效率低下的情况下,可以使用 实现缓冲和批处理技术。这涉及在产生元素之前缓冲一定数量的元素,或者根据指定的大小或时间间隔对元素进行批处理。

public async IAsyncEnumerable<List<int>> FetchDataBatchedAsync(int batchSize, CancellationToken cancellationToken = default)
{
    List<int> batch = new List<int>();
    for (int page = 1; page <= 3; page++)
    {
        cancellationToken.ThrowIfCancellationRequested();
        var data = await GetDataFromApiAsync(page, cancellationToken);
        batch.AddRange(data);
        if (batch.Count >= batchSize)
        {
            yield return batch;
            batch = new List<int>();
        }
    }
    if (batch.Count > 0)
    {
        yield return batch;
    }
}

在此代码中,该方法从 API 检索数据,并根据指定的元素生成成批的元素。

并行处理

IAsyncEnumerable可以与并行处理技术结合使用,以提高吞吐量和性能。您可以使用异步方法(如 TPL Dataflow)或并行处理库(如 TPL Dataflow)来并发处理元素,同时保持序列的异步性质。

public async Task ProcessDataParallelAsync(CancellationToken cancellationToken = default)
{
    var dataStream = FetchDataAsync(cancellationToken);
    var tasks = new List<Task>();

    await foreach (var batch in dataStream)
    {
        tasks.Add(ProcessBatchAsync(batch, cancellationToken));
    }

    await Task.WhenAll(tasks);
}

public async Task ProcessBatchAsync(List<int> batch, CancellationToken cancellationToken)
{
    // Simulate processing batch asynchronously
    await Task.Delay(100, cancellationToken);
    foreach (var item in batch)
    {
        Console.WriteLine($"Processing item: {item}");
    }
}

在此示例中,该方法使用 异步处理从 API 检索到的数据批次。

高级查询

高级 LINQ 运算符和技术可以增强数据处理能力。这包括像 、 和 这样的运算符,以及使用方法链或查询语法组成复杂的查询表达式。

public async Task ProcessDataAsync(CancellationToken cancellationToken = default)  
{  
    var dataStream = FetchDataAsync(cancellationToken)  
                        .Where(x => x % 2 == 0)  
                        .OrderByDescending(x => x);  
  
    await foreach (var item in dataStream)  
    {  
        Console.WriteLine($"Processed item: {item}");  
    }  
}

在此示例中,该方法使用 LINQ 运算符异步处理从 API 检索的筛选和排序数据。

过滤异步数据

// Filter even numbers asynchronously  
await foreach (var evenNumber in FetchDataAsync().WhereAsync(x => x % 2 == 0))  
{  
    Console.WriteLine(evenNumber);  
}

WhereAsync筛选 返回的异步数据流,使其仅包含偶数。循环异步迭代过滤后的序列,并将每个偶数打印到控制台。

异步数据

// Project each number asynchronously to its square  
await foreach (var squaredNumber in FetchDataAsync().SelectAsync(async x => await SquareAsync(x)))  
{  
    Console.WriteLine(squaredNumber);  
}

在这里,使用该方法将返回的异步数据流中的每个项目异步投影到其方块。该循环异步迭代投影序列,并将每个平方数字打印到控制台。

聚合异步数据

// Calculate the sum of all numbers asynchronously  
var sum = await FetchDataAsync().SumAsync();  
Console.WriteLine($"Sum: {sum}");

SumAsync计算 返回的异步数据流中所有数字的总和。异步等待结果,然后打印到控制台。

结合过滤

// Filter even numbers and project each number to its square asynchronously  
await foreach (var squaredEvenNumber in FetchDataAsync()  
                                        .WhereAsync(x => x % 2 == 0)  
                                        .SelectAsync(async x => await SquareAsync(x)))  
{  
    Console.WriteLine(squaredEvenNumber);  
}

此示例演示如何合并 and 筛选偶数,并将每个偶数异步投影到其平方。

合并多个异步流

// Combine two asynchronous data streams and filter their union  
await foreach (var number in FetchDataAsync()  
                             .UnionAsync(FetchMoreDataAsync())  
                             .WhereAsync(x => x % 3 == 0))  
{  
    Console.WriteLine(number);  
}

UnionAsync合并了 和 返回的两个异步数据流。然后,过滤两个流的并集,以仅包含可被 3 整除的数字。

异步分组

// Group numbers by their remainder when divided by 3 asynchronously  
await foreach (var group in FetchDataAsync()  
                            .GroupByAsync(x => x % 3))  
{  
    Console.WriteLine($"Group {group.Key}: {string.Join(", ", group)}");  
}

GroupByAsync当异步除以 3 时,将数字按其余数分组。然后,在循环中迭代生成的组,并且每个组都与其键一起打印到控制台中。

异步非同一性

// Select distinct numbers asynchronously  
await foreach (var number in FetchDataAsync().DistinctAsync())  
{  
    Console.WriteLine(number);  
}

DistinctAsync从 返回的异步数据流中选择不同的数字。然后,在循环中迭代不同的数字,并打印到控制台。

优势和最佳实践

  • 高效的资源利用: 异步流通过增量处理数据、减少内存占用和增强响应能力来优化资源使用。
  • 与 Async/Await 集成: 携手合作,使编写清晰高效的异步代码变得容易。IAsyncEnumerable
  • 可扩展且响应迅速的应用程序: 使用异步数据处理使应用程序能够顺利处理大型数据集和 I/O 绑定任务,从而确保它们保持可扩展性和响应能力。

IAsyncEnumerable并提供效率和响应能力。使用这些功能时,可以克服较慢的同步方法的限制。这意味着您的应用程序可以以更顺畅、更高效的方式处理数据。

阅读排行