0.Overview
生产者消费者模型,在并发编程中经常会用到,Unreal Engine中也封装了相应的无锁数据结构:
TQueue
- 一个无锁的不限制大小的队列,支持SPSC(单生产者单消费者)/MPSC(多生产者单消费者)两种模式。TCircularQueue
- 一个无锁环形队列,SPSC(单生产者单消费者)模式下线程安全。TTripleBuffer
- 一个无锁的三缓存实现,SPSC(单生产者单消费者)模式下线程安全(UE源码中没有用到,所以本文不涉及其用法,不过其思路在物理模块中有用到)。
另外,针对MPMC/SPMC(多消费者)模式,需要自行实现,示例代码中做了简单实现,供参考。本文主要包含:
- 生产者消费者模型及其模式
- UE中
TQueue
和TCircularQueue
的实现和用法 - 一个双缓冲的简单实现,双缓冲主要用来优化队列读写锁定的开销,在服务器中一般用于IO线程和逻辑线程之间做数据交换。
- 一个MPMC模式的简实现,用到锁和事件。
1.生产者消费者模式
生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
一般根据生产者和消费者的数量,进行组合,可以分为下面几种,针对不同的情况,会有对应的优化策略:
SPSC(Single Producer & Single Consumer),对于单生产者单消费者,只有同步没用互斥,只用保证缓冲区满的时候,生产者不会继续向缓冲区放数据,缓冲区空的时候,消费者不会继续从缓冲区取数据,而不存在同时有两个生产者使用缓冲区资源,造成数据不一致的状态。
MPSC(Multiple Producer & Simple Consumer),对于多生产者单消费者来说,多生产者之间具有互斥关系,需要加锁。
SPMC(Single Producer & Multiple Consumer),类似MPMC模式。
MPMC(Multiple Producer & Multiple Consumer),对于多生产者多消费者问题,是一个同步+互斥问题,不仅需要生产者和消费者之间的同步协作,还需要实现对缓冲区资源的互斥访问。
2.UE中的并发数据结构
2.1.TQueue
TQueue实现了一个不限制大小的LockFree链表,节点数据以值形式存储。该数据结构可以用于两种模式:
EQueueMode::Spsc
,单生产者单消费者模式。EQueueMode::Mpsc
,多生产者单消费者模式。
两种模式下都是线程安全的:
- 在消费者线程中调用的
Dequeue
方法只操作Tail,不存在竞争 - 在生产者线程中调用的
Enqueue
方法,在SPSC模式下,操作Head,不存在竞争(保证内存访问顺序一致即可,使用了Memory Barrier);在MPSC模式,多个生产者对Head的访问,通过原子操作CAS实现。
实现代码片段:
1 | bool Enqueue(const ItemType& Item) |
用法代码片段:
1 | // 自定义数据 |
2.2.TCircularQueue/TCircularBuffer
TCircularQueue
,一个基于环形数组实现的LockFree的FIFO的队列,只能用于SPSC模式下:
Enqueue
,在队尾(Tail)添加一个对象,成功返回true;若队列已满(Capacity-1个元素)返回false,添加失败。Dequeue
,从队列的前面(Head)移除一个元素,成功返回true;若队列已经没有元素了,返回false。
用法代码片段:
1 | template<uint32 Num> |
3.MPMC的简单实现
对于多生产者多消费者问题,是一个同步+互斥问题,不仅需要生产者和消费者之间的同步协作,还需要实现对缓冲区资源的互斥访问。
简单实现如下:
1 | template <typename ItemType, int QueueSize> |
测试代码如下:
1 | struct MPMCTest |
4.双缓冲技术
在服务器开发中 通常的做法是把逻辑处理线程和IO处理线程分离。 I/0处理线程负责网络数据的发送和接收,连接的建立和维护。 逻辑处理线程处理从IO线程接收到的包。
通常逻辑处理线程和IO处理线程是通过数据队列来交换数据,就是生产者–消费者模型。这个数据队列是多个线程在共享,每次访问都需要加锁,因此如何减少互斥/同步的开销就显得尤为重要。可以通过双缓冲队列来优化这种场景。
所谓双缓冲数据就是两个队列:
- 一个负责从里写入数据
- 一个负责读取数据。
当逻辑线程读完数据后负责将自己的队列和IO线程的队列进行交换。这种操作需要在两个地方加锁:
- IO线程向队列写入数据。
- 两个队列进行交换时。
简单实现如下:
1 | template <typename ItemType> |
测试代码如下:
1 | void Test_DoubleBuffer() |
5.参考资料
- https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
Engine/Source/Runtime/Core/Public/Containers/Queue.h
Engine/Source/Runtime/Core/Public/Containers/CircularQueue.h
Engine/Source/Runtime/Core/Public/Containers/TripleBuffer.h
Engine/Source/Runtime/Experimental/Chaos/Public/Framework/TripleBufferedData.h