【.NET并发编程 – 12】并发集合与线程安全类型

12. 并发集合与线程安全类型

本章 GitHub 仓库csharp-concurrency-cookbook

欢迎 Star 和 Fork!所有代码示例都可以在仓库中找到并运行。


本章导读

本文目标:搞清楚 System.Collections.Concurrent 命名空间里那些并发集合的正确用法和内部原理,了解 Channel<T> 这个现代生产者-消费者利器,以及不可变集合(ImmutableCollections)的线程安全之道。

上一篇中,我们把 C# 里的各种锁摸了个遍——从 lockSemaphoreSlim,从 SpinLockAsyncLock。但是,或许有人问这个问题:

“我只是想在多线程里维护一个字典 / 队列,难道每次我都要自己加锁吗?

答案当然是:不用。.NET 早就替你把这部分活儿干了,这就是今天要聊的——并发集合

不过,并发集合不是银弹。它们各自有适合的场景,也有很多初学者踩过的坑。今天我们就系统地把这张牌打开,让你对每一张都心里有数。


🤔 先想想:为什么普通 List/Dictionary 不是线程安全的?

这是个好问题,很多人用了很久并发集合,却从没认真想过这一层。

List<T>.Add 为例

我们来看看 List<T> 内部大概长什么样(简化版):

// List<T> 内部核心字段(简化)
private T[] _items;    // 底层数组
private int _size;     // 当前元素个数

public void Add(T item)
{
	// 第一步:容量不够时扩容(翻倍策略,创建新数组并复制旧数据)
	if (_size == _items.Length)
		EnsureCapacity();

	// 第二步:写入数据
	_items[_size] = item;

	// 第三步:更新计数
	_size++;
}

现在想象两个线程同时调用 Add_size 当前是 5

线程A:读取 _size = 5
线程B:读取 _size = 5
线程A:_items[5] = "A"
线程B:_items[5] = "B"  ← 覆盖了线程A写的!
线程A:_size = 6
线程B:_size = 6        ← 两次Add,_size只增加了1,数据丢失!

这就是经典的 竞态条件(Race Condition)。更糟糕的是,当 EnsureCapacity 触发扩容时,线程A在复制旧数组,线程B在往旧数组写数据,结果可能是 IndexOutOfRangeException,也可能是数据静默丢失——而你完全不知道发生了什么。

Dictionary<K,V> 呢?

更麻烦。字典内部是哈希表,涉及到:

  1. 计算哈希值
  2. 找到 bucket(桶)
  3. 处理哈希冲突(链表/开放寻址)
  4. 扩容(rehash,整个数据结构重建)

任何一个步骤被中途打断,都可能导致:

  • 数据丢失
  • 哈希链表成环(死循环!.NET Framework 4.0 以前确实有这个 bug)
  • 抛出 InvalidOperationException(集合在枚举过程中被修改)

结论:普通集合的设计目标是单线程高性能,它们为了减少开销,完全没有考虑线程安全。


并发集合的线程安全是怎么实现的?

在进入具体集合之前,先聊聊实现原理,这会帮你更好地理解为什么要选这个而不是那个。

并发集合主要用了三种技术:

1. 细粒度锁(Fine-grained Locking)

不是”一把大锁锁住整个集合”,而是把集合分成若干段(Segment)或桶(Bucket),每段一把锁

ConcurrentDictionary 内部结构(简化):

  _buckets 数组(每个 bucket 是链表头,哈希冲突时向后追加节点):
  ┌───────────────┬───────────────┬───────────────┬───────────────┐
  │  Bucket[0]    │  Bucket[1]    │  Bucket[2]    │  Bucket[3]    │ ...
  │ keyA→keyE→…   │   keyB        │ keyC→keyF→…   │   keyD        │
  └───────────────┴───────────────┴───────────────┴───────────────┘
          │               │               │               │
          └───────┬───────┘               └───────┬───────┘
                  ↓                               ↓
              _locks[0]                        _locks[1]           ...

  映射规则:lock 索引 = bucketIndex % locks.Length
  → Bucket[0]、Bucket[1] 共用 _locks[0]
  → Bucket[2]、Bucket[3] 共用 _locks[1]

线程A 写 keyA(落在 Bucket[0])→ 只锁 _locks[0]
线程B 写 keyC(落在 Bucket[2])→ 只锁 _locks[1]
← 两个线程可以并行!读操作完全不加锁!

