【.NET并发编程 – 15】TPL Dataflow 流水线编程

15. TPL Dataflow 流水线编程

本章 GitHub 仓库csharp-concurrency-cookbook

欢迎 Star 和 Fork!所有代码示例都可以在仓库中找到并运行。


一、本章导读

本文目标:从零构建生产级 Dataflow 流水线,掌握完整的设计思路、核心原理和最佳实践。

我先问你一个问题:如果让你处理 10 万张图片(下载 → 压缩 → 水印 → 上传),你会怎么写?

很多人的第一反应是:

// 方案1:串行处理 —— 太慢了
foreach (var url in imageUrls)
{
    var data = await DownloadAsync(url);
    var compressed = await CompressAsync(data);
    var watermarked = await AddWatermarkAsync(compressed);
    await UploadAsync(watermarked);
}

// 方案2:全部并行 —— 炸了
await Task.WhenAll(imageUrls.Select(async url =>
{
    // ... 10万个Task同时跑,内存几十GB,系统直接崩溃
}));

方案 1 太慢,10 万张图片可能要跑几天。
方案 2 太猛,瞬间把内存撑爆,系统直接 OOM。

那怎么办?你可能想到用 SemaphoreSlim 或 Channel 来限流:

var sem = new SemaphoreSlim(10); // 限制并发数
await Task.WhenAll(imageUrls.Select(async url =>
{
    await sem.WaitAsync();
    try
    {
        // ... 处理逻辑
    }
    finally
    {
        sem.Release();
    }
}));

这确实能跑,但有几个问题:

  1. 各阶段混在一起:下载、压缩、上传的并发度应该不同(I/O 密集 vs CPU 密集),但现在是统一控制
  2. 缺乏背压机制:如果下载很快但上传很慢,中间数据会堆积在内存里
  3. 错误处理复杂:某个文件失败了要记录日志、重试、还要继续处理其他文件
  4. 代码难维护:加个新阶段(比如”人脸识别”),代码要大改

二、Dataflow 的核心思想

2.1 流水线的本质

你去过工厂吗?现代工厂的生产线是这样的:

[原料] → [工位1:切割] → [工位2:打磨] → [工位3:组装] → [工位4:包装] → [成品]
          ↓ 3人并行       ↓ 5人并行      ↓ 2人并行        ↓ 1人

每个工位的特征:

  • 独立负责一道工序:切割的人只管切割,不管打磨
  • 有自己的工人数量(并行度):打磨工序慢,多配5个人
  • 有中间缓冲区(队列):前道工序快了,产品在这里排队等待
  • 出问题只影响本工位:组装工位停机,不影响切割继续生产

这样做的好处是什么?

对比维度 串行处理 流水线处理
效率 处理完一个产品才能开始下一个 同时处理多个产品(不同阶段)
并行度 无法针对不同工序调整 慢工序多配人,快工序少配人
故障影响 一个环节出问题,全线停工 只影响本工位,其他继续运行
资源利用 CPU和I/O经常空闲 CPU和I/O同时工作

TPL Dataflow 把这套思想搬到代码里

graph LR A[数据源] –> B[BlockA<br/>并行度:3<br/>缓冲:10个] B –> C[BlockB<br/>并行度:5<br/>缓冲:20个] C –> D[BlockC<br/>并行度:2<br/>缓冲:5个] D –> E[结果] style B fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style C fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff style D fill:#f57c00,stroke:#e65100,stroke-width:2px,color:#fff

每个 Block(块) 就是一个工位,它:

  1. 接收上游数据(或主动拉取)→ 相当于从上一道工序拿产品
  2. 执行自己的逻辑(同步或异步)→ 相当于本工位的加工操作
  3. 把结果传给下游(或丢弃)→ 相当于把成品放到下一道工序的缓冲区

2.2 核心Block类型

相关类在System.Threading.Tasks.Dataflow命名空间里面,系统自带,无须额外nuget包。

