UE并发-Async & Future & Promise

Simplicity is the ultimate form of sophistication ! - Leonardo Da Vinci

UnrealEngine有了跨平台的线程、线程池和TaskGraph,再来看看Async/ParalleFor和Future/Promise的实现,由衷的感受下上面的话:简约是复杂的最终形式。这些操作实现都不复杂,却有着强大的能力,让异步并发代码写起来简洁易懂。下面简单介绍下:

  • Future & Promise的实现和简单用法
  • Async系列接口的用法
  • Paralle接口的用法

另外这里相关接口可以类比C++标准库中的概念。UE中这些接口是基于之前提到的几种并发机制实现的,废话不多说,上代码,看注释。

1.Future & Promise

1.1.实现

Future和Promise的概念,和C++标准中的std::future/std::promise类似:

  • Future - 一个会在未来某个点返回的值
  • Promise - 一个异步函数,在某个并发执行体中设置,然后Future变得有效

实现比较简单,类图如下:

Future

  • TPromise<ResultType> - Promise
  • TFuture<ResultType> - Future
  • TFutureState<ResultType> - Future和Promise共享的状态。
    • Promise创建时创建一个State
    • GetFuture()时返回一个共享该状态的Future对象
    • 基于FEvent实现,等待和触发

1.2.基本用法

Future和Promise这两个概念非常形象,可以理解为:

  • Promise - 给别人一承诺
  • Promise.GetFuture() - 承诺一个未来
  • Promise.SetValue(X) - 兑现承诺
  • Future.IsReady() - 承诺是否兑现?

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
inline void Test_FuturePromise()
{
// 承诺给一个bool结果
TPromise<bool> Promise;
// 返回一个尚未兑现的未来
TFuture<bool> Future = Promise.GetFuture();

// AnyThread中执行
FFunctionGraphTask::CreateAndDispatchWhenReady([&Promise]()
{
FPlatformProcess::Sleep(3);
AsyncLog("do the promise");
// 实现承诺(Promise)
Promise.SetValue(true);
});

// 等待实现承诺
// - Wait - 等到天荒地老
// - WaitFor - 等一个时间段
// - WaitUntil - 等到某年某月某一天
UE_LOG(LogTemp, Display, TEXT("waiting for the promise..."));
// Future.Wait();
Future.WaitFor(FTimespan::FromSeconds(5));
// Future.WaitUntil(FDateTime(2022, 1, 1));

// 承诺已兑现
if (Future.IsReady())
{
// Future.Get() - 看下结果
UE_LOG(LogTemp, Display, TEXT("keep the promise, future is %d"), Future.Get());
}
// 承诺尚未兑现
else
{
Promise.SetValue(false);
UE_LOG(LogTemp, Display, TEXT("break the promise"));
}
}

输出:

1
2
3
LogTemp: Display: waiting for the promise...
LogTemp: Display: TaskGraphThreadNP 0[16164], do the promise
LogTemp: Display: keep the promise, future is 1

1.3.带回调的Promise

Promise在设置完,在进行设置操作的线程中,执行一个回调。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
inline void Test_FuturePromise2()
{
// 带回调的Promise,Promise实现时,调用该回调
TPromise<bool> Promise([]()
{
AsyncLog("the promise is set !");
});

TFuture<bool> Future = Promise.GetFuture();

// TaskGraph中运行并等其完成
FFunctionGraphTask::CreateAndDispatchWhenReady([&Promise]()
{
FPlatformProcess::Sleep(3);

AsyncLog("do the promise");
Promise.SetValue(true);
})->Wait(ENamedThreads::GameThread);
}

输出:

1
2
LogTemp: Display: TaskGraphThreadNP 0[15864], do the promise
LogTemp: Display: TaskGraphThreadNP 0[15864], the promise is set !

1.4.创建一个异步函数

定义一个返回Future的异步函数,然后拿着Future返回值,可以进行Wait/Then等操作。

