C# 中的异步流与数据处理管道

作者:微信公众号:【架构师老卢】
11-24 20:28
60

在当今世界,应用程序常常需要处理大量数据或进行实时更新。无论是股票价格的流式传输、日志处理,还是用户生成的内容,设计一个响应迅速且高效的数据管道都至关重要。借助 C# 的异步流和 IAsyncEnumerable,我们能够创建异步数据处理的无缝流程,同时保持出色的可读性和性能。

在本文中,我们将探讨异步流如何简化复杂的工作流程、实现数据处理管道以及应对现实世界中的各种挑战。

什么是异步流?

在 C# 8.0 中引入的异步流将异步编程的强大功能与可枚举集合相结合。它们使用 IAsyncEnumerable<T> 接口来允许对异步操作的集合进行迭代。

主要优点

  • 异步执行:减少诸如 I/O 或网络调用等高延迟操作中的阻塞情况。
  • 惰性求值:仅在需要时处理数据,从而减少内存开销。
  • 简化代码:无需回调或事件驱动模型。

以下是一个简单示例:

async IAsyncEnumerable<int> GenerateNumbersAsync()
{
    for (int i = 1; i <= 5; i++)
    {
        await Task.Delay(500); // 模拟异步工作
        yield return i;
    }
}

await foreach (var number in GenerateNumbersAsync())
{
    Console.WriteLine(number);
}

输出

1
2
3
4
5

何时应该使用异步流?

  • 处理大型数据集(例如读取日志或文件流)。
  • 处理实时数据(例如物联网、实时推送)。
  • 管理包含长时间运行操作的工作流程。

构建数据处理管道

让我们构建一个用于实时处理日志文件的数据管道。想象一下,有一个系统会生成日志条目,这些条目需要经过筛选、转换,然后保存到数据库中。以下展示了异步流如何简化这一工作流程:

步骤 1:生成日志(数据源) 第一步是创建一个异步日志生成器。

async IAsyncEnumerable<string> GenerateLogsAsync()
{
    string[] logLevels = { "INFO", "WARN", "ERROR" };
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(300); // 模拟日志生成延迟
        yield return $"{DateTime.UtcNow}: {logLevels[i % 3]} Log entry {i}";
    }
}

步骤 2:筛选日志(处理过程) 让我们筛选出级别为“INFO”的日志。

async IAsyncEnumerable<string> FilterLogsAsync(IAsyncEnumerable<string> logs)
{
    await foreach (var log in logs)
    {
        if (!log.Contains("INFO"))
        {
            yield return log;
        }
    }
}

步骤 3:转换日志(处理过程) 将日志条目转换为结构化格式。

record LogEntry(DateTime Timestamp, string Level, string Message);

async IAsyncEnumerable<LogEntry> TransformLogsAsync(IAsyncEnumerable<string> logs)
{
    await foreach (var log in logs)
    {
        var parts = log.Split(' ', 3);
        yield return new LogEntry(
            DateTime.Parse(parts[0]),
            parts[1].TrimEnd(':'),
            parts[2]
        );
    }
}

步骤 4:保存日志(数据汇) 最后,将日志存储到数据库中。

async Task SaveLogsAsync(IAsyncEnumerable<LogEntry> logs)
{
    await foreach (var log in logs)
    {
        Console.WriteLine($"Saving log to DB: {log}");
        await Task.Delay(100); // 模拟数据库保存延迟
    }
}

步骤 5:组装管道 将所有步骤整合在一起:

async Task ProcessLogsAsync()
{
    var logs = GenerateLogsAsync();
    var filteredLogs = FilterLogsAsync(logs);
    var structuredLogs = TransformLogsAsync(filteredLogs);
    await SaveLogsAsync(structuredLogs);
}

await ProcessLogsAsync();

输出

Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:00Z, Level = WARN, Message = Log entry 2 }
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:01Z, Level = ERROR, Message = Log entry 3 }
...

异步流中的错误处理

在管道的任何阶段都可能出现错误。在 await foreach 循环内部或生成器中使用 try-catch 块:

async IAsyncEnumerable<string> SafeGenerateLogsAsync()
{
    try
    {
        for (int i = 1; i <= 5; i++)
        {
            if (i == 3) throw new Exception("Simulated error");
            yield return $"Log {i}";
        }
    }
    catch (Exception ex)
    {
        yield return $"Error: {ex.Message}";
    }
}

在管道中处理错误:

await foreach (var log in SafeGenerateLogsAsync())
{
    Console.WriteLine(log);
}

异步流的最佳实践

  • 最小化延迟:在每个步骤中使用异步操作以防止出现瓶颈。
  • 限制并发:使用 ChannelParallel.ForEachAsync 来进行可控的并行处理。
  • 资源管理:妥善处置文件句柄或数据库连接等资源。
  • 避免过载:不要获取或处理超出必要范围的数据。

现实世界用例:股票价格流式传输

以下是一个流式传输和处理股票价格的示例:

async IAsyncEnumerable<(string Symbol, decimal Price)> FetchStockPricesAsync(string[] symbols)
{
    var random = new Random();
    foreach (var symbol in symbols)
    {
        await Task.Delay(500); // 模拟数据获取
        yield return (symbol, random.Next(100, 200) + random.NextDecimal());
    }
}

转换并筛选数据:

async IAsyncEnumerable<string> AnalyzeStockPricesAsync(IAsyncEnumerable<(string Symbol, decimal Price)> prices)
{
    await foreach (var (symbol, price) in prices)
    {
        if (price > 150)
        {
            yield return $"{symbol}: {price} is above the threshold!";
        }
    }
}

异步流 IAsyncEnumerable 为构建响应迅速且可扩展的数据管道提供了一种优雅的解决方案。从日志处理到实时数据分析,其潜在应用非常广泛。通过将异步编程与可枚举集合相结合,它们简化了复杂的工作流程,减少了内存开销,并提升了性能。

相关留言评论
昵称:
邮箱:
阅读排行