Block 类型 功能 典型场景
ActionBlock 消费数据,无输出 日志记录、数据库写入
TransformBlock<TIn, TOut> 转换数据(1→1) 图片压缩、数据清洗
TransformManyBlock<TIn, TOut> 转换数据(1→多) 拆分大文件、展开嵌套数据
BatchBlock 批量打包(多→1) 批量入库、日志聚合
BroadcastBlock 广播数据(1→多份) 缓存+持久化双写
BufferBlock 缓冲队列 解耦生产者和消费者

2.3 关键配置参数

var options = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 4,        // 并行度(同时处理4条数据)
    BoundedCapacity = 10,               // 缓冲容量(最多排队10条)
    CancellationToken = cts.Token       // 取消令牌
};

var block = new TransformBlock<string, byte[]>(
    async url => await DownloadAsync(url),
    options
);

为什么 Dataflow 比手写流水线优雅?

手写流水线要自己做的事 Dataflow 内置支持
手动管理队列(Channel、Queue) 每个块自带缓冲队列
手动控制并行度(SemaphoreSlim) MaxDegreeOfParallelism 配置
手动实现背压(队列满了要等待) BoundedCapacity 自动节流
手动传播完成状态(上游结束了通知下游) PropagateCompletion 自动传播
手动处理异常(某个任务失败了要记录) 每个块独立异常处理
手动组装流水线(代码像意大利面条) 声明式 LinkTo 连接

三、何时使用 Dataflow?

适合的场景

场景 为什么选 Dataflow
多阶段数据处理 每个阶段独立配置并行度和缓冲
I/O 和 CPU 混合任务 下载(I/O密集)并行20,压缩(CPU密集)并行4
需要背压控制 自动节流,避免内存爆炸
复杂数据流向 支持分支、合并、过滤、广播
长时间运行的流水线 可观测性强(监控、统计、故障隔离)

不适合的场景

场景 为什么不选 Dataflow 推荐方案
简单并行任务 Dataflow 有学习成本和性能开销 Parallel.ForEachAsync
实时事件处理 Dataflow 不是事件驱动架构 System.Reactive (Rx)
需要严格顺序 默认是并行的,手动保证顺序很麻烦 Channel 或串行处理

对比其他方案

// 方案1:Parallel.ForEachAsync —— 简单并行任务首选
await Parallel.ForEachAsync(urls, new ParallelOptions { MaxDegreeOfParallelism = 10 },
    async (url, ct) => await ProcessAsync(url));

// 方案2:Channel<T> —— 单生产者/单消费者场景
var channel = Channel.CreateBounded<string>(10);
var producer = Task.Run(async () => { /* ... */ });
var consumer = Task.Run(async () => { /* ... */ });

// 方案3:Dataflow —— 多阶段流水线
var pipeline = CreatePipeline(); // 本文重点!
await pipeline.SendAsync(data);

四、实战:构建生产级图片处理流水线

完整代码位置Dataflow/Production/ProductionImagePipeline.cs

4.1 架构总览

我们要实现一个真实生产环境可用的图片处理流水线

graph TB A[输入: 图片URL] –> B[阶段1: 下载<br/>I/O密集, 并行20<br/>支持重试] B –> C[阶段2: 压缩<br/>CPU密集, 并行4<br/>背压控制] C –> D[阶段3: 水印<br/>CPU密集, 并行4] D –> E[阶段4: 上传<br/>I/O密集, 并行10<br/>故障隔离] E –> F[阶段5: 日志<br/>串行, 落盘] F –> G[完成] H[监控线程] -.定期巡检.-> B H -.定期巡检.-> C H -.定期巡检.-> D H -.定期巡检.-> E style B fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style C fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff style D fill:#f57c00,stroke:#e65100,stroke-width:2px,color:#fff style E fill:#d32f2f,stroke:#b71c1c,stroke-width:2px,color:#fff style F fill:#7b1fa2,stroke:#4a148c,stroke-width:2px,color:#fff style H fill:#455a64,stroke:#263238,stroke-width:2px,color:#fff

设计要点

  1. 故障隔离:某张图片失败了,不影响其他图片继续处理
  2. 重试机制:下载/上传支持指数退避重试
  3. 监控能力:实时统计成功率、吞吐量、队列积压
  4. 优雅关闭:取消时等待当前任务完成,不丢数据

4.2 核心代码实现