ConcurrentDictionary 内部默认用 concurrencyLevel(通常等于 CPU 核心数)个锁来分段保护,大幅减少锁竞争。

2. 无锁算法(Lock-free)+ CAS(Compare-And-Swap)

ConcurrentQueue<T> 就是典型的无锁实现。它依赖 CPU 提供的原子指令——Interlocked.CompareExchange(即 CAS)

// CAS 的语义(伪代码):
if (当前值 == 预期值)
{
	当前值 = 新值;
	return true;
}
else
{
	return false; // 有人抢先修改了,重试
}

这个操作在硬件层面是原子的,不需要操作系统内核参与,所以比锁快得多。ConcurrentQueue 用分段数组 + CAS 来管理队头队尾,入队/出队都不用加锁。

如果你对 CAS 和 Interlocked 感兴趣,后续在第14篇(低级内存模型)中会深入讲解。

3. 不可变 + 写时复制(Immutable + Copy-on-Write)

ImmutableList<T>ImmutableDictionary<K,V> 这类不可变集合天然线程安全——因为它们根本不允许修改。每次”修改”都返回一个新的集合对象,原来的对象保持不变,所有线程都可以安全地读取。


Part 1:并发集合大家族

1.1 ConcurrentDictionary<TKey, TValue>

这是最常用的并发集合,没有之一。如果你的多线程代码需要共享字典,第一个应该想到的就是它

基本用法

var dict = new ConcurrentDictionary<string, int>();

// 添加
dict.TryAdd("apple", 1);

// 获取(不存在时添加)
int count = dict.GetOrAdd("banana", 0);

// 更新(原子地读-改-写)
dict.AddOrUpdate(
	key: "apple",
	addValue: 1,
	updateValueFactory: (key, oldValue) => oldValue + 1
);

// 条件更新(仅当值等于预期值时才更新)
dict.TryUpdate("apple", newValue: 3, comparisonValue: 2);

// 删除
dict.TryRemove("banana", out int removed);

️ 最大的坑:非原子的复合操作

ConcurrentDictionary 的每个单独方法是原子的,但方法之间的组合不是

//  错误写法:这两步之间可能被其他线程打断
if (!dict.ContainsKey("key"))
{
	dict.TryAdd("key", 1);
}

//  正确写法:GetOrAdd 是原子操作
dict.GetOrAdd("key", 1);
//  错误写法:读-改-写不是原子的
var old = dict["key"];
dict["key"] = old + 1;

//  正确写法:AddOrUpdate 保证原子性
dict.AddOrUpdate("key", 1, (k, v) => v + 1);

和 Dictionary + lock 的性能对比

在高并发读多写少的场景下,ConcurrentDictionaryDictionary + lock 快很多。在 .NET Core 版本中,读操作是完全无锁的——内部通过 volatile 读取节点引用来保证可见性,不需要获取任何锁,多线程并发读可以完全并行。写操作才会用分段锁,且只锁住目标 bucket 对应的那一把锁。

.NET Framework 版本的 ConcurrentDictionary 读操作仍需短暂持锁,这是两个版本性能差距的主要来源之一。如果你在 Framework 项目里觉得 ConcurrentDictionary 读性能不够理想,这就是原因。

但在单线程场景下,ConcurrentDictionary 比普通 Dictionary 要慢一些(因为它有额外的原子操作开销)。所以:

规则:只在真正多线程共享的场景使用 ConcurrentDictionary,单线程就老老实实用 Dictionary

实际应用场景

// 场景一:缓存(多线程读,偶尔写)
var cache = new ConcurrentDictionary<string, UserInfo>();

// 场景二:统计计数(多线程并发递增)
var counters = new ConcurrentDictionary<string, long>();
counters.AddOrUpdate("pageView", 1, (k, v) => v + 1);

// 场景三:防重(同一Key只处理一次)
var processing = new ConcurrentDictionary<Guid, bool>();
if (processing.TryAdd(orderId, true))
{
	// 只有一个线程能进来
	await ProcessOrderAsync(orderId);
	processing.TryRemove(orderId, out _);
}

1.2 ConcurrentQueue<T> 和 ConcurrentStack<T>

