UE并发-生产者消费者模式

0.Overview

生产者消费者模型,在并发编程中经常会用到,Unreal Engine中也封装了相应的无锁数据结构:

  • TQueue - 一个无锁的不限制大小的队列,支持SPSC(单生产者单消费者)/MPSC(多生产者单消费者)两种模式。
  • TCircularQueue - 一个无锁环形队列,SPSC(单生产者单消费者)模式下线程安全。
  • TTripleBuffer - 一个无锁的三缓存实现,SPSC(单生产者单消费者)模式下线程安全(UE源码中没有用到,所以本文不涉及其用法,不过其思路在物理模块中有用到)。

另外,针对MPMC/SPMC(多消费者)模式,需要自行实现,示例代码中做了简单实现,供参考。本文主要包含:

  • 生产者消费者模型及其模式
  • UE中TQueueTCircularQueue的实现和用法
  • 一个双缓冲的简单实现,双缓冲主要用来优化队列读写锁定的开销,在服务器中一般用于IO线程和逻辑线程之间做数据交换。
  • 一个MPMC模式的简实现,用到锁和事件。

1.生产者消费者模式

生产者消费者问题(Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

Lockfree

一般根据生产者和消费者的数量,进行组合,可以分为下面几种,针对不同的情况,会有对应的优化策略:

  • SPSC(Single Producer & Single Consumer),对于单生产者单消费者,只有同步没用互斥,只用保证缓冲区满的时候,生产者不会继续向缓冲区放数据,缓冲区空的时候,消费者不会继续从缓冲区取数据,而不存在同时有两个生产者使用缓冲区资源,造成数据不一致的状态。

  • MPSC(Multiple Producer & Simple Consumer),对于多生产者单消费者来说,多生产者之间具有互斥关系,需要加锁。

  • SPMC(Single Producer & Multiple Consumer),类似MPMC模式。

  • MPMC(Multiple Producer & Multiple Consumer),对于多生产者多消费者问题,是一个同步+互斥问题,不仅需要生产者和消费者之间的同步协作,还需要实现对缓冲区资源的互斥访问。

2.UE中的并发数据结构

2.1.TQueue

TQueue实现了一个不限制大小的LockFree链表,节点数据以值形式存储。该数据结构可以用于两种模式:

  • EQueueMode::Spsc,单生产者单消费者模式。
  • EQueueMode::Mpsc,多生产者单消费者模式。

两种模式下都是线程安全的:

  • 在消费者线程中调用的Dequeue方法只操作Tail,不存在竞争
  • 在生产者线程中调用的Enqueue方法,在SPSC模式下,操作Head,不存在竞争(保证内存访问顺序一致即可,使用了Memory Barrier);在MPSC模式,多个生产者对Head的访问,通过原子操作CAS实现。

Lockfree

实现代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool Enqueue(const ItemType& Item)
{
        TNode* NewNode = new TNode(Item);
        if (NewNode == nullptr)
        {
            return false;
        }
        TNode* OldHead;
        if (Mode == EQueueMode::Mpsc)
        {
// CAS原子操作,设置节点指针和NextNode指针
            OldHead = (TNode*)FPlatformAtomics::InterlockedExchangePtr((void**)&Head, NewNode);
            TSAN_BEFORE(&OldHead->NextNode);
            FPlatformAtomics::InterlockedExchangePtr((void**)&OldHead->NextNode, NewNode);
        }
        else
        {
            OldHead = Head;
            Head = NewNode;
            TSAN_BEFORE(&OldHead->NextNode);
            FPlatformMisc::MemoryBarrier(); // 保证内存访问顺序一致
            OldHead->NextNode = NewNode;
        }
        return true;
 }

用法代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 自定义数据
struct FMyItem
{
    FMyItem(uint32 TheId = 0const FString& TheName = TEXT("Item"))
        : Id(TheId), Name(TheName)
    {
    }
    uint32 Id;
    FString Name;
};

// SPSC队列
TQueue<FMyItem, EQueueMode::Spsc> ItemsQueue;

void Test_Queue()
{
    // Single Producer
    Async(EAsyncExecution::Thread, []()
    {
        for (uint32 Id = 1; ; Id++)
        {
// 生产一个对象
            ItemsQueue.Enqueue(FMyItem(Id, "Item"));
            UE_LOG(LogTemp, Display, TEXT("Produce: %d,%s"), Id, TEXT("Item"));
        }
    });
    // Single Consumer
    Async(EAsyncExecution::Thread, []()
    {
        while (true)
        {
            if (!ItemsQueue.IsEmpty())
            {
// 消费一个对象
FMyItem Item;
                ItemsQueue.Dequeue(Item);
                UE_LOG(LogTemp, Display, TEXT("Consume: %d,%s"), Item.Id, *Item.Name);
            }
        }
    });
}

2.2.TCircularQueue/TCircularBuffer

TCircularQueue,一个基于环形数组实现的LockFree的FIFO的队列,只能用于SPSC模式下:

  • Enqueue,在队尾(Tail)添加一个对象,成功返回true;若队列已满(Capacity-1个元素)返回false,添加失败。
  • Dequeue,从队列的前面(Head)移除一个元素,成功返回true;若队列已经没有元素了,返回false。

Lockfree

用法代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template<uint32 Num>
void TestTCircularQueue()
{
    // 环形队列
    TCircularQueue<uint32> Queue{ 100 };
    std::atomic<bool> bStop{ false };
    // 一个生产者(TaskGraph线程)
    FGraphEventRef Task = FFunctionGraphTask::CreateAndDispatchWhenReady(
        [&bStop, &Queue]
        {
            while (!bStop)
            {
                Queue.Enqueue(0);
            }
        }
        );
    // 消费者(当前线程)
    uint32 It = 0;
    while (It != Num)
    {
        uint32 El;
        if (Queue.Dequeue(El))
        {
            ++It;
        }
    }
    bStop = true;
    // 等待任务结束
    Task->Wait(ENamedThreads::GameThread);
}

// 运行示例
TestTCircularQueue<10'000'000>();

3.MPMC的简单实现

对于多生产者多消费者问题,是一个同步+互斥问题,不仅需要生产者和消费者之间的同步协作,还需要实现对缓冲区资源的互斥访问。

简单实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
template <typename ItemType, int QueueSize>
class TQueueMPMC
{
public:
    TQueueMPMC()
    {
        ItemsCount = 0;
        Items.Reserve(QueueSize);
    }
    void Enqueue(const ItemType& Item)
    {
        // 队列以满,则阻塞于此
        if (ItemsCount == QueueSize)
        {
            UE_LOG(LogTemp, Display, TEXT("Enque-Waiting...."));
            FullEvent->Wait();
        }

        // 插入数据
        ItemsMutex.Lock();
        Items.Push(Item);
        ItemsCount = Items.Num();
        ItemsMutex.Unlock();

        // 通知Consumer有数据了
        if (ItemsCount >= 1)
        {
            EmptyEvent->Trigger();
        }
    }
    ItemType Dequeue()
    {
        // 若队列为空,阻塞于此,直到有数据
        if (ItemsCount == 0)
        {
            UE_LOG(LogTemp, Display, TEXT("Dequeue-Waiting...."));
            EmptyEvent->Wait();
        }

        // 弹出数据
        ItemType Item;
        ItemsMutex.Lock();
        if (Items.Num() > 0) Item = Items.Pop();
        ItemsCount = Items.Num();
        ItemsMutex.Unlock();

        // 通知生产者有空间了
        if (ItemsCount == (QueueSize - 1))
            FullEvent->Trigger();
        return Item;
    }
private:
    FEventRef FullEvent; // 满队列事件/条件变量
    FEventRef EmptyEvent; // 空队列事件/条件变量

    TAtomic<int> ItemsCount;
    FCriticalSection ItemsMutex; // Buffer Mutex
    TArray<ItemType> Items; // Single Buffer
};

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
struct MPMCTest
{
// 支持多生产者多消费者模式的队列
    static TQueueMPMC<int10> QueueMPMC;

// 生产者
    static void Producer(int ItemID = 0)
    {
        int Item = ItemID;
        while (true)
        {
            // 生产一个对象
            Item++;
            // 放入队列,若队列已满,等待被消费
            QueueMPMC.Enqueue(Item);
        }
    }

// 消费者
    static void Consumer()
    {
        uint32 CurrentThreadId = FPlatformTLS::GetCurrentThreadId();
        FString CurrentThreadName = FThreadManager::Get().GetThreadName(CurrentThreadId);
        while (true)
        {
// 拿出一个对象,若队列为空,等待生产者
            int Item = QueueMPMC.Dequeue();
            // 消费对象
            UE_LOG(LogTemp, Display, TEXT("Consumer@%s, Item=%d"), *CurrentThreadName, Item);
        }
    }
};

TQueueMPMC<int10> MPMCTest::QueueMPMC;

void Test_MPMC()
{
    const int NumProducer = 5;
    const int NumConsumer = 3;

    // Multiple Producer
    for (int i = 0; i < NumProducer; ++i)
    {
        Async(EAsyncExecution::Thread, [i]()
        {
            MPMCTest::Producer(i * 1000000);
        });
    }

    // Multiple Consumer
    for (int i = 0; i < NumConsumer; ++i)
    {
        Async(EAsyncExecution::Thread, &MPMCTest::Consumer);
    }
}

4.双缓冲技术

在服务器开发中 通常的做法是把逻辑处理线程和IO处理线程分离。 I/0处理线程负责网络数据的发送和接收,连接的建立和维护。 逻辑处理线程处理从IO线程接收到的包。

通常逻辑处理线程和IO处理线程是通过数据队列来交换数据,就是生产者–消费者模型。这个数据队列是多个线程在共享,每次访问都需要加锁,因此如何减少互斥/同步的开销就显得尤为重要。可以通过双缓冲队列来优化这种场景。

所谓双缓冲数据就是两个队列:

  • 一个负责从里写入数据
  • 一个负责读取数据。

当逻辑线程读完数据后负责将自己的队列和IO线程的队列进行交换。这种操作需要在两个地方加锁:

  • IO线程向队列写入数据。
  • 两个队列进行交换时。

简单实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template <typename ItemType>
class TDoubleBuffer
{
public:
    TDoubleBuffer(uint32 Capacity = (uint32)-1)
        : MaxCapacity(Capacity)
    {
        WriteBuffer = new TArray<ItemType>();
        ReadBuffer = new TArray<ItemType>();
    }
    ~TDoubleBuffer()
    {
        delete WriteBuffer;
        delete ReadBuffer;
    }
// 生产者调用
    bool Enqueue(const ItemType& Item)
    {
// 写入时加锁
        FScopeLock Lock(&SwapMutex);
        if ((uint32)WriteBuffer->Num() > MaxCapacity)
            return false;
        WriteBuffer->Push(Item);
        return true;
    }

// 消费者调用
    bool Dequeue(ItemType& Item)
    {
        if (ReadBuffer->Num() == 0)
        {
            // 交换时加锁
            FScopeLock Lock(&SwapMutex);
            Swap(WriteBuffer, ReadBuffer);
            if (ReadBuffer->Num() == 0)
                return false;
        }
        if (ReadBuffer->Num() > 0)
        {
            Item = ReadBuffer->Pop();
            return true;
        }
        return false;
    }
private:
    uint32 MaxCapacity;
    FCriticalSection SwapMutex; // 交换操作锁
    TArray<ItemType>* WriteBuffer; // 写Buffer
    TArray<ItemType>* ReadBuffer; // 读Buffer
};

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void Test_DoubleBuffer()
{
    TDoubleBuffer<uint32> DoubleBuffer;
    std::atomic<bool> bStop{false};

    // 生产者线程
    auto IOThread = Async(EAsyncExecution::Thread, [&bStop, &DoubleBuffer]()
    {
        FRandomStream Rand;
        Rand.GenerateNewSeed();
        while (!bStop)
        {
            // 生产一个对象
            DoubleBuffer.Enqueue(Rand.GetUnsignedInt());
        }
    });

    // 消费者线程
    const uint32 MaxConsuming = 100000;
    for (uint32 i = 0; i < MaxConsuming;)
    {
        uint32 Item;
        if (DoubleBuffer.Dequeue(Item))
        {
            ++i;
            UE_LOG(LogTemp, Display, TEXT("Consumer %u, Item=%u"), i, Item);
        }
    }
    bStop = true;
    IOThread.Wait();
    
    UE_LOG(LogTemp, Display, TEXT("OVer...."));
}

5.参考资料

  • https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
  • Engine/Source/Runtime/Core/Public/Containers/Queue.h
  • Engine/Source/Runtime/Core/Public/Containers/CircularQueue.h
  • Engine/Source/Runtime/Core/Public/Containers/TripleBuffer.h
  • Engine/Source/Runtime/Experimental/Chaos/Public/Framework/TripleBufferedData.h
David++