️ 生产级代码的三大要求

  1. 配置外部化ExecutionDataflowBlockOptions 应由调用方注入(从配置文件读取)
  2. 全链路取消CancellationToken 必须传递到每个 Block
  3. 可观测性:暴露 Block 的公共方法(InputCountCompletion 等)供监控使用

Step 0: 配置化设计(生产环境必备)

/// <summary>
/// 流水线配置选项(由调用方通过配置文件注入)
/// </summary>
public class PipelineOptions
{
    /// <summary>下载块配置(I/O 密集,默认高并行)</summary>
    public ExecutionDataflowBlockOptions? DownloadOptions { get; set; }

    /// <summary>压缩块配置(CPU 密集,默认=核心数)</summary>
    public ExecutionDataflowBlockOptions? CompressOptions { get; set; }

    /// <summary>上传块配置(I/O 密集,默认高并行)</summary>
    public ExecutionDataflowBlockOptions? UploadOptions { get; set; }

    /// <summary>HTTP 超时时间(默认30秒)</summary>
    public TimeSpan HttpTimeout { get; set; } = TimeSpan.FromSeconds(30);

    /// <summary>上传重试次数(默认3次)</summary>
    public int MaxUploadRetries { get; set; } = 3;

    /// <summary>监控报告间隔(默认5秒)</summary>
    public TimeSpan MonitorInterval { get; set; } = TimeSpan.FromSeconds(5);

    /// <summary>获取默认配置</summary>
    public static PipelineOptions Default => new()
    {
        DownloadOptions = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 10,
            BoundedCapacity = 20
        },
        // ... 其他配置
    };
}

// 构造函数支持配置注入
public ProductionImagePipeline(
    ILogger<ProductionImagePipeline> logger,
    PipelineOptions? options = null)  //  如果为 null 则使用默认配置
{
    _options = options ?? PipelineOptions.Default;
    // ...
}

为什么要配置外部化?

硬编码配置 外部化配置
不同环境(开发/测试/生产)参数相同 根据环境调整(生产环境并行度更高)
修改参数需要重新编译、发布 修改配置文件即可,无需重新部署
无法动态调优性能 可以根据监控数据调整配置

生产环境使用示例

// appsettings.json
{
  "Pipeline": {
    "DownloadParallelism": 20,
    "CompressParallelism": 8,
    "HttpTimeoutSeconds": 60
  }
}

// Startup.cs
var config = Configuration.GetSection("Pipeline");
var options = new PipelineOptions
{
    DownloadOptions = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = config.GetValue<int>("DownloadParallelism"),
        BoundedCapacity = 50
    },
    HttpTimeout = TimeSpan.FromSeconds(config.GetValue<int>("HttpTimeoutSeconds"))
};

var pipeline = new ProductionImagePipeline(logger, options);

Step 1: 定义数据结构

// 流水线中流转的数据(携带状态,而不是抛异常)
public record ProcessResult
{
    public string ImageUrl { get; init; } = "";
    public byte[]? ImageData { get; set; }
    public byte[]? CompressedData { get; set; }
    public byte[]? WatermarkedData { get; set; }
    public string? UploadedUrl { get; set; }
    
    public bool IsSuccess { get; set; } = true;
    public string? ErrorMessage { get; set; }
    public int RetryCount { get; set; }
    
    public ProcessResult WithError(string error)
    {
        IsSuccess = false;
        ErrorMessage = error;
        return this;
    }
}

为什么用 ProcessResult 而不是抛异常?

方案 优点 缺点
抛异常 代码简洁 一个异常会让整个 Block 停止,影响其他数据
返回结果对象 故障隔离,失败的数据继续往下走,记录日志 需要在每个阶段检查 IsSuccess

Step 2: 创建各阶段Block

下载Block(支持重试)