这两个是无锁实现的队列和栈,非常适合”一端进、一端出”的场景。

ConcurrentQueue<T>:先进先出(FIFO)

var queue = new ConcurrentQueue<string>();

// 入队
queue.Enqueue("task1");
queue.Enqueue("task2");

// 出队(可能失败,因为队列可能是空的)
if (queue.TryDequeue(out string? item))
{
	Console.WriteLine($"处理: {item}");
}

// 只看不取(peek)
if (queue.TryPeek(out string? first))
{
	Console.WriteLine($"下一个是: {first}");
}

ConcurrentStack<T>:后进先出(LIFO)

var stack = new ConcurrentStack<int>();

// 入栈(支持批量)
stack.Push(1);
stack.PushRange([2, 3, 4]);

// 出栈
if (stack.TryPop(out int top))
{
	Console.WriteLine($"弹出: {top}"); // 4
}

// 批量出栈
int[] batch = new int[3];
int count = stack.TryPopRange(batch);

内部原理

ConcurrentQueue<T> 内部使用分段数组(Segmented Array)+ CAS 的设计。.NET Framework 版本用的是链表节点 + CAS——每个元素都是一个独立的链表节点,内存不连续,CPU 缓存命中率低。.NET Core 2.0 对此进行了重大重写,改为固定大小(32槽)的数组段,段之间用链表连接,这样同一个段内的元素在内存上是连续的,大幅提升了缓存局部性(Cache Locality),高并发吞吐量有明显改善:

ConcurrentQueue 内部结构(.NET Core / .NET 5+,简化):

  Segment1[32] → Segment2[32] → Segment3[32]
	 ↑                                  ↑
   _head                             _tail

每个 Segment 是固定 32 槽的数组,槽内元素内存连续,缓存友好
入队:向 _tail 所在 Segment 的下一槽写入,满了就 CAS 追加新 Segment
出队:从 _head 所在 Segment 读取,空了就 CAS 移动 _head 到下一 Segment

所有的 head/tail 指针移动都是用 Interlocked.CompareExchange 完成的,整个过程没有一把锁,这就是它在高并发下性能优异的原因。


1.3 ConcurrentBag<T>:无序并发包

ConcurrentBag<T> 是个很特殊的集合——它不保证顺序,读出来的元素顺序是不确定的。听起来有点奇怪,但它有一个非常适合的场景:“自产自销”模式,即添加和读取操作由同一个线程执行

内部优化:线程本地存储

ConcurrentBag<T> 内部为每个线程维护了一个本地双端链表(Thread-Local Deque)

线程A 的本地链表: [A1] ↔ [A2] ↔ [A3]   ← 线程A 从头部 Add / TryTake
线程B 的本地链表: [B1] ↔ [B2]            ← 线程B 从头部 Add / TryTake
线程C 的本地链表: [C1]

当线程B 本地列表为空时:从线程A 的链表【尾部】偷取 → work-stealing

当线程 A 向自己的本地链表 Add 时,绝大多数情况下只需要操作头部,开销极低。当线程 A TryTake 时,也优先从自己的头部取,无需跨线程竞争。只有当本地链表为空需要 work-stealing 时,才会对目标线程的链表加锁(锁的是尾部,而宿主线程操作头部,两端互不干扰,冲突概率很低)。

.NET Framework 版本的 ConcurrentBag 实现相对简单,work-stealing 的加锁粒度更粗。.NET Core 对其进行了重写,引入了双端链表设计,使得 work-stealing 对宿主线程的干扰降到最低。

var bag = new ConcurrentBag<int>();

// 最适合的场景:多个线程各自产生数据,各自消费
Parallel.For(0, 100, i =>
{
	bag.Add(i * i);      // 添加到本线程本地列表,无竞争
	bag.TryTake(out _);  // 先从本线程本地列表取,无竞争
});

️ 什么时候不该用 ConcurrentBag

//  生产者和消费者是不同线程 → ConcurrentBag 性能很差
// 应该用 ConcurrentQueue 或 Channel
var bag = new ConcurrentBag<WorkItem>();

// 生产线程
Task.Run(() => {
	while (true) bag.Add(GetNextWork()); // 存到线程1的本地列表
});

// 消费线程
Task.Run(() => {
	while (bag.TryTake(out var item)) // 线程2本地列表是空的,一直在"偷"
		Process(item);                 // 大量锁竞争!
});

