ActionBlock、TransformBlock 揭秘:微软流处理管道的核心组件与秘密武器

作者:微信公众号:【架构师老卢】
7-30 7:54
22

你的 .NET 代码可能整洁、异步且完全可测试。

但它能每分钟处理 50,000 条消息... 而不会崩溃吗?

大多数开发者会使用:

foreach (var item in items) await ProcessAsync(item)
Task.WhenAll(...)

甚至原生的 Task.Run

这确实有效 —— 直到失效为止。

微软多年前就通过创建一个生产级的管道库解决了这个问题,它能处理: ✅ 并行处理 ✅ 节流 ✅ 反压 (Backpressure) ✅ 重试 ✅ 有界队列 (Bounded queues) ✅ 优雅关闭 (Graceful shutdown)

它叫做 System.Threading.Tasks.Dataflow

这篇博客将向你展示这个库为何存在,它解决了什么问题,以及如何像微软那样使用它——没有废话,只有经过生产验证的指导。

我们曾在一个每秒处理数千条消息的 Azure Functions IoT 解决方案中使用过 TPL Dataflow —— 对于高吞吐量、实时工作负载来说,它极其有效。

缩放图片将会显示

🤔 TPL Dataflow 为何存在? 来自微软官方文档:

“Dataflow 库使你能够在数据可用时立即处理它们,使用支持并发、有界容量和异步处理的管道。”

Task.RunParallel.ForEach,甚至 Channel<T> 这样的传统构造是:

  • 低级的
  • 不处理节流
  • 没有原生的反压机制
  • 与异步 I/O 不能很好地扩展

Dataflow 是微软为在 .NET 中构建持久、高吞吐量管道提供的解决方案 —— 尤其是在以下场景:

  • 消息持续到达
  • 每个项目需要异步转换
  • 你需要故障隔离、重试和批处理

🛠️ 它解决的问题

🧱 什么是 TPL Dataflow? System.Threading.Tasks.Dataflow 是一个微软提供的 .NET 库,专为以下目的设计:

✅ 并行处理 ✅ 反压 (Backpressure) ✅ 节流 (Throttling) ✅ 有界容量 (Bounded capacity) ✅ 流处理 (Stream processing)

它将你的工作流建模为一个由块 (blocks) 组成的图 (graph) —— 每个块处理一个处理阶段,并内置了异步和重试语义。

📦 安装:

dotnet add package System.Threading.Tasks.Dataflow

🧱 核心构建块(附微软官方示例)

1️⃣ BufferBlock<T> – 异步队列 充当消息的暂存队列。

```csharp
var buffer = new BufferBlock<string>();
await buffer.SendAsync("msg1");
```
✅ 用于解耦生产者与消费者
✅ 支持反压

2️⃣ TransformBlock<TInput, TOutput> – 并行异步处理 并行应用异步逻辑。

```csharp
var transform = new TransformBlock<string, string>(
    async input => {
        await Task.Delay(100); // 模拟 I/O
        return input.ToUpper();
    },
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = 8,
        BoundedCapacity = 100
    });
```
✅ 非常适合 API 调用、解析、数据丰富 (enrichment)

3️⃣ ActionBlock<T> – 终端接收器 (Terminal Sink) 消费消息但不发出结果。

```csharp
var action = new ActionBlock<string>(
    async msg => {
        await SaveToDatabaseAsync(msg);
    },
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = 4,
        BoundedCapacity = 50
    });
```
✅ 用于持久化、网络调用、日志记录

4️⃣ BatchBlock<T> – 批处理以提高吞吐量 将输入分组为批次。

```csharp
var batch = new BatchBlock<MyDto>(batchSize: 10);
```
✅ 在进行批量插入或 API 调用之前非常有用

5️⃣ BroadcastBlock<T> – 发布到多个目标 csharp var broadcast = new BroadcastBlock<string>(msg => msg); ✅ 用于扇出 (fan-out) 模式 —— 例如,同时记录和处理

6️⃣ JoinBlock<T1, T2> – 合并流 csharp var join = new JoinBlock<string, int>(); 在所有输入可用时,将多个流组合成元组 (tuples)。

🧪 使用 Dataflow 的真实世界管道 这些不仅仅是假设。微软自身就使用这些模式。

示例 1:文件处理管道

var read = new TransformBlock<string, string>(
    async path => await File.ReadAllTextAsync(path));

var enrich = new TransformBlock<string, string>(
    async content => await EnrichWithAIAsync(content));

var save = new ActionBlock<string>(
    async enriched => await SaveToBlobAsync(enriched));

read.LinkTo(enrich, new DataflowLinkOptions { PropagateCompletion = true });
enrich.LinkTo(save, new DataflowLinkOptions { PropagateCompletion = true });

// 启动流程
read.Post("file1.txt");
read.Post("file2.txt");
read.Complete(); // 确保管道优雅关闭
await save.Completion;

💡 示例 2:使用 Polly 的 API 聚合器

var retryPolicy = Policy
    .Handle<Exception>()
    .WaitAndRetryAsync(3, i => TimeSpan.FromMilliseconds(200));

var enrichBlock = new TransformBlock<InputDto, OutputDto>(
    async input => await retryPolicy.ExecuteAsync(async () =>
    {
        var response = await httpClient.GetAsync($"https://api.com/data/{input.Id}");
        var data = await response.Content.ReadAsStringAsync();
        return new OutputDto
        {
            Id = input.Id,
            Data = data
        };
    }));

💡 示例 3:用于并发消息处理的后台工作者

