在当今世界,应用程序常常需要处理大量数据或进行实时更新。无论是股票价格的流式传输、日志处理,还是用户生成的内容,设计一个响应迅速且高效的数据管道都至关重要。借助 C# 的异步流和 IAsyncEnumerable
,我们能够创建异步数据处理的无缝流程,同时保持出色的可读性和性能。
在本文中,我们将探讨异步流如何简化复杂的工作流程、实现数据处理管道以及应对现实世界中的各种挑战。
在 C# 8.0 中引入的异步流将异步编程的强大功能与可枚举集合相结合。它们使用 IAsyncEnumerable<T>
接口来允许对异步操作的集合进行迭代。
主要优点:
以下是一个简单示例:
async IAsyncEnumerable<int> GenerateNumbersAsync()
{
for (int i = 1; i <= 5; i++)
{
await Task.Delay(500); // 模拟异步工作
yield return i;
}
}
await foreach (var number in GenerateNumbersAsync())
{
Console.WriteLine(number);
}
输出:
1
2
3
4
5
让我们构建一个用于实时处理日志文件的数据管道。想象一下,有一个系统会生成日志条目,这些条目需要经过筛选、转换,然后保存到数据库中。以下展示了异步流如何简化这一工作流程:
步骤 1:生成日志(数据源) 第一步是创建一个异步日志生成器。
async IAsyncEnumerable<string> GenerateLogsAsync()
{
string[] logLevels = { "INFO", "WARN", "ERROR" };
for (int i = 1; i <= 10; i++)
{
await Task.Delay(300); // 模拟日志生成延迟
yield return $"{DateTime.UtcNow}: {logLevels[i % 3]} Log entry {i}";
}
}
步骤 2:筛选日志(处理过程) 让我们筛选出级别为“INFO”的日志。
async IAsyncEnumerable<string> FilterLogsAsync(IAsyncEnumerable<string> logs)
{
await foreach (var log in logs)
{
if (!log.Contains("INFO"))
{
yield return log;
}
}
}
步骤 3:转换日志(处理过程) 将日志条目转换为结构化格式。
record LogEntry(DateTime Timestamp, string Level, string Message);
async IAsyncEnumerable<LogEntry> TransformLogsAsync(IAsyncEnumerable<string> logs)
{
await foreach (var log in logs)
{
var parts = log.Split(' ', 3);
yield return new LogEntry(
DateTime.Parse(parts[0]),
parts[1].TrimEnd(':'),
parts[2]
);
}
}
步骤 4:保存日志(数据汇) 最后,将日志存储到数据库中。
async Task SaveLogsAsync(IAsyncEnumerable<LogEntry> logs)
{
await foreach (var log in logs)
{
Console.WriteLine($"Saving log to DB: {log}");
await Task.Delay(100); // 模拟数据库保存延迟
}
}
步骤 5:组装管道 将所有步骤整合在一起:
async Task ProcessLogsAsync()
{
var logs = GenerateLogsAsync();
var filteredLogs = FilterLogsAsync(logs);
var structuredLogs = TransformLogsAsync(filteredLogs);
await SaveLogsAsync(structuredLogs);
}
await ProcessLogsAsync();
输出:
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:00Z, Level = WARN, Message = Log entry 2 }
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:01Z, Level = ERROR, Message = Log entry 3 }
...
在管道的任何阶段都可能出现错误。在 await foreach
循环内部或生成器中使用 try-catch
块:
async IAsyncEnumerable<string> SafeGenerateLogsAsync()
{
try
{
for (int i = 1; i <= 5; i++)
{
if (i == 3) throw new Exception("Simulated error");
yield return $"Log {i}";
}
}
catch (Exception ex)
{
yield return $"Error: {ex.Message}";
}
}
在管道中处理错误:
await foreach (var log in SafeGenerateLogsAsync())
{
Console.WriteLine(log);
}
Channel
或 Parallel.ForEachAsync
来进行可控的并行处理。以下是一个流式传输和处理股票价格的示例:
async IAsyncEnumerable<(string Symbol, decimal Price)> FetchStockPricesAsync(string[] symbols)
{
var random = new Random();
foreach (var symbol in symbols)
{
await Task.Delay(500); // 模拟数据获取
yield return (symbol, random.Next(100, 200) + random.NextDecimal());
}
}
转换并筛选数据:
async IAsyncEnumerable<string> AnalyzeStockPricesAsync(IAsyncEnumerable<(string Symbol, decimal Price)> prices)
{
await foreach (var (symbol, price) in prices)
{
if (price > 150)
{
yield return $"{symbol}: {price} is above the threshold!";
}
}
}
异步流 IAsyncEnumerable
为构建响应迅速且可扩展的数据管道提供了一种优雅的解决方案。从日志处理到实时数据分析,其潜在应用非常广泛。通过将异步编程与可枚举集合相结合,它们简化了复杂的工作流程,减少了内存开销,并提升了性能。