1.4 BlockingCollection<T>:有边界的阻塞集合

BlockingCollection<T> 是一个包装器,可以把 ConcurrentQueueConcurrentStackConcurrentBag 包装成一个支持阻塞等待和容量限制的集合。

它是经典的生产者-消费者模式实现,在 Channel<T> 出现之前,这是最主流的做法。

// 创建一个容量上限为 100 的有界队列
var bc = new BlockingCollection<string>(boundedCapacity: 100);

// 生产者线程
var producer = Task.Run(() =>
{
	for (int i = 0; i < 1000; i++)
	{
		bc.Add($"item-{i}"); // 队列满了会阻塞,直到有位置
	}
	bc.CompleteAdding(); // ← 重要!告诉消费者"没有更多数据了"
});

// 消费者线程
var consumer = Task.Run(() =>
{
	// GetConsumingEnumerable:自动阻塞等待,直到 CompleteAdding 被调用且集合为空
	foreach (var item in bc.GetConsumingEnumerable())
	{
		Console.WriteLine($"消费: {item}");
	}
});

await Task.WhenAll(producer, consumer);

关键特性

特性 说明
有界限制 new BlockingCollection<T>(capacity) 限制最大容量
生产者阻塞 队列满时,Add 会阻塞(或用 TryAdd + 超时)
消费者阻塞 队列空时,Take 会阻塞(或用 TryTake + 超时)
优雅关闭 CompleteAdding() 通知消费者不会再有新数据
取消支持 Add/Take 接受 CancellationToken

BlockingCollection<T> 本质上是同步阻塞的。在 async/await 异步代码中,阻塞等待会浪费线程。如果你的代码是异步的,用下面讲的 Channel<T> 才是正确选择。


Part 2:Channel<T>——现代生产者-消费者的首选

Channel<T>(位于 System.Threading.Channels,.NET Core 3.0+ 内置)是专为异步生产者-消费者模式设计的,可以看作是 BlockingCollection 的异步升级版。

为什么要有 Channel<T>?

BlockingCollection<T> 虽好,但它的 Take() 是同步阻塞的——等待时会占用一个线程什么都不干。在高并发的异步系统里,这是一种浪费。

Channel<T> 解决的就是这个问题:等待时不阻塞线程,而是挂起异步操作(await),等有数据了再被唤醒。

创建 Channel

using System.Threading.Channels;

// 无界 Channel:生产者永远不会阻塞(但要注意内存)
var unbounded = Channel.CreateUnbounded<string>();

// 有界 Channel:超出容量时,行为可配置
var bounded = Channel.CreateBounded<string>(new BoundedChannelOptions(capacity: 100)
{
	// 队列满时的策略:
	// Wait = 等待(生产者挂起,默认)
	// DropWrite = 丢弃新写入
	// DropOldest = 丢弃最旧的
	FullMode = BoundedChannelFullMode.Wait,

	// 单生产者/单消费者模式可以开启,性能更好
	SingleWriter = false,
	SingleReader = false
});

基本用法

var channel = Channel.CreateBounded<WorkItem>(100);
var writer = channel.Writer;
var reader = channel.Reader;

// 生产者(异步写入)
Exception? fault = null;
try
{
	await writer.WriteAsync(new WorkItem("task1")); // 满了则等待
	writer.TryWrite(new WorkItem("task2"));          // 满了则返回 false,不等待
}
catch (Exception ex)
{
	fault = ex;
	throw;
}
finally
{
	//  无论正常还是异常退出,Complete 一定被调用,消费者不会永久挂起
	// fault=null 时等同于 Complete();有异常时把异常传递给消费者
	writer.TryComplete(fault);
}

// 消费者(异步读取)
try
{
	await foreach (var item in reader.ReadAllAsync()) // 自动等待,直到 Writer Complete
	{
		await ProcessAsync(item);
	}
}
catch (ChannelClosedException ex) when (ex.InnerException is not null)
{
	// 生产者通过 TryComplete(fault) 传来的异常,在此处理
}

多生产者、多消费者示例

var channel = Channel.CreateBounded<string>(50);