定义一个异步执行的函数,并返回Future对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
TFuture<int> DoSthAsync(int Value)
{
TPromise<int> Promise;
TFuture<int> Future = Promise.GetFuture();

class FGraphTaskSimple
{
public:
FGraphTaskSimple(TPromise<int>&& ThePromise, int TheValue)
: Promise(MoveTemp(ThePromise)), FutureValue(TheValue)
{
}

FORCEINLINE TStatId GetStatId() const
{
RETURN_QUICK_DECLARE_CYCLE_STAT(FGraphTaskSimple, STATGROUP_TaskGraphTasks);
}

static ENamedThreads::Type GetDesiredThread()
{
return ENamedThreads::AnyThread;
}

static ESubsequentsMode::Type GetSubsequentsMode()
{
return ESubsequentsMode::FireAndForget;
}

void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
{
AsyncLog("DoSthAsync.... Begin");
FPlatformProcess::Sleep(3);
AsyncLog("DoSthAsync.... Done");

Promise.SetValue(FutureValue);
}

private:
TPromise<int> Promise;
int FutureValue;
};

TGraphTask<FGraphTaskSimple>::CreateTask().ConstructAndDispatchWhenReady(MoveTemp(Promise), MoveTemp(Value));

return MoveTemp(Future);
}

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
inline void Test_FuturePromise3()
{
// usage1: 返回Future,通过Get操作等待Promise的兑现
{
auto Future = DoSthAsync(100);

// todo other ...
FPlatformProcess::Sleep(1);
UE_LOG(LogTemp, Display, TEXT("Do something else .."));

// waiting for the value
int Value = Future.Get();
UE_LOG(LogTemp, Display, TEXT("Value = %d"), Value);
}

// usage2 : Promise兑现的时候,执行一系列Then操作(在相应的线程中)
DoSthAsync(1)
.Then([](TFuture<int> Future) -> int
{
AsyncLog("then1 .. ");
UE_LOG(LogTemp, Display, TEXT("Value = %d"), Future.Get());
return 2;
})
.Then([](TFuture<int> Future)
{
AsyncLog("then2 .. ");
UE_LOG(LogTemp, Display, TEXT("Value = %d"), Future.Get());
})
.Wait();

AsyncLog("Over....");
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// usage1:
LogTemp: Display: TaskGraphThreadNP 0[4611], DoSthAsync.... Begin
LogTemp: Display: Do something else ..
LogTemp: Display: TaskGraphThreadNP 0[4611], DoSthAsync.... Done
LogTemp: Display: Value = 100

// usage2:
LogTemp: Display: TaskGraphThreadNP 0[4611], DoSthAsync.... Begin
LogTemp: Display: TaskGraphThreadNP 0[4611], DoSthAsync.... Done
LogTemp: Display: TaskGraphThreadNP 0[4611], then1 ..
LogTemp: Display: Value = 1
LogTemp: Display: TaskGraphThreadNP 0[4611], then2 ..
LogTemp: Display: Value = 2
LogTemp: Display: GameThread[259], Over....

2.Async接口

Async系列接口支持异步执行一个函数,并返回Future对象(包含函数返回值),可以通过参数控制使用何种UE中的并发机制。

Async执行方式:

  • TaskGraph - 在TaskGraph中执行,适合运行时间较短的任务
  • TaskGraphMainThread - 在TaskGraph中并且指定主线程中执行,适合运行时间较短的任务
  • Thread - 创建一个线程对象执行,适合运行时较长的任务
  • ThreadPool - 在全局线程池中运行(GThreadPool
  • LargeThreadPool - Editor模式下在全局GLargeThreadPool中运行

Async接口:

  • Async - 异步执行的统一接口,需要指定执行方式(EAsyncExecution),并返回一个Future对象
  • AsyncThread - 创建一个线程来执行
  • AsyncPool - 在指定的线程池中执行
  • AsyncTask - TaskGraph中指定线程类型(ENamedThreads)中执行

代码示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int SimpleAsyncFunc()
{
AsyncLog("SimpleAsyncFunc .... Begin");
FPlatformProcess::Sleep(1);
AsyncLog("SimpleAsyncFunc .... Over");
return 123;
}

inline void Test_Async1()
{
// 在TaskGraph中执行一个函数,函数的返回结果放到Future对象
auto Future = Async(EAsyncExecution::TaskGraph, SimpleAsyncFunc);

// 干点其他事情
FPlatformProcess::Sleep(1);
UE_LOG(LogTemp, Display, TEXT("Do something else .."));

// 等待SimpleAsyncFunc在TaskGraph中执行完成
int Value = Future.Get();
UE_LOG(LogTemp, Display, TEXT("Value = %d"), Value);
}

代码示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
inline void Test_Async2()
{
// 使用全局函数
Async(EAsyncExecution::Thread, SimpleAsyncFunc);

// 使用TUniqueFunction
// TUniqueFunction<int()> Task = SimpleTestFunc;
TUniqueFunction<int()> Task = []()
{
AsyncLog("Lambda1 .... Begin");
FPlatformProcess::Sleep(1);
AsyncLog("Lambda1 .... Over");
return 123;
};

Async(EAsyncExecution::Thread, MoveTemp(Task));

// 使用Labmda函数
Async(EAsyncExecution::Thread, []()
{
AsyncLog("Inline Lambda .... Begin");
FPlatformProcess::Sleep(1);
AsyncLog("Inline Lambda .... Over");
});

// 带完成回调
Async(EAsyncExecution::ThreadPool,
[]()
{
AsyncLog("Inline Lambda2 .... Begin");
FPlatformProcess::Sleep(1);
AsyncLog("Inline Lambda2 .... Over");
},
[]()
{
AsyncLog("Inline Lambda2 .... Completion Callback");
});

// 创建一个线程来执行
AsyncThread([]()
{
AsyncLog("AsyncThread Function .... Begin");
FPlatformProcess::Sleep(1);
AsyncLog("AsyncThread Function .... Over");
});

// 在指定的线程池中执行
AsyncPool(*GThreadPool, []()
{
AsyncLog("AsyncPool Function .... Begin");
FPlatformProcess::Sleep(1);
AsyncLog("AsyncPool Function .... Over");
});

// TaskGraph中指定线程类型,来执行
AsyncTask(ENamedThreads::AnyThread, []()
{
AsyncLog("AsyncTask Function .... Begin");
FPlatformProcess::Sleep(1);
AsyncLog("AsyncTask Function .... Over");
});
}

3.ParallelFor接口

把某一个大型任务划分成可以并行执行的N个子任务,划分方法比如采用:大的数据结构拆成块,每一块并行处理。

ParallelFor就是用来干这个事情的(基于TaskGraph实现):

  • 指定划分的数量
  • 和执行函数,其中执行函数的参数为划分的索引
  • 运行所有子任务直到结束,ParallelFor返回

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
inline void Test_Parallel()
{
// parallel
ParallelFor(100, [](int32 Index)
{
int32 ThreadId = FPlatformTLS::GetCurrentThreadId();
FString ThreadName = FThreadManager::Get().GetThreadName(ThreadId);
UE_LOG(LogTemp, Display, TEXT("%s[%d],Parallel Task, Index:%d"), *ThreadName, ThreadId, Index);
});

// done
UE_LOG(LogTemp, Display, TEXT("Over...."));
}

输出:

1
2
3
4
5
...
LogTemp: Display: TaskGraphThreadNP 4[3075],Parallel Task, Index:98
LogTemp: Display: TaskGraphThreadNP 3[4355],Parallel Task, Index:96
...
LogTemp: Display: Over....

4.参考资料

完整代码示例:

UE源码:

  • Engine/Source/Runtime/Core/Public/Async/Future.h
  • Engine/Source/Runtime/Core/Public/Async/Async.h
  • Engine/Source/Runtime/Core/Public/Async/ParallelFor.h
David++