TaskGraph是UE中基于任务的并发机制。可以创建任务在指定类型的线程中执行,同时提供了等待机制,其强大之处在于可以调度一系列有依赖关系的任务,这些任务组成了一个有向无环的任务网络(DAG),并且任务的执行可以分布在不同的线程中。
TaskGraph支持两种类型的线程:
- 一种是由TaskGraph系统后台创建的线程,称之为AnyThread。
- 另一种是外部线程,包括系统线程(比如:主线程)或者其他基于
FRunnableThread
创建的线程,初始化的时候需要Attach到TaskGraph系统,此类线程成为NamedThread。
1.TaskGraph简介
1.1.AnyThread
AnyThread是由TaskGraph系统创建的后台线程,会持续地从相应优先级队列中拿任务执行,线程的个数由当前运行的系统及CPU核心数决定的。对于AnyThread,有优先级和线程集合的概念。
线程优先级(Thread Priority):
对于AnyThread,TaskGraph系统在初始化时,会根据需要创建多个优先级的线程(线程名 - 描述,线程类型标记,系统线程优先级):
- TaskGraphThreadNP X - 正常优先级,
NormalThreadPriority
,TPri_BelowNormal
- TaskGraphThreadHP X - 高优先级,
HighThreadPriority
,TPri_SlightlyBelowNormal
- TaskGraphThreadBP X - 低优先级,
BackgroundThreadPriority
,TPri_Lowest
线程集(Thread Set):
- 一组多个优先级的线程,称为为ThreadSet
- 至少1个,最多3个(由
CREATE_HIPRI_TASK_THREADS
和CREATE_BACKGROUND_TASK_THREADS
决定是否创建高/低优先级线程)
1.2.NamedThread
NamedThread是外部创建的线程,该类型初始化时,可以通过Attach操作,设置TLS指向相应的Worker对象:
1 | // 绑定主线程(GameThread) |
目前支持的NamedThread有:
- StatsThread - 统计性能线程,
FStatsThread
- RHIThread - 渲染硬件接口层线程,
FRHIThread
- AudioThread - 音频线程,
FAudioThread
- GameThread - 游戏逻辑线程,主线程
- RenderThread - 渲染线程,
FRenderingThread
NameThread的支持两个任务队列(由QueueIndex指定):
1 | FThreadTaskQueue Queues[ENamedThreads::NumQueues]; |
MainQueue
- 对应Queues[0]
(默认)LocalQueue
- 对应Queues[1]
1.3.Task Priority
AnyThread和NamedThread都支持两种优先级的任务:
- 正常优先级 -
NormalTaskPriority
,对应的FStallingTaskQueue
中的PriorityQueues[0]
- 高优先级 -
HighTaskPriority
,对应的FStallingTaskQueue
中的PriorityQueues[
]`
AnyThread和NamedThread都类似的任务队列,使用的无锁优先级队列,该队列优先Pop出高优先级任务,具体分析参见之前文章:《原子操作及其在TaskGraph中的应用》:1
FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> StallQueue;
1.4.ThreadAndIndex
TaskGraph中许多接口需要指定线程类型,比如:
1 | // 要在该类型线程中执行 |
为了降低参数的个数,UE把一些标记也融合进了线程类型变量,比如:
1 | // NamedThread中的LocalQueue |
同时也提供了帮助函数:
1 | // 计算线程类型:NameThread类型或者AnyThread |
1.5.Simple Examples
一个简单示例感受下:
1 | inline void Test_GraphTask_Simple1() |
2.TaskGraph的实现
2.1.TaskGraph的类结构
接口层核心类:
FTaskGraphInterface
- TaskGraph的接口类,可以通过FTaskGraphInterface::Get()
来访问。FBaseGraphTask
- 任务基类,在线程中执行时会调用ExecuteTask
。FGraphEvent
- 后续任务的集合(SubseuentList),依赖的任务完成后,这些后续任务才会被放入TaskGraph的任务队列进行执行,Graph Event的生命周期由引用计数控制。一般用FGraphEventRef
代表一个任务事件,FGraphEventArray
代表一组任务事件。TGraphTask
- 基于模板的实现,内置一个用户自定义任务,该任务类必须满足下面的约束:
1 | class FGenericTask |
实现层:
FTaskGraphImplementation
- TaskGraph系统的实现类,下面会详细介绍。FWorkerThread
- TaskGraph包含多个FWorkerThread
对象,该结构有下面几个变量:RunnableThread
- 线程对象。- AnyThread时创建一个线程
- NamedThread时为Null
TaskGraphWorker
- Woker对象,负责调度和执行任务。- AnyThread时指向
FTaskThreadAnyThread
对象 - NamedThread时指向
FNamedTaskThread
对象
- AnyThread时指向
2.2.TaskGraph的实现细节
其中:
WokerQueue:Woker线程队列。分两部分:
- NamedThread,其数量为
NumNamedThread
- AnyThread,其数量为:ThreadSet的大小 x ThreadSet的数量
- NamedThread,其数量为
AnyThread:
- 放入任务:根据线程优先级和任务优先级,把任务放进相应的队列
- 执行任务:每个AnyThread对应的线程,会一直从
IncommingAnyThreadTasks[Priority]
中拿任务执行,空闲则挂起(无锁、可挂起、优先级队列)。
NamedThread:
- 放入任务:根据QueueIndex和任务优先级,把任务放进相应的队列。
- 执行任务:通过在相应线程中手动执行
WaitUntilTaskCompletes
来执行队列里面的任务。
FTaskGraphImplement的成员变量及其说明:
1 | // 后台线程及数据(Windows平台下,最多有83个线程) |
其中:
1 | enum |
TaskGraph系统的初始化入口:
1 | // FEngineLoop::PreInitPreStartupScreen |
2.3.TaskGraph的实现示例
一个DAG的例子:
代码片段:
1 | FGraphEventRef TaskA, TaskB, TaskC, TaskD, TaskE; |
对象结构:
一个任务主要由两部分构成:
- Task对象,表示任务本身
- GraphEvent对象,表示任务之间的依赖关系(后续任务集合)
整个任务DAG由上面两部组成,如下所示:
Wait操作的实现:
无论哪种Wait操作:
Event->Wait()
FTaskGraphInterface::Get().WaitUntilTaskCompletes()
最终调用的都是FTaskGraphImplementation::WaitUntilTasksComplete
a. 对于AnyThread来说,Wait操作相当于给DAG最后再加一个Trigger任务节点,挂起到该Trigger任务执行完成:
- 会创建一个
FTriggerEventGraphTask
对象 - 然后使用
FEvent
挂起到该Trigger任务完成(调用FEvent::Trigger
)
b. 对于NamedThread来说,Wait操作也是给DAG最后加了一个任务节点(FReturnGraphTask
),执行NamedThread里面的任务,直到这个ReturnTask完成。
3.TaskGraph的简单用法
3.1.自定义任务
定义一个两个示例任务:
- FGraphTaskSimple - 一次性任务(
ESubsequentsMode::FireAndForget
) - FTask - 有依赖关系的任务(
ESubsequentsMode::TrackSubsequents
)
代码示例:
1 | // 定义一个一次性任务 |
3.2.一次性任务
代码示例:
1 | inline void Test_GraphTask_Simple() |
3.3.顺序依赖任务
有三个任务,需要按照顺序执行,任务本身在不同的AnyThread中执行:
代码示例:
1 | // TaskA -> TaskB -> TaskC |
3.4.Gather/Fence任务
FNullGraphTask
,一个执行体为空的任务,用于等待多个任务结束后的点,类似Fork-Join模型中的Join操作。
代码示例:
1 | // |
3.5.Delegate任务
支持两种代理任务:
FSimpleDelegateGraphTask
- Delegate对象没有参数FDelegateGraphTask
- Delegate对象有两个参数,形如:TaskDelegate(NamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
,和上面定义的任务的DoTask参数相同。
代码示例:
1 | inline void Test_GraphTask_Delegate() |
3.6.Function任务
封装了TUniqueFunction,可以直接用于执行某一个函数对象,或者Lambda函数。
代码示例:
1 | inline void Test_GraphTask_Function() |
4.并行计算Fibonacci数列的示例
使用TaskGraph实现一个同步和异步模式的,并行计算斐波拉切数列的例子。通过下面示例,感受下其强大之处,使用TaskGraph可以以简洁的代码轻松实现Map/Reduce或Fork/Join模式。
形式1:同步模式
通过递归实现,每次调用创建两个任务,并等待其结束。直到所有子任务完成,返回结果。
1 | int64 Fibonacci(int64 N) |
形式2:异步模式
也是通过递归调用构建一个树状的任务网络,然后返回它的GraphEvent对象(根结点)。然后任务的执行分配在不同的AnyThread中执行,遇到结束条件时,执行ResEvent->DispatchSubsequents
,然后父任务才算完成,完成事件依次往上抛,到最终的根结点。
1 | FGraphEventRef Fib(int64 N, int64* Res) |
5.小结
通过以上示例和实现分析,可以看到UE的TaskGraph提供了一套非常方便的,基于任务的并发机制。加上低层无锁任务队列的实现,让其任务调度的性能有了保证。
另外有一点要一直强调:任务执行体中的代码,一定要明确知道它在什么类型的线程中执行,是否存在数据竞争。比如:Gameplay相关的对象操作,如Actor的创建和删除的任务,只能在GameThread类型的线程中执行,若放入其它线程执行就会有问题。
PS.之前在一游戏项目中实现过类似的异步任务机制,后台多个线程执行任务,最终Wait操作只能在游戏逻辑主线程中,比起UE的TaskGraph简直就是小巫见大巫了。代码片段如下:
https://github.com/david-pp/tinyworld/blob/master/common/async.h
6.参考资料
示例完整代码:
https://github.com/david-pp/UESnippets/blob/main/SnippetAsync/Private/SimpleGraphTask.h
UE源码:
Engine/Source/Runtime/Core/Public/Async/TaskGraphInterfaces.h
Engine/Source/Runtime/Core/Private/Tests/Async/TaskGraphTest.cpp