// 多个生产者并发写入
var producers = Enumerable.Range(0, 3).Select(i => Task.Run(async () =>
{
	for (int j = 0; j < 20; j++)
	{
		await channel.Writer.WriteAsync($"P{i}-item{j}");
		await Task.Delay(10);
	}
})).ToArray();

//  用 try/finally 协调多生产者的 Complete,不用 fire-and-forget 的 ContinueWith
// ContinueWith 的异常默认会被吸掉,且时序不可控;try/finally 明确保证任何情况下 Complete 一定被调用
_ = Task.Run(async () =>
{
	Exception? fault = null;
	try
	{
		await Task.WhenAll(producers);
	}
	catch (Exception ex)
	{
		fault = ex;
	}
	finally
	{
		channel.Writer.TryComplete(fault);
	}
});

// 多个消费者并发读取
var consumers = Enumerable.Range(0, 2).Select(i => Task.Run(async () =>
{
	await foreach (var item in channel.Reader.ReadAllAsync())
	{
		Console.WriteLine($"Consumer{i} 处理: {item}");
	}
})).ToArray();

await Task.WhenAll(consumers);

Channel vs BlockingCollection 选谁?

对比项 BlockingCollection<T> Channel<T>
等待方式 同步阻塞,占用线程 异步挂起,不占线程
适合场景 同步代码、旧项目 异步代码、新项目
背压(Backpressure) 有界阻塞 有界 + 多种策略
完成通知 CompleteAdding() Complete()
遍历方式 GetConsumingEnumerable() ReadAllAsync()
性能 较好 更好(异步友好)

如果你在写新的异步代码,优先选 Channel<T>,它是 .NET 对生产者-消费者模式最现代的答案。


🧊 Part 3:不可变集合(ImmutableCollections)

不可变集合在 System.Collections.Immutable 命名空间下(NuGet 包:System.Collections.Immutable,.NET 5+ 已内置)。

为什么不可变集合是线程安全的?

答案极其简单:因为它们根本不能被修改

线程安全问题的根源是多个线程对同一块内存进行竞争性的读写。如果一个对象创建后永远不会变,那么多少线程同时读都不会有问题——根本不存在”写”的那一方,竞争消失了。

var list = ImmutableList.Create(1, 2, 3);

// 所有"修改"操作都返回一个新对象,原来的 list 不变
var list2 = list.Add(4);          // list 还是 [1,2,3],list2 是 [1,2,3,4]
var list3 = list2.Remove(2);      // list2 不变,list3 是 [1,3,4]
var list4 = list.SetItem(0, 99);  // list4 是 [99,2,3],list 不变

Console.WriteLine(string.Join(",", list));  // 1,2,3(没变!)
Console.WriteLine(string.Join(",", list2)); // 1,2,3,4

常用不可变集合

// ImmutableList<T>
var immutableList = ImmutableList<string>.Empty
	.Add("a")
	.Add("b")
	.Add("c");

// ImmutableDictionary<TKey, TValue>
var immutableDict = ImmutableDictionary<string, int>.Empty
	.Add("one", 1)
	.Add("two", 2);
int val = immutableDict["one"]; // 读取,线程安全

// ImmutableHashSet<T>
var immutableSet = ImmutableHashSet.Create("a", "b", "c");

// ImmutableArray<T>(结构体,零额外内存开销)
var immutableArr = ImmutableArray.Create(1, 2, 3, 4, 5);

内部实现:结构共享(Structural Sharing)

你可能会担心:每次”修改”都创建新对象,内存开销不会很大吗?

不可变集合用了一个非常聪明的技术叫结构共享(Structural Sharing)。以 ImmutableList<T> 为例,它内部用的是AVL 平衡树(二叉搜索树),而不是数组:

原始列表 [1, 2, 3, 4, 5] 的树结构:

		 3
		/ \
	   2   5
	  /   /
	 1   4

执行 Add(6) 后,新叶子 6 的路径是 3 → 5 → 6,路径上的节点需要创建新副本(因为子指针变了),路径之外的节点直接共享:

		 3'         ← 新建(右子指针指向新的 5')
		/ \
	   2   5'        ← 新建(右子指针指向新的 6)
	  /   / \
	 1   4   6   ← 新建叶子

节点 1、2、4 继续被新旧两个版本共享,没有复制!

这就是为什么 ImmutableList 的操作是 O(log n) 而不是 O(n)——不需要复制整个列表。

