你的 .NET 代码可能整洁、异步且完全可测试。
但它能每分钟处理 50,000 条消息... 而不会崩溃吗?
大多数开发者会使用:
foreach (var item in items) await ProcessAsync(item)
Task.WhenAll(...)
甚至原生的 Task.Run
这确实有效 —— 直到失效为止。
微软多年前就通过创建一个生产级的管道库解决了这个问题,它能处理: ✅ 并行处理 ✅ 节流 ✅ 反压 (Backpressure) ✅ 重试 ✅ 有界队列 (Bounded queues) ✅ 优雅关闭 (Graceful shutdown)
它叫做 System.Threading.Tasks.Dataflow。
这篇博客将向你展示这个库为何存在,它解决了什么问题,以及如何像微软那样使用它——没有废话,只有经过生产验证的指导。
我们曾在一个每秒处理数千条消息的 Azure Functions IoT 解决方案中使用过 TPL Dataflow —— 对于高吞吐量、实时工作负载来说,它极其有效。
缩放图片将会显示
🤔 TPL Dataflow 为何存在? 来自微软官方文档:
“Dataflow 库使你能够在数据可用时立即处理它们,使用支持并发、有界容量和异步处理的管道。”
像 Task.Run
、Parallel.ForEach
,甚至 Channel<T>
这样的传统构造是:
Dataflow 是微软为在 .NET 中构建持久、高吞吐量管道提供的解决方案 —— 尤其是在以下场景:
🛠️ 它解决的问题
🧱 什么是 TPL Dataflow?
System.Threading.Tasks.Dataflow
是一个微软提供的 .NET 库,专为以下目的设计:
✅ 并行处理 ✅ 反压 (Backpressure) ✅ 节流 (Throttling) ✅ 有界容量 (Bounded capacity) ✅ 流处理 (Stream processing)
它将你的工作流建模为一个由块 (blocks) 组成的图 (graph) —— 每个块处理一个处理阶段,并内置了异步和重试语义。
📦 安装:
dotnet add package System.Threading.Tasks.Dataflow
🧱 核心构建块(附微软官方示例)
1️⃣ BufferBlock<T>
– 异步队列
充当消息的暂存队列。
```csharp
var buffer = new BufferBlock<string>();
await buffer.SendAsync("msg1");
```
✅ 用于解耦生产者与消费者
✅ 支持反压
2️⃣ TransformBlock<TInput, TOutput>
– 并行异步处理
并行应用异步逻辑。
```csharp
var transform = new TransformBlock<string, string>(
async input => {
await Task.Delay(100); // 模拟 I/O
return input.ToUpper();
},
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 8,
BoundedCapacity = 100
});
```
✅ 非常适合 API 调用、解析、数据丰富 (enrichment)
3️⃣ ActionBlock<T>
– 终端接收器 (Terminal Sink)
消费消息但不发出结果。
```csharp
var action = new ActionBlock<string>(
async msg => {
await SaveToDatabaseAsync(msg);
},
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 4,
BoundedCapacity = 50
});
```
✅ 用于持久化、网络调用、日志记录
4️⃣ BatchBlock<T>
– 批处理以提高吞吐量
将输入分组为批次。
```csharp
var batch = new BatchBlock<MyDto>(batchSize: 10);
```
✅ 在进行批量插入或 API 调用之前非常有用
5️⃣ BroadcastBlock<T>
– 发布到多个目标
csharp var broadcast = new BroadcastBlock<string>(msg => msg);
✅ 用于扇出 (fan-out) 模式 —— 例如,同时记录和处理
6️⃣ JoinBlock<T1, T2>
– 合并流
csharp var join = new JoinBlock<string, int>();
在所有输入可用时,将多个流组合成元组 (tuples)。
🧪 使用 Dataflow 的真实世界管道 这些不仅仅是假设。微软自身就使用这些模式。
示例 1:文件处理管道
var read = new TransformBlock<string, string>(
async path => await File.ReadAllTextAsync(path));
var enrich = new TransformBlock<string, string>(
async content => await EnrichWithAIAsync(content));
var save = new ActionBlock<string>(
async enriched => await SaveToBlobAsync(enriched));
read.LinkTo(enrich, new DataflowLinkOptions { PropagateCompletion = true });
enrich.LinkTo(save, new DataflowLinkOptions { PropagateCompletion = true });
// 启动流程
read.Post("file1.txt");
read.Post("file2.txt");
read.Complete(); // 确保管道优雅关闭
await save.Completion;
💡 示例 2:使用 Polly 的 API 聚合器
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, i => TimeSpan.FromMilliseconds(200));
var enrichBlock = new TransformBlock<InputDto, OutputDto>(
async input => await retryPolicy.ExecuteAsync(async () =>
{
var response = await httpClient.GetAsync($"https://api.com/data/{input.Id}");
var data = await response.Content.ReadAsStringAsync();
return new OutputDto
{
Id = input.Id,
Data = data
};
}));
💡 示例 3:用于并发消息处理的后台工作者
var messageProcessor = new ActionBlock<Message>(
async message =>
{
await HandleAsync(message); // 你的自定义异步处理逻辑
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10, // 并行处理最多 10 条消息
BoundedCapacity = 100 // 通过限制缓冲区大小防止内存压力
});
💡 示例 4:Azure Function 中的真实生产管道 用例:
public class ProcessMessagePipeline
{
private readonly ActionBlock<ServiceBusReceivedMessage> _entryBlock;
private readonly TransformBlock<ServiceBusReceivedMessage, MyDto> _transformBlock;
private readonly ActionBlock<MyDto> _saveBlock;
public ProcessMessagePipeline()
{
_transformBlock = new TransformBlock<ServiceBusReceivedMessage, MyDto>(
async message => await TransformMessageAsync(message),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 16,
BoundedCapacity = 200
});
_saveBlock = new ActionBlock<MyDto>(
async dto => await SaveToCosmosAsync(dto),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 8,
BoundedCapacity = 100
});
_transformBlock.LinkTo(_saveBlock, new DataflowLinkOptions { PropagateCompletion = true });
_entryBlock = new ActionBlock<ServiceBusReceivedMessage>(
async message => await _transformBlock.SendAsync(message),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1000
});
}
public async Task ProcessAsync(ServiceBusReceivedMessage message)
{
await _entryBlock.SendAsync(message);
}
public void Complete() => _entryBlock.Complete();
public async Task Completion => await _saveBlock.Completion;
private async Task<MyDto> TransformMessageAsync(ServiceBusReceivedMessage message)
{
var dto = JsonSerializer.Deserialize<MyDto>(message.Body);
dto.Enriched = await CallExternalApiAsync(dto);
return dto;
}
private async Task SaveToCosmosAsync(MyDto dto)
{
await Task.Yield(); // 如果你的方法不是 CPU 密集型的,请移除此行
await CosmosRepository.SaveAsync(dto);
}
}
🧭 示例 4 — 可视化管道架构
[Azure 服务总线触发器]
|
[ActionBlock: 接收消息]
|
[TransformBlock: 反序列化 + 丰富]
|
[ActionBlock: 保存到 Cosmos DB]
📉 何时不应使用 TPL Dataflow
foreach
可能就够了)。Actor
模型或状态机)。📈 在 Azure 中监控你的管道(微软推荐)
“使用块属性来检查吞吐量、队列长度和完成状态。” — TPL 文档
✅ 跟踪:
InputCount
, OutputCount
Completion.IsCompleted
BoundedCapacity
跟踪队列深度✅ 记录指标:
_telemetry.TrackMetric("BlockQueueLength", block.InputCount);
✅ 使用 TrySendAsync()
配合回退机制来检测溢出。
🧯 关闭管道而不丢失数据(也能安心睡觉) 在关闭时:
pipeline.Complete();
await pipeline.Completion;
“这确保所有处理中的消息都能优雅地完成。” — 微软文档
⚖️ 与其他并发工具的比较
| 工具/构造 | 主要优点 | 主要缺点(对比 Dataflow) | 适用场景 |
| :-------------------- | :-------------------------------------------- | :-------------------------------------------------- | :------------------------------------------- |
| TPL Dataflow | 内置反压、节流、有界容量、块链接、优雅关闭 | 学习曲线稍陡峭,配置选项多 | 复杂、高吞吐、多阶段异步数据流管道 |
| Task.Run
+ async
| 简单,易于理解 | 手动管理并发度、无内置反压、队列失控风险高 | 简单后台任务,少量并发 |
| Parallel.For/ForEach
| CPU 密集型并行计算高效 | 不适用于异步 I/O、阻塞调用线程、无反压 | CPU 密集型循环操作 |
| Channel<T>
| 高效的生产者/消费者队列,比 BlockingCollection
更优 | 仅提供队列,不提供处理逻辑、并行度、节流或链接 | 需要解耦生产者/消费者的简单场景,作为基础块 |
| Reactive Extensions (Rx) | 强大的事件流组合、时间处理、错误处理 | 学习曲线陡峭,更偏向事件处理而非数据处理管道 | 事件驱动编程、复杂的事件流组合与转换 |
| Azure Durable Functions | 无服务器编排、状态管理、内置可靠性(重试、超时) | 运行在无服务器环境可能有冷启动、状态管理开销 | 跨函数编排、有状态工作流、无服务器环境 |
🧠 要点 如果你正在构建:
...而你没有使用 System.Threading.Tasks.Dataflow
,你可能: