【.NET并发编程 – 10】Parallel 与 PLINQ:榨干多核 CPU

10. Parallel 与 PLINQ:榨干多核 CPU

本章 GitHub 仓库csharp-concurrency-cookbook

欢迎 Star 和 Fork!所有代码示例都可以在仓库中找到并运行。


本章导读

本文目标:彻底搞懂 Parallel.For/ForEach 和 PLINQ 的适用场景、正确用法与常见陷阱,知道什么时候该用并行,什么时候反而会帮倒忙。

先问你一个问题:你有一台 32 核的服务器,跑一段数据处理代码,CPU 使用率是 3%。

你是什么感觉?

是的,那 31 个核心,正在摸鱼。

今天我们要做的事情,就是让它们都给我干活——这就是 ParallelPLINQ 的价值所在。

不过在开始之前,我必须先说一句重要的前置话

Parallel 和 PLINQ 是为 CPU 密集型任务设计的。
如果你的瓶颈是网络请求、数据库查询、文件读写这类 IO 操作,那你应该用 async/await + Task.WhenAll(第02章到第04章讲过的那些),而不是今天这章的内容。

弄清楚这个前提,我们才能正确”榨干” CPU。


🧩 Parallel vs PLINQ,先搞清楚定位

Parallel.For / ForEach PLINQ
编程风格 命令式(像 for 循环) 声明式(像 LINQ)
适用场景 已有 for/foreach 循环要并行化 已有 LINQ 查询要并行化
中断控制 Break() / Stop() / CancellationToken ️ 有限,主要靠 CancellationToken
线程本地状态 localInit / localFinally 原生支持 需要手动处理
结果顺序 需要预分配数组来保证 AsOrdered() 保证,但有代价
代码改动量 较大(改写循环) 极小(加一行 AsParallel()

没有哪个更好,只有哪个更适合你当前的场景。


Part 1:Parallel.For / ForEach

1.1 最简单的用法

如果你有一个普通的 for 循环,里面每次迭代都互相独立,直接换成 Parallel.For

// 普通 for 循环
for (int i = 0; i < data.Length; i++)
{
	results[i] = Math.Sqrt(data[i]);
}

// 并行版本:只需换一下方法名
Parallel.For(0, data.Length, i =>
{
	results[i] = Math.Sqrt(data[i]);
	// 注意:每次迭代在不同线程上运行
});

就这么简单。Parallel.For 会自动把循环范围分成若干个分区,分配到线程池的不同线程上并行执行。

Parallel.ForEach 也是一样的道理,只不过接收的是集合而不是范围:

var urls = new[] { "page1", "page2", "page3", "page4", "page5" };
var results = new ConcurrentBag<string>();

Parallel.ForEach(urls, url =>
{
	// 每个 url 在独立线程上处理
	string result = Process(url);
	results.Add(result); // 注意:要用线程安全的集合!
});

这里有一个点要特别提醒:结果要写入 ConcurrentBag<T>ConcurrentDictionary<K,V> 等线程安全集合,千万不要直接写 List<T>——这是新手最容易踩的坑之一,后面的陷阱章节会详细演示。

1.2 并行循环的中断与退出

并行循环里的”中断”比普通 break 复杂一点,因为你根本不知道当前是哪个线程在运行哪个迭代。Parallel 提供了 ParallelLoopState 来处理这个问题:

Break():找到了,不需要再往后找了

Parallel.For(0, items.Length, (i, state) =>
{
	if (items[i].IsTarget)
	{
		state.Break(); // 通知:i 之后的迭代不需要了
		return;
	}
	Process(items[i]);
});

Break() 的语义是:“比我索引更大的迭代不需要了,但比我小的还要保证执行完”。这很适合”在有序集合里找第一个满足条件的元素”这类场景。

Stop():立刻叫停所有人

Parallel.For(0, items.Length, (i, state) =>
{
	if (state.IsStopped) return; // 先检查,再干活

	if (发现致命错误)
	{
		state.Stop(); // 通知所有线程停下来
		return;
	}
	Process(items[i]);
});

Stop() 更激进,它不保证任何顺序语义,就是”立刻停,能停多快停多快”。适合”发现问题,全部放弃”的场景。

通过 CancellationToken 从外部取消

这个是最推荐的做法,和我们在第06章讲的取消令牌机制完全一致:

var cts = new CancellationTokenSource();
var options = new ParallelOptions
{
	CancellationToken = cts.Token,
	MaxDegreeOfParallelism = 4  // 顺便限制并发度
};

// 从外部取消
cts.CancelAfter(TimeSpan.FromSeconds(5));

try
{
	Parallel.For(0, 10000, options, i =>
	{
		options.CancellationToken.ThrowIfCancellationRequested();
		HeavyWork(i);
	});
}
catch (OperationCanceledException)
{
	Console.WriteLine("循环已取消");
}

复习提示:不熟悉 CancellationToken 的同学,建议先看第06章,里面把取消机制讲得很透彻。

1.3 线程本地变量:避免锁争用的关键

这是很多人不知道的技巧,也是 Parallel.For 最强大的特性之一。

问题背景:假设你要并行计算 1000 万个数的总和。最直觉的写法:

//  错误:直接写共享变量,严重的数据竞争
long totalSum = 0;
Parallel.For(0, 10_000_000, i =>
{
	totalSum += (i % 100); // 多线程同时读写,结果不可预测!
});

totalSum += value 看起来是一行代码,实际上是三步:读 → 加 → 写。多线程同时执行这三步,会互相覆盖,结果每次都不一样。

正确做法是用线程本地变量:让每个线程维护自己的局部累计值,最后才合并:

//  正确:线程本地变量 + 最后 Interlocked 合并
long totalSum = 0;

Parallel.For(
	fromInclusive: 0,
	toExclusive: 10_000_000,
	localInit: () => 0L,           // 每个线程初始化自己的局部值
	body: (i, state, localSum) =>
	{
		return localSum + (i % 100); // 纯本地计算,不需要任何锁
	},
	localFinally: localSum =>
	{
		// 所有线程都结束后,才用原子操作合并一次
		// 锁争用从"千万次"降低到"线程数次"(通常是几十次)
		Interlocked.Add(ref totalSum, localSum);
	}
);

这个技巧的核心思想:把高频的竞争操作变成低频的合并操作。内循环完全无锁,只有最后合并时才有少量竞争。


Part 2:PLINQ —— 一行代码开启并行

2.1 .AsParallel():最低成本的并行化

如果你已经有一段 LINQ 查询,最懒(也是最安全)的并行化方式就是加一行 .AsParallel()

// 原始顺序查询
var primes = numbers
	.Where(IsPrime)
	.ToList();

// 并行版本:只加一行
var primes = numbers
	.AsParallel()       // ← 就这一行
	.Where(IsPrime)
	.ToList();

PLINQ 会自动把 numbers 分成若干个分区,每个核心处理一个分区,最后合并结果。代码几乎不变,性能可能翻倍。

实测在 1000 万个素数筛查中,8 核机器上 PLINQ 比顺序 LINQ 快 5-6 倍。当然,前提是数据量够大,计算够重——这个后面会细说。

2.2 WithDegreeOfParallelism:别把所有核心都占满

默认情况下,PLINQ 会使用所有 CPU 核心(Environment.ProcessorCount)。在本地开发环境这没问题,但在生产服务器上,你的 Web API 也在同一台机器上跑,如果一个后台任务把所有核心都占了,其他请求就饿着了。

int safeDegree = Math.Max(1, Environment.ProcessorCount / 2);

var result = data
	.AsParallel()
	.WithDegreeOfParallelism(safeDegree)  // 最多用一半核心
	.Where(HeavyComputation)
	.ToList();

这是生产环境中很重要的一个习惯:给系统的其他部分留呼吸空间

2.3 AsOrdered():保持原始顺序

PLINQ 默认不保证结果顺序。分区并行处理完之后,谁先好谁先输出,顺序是随机的。

如果你需要结果保持和输入一样的顺序:

var ordered = numbers
	.AsParallel()
	.AsOrdered()              // 保持原始顺序
	.Where(x => x % 2 == 0)
	.ToList();
// 结果一定是 2, 4, 6, 8, ...(不管哪个线程先完成)

代价提示AsOrdered() 会让 PLINQ 内部维护一个顺序合并的开销,性能比无序版本差一些。如果你只是想”并行处理完之后自己排序”,不如最后加一个 .OrderBy(),语义更清晰,有时候性能更好。

2.4 ForAll():并行副作用,省去合并开销

普通 LINQ 最后往往需要 .ToList() 把结果收集起来。但如果你只需要对每个元素执行一个副作用(写库、发消息、更新缓存),不需要收集结果呢?

ForAll() 就是为这个场景设计的:

// 不需要结果集合,直接并行执行副作用
processedOrders
	.AsParallel()
	.Where(o => o.Amount > 1000m)
	.ForAll(order =>
	{
		// ️ 这里是并行的!写共享资源必须线程安全
		_database.Insert(order);
	});

ForAll 省去了结果合并的步骤,比 .ToList() 之后再 foreach 效率更高。

2.5 PLINQ 的异常处理

PLINQ 的异常处理方式和我们在第07章讲的 Task.WhenAll 完全一样——所有并行发生的异常都会被打包成 AggregateException

try
{
	var results = data
		.AsParallel()
		.Select(x =>
		{
			if (x < 0) throw new ArgumentException($"负数: {x}");
			return Math.Sqrt(x);
		})
		.ToList();
}
catch (AggregateException ae)
{
	// 多个线程可能同时抛异常,全都在 InnerExceptions 里
	foreach (var ex in ae.InnerExceptions)
		Console.WriteLine(ex.Message);
}

复习提示AggregateException 的拆解技巧在第07章有详细讲解,包括 Flatten()Handle() 等方法,可以回去翻翻。


️ Part 3:何时使用并行?

这可能是本章最重要的部分。很多人学了 Parallel 和 PLINQ 之后,恨不得所有循环都加上 .AsParallel(),结果发现代码反而变慢了,然后开始怀疑人生。

3.1 并行的三个必要条件

满足以下三个条件,才值得并行:

① CPU 密集型

并行能加速的是计算,不是等待。如果你的循环主要在等网络、等磁盘,加 .AsParallel() 只是白白消耗线程,IO 该多慢还是多慢。

 适合并行:图像处理、数学计算、加密解密、数据压缩、AI 推理
 不适合:HTTP 请求、数据库查询、文件读写 → 用 async/await

② 任务相互独立

每次迭代之间不能有依赖,不能”第 i 次的结果依赖第 i-1 次”。否则并行会导致数据不一致。

③ 数据量/计算量足够大

创建线程分区、调度、合并结果——这些都是有开销的。数据量太小时,这个开销比计算本身还大,并行反而更慢:

1,000 个元素    → 顺序可能更快(并行开销占主导)
100,000 个元素  → 开始持平
1,000,000+ 个元素 → 并行优势明显

当然这不是绝对的,还要看单个元素的计算量。如果每个元素要跑 100ms 的计算,哪怕只有 10 个元素,并行也划算。

3.2 决策树

你的任务是否适合并行化?

┌─ 是 CPU 密集型吗?
│   ├── 否 → 用 async/await + Task.WhenAll(第02-04章)
│   └── 是 ↓
├─ 每个元素的计算相互独立吗?
│   ├── 否(需要共享状态)→ 考虑重构,或用线程安全结构
│   └── 是 ↓
├─ 数据量足够大或单元素计算足够重?
│   ├── 否 → 顺序执行更快(并行开销大于收益)
│   └── 是 ↓
└─  适合并行!选 Parallel.For/ForEach 或 PLINQ

3.3 典型适用场景

场景 推荐方案 原因
图像/视频像素处理 Parallel.For 每像素独立,数据量极大
海量数据统计分析 PLINQ 已有 LINQ,改动最小
批量加密/哈希计算 Parallel.ForEach CPU 密集,无共享状态
矩阵运算 Parallel.For 需要精确控制索引
并行搜索(找到即停) Parallel.For + Break() 需要中断控制
HTTP 批量请求 Task.WhenAll IO 密集,不用 Parallel

Part 4:常见陷阱

陷阱 1:向 List<T> 并行写入

//  危险!List<T> 不是线程安全的
var results = new List<int>();
data.AsParallel().ForAll(x => results.Add(x)); // 可能丢数据,可能抛异常

//  方案一:用 ConcurrentBag
var safeBag = new ConcurrentBag<int>();
data.AsParallel().ForAll(safeBag.Add);

//  方案二(最简单):让 PLINQ 内部合并
var safeList = data.AsParallel().Where(条件).ToList();

List<T>.Add 内部在容量不够时会重新分配数组,多线程同时触发这个操作,结果不可预测——有时丢数据,有时直接抛 IndexOutOfRangeException

陷阱 2:在 Parallel 里做 IO 操作

//  错误:Parallel + 阻塞 IO = 线程池耗尽
Parallel.ForEach(urls, url =>
{
	var html = new HttpClient().GetStringAsync(url).Result; // 阻塞!
	Process(html);
});

//  正确:async/await + Task.WhenAll
var tasks = urls.Select(url => FetchAndProcessAsync(url));
await Task.WhenAll(tasks);

Parallel.ForEach 里用 .Result 阻塞异步操作,会消耗线程池线程等待 IO——这和第08章讲的”异步转同步死锁”是同一类问题。IO 密集型任务永远应该选 async/await

陷阱 3:PLINQ 默认并行度在服务器上可能”太贪”

// ️ 在 ASP.NET Core 服务中直接用 PLINQ(并发请求多时有风险)
var result = data.AsParallel().Where(HeavyWork).ToList();
// 多个并发请求每个都想占满 CPU,互相打架

//  限制并行度,给服务器留余量
var result = data
	.AsParallel()
	.WithDegreeOfParallelism(Math.Max(1, Environment.ProcessorCount / 2))
	.Where(HeavyWork)
	.ToList();

陷阱 4:PLINQ 查询含有副作用

int counter = 0;

//  Select 里修改共享变量,数据竞争
var results = data.AsParallel()
	.Select(x => { counter++; return x * 2; }) // counter 每次结果不同
	.ToList();

//  查询本身保持纯函数,副作用用原子操作
var results = data.AsParallel()
	.Select(x => x * 2)
	.ToList();
Interlocked.Add(ref counter, results.Count); // 统一原子更新

原则:PLINQ 的 Where / Select 保持纯函数(无副作用),副作用只在 ForAll 中做,且必须线程安全。


Part 5:性能速查表

数据量 vs 并行收益(8核机器,素数判断为例)

数据量 顺序 PLINQ 是否划算
1,000 ~1ms ~5ms 不划算
100,000 ~50ms ~20ms 开始划算
1,000,000 ~500ms ~80ms 很划算
10,000,000 ~5000ms ~700ms 强烈推荐

以上数据为示意,实际值因机器配置、具体计算类型而差异较大

方案选择速查

有 for/foreach,需要中断/线程本地状态  → Parallel.For / ForEach
有 LINQ 查询,改动越小越好            → .AsParallel()
有 LINQ 查询,需要保证顺序            → .AsParallel().AsOrdered()
并行执行副作用,不需要收集结果         → .ForAll()
IO 密集型任务                        → async/await + Task.WhenAll(不要用 Parallel!)
数据量 < 10万,计算不重               → 顺序执行就够了,别过度优化

🧪 动手实验

示例代码在 Parallel-PLinq/ 项目中,运行方式:

dotnet run --project Parallel-PLinq

本章代码覆盖了以下场景:

文件 说明
ParallelBasic/ParallelForDemo.cs Parallel.For / ForEach 基本用法、线程本地变量
ParallelBasic/ParallelBreakDemo.cs Break、Stop、CancellationToken 取消演示
PLinqBasic/PLinqDemo.cs AsParallel、WithDegreeOfParallelism、AsOrdered、ForAll、异常处理
WhenToUse/WhenToUseParallelDemo.cs 决策树、图像处理、订单数据统计实战案例
Performance/PerformanceComparisonDemo.cs 不同数据量的加速比对比,Parallel.For vs PLINQ 对比
Pitfalls/CommonPitfallsDemo.cs 四大陷阱演示与修复方案

建议按顺序跑一遍,重点关注 PerformanceComparisonDemo 里不同数据量的耗时对比——亲眼看着”100个元素并行比顺序还慢”,比看一百遍文字都印象深刻。


本章小结

  • Parallel.For / ForEach:命令式并行,适合已有循环的改造,支持 BreakStopCancellationToken 和线程本地变量
  • PLINQ:声明式并行,适合已有 LINQ 查询的改造,一行 .AsParallel() 起步
  • 适用条件(缺一不可):① CPU 密集型 ② 任务独立 ③ 数据量/计算量足够大
  • 千万别用 Parallel 处理 IO —— IO 密集型用 async/await(这是这章最重要的一句话)
  • 生产环境记得限制并行度WithDegreeOfParallelism 给其他服务留 CPU

文章摘自:https://www.cnblogs.com/diamondhusky/p/20144055