15. TPL Dataflow 流水线编程
本章 GitHub 仓库:csharp-concurrency-cookbook ⭐
欢迎 Star 和 Fork!所有代码示例都可以在仓库中找到并运行。
一、本章导读
本文目标:从零构建生产级 Dataflow 流水线,掌握完整的设计思路、核心原理和最佳实践。
我先问你一个问题:如果让你处理 10 万张图片(下载 → 压缩 → 水印 → 上传),你会怎么写?
很多人的第一反应是:
// 方案1:串行处理 —— 太慢了
foreach (var url in imageUrls)
{
var data = await DownloadAsync(url);
var compressed = await CompressAsync(data);
var watermarked = await AddWatermarkAsync(compressed);
await UploadAsync(watermarked);
}
// 方案2:全部并行 —— 炸了
await Task.WhenAll(imageUrls.Select(async url =>
{
// ... 10万个Task同时跑,内存几十GB,系统直接崩溃
}));
方案 1 太慢,10 万张图片可能要跑几天。
方案 2 太猛,瞬间把内存撑爆,系统直接 OOM。
那怎么办?你可能想到用 SemaphoreSlim 或 Channel 来限流:
var sem = new SemaphoreSlim(10); // 限制并发数
await Task.WhenAll(imageUrls.Select(async url =>
{
await sem.WaitAsync();
try
{
// ... 处理逻辑
}
finally
{
sem.Release();
}
}));
这确实能跑,但有几个问题:
- 各阶段混在一起:下载、压缩、上传的并发度应该不同(I/O 密集 vs CPU 密集),但现在是统一控制
- 缺乏背压机制:如果下载很快但上传很慢,中间数据会堆积在内存里
- 错误处理复杂:某个文件失败了要记录日志、重试、还要继续处理其他文件
- 代码难维护:加个新阶段(比如”人脸识别”),代码要大改
二、Dataflow 的核心思想
2.1 流水线的本质
你去过工厂吗?现代工厂的生产线是这样的:
[原料] → [工位1:切割] → [工位2:打磨] → [工位3:组装] → [工位4:包装] → [成品]
↓ 3人并行 ↓ 5人并行 ↓ 2人并行 ↓ 1人
每个工位的特征:
- 独立负责一道工序:切割的人只管切割,不管打磨
- 有自己的工人数量(并行度):打磨工序慢,多配5个人
- 有中间缓冲区(队列):前道工序快了,产品在这里排队等待
- 出问题只影响本工位:组装工位停机,不影响切割继续生产
这样做的好处是什么?
| 对比维度 | 串行处理 | 流水线处理 |
|---|---|---|
| 效率 | 处理完一个产品才能开始下一个 | 同时处理多个产品(不同阶段) |
| 并行度 | 无法针对不同工序调整 | 慢工序多配人,快工序少配人 |
| 故障影响 | 一个环节出问题,全线停工 | 只影响本工位,其他继续运行 |
| 资源利用 | CPU和I/O经常空闲 | CPU和I/O同时工作 |
TPL Dataflow 把这套思想搬到代码里:
graph LR A[数据源] –> B[BlockA<br/>并行度:3<br/>缓冲:10个] B –> C[BlockB<br/>并行度:5<br/>缓冲:20个] C –> D[BlockC<br/>并行度:2<br/>缓冲:5个] D –> E[结果] style B fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style C fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff style D fill:#f57c00,stroke:#e65100,stroke-width:2px,color:#fff
每个 Block(块) 就是一个工位,它:
- 接收上游数据(或主动拉取)→ 相当于从上一道工序拿产品
- 执行自己的逻辑(同步或异步)→ 相当于本工位的加工操作
- 把结果传给下游(或丢弃)→ 相当于把成品放到下一道工序的缓冲区
2.2 核心Block类型
相关类在System.Threading.Tasks.Dataflow命名空间里面,系统自带,无须额外nuget包。
| Block 类型 | 功能 | 典型场景 |
|---|---|---|
| ActionBlock | 消费数据,无输出 | 日志记录、数据库写入 |
| TransformBlock<TIn, TOut> | 转换数据(1→1) | 图片压缩、数据清洗 |
| TransformManyBlock<TIn, TOut> | 转换数据(1→多) | 拆分大文件、展开嵌套数据 |
| BatchBlock | 批量打包(多→1) | 批量入库、日志聚合 |
| BroadcastBlock | 广播数据(1→多份) | 缓存+持久化双写 |
| BufferBlock | 缓冲队列 | 解耦生产者和消费者 |
2.3 关键配置参数
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4, // 并行度(同时处理4条数据)
BoundedCapacity = 10, // 缓冲容量(最多排队10条)
CancellationToken = cts.Token // 取消令牌
};
var block = new TransformBlock<string, byte[]>(
async url => await DownloadAsync(url),
options
);
为什么 Dataflow 比手写流水线优雅?
| 手写流水线要自己做的事 | Dataflow 内置支持 |
|---|---|
| 手动管理队列(Channel、Queue) | 每个块自带缓冲队列 |
| 手动控制并行度(SemaphoreSlim) | MaxDegreeOfParallelism 配置 |
| 手动实现背压(队列满了要等待) | BoundedCapacity 自动节流 |
| 手动传播完成状态(上游结束了通知下游) | PropagateCompletion 自动传播 |
| 手动处理异常(某个任务失败了要记录) | 每个块独立异常处理 |
| 手动组装流水线(代码像意大利面条) | 声明式 LinkTo 连接 |
三、何时使用 Dataflow?
适合的场景
| 场景 | 为什么选 Dataflow |
|---|---|
| 多阶段数据处理 | 每个阶段独立配置并行度和缓冲 |
| I/O 和 CPU 混合任务 | 下载(I/O密集)并行20,压缩(CPU密集)并行4 |
| 需要背压控制 | 自动节流,避免内存爆炸 |
| 复杂数据流向 | 支持分支、合并、过滤、广播 |
| 长时间运行的流水线 | 可观测性强(监控、统计、故障隔离) |
不适合的场景
| 场景 | 为什么不选 Dataflow | 推荐方案 |
|---|---|---|
| 简单并行任务 | Dataflow 有学习成本和性能开销 | Parallel.ForEachAsync |
| 实时事件处理 | Dataflow 不是事件驱动架构 | System.Reactive (Rx) |
| 需要严格顺序 | 默认是并行的,手动保证顺序很麻烦 | Channel 或串行处理 |
对比其他方案
// 方案1:Parallel.ForEachAsync —— 简单并行任务首选
await Parallel.ForEachAsync(urls, new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (url, ct) => await ProcessAsync(url));
// 方案2:Channel<T> —— 单生产者/单消费者场景
var channel = Channel.CreateBounded<string>(10);
var producer = Task.Run(async () => { /* ... */ });
var consumer = Task.Run(async () => { /* ... */ });
// 方案3:Dataflow —— 多阶段流水线
var pipeline = CreatePipeline(); // 本文重点!
await pipeline.SendAsync(data);
四、实战:构建生产级图片处理流水线
完整代码位置:
Dataflow/Production/ProductionImagePipeline.cs
4.1 架构总览
我们要实现一个真实生产环境可用的图片处理流水线:
graph TB A[输入: 图片URL] –> B[阶段1: 下载<br/>I/O密集, 并行20<br/>支持重试] B –> C[阶段2: 压缩<br/>CPU密集, 并行4<br/>背压控制] C –> D[阶段3: 水印<br/>CPU密集, 并行4] D –> E[阶段4: 上传<br/>I/O密集, 并行10<br/>故障隔离] E –> F[阶段5: 日志<br/>串行, 落盘] F –> G[完成] H[监控线程] -.定期巡检.-> B H -.定期巡检.-> C H -.定期巡检.-> D H -.定期巡检.-> E style B fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style C fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff style D fill:#f57c00,stroke:#e65100,stroke-width:2px,color:#fff style E fill:#d32f2f,stroke:#b71c1c,stroke-width:2px,color:#fff style F fill:#7b1fa2,stroke:#4a148c,stroke-width:2px,color:#fff style H fill:#455a64,stroke:#263238,stroke-width:2px,color:#fff
设计要点:
- 故障隔离:某张图片失败了,不影响其他图片继续处理
- 重试机制:下载/上传支持指数退避重试
- 监控能力:实时统计成功率、吞吐量、队列积压
- 优雅关闭:取消时等待当前任务完成,不丢数据
4.2 核心代码实现
️ 生产级代码的三大要求:
- 配置外部化:
ExecutionDataflowBlockOptions应由调用方注入(从配置文件读取)- 全链路取消:
CancellationToken必须传递到每个 Block- 可观测性:暴露 Block 的公共方法(
InputCount、Completion等)供监控使用
Step 0: 配置化设计(生产环境必备)
/// <summary>
/// 流水线配置选项(由调用方通过配置文件注入)
/// </summary>
public class PipelineOptions
{
/// <summary>下载块配置(I/O 密集,默认高并行)</summary>
public ExecutionDataflowBlockOptions? DownloadOptions { get; set; }
/// <summary>压缩块配置(CPU 密集,默认=核心数)</summary>
public ExecutionDataflowBlockOptions? CompressOptions { get; set; }
/// <summary>上传块配置(I/O 密集,默认高并行)</summary>
public ExecutionDataflowBlockOptions? UploadOptions { get; set; }
/// <summary>HTTP 超时时间(默认30秒)</summary>
public TimeSpan HttpTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>上传重试次数(默认3次)</summary>
public int MaxUploadRetries { get; set; } = 3;
/// <summary>监控报告间隔(默认5秒)</summary>
public TimeSpan MonitorInterval { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>获取默认配置</summary>
public static PipelineOptions Default => new()
{
DownloadOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10,
BoundedCapacity = 20
},
// ... 其他配置
};
}
// 构造函数支持配置注入
public ProductionImagePipeline(
ILogger<ProductionImagePipeline> logger,
PipelineOptions? options = null) // 如果为 null 则使用默认配置
{
_options = options ?? PipelineOptions.Default;
// ...
}
为什么要配置外部化?
| 硬编码配置 | 外部化配置 |
|---|---|
| 不同环境(开发/测试/生产)参数相同 | 根据环境调整(生产环境并行度更高) |
| 修改参数需要重新编译、发布 | 修改配置文件即可,无需重新部署 |
| 无法动态调优性能 | 可以根据监控数据调整配置 |
生产环境使用示例:
// appsettings.json
{
"Pipeline": {
"DownloadParallelism": 20,
"CompressParallelism": 8,
"HttpTimeoutSeconds": 60
}
}
// Startup.cs
var config = Configuration.GetSection("Pipeline");
var options = new PipelineOptions
{
DownloadOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = config.GetValue<int>("DownloadParallelism"),
BoundedCapacity = 50
},
HttpTimeout = TimeSpan.FromSeconds(config.GetValue<int>("HttpTimeoutSeconds"))
};
var pipeline = new ProductionImagePipeline(logger, options);
Step 1: 定义数据结构
// 流水线中流转的数据(携带状态,而不是抛异常)
public record ProcessResult
{
public string ImageUrl { get; init; } = "";
public byte[]? ImageData { get; set; }
public byte[]? CompressedData { get; set; }
public byte[]? WatermarkedData { get; set; }
public string? UploadedUrl { get; set; }
public bool IsSuccess { get; set; } = true;
public string? ErrorMessage { get; set; }
public int RetryCount { get; set; }
public ProcessResult WithError(string error)
{
IsSuccess = false;
ErrorMessage = error;
return this;
}
}
为什么用 ProcessResult 而不是抛异常?
| 方案 | 优点 | 缺点 |
|---|---|---|
| 抛异常 | 代码简洁 | 一个异常会让整个 Block 停止,影响其他数据 |
| 返回结果对象 | 故障隔离,失败的数据继续往下走,记录日志 | 需要在每个阶段检查 IsSuccess |
Step 2: 创建各阶段Block
下载Block(支持重试):
private TransformBlock<ProcessResult, ProcessResult> CreateDownloadBlock()
{
return new TransformBlock<ProcessResult, ProcessResult>(
async result =>
{
if (!result.IsSuccess) return result; // 上游已失败,跳过
for (int retry = 0; retry <= 3; retry++)
{
try
{
_logger.LogInformation("下载 {Url} (重试 {Retry})", result.ImageUrl, retry);
result.ImageData = await DownloadImageAsync(result.ImageUrl);
_stats.IncrementDownloaded();
return result;
}
catch (Exception ex)
{
if (retry == 3)
{
_logger.LogError(ex, "下载失败 {Url}", result.ImageUrl);
return result.WithError($"下载失败: {ex.Message}");
}
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, retry))); // 指数退避
}
}
return result;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20, // I/O密集,高并发
BoundedCapacity = 50, // 限制排队数量
CancellationToken = _cts.Token
}
);
}
压缩Block(CPU密集):
private TransformBlock<ProcessResult, ProcessResult> CreateCompressBlock()
{
return new TransformBlock<ProcessResult, ProcessResult>(
result =>
{
if (!result.IsSuccess) return result;
try
{
result.CompressedData = CompressImage(result.ImageData!);
_stats.IncrementCompressed();
return result;
}
catch (Exception ex)
{
return result.WithError($"压缩失败: {ex.Message}");
}
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4, // CPU密集,不宜过高
BoundedCapacity = 20,
CancellationToken = _cts.Token
}
);
}
Step 3: 组装流水线
public class ProductionImagePipeline : IDisposable
{
private readonly TransformBlock<ProcessResult, ProcessResult> _downloadBlock;
private readonly TransformBlock<ProcessResult, ProcessResult> _compressBlock;
private readonly TransformBlock<ProcessResult, ProcessResult> _watermarkBlock;
private readonly TransformBlock<ProcessResult, ProcessResult> _uploadBlock;
private readonly ActionBlock<ProcessResult> _logBlock;
public ProductionImagePipeline(ILogger<ProductionImagePipeline> logger)
{
_logger = logger;
// 创建各个阶段
_downloadBlock = CreateDownloadBlock();
_compressBlock = CreateCompressBlock();
_watermarkBlock = CreateWatermarkBlock();
_uploadBlock = CreateUploadBlock();
_logBlock = CreateLogBlock();
// 连接流水线(重点!)
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
_downloadBlock.LinkTo(_compressBlock, linkOptions);
_compressBlock.LinkTo(_watermarkBlock, linkOptions);
_watermarkBlock.LinkTo(_uploadBlock, linkOptions);
_uploadBlock.LinkTo(_logBlock, linkOptions);
}
public async Task ProcessAsync(string imageUrl)
{
var result = new ProcessResult { ImageUrl = imageUrl };
await _downloadBlock.SendAsync(result);
}
public async Task StopAsync()
{
_downloadBlock.Complete(); // 标记入口为"不再接收新数据"
await _logBlock.Completion; // 等待最后一个阶段完成
}
}
关键:PropagateCompletion
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
_downloadBlock.LinkTo(_compressBlock, linkOptions);
这行代码的作用:
sequenceDiagram participant A as DownloadBlock participant B as CompressBlock participant C as UploadBlock Note over A: Complete() 被调用 A->>A: 处理完剩余数据 A->>B: 自动调用 Complete() B->>B: 处理完剩余数据 B->>C: 自动调用 Complete() C->>C: 处理完剩余数据 Note over C: Completion 任务完成
Step 4: 监控与统计
private async Task MonitorProgressAsync()
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
while (await timer.WaitForNextTickAsync(_cts.Token))
{
_logger.LogInformation(
" 统计: 下载 {Downloaded}, 压缩 {Compressed}, 上传 {Uploaded}, 失败 {Failed}",
_stats.Downloaded, _stats.Compressed, _stats.Uploaded, _stats.Failed
);
_logger.LogInformation(
" 队列积压: 下载 {DQ}, 压缩 {CQ}, 上传 {UQ}",
_downloadBlock.InputCount, _compressBlock.InputCount, _uploadBlock.InputCount
);
}
}
监控两个维度:
- 业务指标:成功/失败数量
- 性能指标:各阶段队列积压(
InputCount)
Step 5: CancellationToken 全链路传递(生产环境必备)
// 错误做法:没有传递 CancellationToken
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
// 缺少 CancellationToken!
};
// 正确做法:合并内部和外部 CancellationToken
var options = _options.DownloadOptions ?? PipelineOptions.Default.DownloadOptions!;
options.CancellationToken = CancellationTokenSource
.CreateLinkedTokenSource(_internalCts.Token, options.CancellationToken)
.Token;
// Block 内部也要支持取消
await client.GetByteArrayAsync(url, options.CancellationToken);
为什么要全链路传递?
| 场景 | 没有 CancellationToken | 有 CancellationToken |
|---|---|---|
| 用户按 Ctrl+C | Block 继续运行,无法停止 | 立即停止,释放资源 |
| 超时 | 卡死在某个Block,整个流水线挂起 | 自动取消,返回控制权 |
| 健康检查 | 监控系统无法判断是否健康 | 可以检测 IsCancellationRequested |
完整示例:
// 外部调用方
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true;
cts.Cancel(); // 用户按 Ctrl+C
};
await pipeline.ProcessAsync(urls, cts.Token); // 传递进去
Step 6: Block 的重要公共方法(生产环境常用)
TransformBlock 和 ActionBlock 有一些非常重要的公共方法/属性,文档经常漏讲:
| 方法/属性 | 功能 | 典型场景 |
|---|---|---|
| InputCount | 获取输入队列长度 | 监控背压、性能调优 |
| OutputCount | 获取输出队列长度 | 诊断下游消费慢的问题 |
| Completion | 获取完成状态 Task | 等待流水线结束、监控故障 |
| Complete() | 标记为”不再接收新数据” | 优雅关闭流水线 |
| Fault(Exception) | 手动触发故障状态 | 严重错误时强制停止 |
| SendAsync(data) | 异步发送数据(支持背压) | 生产者推送数据 |
| Post(data) | 同步发送数据(队列满则丢弃) | 非关键数据推送 |
实战示例 1:监控队列积压
// 暴露公共方法供监控系统调用
public (int Download, int Compress, int Upload) GetQueueLengths()
{
return (
_downloadBlock.InputCount, // 如果持续增长,说明下载太快或压缩太慢
_compressBlock.InputCount,
_uploadBlock.InputCount
);
}
// 监控任务
while (true)
{
var queues = pipeline.GetQueueLengths();
if (queues.Download > 100)
{
_logger.LogWarning("️ 下载队列积压严重: {Count}", queues.Download);
// 触发告警、限流等
}
await Task.Delay(5000);
}
实战示例 2:健康检查
// Kubernetes 或监控系统定期调用
public bool IsHealthy()
{
// 检查是否进入故障状态
return !_downloadBlock.Completion.IsFaulted &&
!_compressBlock.Completion.IsFaulted &&
_downloadBlock.InputCount < 200; // 队列积压不能太多
}
// 监控端点
app.MapGet("/health", () =>
{
return pipeline.IsHealthy()
? Results.Ok("Healthy")
: Results.StatusCode(503); // Service Unavailable
});
实战示例 3:优雅关闭
public async Task StopAsync(TimeSpan timeout)
{
_downloadBlock.Complete(); // 1. 停止接收新数据
var completionTask = _logBlock.Completion; // 2. 获取完成Task
var timeoutTask = Task.Delay(timeout);
if (await Task.WhenAny(completionTask, timeoutTask) == timeoutTask)
{
_logger.LogWarning("超时,强制取消");
_internalCts.Cancel(); // 3. 触发所有Block的CancellationToken
}
try
{
await completionTask; // 4. 等待真正结束
}
catch (OperationCanceledException)
{
// 正常取消
}
}
五、运行效果与使用示例
5.1 运行方式
cd Dataflow
dotnet run
# 选择选项 5: 生产级流水线演示
输出示例(100张图片):
[12:00:05] 下载 https://example.com/image1.jpg (重试 0)
[12:00:05] 下载 https://example.com/image2.jpg (重试 0)
...
[12:00:10] 统计: 下载 45, 压缩 38, 上传 30, 失败 2
[12:00:10] 队列积压: 下载 15, 压缩 7, 上传 8
...
[12:00:35] 处理完成!成功 98 张,失败 2 张
5.2 性能对比
// Demo: 对比串行/并行/Dataflow 三种方案
// 位置:Dataflow/Production/ProductionPipelineDemo.cs -> RunComparisonDemo()
// 结果(100张图片):
// 串行处理: 125 秒
// 全并行: 内存 2.8GB(崩溃风险)
// Dataflow流水线: 18 秒,内存 150MB
为什么 Dataflow 既快又省内存?
| 方案 | 问题 | Dataflow 的解法 |
|---|---|---|
| 串行 | 下载完一张才处理下一张,CPU和网络都闲着 | 流水线并行:下载图2的同时压缩图1 |
| 全并行 | 10万个Task同时跑,中间结果都在内存里 | 背压控制:队列满了上游就等待 |
5.3 其他Demo
项目中还包含了基础Block的教学示例:
# 选项 1: 基础Block演示(ActionBlock/TransformBlock/BufferBlock)
# 选项 2: LinkTo路由演示(按条件分流数据)
# 选项 3: 背压控制演示(BoundedCapacity效果)
# 选项 4: 图片流水线简化版(不含重试和监控)
六、常见问题与最佳实践
Q1: 如何设置合理的并行度?
经验公式:
// I/O 密集型(网络/磁盘):并发数 = 2 * CPU核心数 ~ 100
MaxDegreeOfParallelism = 20;
// CPU 密集型(图片压缩/加密):并发数 = CPU核心数
MaxDegreeOfParallelism = Environment.ProcessorCount;
// 混合型(既有I/O又有CPU):根据实测调整,通常 4~8
MaxDegreeOfParallelism = 6;
Q2: BoundedCapacity 设置多大合适?
推荐值:
// 上游很快、下游很慢的场景:设置较小的值(10-50)
BoundedCapacity = 20; // 避免内存堆积
// 上游不稳定(网络波动):设置较大的值(100-500)
BoundedCapacity = 200; // 缓冲流量峰值
// 实时性要求高:设置 1 或不设置
BoundedCapacity = DataflowBlockOptions.Unbounded; // 立即处理
Q3: 异常处理的最佳实践?
Dataflow 的异常处理是生产环境中最容易踩坑的地方。让我们对比三种方案:
方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Block内 try-catch | 简单直接 | 某个Block异常会停止整个流水线 | 快速原型 |
| 返回 Result 对象 | 故障隔离,失败数据继续流转 | 代码稍复杂 | 生产环境推荐 |
| 监控 Block.Completion | 捕获未处理异常 | 只能记录,无法恢复 | 兜底日志 |
推荐模式(本文采用)
// 1. 每个Block内捕获异常,返回失败结果
var block = new TransformBlock<ProcessResult, ProcessResult>(result =>
{
if (!result.IsSuccess) return result; // 短路:上游已失败,直接跳过
try
{
// ... 业务逻辑
return result;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "处理失败: {Url}", result.ImageUrl);
return result.WithError(ex.Message); // 继续往下游传递失败结果
}
});
// 2. 最后一个阶段统一记录失败日志
var logBlock = new ActionBlock<ProcessResult>(result =>
{
if (!result.IsSuccess)
{
_logger.LogError(" 最终失败: {Url}, 原因: {Error}",
result.ImageUrl, result.ErrorMessage);
_stats.IncrementFailed();
}
else
{
_logger.LogInformation(" 处理成功: {Url}", result.UploadedUrl);
_stats.IncrementSuccess();
}
});
// 3. 监控 Block.Completion(兜底)
_ = Task.Run(async () =>
{
try
{
await _downloadBlock.Completion;
}
catch (Exception ex)
{
_logger.LogCritical(ex, "️ 下载Block发生未捕获异常,流水线已中断!");
// 触发告警、重启流水线等
}
});
常见错误
// 错误1: 直接抛异常,导致整个流水线停止
var block = new TransformBlock<string, byte[]>(async url =>
{
return await DownloadAsync(url); // 如果抛异常,后续数据都无法处理!
});
// 错误2: 不检查上游失败状态,浪费计算资源
var compressBlock = new TransformBlock<ProcessResult, ProcessResult>(result =>
{
// 缺少:if (!result.IsSuccess) return result;
return CompressImage(result.ImageData); // 上游失败了,ImageData是null,这里会崩溃!
});
// 错误3: 吞掉异常不记录日志
catch (Exception ex)
{
return result.WithError("失败"); // 丢失了异常详情!
}
异常处理决策树
graph TD A[Block内发生异常] –> B{是否希望继续处理<br/>其他数据?} B –>|是| C[捕获异常<br/>返回失败Result] B –>|否| D[不捕获<br/>让Block.Completion异常] C –> E{是否需要重试?} E –>|是| F[在Block内实现<br/>指数退避重试] E –>|否| G[直接返回<br/>WithError结果] F –> H[最后一个阶段<br/>统一记录日志] G –> H D –> I[监控Completion<br/>触发告警/重启] style C fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff style F fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style H fill:#7b1fa2,stroke:#4a148c,stroke-width:2px,color:#fff style D fill:#d32f2f,stroke:#b71c1c,stroke-width:2px,color:#fff
Q4: 如何实现条件路由(数据分流)?
// 场景:大图片走压缩流水线,小图片直接上传
var downloadBlock = new TransformBlock<string, ImageData>(...);
var compressBlock = new TransformBlock<ImageData, ImageData>(...);
var uploadBlock = new ActionBlock<ImageData>(...);
// 大图片 -> 压缩 -> 上传
downloadBlock.LinkTo(compressBlock, new DataflowLinkOptions { PropagateCompletion = true },
data => data.Size > 1_000_000); // 过滤条件
// 小图片 -> 直接上传
downloadBlock.LinkTo(uploadBlock, new DataflowLinkOptions { PropagateCompletion = true },
data => data.Size <= 1_000_000);
compressBlock.LinkTo(uploadBlock);
流程图:
graph LR A[下载] –> B{文件大小} B –>|> 1MB| C[压缩] B –>|<= 1MB| D[上传] C –> D style B fill:#ff9800,stroke:#e65100,stroke-width:2px,color:#fff style C fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style D fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff
Q5: 如何优雅关闭流水线?
public async Task StopAsync()
{
// 1. 停止接收新数据
_downloadBlock.Complete();
// 2. 等待流水线处理完剩余数据(最多30秒)
var completionTask = _logBlock.Completion;
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(30));
if (await Task.WhenAny(completionTask, timeoutTask) == timeoutTask)
{
// 3. 超时则取消
_logger.LogWarning("流水线关闭超时,强制取消");
_cts.Cancel();
}
// 4. 等待真正结束
try
{
await completionTask;
}
catch (OperationCanceledException)
{
_logger.LogInformation("流水线已取消");
}
}
Q6: Dataflow vs Channel ?
| 特性 | Dataflow | Channel |
|---|---|---|
| 学习曲线 | 陡峭(Block类型多) | 平缓(类似队列) |
| 多阶段流水线 | 原生支持 | 需要手动串联多个Channel |
| 背压控制 | BoundedCapacity | BoundedChannel |
| 并行度控制 | MaxDegreeOfParallelism | 需要手动创建多个消费者 |
| 性能 | 稍慢(功能多) | 更快(轻量级) |
| 适用场景 | 复杂流水线 | 简单生产者-消费者 |
选择建议:
- 1-2个阶段 → Channel
- 3+个阶段,且需要不同并行度 → Dataflow
七、小结
本章核心要点
- Dataflow 的本质:把”工厂流水线”搬到代码里,每个Block是一个工位
- 三大核心能力:
- 独立配置每个阶段的并行度和缓冲容量
- 自动背压控制,避免内存爆炸
- 声明式连接,代码清晰易维护
- 生产级实践:
- 用 ProcessResult 携带状态,而不是抛异常
- 监控队列积压和业务指标
- 支持重试、优雅关闭、取消令牌
进阶学习
文章摘自:https://www.cnblogs.com/diamondhusky/p/20362777