private TransformBlock<ProcessResult, ProcessResult> CreateDownloadBlock()
{
    return new TransformBlock<ProcessResult, ProcessResult>(
        async result =>
        {
            if (!result.IsSuccess) return result; // 上游已失败,跳过

            for (int retry = 0; retry <= 3; retry++)
            {
                try
                {
                    _logger.LogInformation("下载 {Url} (重试 {Retry})", result.ImageUrl, retry);
                    result.ImageData = await DownloadImageAsync(result.ImageUrl);
                    _stats.IncrementDownloaded();
                    return result;
                }
                catch (Exception ex)
                {
                    if (retry == 3)
                    {
                        _logger.LogError(ex, "下载失败 {Url}", result.ImageUrl);
                        return result.WithError($"下载失败: {ex.Message}");
                    }
                    await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, retry))); // 指数退避
                }
            }
            return result;
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 20, // I/O密集,高并发
            BoundedCapacity = 50,         // 限制排队数量
            CancellationToken = _cts.Token
        }
    );
}

压缩Block(CPU密集)

private TransformBlock<ProcessResult, ProcessResult> CreateCompressBlock()
{
    return new TransformBlock<ProcessResult, ProcessResult>(
        result =>
        {
            if (!result.IsSuccess) return result;
            try
            {
                result.CompressedData = CompressImage(result.ImageData!);
                _stats.IncrementCompressed();
                return result;
            }
            catch (Exception ex)
            {
                return result.WithError($"压缩失败: {ex.Message}");
            }
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,  // CPU密集,不宜过高
            BoundedCapacity = 20,
            CancellationToken = _cts.Token
        }
    );
}

Step 3: 组装流水线

public class ProductionImagePipeline : IDisposable
{
    private readonly TransformBlock<ProcessResult, ProcessResult> _downloadBlock;
    private readonly TransformBlock<ProcessResult, ProcessResult> _compressBlock;
    private readonly TransformBlock<ProcessResult, ProcessResult> _watermarkBlock;
    private readonly TransformBlock<ProcessResult, ProcessResult> _uploadBlock;
    private readonly ActionBlock<ProcessResult> _logBlock;

    public ProductionImagePipeline(ILogger<ProductionImagePipeline> logger)
    {
        _logger = logger;
        
        // 创建各个阶段
        _downloadBlock = CreateDownloadBlock();
        _compressBlock = CreateCompressBlock();
        _watermarkBlock = CreateWatermarkBlock();
        _uploadBlock = CreateUploadBlock();
        _logBlock = CreateLogBlock();

        // 连接流水线(重点!)
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
        _downloadBlock.LinkTo(_compressBlock, linkOptions);
        _compressBlock.LinkTo(_watermarkBlock, linkOptions);
        _watermarkBlock.LinkTo(_uploadBlock, linkOptions);
        _uploadBlock.LinkTo(_logBlock, linkOptions);
    }

    public async Task ProcessAsync(string imageUrl)
    {
        var result = new ProcessResult { ImageUrl = imageUrl };
        await _downloadBlock.SendAsync(result);
    }

    public async Task StopAsync()
    {
        _downloadBlock.Complete(); // 标记入口为"不再接收新数据"
        await _logBlock.Completion;  // 等待最后一个阶段完成
    }
}

关键:PropagateCompletion

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
_downloadBlock.LinkTo(_compressBlock, linkOptions);

这行代码的作用:

sequenceDiagram participant A as DownloadBlock participant B as CompressBlock participant C as UploadBlock Note over A: Complete() 被调用 A->>A: 处理完剩余数据 A->>B: 自动调用 Complete() B->>B: 处理完剩余数据 B->>C: 自动调用 Complete() C->>C: 处理完剩余数据 Note over C: Completion 任务完成

Step 4: 监控与统计

private async Task MonitorProgressAsync()
{
    using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));

    while (await timer.WaitForNextTickAsync(_cts.Token))
    {
        _logger.LogInformation(
            " 统计: 下载 {Downloaded}, 压缩 {Compressed}, 上传 {Uploaded}, 失败 {Failed}",
            _stats.Downloaded, _stats.Compressed, _stats.Uploaded, _stats.Failed
        );

        _logger.LogInformation(
            " 队列积压: 下载 {DQ}, 压缩 {CQ}, 上传 {UQ}",
            _downloadBlock.InputCount, _compressBlock.InputCount, _uploadBlock.InputCount
        );
    }
}