var messageProcessor = new ActionBlock<Message>(
    async message =>
    {
        await HandleAsync(message); // 你的自定义异步处理逻辑
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 10,  // 并行处理最多 10 条消息
        BoundedCapacity = 100         // 通过限制缓冲区大小防止内存压力
    });

💡 示例 4:Azure Function 中的真实生产管道 用例:

  • 触发器:Azure 服务总线 (Azure Service Bus)
  • 步骤:反序列化 → 丰富 (API 调用) → 保存到 Cosmos DB
  • 性能:负载下每分钟 10K+ 条消息
public class ProcessMessagePipeline
{
    private readonly ActionBlock<ServiceBusReceivedMessage> _entryBlock;
    private readonly TransformBlock<ServiceBusReceivedMessage, MyDto> _transformBlock;
    private readonly ActionBlock<MyDto> _saveBlock;

    public ProcessMessagePipeline()
    {
        _transformBlock = new TransformBlock<ServiceBusReceivedMessage, MyDto>(
            async message => await TransformMessageAsync(message),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 16,
                BoundedCapacity = 200
            });

        _saveBlock = new ActionBlock<MyDto>(
            async dto => await SaveToCosmosAsync(dto),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 100
            });

        _transformBlock.LinkTo(_saveBlock, new DataflowLinkOptions { PropagateCompletion = true });

        _entryBlock = new ActionBlock<ServiceBusReceivedMessage>(
            async message => await _transformBlock.SendAsync(message),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1000
            });
    }

    public async Task ProcessAsync(ServiceBusReceivedMessage message)
    {
        await _entryBlock.SendAsync(message);
    }

    public void Complete() => _entryBlock.Complete();

    public async Task Completion => await _saveBlock.Completion;

    private async Task<MyDto> TransformMessageAsync(ServiceBusReceivedMessage message)
    {
        var dto = JsonSerializer.Deserialize<MyDto>(message.Body);
        dto.Enriched = await CallExternalApiAsync(dto);
        return dto;
    }

    private async Task SaveToCosmosAsync(MyDto dto)
    {
        await Task.Yield(); // 如果你的方法不是 CPU 密集型的,请移除此行
        await CosmosRepository.SaveAsync(dto);
    }
}

🧭 示例 4 — 可视化管道架构

[Azure 服务总线触发器]
            |
     [ActionBlock: 接收消息]
            |
[TransformBlock: 反序列化 + 丰富]
            |
 [ActionBlock: 保存到 Cosmos DB]

📉 何时不应使用 TPL Dataflow

  • 对于极其简单的顺序处理(foreach 可能就够了)。
  • 当你的处理步骤完全是同步且轻量级时(并行循环可能更简单)。
  • 当工作流不是基于消息/数据流,而是复杂的、有状态的协调任务时(考虑 Actor 模型或状态机)。
  • 当需要跨机器分布处理时(考虑分布式流处理框架如 Azure Stream Analytics, Kafka Streams, Spark Streaming)。

📈 在 Azure 中监控你的管道(微软推荐)

“使用块属性来检查吞吐量、队列长度和完成状态。” — TPL 文档

跟踪:

  • InputCount, OutputCount
  • Completion.IsCompleted
  • 通过 BoundedCapacity 跟踪队列深度

记录指标:

_telemetry.TrackMetric("BlockQueueLength", block.InputCount);

✅ 使用 TrySendAsync() 配合回退机制来检测溢出。

🧯 关闭管道而不丢失数据(也能安心睡觉) 在关闭时:

pipeline.Complete();
await pipeline.Completion;

“这确保所有处理中的消息都能优雅地完成。” — 微软文档

⚖️ 与其他并发工具的比较

| 工具/构造 | 主要优点 | 主要缺点(对比 Dataflow) | 适用场景 | | :-------------------- | :-------------------------------------------- | :-------------------------------------------------- | :------------------------------------------- | | TPL Dataflow | 内置反压、节流、有界容量、块链接、优雅关闭 | 学习曲线稍陡峭,配置选项多 | 复杂、高吞吐、多阶段异步数据流管道 | | Task.Run + async | 简单,易于理解 | 手动管理并发度、无内置反压、队列失控风险高 | 简单后台任务,少量并发 | | Parallel.For/ForEach| CPU 密集型并行计算高效 | 不适用于异步 I/O、阻塞调用线程、无反压 | CPU 密集型循环操作 | | Channel<T> | 高效的生产者/消费者队列,比 BlockingCollection 更优 | 仅提供队列,不提供处理逻辑、并行度、节流或链接 | 需要解耦生产者/消费者的简单场景,作为基础块 | | Reactive Extensions (Rx) | 强大的事件流组合、时间处理、错误处理 | 学习曲线陡峭,更偏向事件处理而非数据处理管道 | 事件驱动编程、复杂的事件流组合与转换 | | Azure Durable Functions | 无服务器编排、状态管理、内置可靠性(重试、超时) | 运行在无服务器环境可能有冷启动、状态管理开销 | 跨函数编排、有状态工作流、无服务器环境 |

🧠 要点 如果你正在构建:

  • ETL 处理器
  • 消息队列
  • 文件或批处理作业
  • 异步 API 管道
  • 并行代理 (agents)

...而你没有使用 System.Threading.Tasks.Dataflow,你可能:

  • 编写了比实际需要更多的代码
  • 错失了内置的节流功能
  • 正在与无界队列 (unbounded queues) 引发的 Bug 作斗争
相关留言评论
昵称:
邮箱:
阅读排行