在现代软件开发中,响应式应用程序开发是主要要求之一。但是,处理耗时的操作(如 I/O 绑定任务和数据检索)通常会带来挑战,尤其是使用传统的同步方法时。 并且是 C# 8 中引入的两个强大功能,旨在彻底改变异步数据处理。在这篇文章中,我将讨论示例场景和功能,并揭示它们如何解决传统异步方法的局限性。
在进入异步范式之前,了解传统同步编程的约束至关重要。
同步操作会暂停程序执行,直到程序完成,从而导致应用程序响应能力降低。
同步加载大型数据集可能会使内存资源紧张,从而可能导致性能下降或内存不足异常。
同步处理可能会遇到 I/O 绑定操作或大型数据集,因为它依赖于阻塞线程,而这些线程是有限的资源。
传统的异步数据检索通常涉及使用 .NET Framework 提供的异步方法,例如 .以下示例说明了如何使用此方法异步提取数据:
public async Task<List<int>> FetchDataAsync()
{
List<int> result = new List<int>();
for (int page = 1; page <= 3; page++)
{
var data = await GetDataFromApiAsync(page);
result.AddRange(data);
}
return result;
}
public async Task<List<int>> GetDataFromApiAsync(int page)
{
// Simulate fetching data from API asynchronously
await Task.Delay(100);
// Return data for the specified page
return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}
在这种方法中,使用 从 Web API 的多个页面中异步检索数据。但是,请注意,方法签名返回一个 ,表示将在任务完成之前获取整个数据集。
IAsyncEnumerable提供一种更高效、响应更迅速的方式来处理异步数据流。让我们重构我们的数据检索示例以利用:
public async IAsyncEnumerable<int> FetchDataAsync()
{
for (int page = 1; page <= 3; page++)
{
var data = await GetDataFromApiAsync(page);
foreach (var item in data)
{
yield return item;
}
}
}
public async Task<List<int>> GetDataFromApiAsync(int page)
{
// Simulate fetching data from API asynchronously
await Task.Delay(1000);
// Return data for the specified page
return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}
在此代码中,异步从 Web API 提取数据,并在元素可用时生成元素。这样可以更有效地使用资源,并在数据到达时进行处理。
IAsyncEnumerable支持在生成元素的异步方法中使用 try-catch 块进行错误处理。此外,您可以利用这些方法(例如,)优雅地处理错误。
public async IAsyncEnumerable<int> FetchDataAsync(CancellationToken cancellationToken = default)
{
for (int page = 1; page <= 3; page++)
{
try
{
var data = await GetDataFromApiAsync(page, cancellationToken);
foreach (var item in data)
{
yield return item;
}
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
}
}
public async Task<List<int>> GetDataFromApiAsync(int page, CancellationToken cancellationToken)
{
// Simulate fetching data from API asynchronously
await Task.Delay(100, cancellationToken);
// Return data for the specified page
return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}
在此示例中,该方法通过捕获异常并记录异常来正常处理错误。
异步操作通常需要可取消,以提高响应能力和资源利用率。 支持通过使用 取消。您可以将 a 传递给生成元素的异步方法,并使用 定期检查是否取消。
public async IAsyncEnumerable<int> FetchDataAsync(CancellationToken cancellationToken = default)
{
for (int page = 1; page <= 3; page++)
{
cancellationToken.ThrowIfCancellationRequested();
var data = await GetDataFromApiAsync(page, cancellationToken);
foreach (var item in data)
{
yield return item;
}
}
}
public async Task<List<int>> GetDataFromApiAsync(int page, CancellationToken cancellationToken)
{
// Simulate fetching data from API asynchronously
await Task.Delay(100, cancellationToken);
// Return data for the specified page
return Enumerable.Range((page - 1) * 10 + 1, 10).ToList();
}
在这里,该方法接受一个参数并定期检查取消。
在一次处理一个元素效率低下的情况下,可以使用 实现缓冲和批处理技术。这涉及在产生元素之前缓冲一定数量的元素,或者根据指定的大小或时间间隔对元素进行批处理。
public async IAsyncEnumerable<List<int>> FetchDataBatchedAsync(int batchSize, CancellationToken cancellationToken = default)
{
List<int> batch = new List<int>();
for (int page = 1; page <= 3; page++)
{
cancellationToken.ThrowIfCancellationRequested();
var data = await GetDataFromApiAsync(page, cancellationToken);
batch.AddRange(data);
if (batch.Count >= batchSize)
{
yield return batch;
batch = new List<int>();
}
}
if (batch.Count > 0)
{
yield return batch;
}
}
在此代码中,该方法从 API 检索数据,并根据指定的元素生成成批的元素。
IAsyncEnumerable可以与并行处理技术结合使用,以提高吞吐量和性能。您可以使用异步方法(如 TPL Dataflow)或并行处理库(如 TPL Dataflow)来并发处理元素,同时保持序列的异步性质。
public async Task ProcessDataParallelAsync(CancellationToken cancellationToken = default)
{
var dataStream = FetchDataAsync(cancellationToken);
var tasks = new List<Task>();
await foreach (var batch in dataStream)
{
tasks.Add(ProcessBatchAsync(batch, cancellationToken));
}
await Task.WhenAll(tasks);
}
public async Task ProcessBatchAsync(List<int> batch, CancellationToken cancellationToken)
{
// Simulate processing batch asynchronously
await Task.Delay(100, cancellationToken);
foreach (var item in batch)
{
Console.WriteLine($"Processing item: {item}");
}
}
在此示例中,该方法使用 异步处理从 API 检索到的数据批次。
高级 LINQ 运算符和技术可以增强数据处理能力。这包括像 、 和 这样的运算符,以及使用方法链或查询语法组成复杂的查询表达式。
public async Task ProcessDataAsync(CancellationToken cancellationToken = default)
{
var dataStream = FetchDataAsync(cancellationToken)
.Where(x => x % 2 == 0)
.OrderByDescending(x => x);
await foreach (var item in dataStream)
{
Console.WriteLine($"Processed item: {item}");
}
}
在此示例中,该方法使用 LINQ 运算符异步处理从 API 检索的筛选和排序数据。
// Filter even numbers asynchronously
await foreach (var evenNumber in FetchDataAsync().WhereAsync(x => x % 2 == 0))
{
Console.WriteLine(evenNumber);
}
WhereAsync筛选 返回的异步数据流,使其仅包含偶数。循环异步迭代过滤后的序列,并将每个偶数打印到控制台。
// Project each number asynchronously to its square
await foreach (var squaredNumber in FetchDataAsync().SelectAsync(async x => await SquareAsync(x)))
{
Console.WriteLine(squaredNumber);
}
在这里,使用该方法将返回的异步数据流中的每个项目异步投影到其方块。该循环异步迭代投影序列,并将每个平方数字打印到控制台。
// Calculate the sum of all numbers asynchronously
var sum = await FetchDataAsync().SumAsync();
Console.WriteLine($"Sum: {sum}");
SumAsync计算 返回的异步数据流中所有数字的总和。异步等待结果,然后打印到控制台。
// Filter even numbers and project each number to its square asynchronously
await foreach (var squaredEvenNumber in FetchDataAsync()
.WhereAsync(x => x % 2 == 0)
.SelectAsync(async x => await SquareAsync(x)))
{
Console.WriteLine(squaredEvenNumber);
}
此示例演示如何合并 and 筛选偶数,并将每个偶数异步投影到其平方。
// Combine two asynchronous data streams and filter their union
await foreach (var number in FetchDataAsync()
.UnionAsync(FetchMoreDataAsync())
.WhereAsync(x => x % 3 == 0))
{
Console.WriteLine(number);
}
UnionAsync合并了 和 返回的两个异步数据流。然后,过滤两个流的并集,以仅包含可被 3 整除的数字。
// Group numbers by their remainder when divided by 3 asynchronously
await foreach (var group in FetchDataAsync()
.GroupByAsync(x => x % 3))
{
Console.WriteLine($"Group {group.Key}: {string.Join(", ", group)}");
}
GroupByAsync当异步除以 3 时,将数字按其余数分组。然后,在循环中迭代生成的组,并且每个组都与其键一起打印到控制台中。
// Select distinct numbers asynchronously
await foreach (var number in FetchDataAsync().DistinctAsync())
{
Console.WriteLine(number);
}
DistinctAsync从 返回的异步数据流中选择不同的数字。然后,在循环中迭代不同的数字,并打印到控制台。
IAsyncEnumerable并提供效率和响应能力。使用这些功能时,可以克服较慢的同步方法的限制。这意味着您的应用程序可以以更顺畅、更高效的方式处理数据。