监控两个维度

  1. 业务指标:成功/失败数量
  2. 性能指标:各阶段队列积压(InputCount

Step 5: CancellationToken 全链路传递(生产环境必备)

//  错误做法:没有传递 CancellationToken
var options = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10
    // 缺少 CancellationToken!
};

//  正确做法:合并内部和外部 CancellationToken
var options = _options.DownloadOptions ?? PipelineOptions.Default.DownloadOptions!;
options.CancellationToken = CancellationTokenSource
    .CreateLinkedTokenSource(_internalCts.Token, options.CancellationToken)
    .Token;

// Block 内部也要支持取消
await client.GetByteArrayAsync(url, options.CancellationToken);

为什么要全链路传递?

场景 没有 CancellationToken 有 CancellationToken
用户按 Ctrl+C Block 继续运行,无法停止 立即停止,释放资源
超时 卡死在某个Block,整个流水线挂起 自动取消,返回控制权
健康检查 监控系统无法判断是否健康 可以检测 IsCancellationRequested

完整示例

// 外部调用方
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
    e.Cancel = true;
    cts.Cancel(); // 用户按 Ctrl+C
};

await pipeline.ProcessAsync(urls, cts.Token); //  传递进去

Step 6: Block 的重要公共方法(生产环境常用)

TransformBlockActionBlock 有一些非常重要的公共方法/属性,文档经常漏讲:

方法/属性 功能 典型场景
InputCount 获取输入队列长度 监控背压、性能调优
OutputCount 获取输出队列长度 诊断下游消费慢的问题
Completion 获取完成状态 Task 等待流水线结束、监控故障
Complete() 标记为”不再接收新数据” 优雅关闭流水线
Fault(Exception) 手动触发故障状态 严重错误时强制停止
SendAsync(data) 异步发送数据(支持背压) 生产者推送数据
Post(data) 同步发送数据(队列满则丢弃) 非关键数据推送

实战示例 1:监控队列积压

// 暴露公共方法供监控系统调用
public (int Download, int Compress, int Upload) GetQueueLengths()
{
    return (
        _downloadBlock.InputCount,  //  如果持续增长,说明下载太快或压缩太慢
        _compressBlock.InputCount,
        _uploadBlock.InputCount
    );
}

// 监控任务
while (true)
{
    var queues = pipeline.GetQueueLengths();
    if (queues.Download > 100)
    {
        _logger.LogWarning("️  下载队列积压严重: {Count}", queues.Download);
        // 触发告警、限流等
    }
    await Task.Delay(5000);
}

实战示例 2:健康检查

// Kubernetes 或监控系统定期调用
public bool IsHealthy()
{
    //  检查是否进入故障状态
    return !_downloadBlock.Completion.IsFaulted &&
           !_compressBlock.Completion.IsFaulted &&
           _downloadBlock.InputCount < 200; // 队列积压不能太多
}

// 监控端点
app.MapGet("/health", () =>
{
    return pipeline.IsHealthy() 
        ? Results.Ok("Healthy") 
        : Results.StatusCode(503); // Service Unavailable
});

实战示例 3:优雅关闭

public async Task StopAsync(TimeSpan timeout)
{
    _downloadBlock.Complete(); //  1. 停止接收新数据

    var completionTask = _logBlock.Completion; //  2. 获取完成Task
    var timeoutTask = Task.Delay(timeout);

    if (await Task.WhenAny(completionTask, timeoutTask) == timeoutTask)
    {
        _logger.LogWarning("超时,强制取消");
        _internalCts.Cancel(); //  3. 触发所有Block的CancellationToken
    }

    try
    {
        await completionTask; //  4. 等待真正结束
    }
    catch (OperationCanceledException)
    {
        // 正常取消
    }
}

五、运行效果与使用示例

5.1 运行方式

cd Dataflow
dotnet run
# 选择选项 5: 生产级流水线演示

输出示例(100张图片):

[12:00:05] 下载 https://example.com/image1.jpg (重试 0)
[12:00:05] 下载 https://example.com/image2.jpg (重试 0)
...
[12:00:10]  统计: 下载 45, 压缩 38, 上传 30, 失败 2
[12:00:10]  队列积压: 下载 15, 压缩 7, 上传 8
...
[12:00:35]  处理完成!成功 98 张,失败 2 张