适合哪些场景?

// 场景一:配置数据(初始化后不变,多线程读取)
public static readonly ImmutableDictionary<string, string> AppConfig = 
	ImmutableDictionary.Create<string, string>()
		.Add("host", "localhost")
		.Add("port", "5432");

// 场景二:多版本并发控制(MVCC)
private volatile ImmutableList<LogEntry> _logs = ImmutableList<LogEntry>.Empty;

public void AddLog(LogEntry entry)
{
	// 用 Interlocked 原子替换引用,实现无锁更新
	ImmutableList<LogEntry> original, updated;
	do
	{
		original = _logs;
		updated = original.Add(entry);
	} while (Interlocked.CompareExchange(ref _logs, updated, original) != original);
}

// 任何线程随时读取 _logs,都拿到一个完整快照,不会读到中间状态
public ImmutableList<LogEntry> GetLogs() => _logs;

️ ImmutableList vs ImmutableArray

对比项 ImmutableList<T> ImmutableArray<T>
内部结构 AVL 树 普通数组
随机访问 O(log n) O(1)
添加/删除 O(log n) O(n),需完整复制
内存 树节点有额外开销 零额外开销(结构体)
适合场景 频繁修改 一次创建,只读

综合性能对比

理论归理论,最终还是要在你的具体场景下测试。下面是一些经验性的对比参考:

Dictionary 相关

方案 并发写 并发读 混合读写
Dictionary + lock 低(全锁) 低(全锁)
Dictionary + ReaderWriterLockSlim 高(读并发) 中高
ConcurrentDictionary 高(分段锁) 极高(无锁读) 极高
ImmutableDictionary + atomic swap 极高(写慢,O(log n)) 极高(完全无锁) 视读写比

队列相关

方案 生产者等待 消费者等待 背压 异步友好
ConcurrentQueue 无阻塞 轮询(busy-wait)
BlockingCollection 同步阻塞 同步阻塞
Channel<T> 异步等待 异步等待 有(可配置)

️ 实战示例:用 Channel 构建日志管道

来一个完整的实战例子,把 Channel<T> 运用到实际的日志系统中:

// 见示例代码:ConcurrentCollections 项目

(完整代码见本章配套项目)


本章总结

集合 内部机制 适合场景
ConcurrentDictionary<K,V> 分段锁 + 部分无锁读 多线程共享字典、缓存、计数器
ConcurrentQueue<T> 无锁 CAS + 分段数组 线程安全 FIFO 队列,轻量生产消费
ConcurrentStack<T> 无锁 CAS + 链表 线程安全 LIFO 栈
ConcurrentBag<T> 线程本地列表 + work-stealing 同线程”自产自销”
BlockingCollection<T> 包装器(同步阻塞) 同步代码的有界生产消费
Channel<T> 异步信号量 + 无锁队列 异步代码的生产消费,首选
ImmutableXxx<T> 持久化数据结构(树/数组) 只读共享、配置、快照

一句话版本

  • 多线程共享字典 → ConcurrentDictionary
  • 异步生产消费 → Channel<T>
  • 同步生产消费 → BlockingCollection<T>
  • 自产自销 → ConcurrentBag<T>
  • 只读共享 → Immutable 系列
  • 不知道用啥 → 先翻第11篇复习锁,想清楚再选

快速参考卡片

┌─────────────────────────────────────────────────────────────────┐
│                  并发集合选择指南                                  │
├─────────────────────────────────────────────────────────────────┤
│  需要字典?                                                        │
│    └─ 多线程 → ConcurrentDictionary                               │
│    └─ 单线程 → Dictionary(更快)                                  │
│                                                                   │
│  需要队列/生产消费?                                                │
│    └─ 异步代码 → Channel<T>(首选)                                │
│    └─ 同步代码 → BlockingCollection<T>                            │
│    └─ 无需等待/无界 → ConcurrentQueue<T>                          │
│                                                                   │
│  需要栈? → ConcurrentStack<T>                                    │
│                                                                   │
│  同线程自产自销? → ConcurrentBag<T>                              │
│                                                                   │
│  只读共享/配置? → ImmutableXxx<T>                                │
└─────────────────────────────────────────────────────────────────┘

参考资源

文章摘自:https://www.cnblogs.com/diamondhusky/p/20234541