5.2 性能对比

// Demo: 对比串行/并行/Dataflow 三种方案
// 位置:Dataflow/Production/ProductionPipelineDemo.cs -> RunComparisonDemo()

// 结果(100张图片):
// 串行处理:   125 秒
// 全并行:     内存 2.8GB(崩溃风险)
// Dataflow流水线: 18 秒,内存 150MB

为什么 Dataflow 既快又省内存?

方案 问题 Dataflow 的解法
串行 下载完一张才处理下一张,CPU和网络都闲着 流水线并行:下载图2的同时压缩图1
全并行 10万个Task同时跑,中间结果都在内存里 背压控制:队列满了上游就等待

5.3 其他Demo

项目中还包含了基础Block的教学示例:

# 选项 1: 基础Block演示(ActionBlock/TransformBlock/BufferBlock)
# 选项 2: LinkTo路由演示(按条件分流数据)
# 选项 3: 背压控制演示(BoundedCapacity效果)
# 选项 4: 图片流水线简化版(不含重试和监控)

六、常见问题与最佳实践

Q1: 如何设置合理的并行度?

经验公式

// I/O 密集型(网络/磁盘):并发数 = 2 * CPU核心数 ~ 100
MaxDegreeOfParallelism = 20;

// CPU 密集型(图片压缩/加密):并发数 = CPU核心数
MaxDegreeOfParallelism = Environment.ProcessorCount;

// 混合型(既有I/O又有CPU):根据实测调整,通常 4~8
MaxDegreeOfParallelism = 6;

Q2: BoundedCapacity 设置多大合适?

推荐值

// 上游很快、下游很慢的场景:设置较小的值(10-50)
BoundedCapacity = 20; // 避免内存堆积

// 上游不稳定(网络波动):设置较大的值(100-500)
BoundedCapacity = 200; // 缓冲流量峰值

// 实时性要求高:设置 1 或不设置
BoundedCapacity = DataflowBlockOptions.Unbounded; // 立即处理

Q3: 异常处理的最佳实践?

Dataflow 的异常处理是生产环境中最容易踩坑的地方。让我们对比三种方案:

方案对比

方案 优点 缺点 适用场景
Block内 try-catch 简单直接 某个Block异常会停止整个流水线 快速原型
返回 Result 对象 故障隔离,失败数据继续流转 代码稍复杂 生产环境推荐
监控 Block.Completion 捕获未处理异常 只能记录,无法恢复 兜底日志

推荐模式(本文采用)

// 1. 每个Block内捕获异常,返回失败结果
var block = new TransformBlock<ProcessResult, ProcessResult>(result =>
{
    if (!result.IsSuccess) return result; // 短路:上游已失败,直接跳过
    try
    {
        // ... 业务逻辑
        return result;
    }
    catch (Exception ex)
    {
        _logger.LogWarning(ex, "处理失败: {Url}", result.ImageUrl);
        return result.WithError(ex.Message); // 继续往下游传递失败结果
    }
});

// 2. 最后一个阶段统一记录失败日志
var logBlock = new ActionBlock<ProcessResult>(result =>
{
    if (!result.IsSuccess)
    {
        _logger.LogError(" 最终失败: {Url}, 原因: {Error}", 
            result.ImageUrl, result.ErrorMessage);
        _stats.IncrementFailed();
    }
    else
    {
        _logger.LogInformation(" 处理成功: {Url}", result.UploadedUrl);
        _stats.IncrementSuccess();
    }
});

// 3. 监控 Block.Completion(兜底)
_ = Task.Run(async () =>
{
    try
    {
        await _downloadBlock.Completion;
    }
    catch (Exception ex)
    {
        _logger.LogCritical(ex, "️ 下载Block发生未捕获异常,流水线已中断!");
        // 触发告警、重启流水线等
    }
});

常见错误

// 错误1: 直接抛异常,导致整个流水线停止
var block = new TransformBlock<string, byte[]>(async url =>
{
    return await DownloadAsync(url); // 如果抛异常,后续数据都无法处理!
});

// 错误2: 不检查上游失败状态,浪费计算资源
var compressBlock = new TransformBlock<ProcessResult, ProcessResult>(result =>
{
    // 缺少:if (!result.IsSuccess) return result;
    return CompressImage(result.ImageData); // 上游失败了,ImageData是null,这里会崩溃!
});

// 错误3: 吞掉异常不记录日志
catch (Exception ex)
{
    return result.WithError("失败"); //  丢失了异常详情!
}

异常处理决策树

graph TD A[Block内发生异常] –> B{是否希望继续处理<br/>其他数据?} B –>|是| C[捕获异常<br/>返回失败Result] B –>|否| D[不捕获<br/>让Block.Completion异常] C –> E{是否需要重试?} E –>|是| F[在Block内实现<br/>指数退避重试] E –>|否| G[直接返回<br/>WithError结果] F –> H[最后一个阶段<br/>统一记录日志] G –> H D –> I[监控Completion<br/>触发告警/重启] style C fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff style F fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style H fill:#7b1fa2,stroke:#4a148c,stroke-width:2px,color:#fff style D fill:#d32f2f,stroke:#b71c1c,stroke-width:2px,color:#fff

Q4: 如何实现条件路由(数据分流)?

// 场景:大图片走压缩流水线,小图片直接上传
var downloadBlock = new TransformBlock<string, ImageData>(...);
var compressBlock = new TransformBlock<ImageData, ImageData>(...);
var uploadBlock = new ActionBlock<ImageData>(...);

// 大图片 -> 压缩 -> 上传
downloadBlock.LinkTo(compressBlock, new DataflowLinkOptions { PropagateCompletion = true },
    data => data.Size > 1_000_000); // 过滤条件

// 小图片 -> 直接上传
downloadBlock.LinkTo(uploadBlock, new DataflowLinkOptions { PropagateCompletion = true },
    data => data.Size <= 1_000_000);

compressBlock.LinkTo(uploadBlock);

流程图

graph LR A[下载] –> B{文件大小} B –>|> 1MB| C[压缩] B –>|<= 1MB| D[上传] C –> D style B fill:#ff9800,stroke:#e65100,stroke-width:2px,color:#fff style C fill:#1976d2,stroke:#0d47a1,stroke-width:2px,color:#fff style D fill:#388e3c,stroke:#1b5e20,stroke-width:2px,color:#fff

Q5: 如何优雅关闭流水线?

public async Task StopAsync()
{
    // 1. 停止接收新数据
    _downloadBlock.Complete();
    
    // 2. 等待流水线处理完剩余数据(最多30秒)
    var completionTask = _logBlock.Completion;
    var timeoutTask = Task.Delay(TimeSpan.FromSeconds(30));
    
    if (await Task.WhenAny(completionTask, timeoutTask) == timeoutTask)
    {
        // 3. 超时则取消
        _logger.LogWarning("流水线关闭超时,强制取消");
        _cts.Cancel();
    }
    
    // 4. 等待真正结束
    try
    {
        await completionTask;
    }
    catch (OperationCanceledException)
    {
        _logger.LogInformation("流水线已取消");
    }
}

Q6: Dataflow vs Channel ?

特性 Dataflow Channel
学习曲线 陡峭(Block类型多) 平缓(类似队列)
多阶段流水线 原生支持 需要手动串联多个Channel
背压控制 BoundedCapacity BoundedChannel
并行度控制 MaxDegreeOfParallelism 需要手动创建多个消费者
性能 稍慢(功能多) 更快(轻量级)
适用场景 复杂流水线 简单生产者-消费者

选择建议

  • 1-2个阶段 → Channel
  • 3+个阶段,且需要不同并行度 → Dataflow

七、小结

本章核心要点

  1. Dataflow 的本质:把”工厂流水线”搬到代码里,每个Block是一个工位
  2. 三大核心能力
    • 独立配置每个阶段的并行度和缓冲容量
    • 自动背压控制,避免内存爆炸
    • 声明式连接,代码清晰易维护
  3. 生产级实践
    • 用 ProcessResult 携带状态,而不是抛异常
    • 监控队列积压和业务指标
    • 支持重试、优雅关闭、取消令牌

进阶学习

文章摘自:https://www.cnblogs.com/diamondhusky/p/20362777