Skip to content

Commit

Permalink
[perf] change thread pool default config.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Aug 19, 2024
1 parent 4e0cf90 commit b58c6e0
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ int main() {
* 优化perf功能

[2024.07.27 - v2.6.1 - Chunel]
* 提供`pipeline`的静态执行的方式
* 提供`pipeline`的静态执行的方式,提供微任务机制
* 优化`event`(事件)机制,异步事件可以等待结束

</details>
Expand Down
1 change: 1 addition & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ CStatus UThreadPool::submit(const UTaskGroup& taskGroup, CMSec ttl) {
CGRAPH_ASSERT_INIT(true)

std::vector<std::future<CVoid>> futures;
futures.reserve(taskGroup.getSize());
for (const auto& task : taskGroup.task_arr_) {
futures.emplace_back(commit(task));
}
Expand Down
7 changes: 3 additions & 4 deletions src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ static const CMSec CGRAPH_MAX_BLOCK_TTL = 1999999999;
static const CUInt CGRAPH_DEFAULT_RINGBUFFER_SIZE = 64; // 默认环形队列的大小
static const CIndex CGRAPH_MAIN_THREAD_ID = -1; // 启动线程id标识(非上述主线程)
static const CIndex CGRAPH_SECONDARY_THREAD_COMMON_ID = -2; // 辅助线程统一id标识
static const CInt CGRAPH_DEFAULT_PRIORITY = 0; // 默认优先级


static const CInt CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 默认线程调度策略
Expand All @@ -52,14 +51,14 @@ static const CInt CGRAPH_LONG_TIME_TASK_STRATEGY = -101;
*/
static const CInt CGRAPH_DEFAULT_THREAD_SIZE = 8; // 默认开启主线程个数
static const CInt CGRAPH_SECONDARY_THREAD_SIZE = 0; // 默认开启辅助线程个数
static const CInt CGRAPH_MAX_THREAD_SIZE = 16; // 最大线程个数
static const CInt CGRAPH_MAX_TASK_STEAL_RANGE = 2; // 盗取机制相邻范围
static const CInt CGRAPH_MAX_THREAD_SIZE = 8; // 最大线程个数
static const CInt CGRAPH_MAX_TASK_STEAL_RANGE = 7; // 盗取机制相邻范围
static const CBool CGRAPH_BATCH_TASK_ENABLE = false; // 是否开启批量任务功能
static const CInt CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; // 批量执行本地任务最大值
static const CInt CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值
static const CInt CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值
static const CInt CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 10; // 主线程进入wait状态的轮数,数值越大,理论性能越高,但空转可能性也越大
static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 50; // 主线程进入休眠状态的默认时间
static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 10; // 主线程进入休眠状态的默认时间
static const CSec CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s
static const CBool CGRAPH_MONITOR_ENABLE = false; // 是否开启监控程序
static const CSec CGRAPH_MONITOR_SPAN = 5; // 监控线程执行间隔,单位为s
Expand Down
6 changes: 4 additions & 2 deletions test/Functional/test-functional-01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ using namespace CGraph;
void test_functional_01() {
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
const int runTimes = 500000;

GElementPtr a, b, c, d, e, f, g, h, i, j = nullptr;
status += pipeline->registerGElement<TestAdd1GNode>(&a, {});
status += pipeline->registerGElement<TestAdd1GNode>(&b, {});
Expand All @@ -27,14 +29,14 @@ void test_functional_01() {

{
UTimeCounter counter("test_functional_01");
status = pipeline->process(100000);
status = pipeline->process(runTimes);
}

if (status.isErr()) {
std::cout << status.getInfo() << std::endl;
}

if (g_test_node_cnt != 1000000) {
if (g_test_node_cnt != runTimes * 10) {
std::cout << "test_functional_01: g_test_node_cnt is not right : " << g_test_node_cnt << std::endl;
}

Expand Down
6 changes: 4 additions & 2 deletions test/Functional/test-functional-02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ using namespace CGraph;
void test_functional_02() {
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
const int runTimes = 5000;

GElementPtr a,b,c,d,e,f,g,h,i,j,k,l,m,n = nullptr;
GElementPtr region1, region2, cluster1, cluster2 = nullptr;

Expand Down Expand Up @@ -46,14 +48,14 @@ void test_functional_02() {

{
UTimeCounter counter("test_functional_02");
status += pipeline->process(1000);
status += pipeline->process(runTimes);
}

if (status.isErr()) {
std::cout << status.getInfo() << std::endl;
}

if (g_test_node_cnt != 50000) {
if (g_test_node_cnt != (runTimes * 50)) {
std::cout << "test_functional_02: g_test_node_cnt is not right : " << g_test_node_cnt << std::endl;
}

Expand Down
9 changes: 6 additions & 3 deletions test/Functional/test-functional-03.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ using namespace CGraph;
void test_functional_03() {
CStatus status;
GPipelinePtr pipeline = GPipelineFactory::create();
const int runTimes = 100000;

GElementPtr a, b_cluster, c, d_region, e = nullptr;

b_cluster = pipeline->createGGroup<GCluster>({
Expand All @@ -35,21 +37,22 @@ void test_functional_03() {
status += pipeline->registerGElement<TestAdd1GNode>(&c, {a, b_cluster}, "nodeC", 1);
status += pipeline->registerGGroup(&d_region, {a, b_cluster}, "regionD", 2); // 将名为regionD,依赖{a,b_cluster}执行且自循环2次的region信息,注册入pipeline中
status += pipeline->registerGElement<TestAdd1GNode>(&e, {c, d_region}, "nodeE", 1);
pipeline->addGAspect<TestMaterialAdd1GAspect>();

if (!status.isOK()) {
return;
}

{
UTimeCounter counter("test_functional_03");
pipeline->addGAspect<TestMaterialAdd1GAspect>();
status = pipeline->process(2000);
status = pipeline->process(runTimes);
}

if (status.isErr()) {
std::cout << status.getInfo() << std::endl;
}

if (g_test_node_cnt != 116000) {
if (g_test_node_cnt != (runTimes * 58)) {
std::cout << "test_functional_03: g_test_node_cnt is not right : " << g_test_node_cnt << std::endl;
}

Expand Down
14 changes: 7 additions & 7 deletions test/Functional/test-functional-04.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,34 @@
using namespace CGraph;

void test_functional_04() {
const int HALF_ARR_SIZE = 32;
const int RUN_TIMES = 100000;
const int halfArrSize = 32;
const int runTimes = 100000;
CGRAPH_CREATE_MESSAGE_TOPIC(TestGMessageParam, g_test_message_key, 100)
std::unique_ptr<TestGMessageParam> mp(new TestGMessageParam());
CGRAPH_SEND_MPARAM(TestGMessageParam, g_test_message_key, mp, GMessagePushStrategy::WAIT)

GPipelinePtr pipeline = GPipelineFactory::create();
GElementPtr arr[HALF_ARR_SIZE * 2]; // 前面一半是串行的,后面一半是并行的
GElementPtr arr[halfArrSize * 2]; // 前面一半是串行的,后面一半是并行的
GElementPtrSet linearSet;

CStatus status = pipeline->registerGElement<TestRecvMessageGNode>(&arr[0]);
linearSet.insert(arr[0]);
for (int i = 1; i < HALF_ARR_SIZE; i++) {
for (int i = 1; i < halfArrSize; i++) {
status += pipeline->registerGElement<TestRecvMessageGNode>(&arr[i], {arr[i-1]});
linearSet.insert(arr[i]);
}

for (int j = HALF_ARR_SIZE; j < HALF_ARR_SIZE * 2; j++) {
for (int j = halfArrSize; j < halfArrSize * 2; j++) {
status += pipeline->registerGElement<TestRecvMessageGNode>(&arr[j], linearSet);
}

{
UTimeCounter counter("test_functional_04");
status += pipeline->process(RUN_TIMES);
status += pipeline->process(runTimes);
}

status += CGRAPH_RECV_MPARAM_WITH_TIMEOUT(TestGMessageParam, g_test_message_key, mp, 25)
if (mp->num_ != HALF_ARR_SIZE * RUN_TIMES * 2) {
if (mp->num_ != halfArrSize * runTimes * 2) {
CGRAPH_ECHO("result num is wrong, num is [%lu]", mp->num_);
}

Expand Down
9 changes: 6 additions & 3 deletions test/Performance/test-performance-01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ void test_performance_01() {
// 并行的执行32次,对应第1个例子,8thread,32并发,50w次
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
GElementPtr arr[32] = {};
const int runTimes = 500000;
const int size = 32;
GElementPtr arr[size] = {};

UThreadPoolConfig config;
config.default_thread_size_ = 8;
config.secondary_thread_size_ = 0;
Expand All @@ -31,12 +34,12 @@ void test_performance_01() {

{
UTimeCounter counter("test_performance_01");
for (int t = 0; t < 500000; t++) {
for (int t = 0; t < runTimes; t++) {
pipeline->run();
}
}

if (16000000 != g_test_node_cnt) {
if ((runTimes * size) != g_test_node_cnt) {
std::cout << "test_performance_01: g_test_node_cnt is not right : " << g_test_node_cnt << std::endl;
}

Expand Down
11 changes: 7 additions & 4 deletions test/Performance/test-performance-02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@ void test_performance_02() {
// 串行执行32次,对应第二个例子,1thread,32串行,100w次
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
GElementPtr arr[32];
const int runTimes = 1000000;
const int size = 32;
GElementPtr arr[size];

pipeline->registerGElement<TestAdd1GNode>(&arr[0]);
for (int i = 1; i < 32; i++) {
for (int i = 1; i < size; i++) {
pipeline->registerGElement<TestAdd1GNode>(&arr[i], {arr[i - 1]});
}
pipeline->makeSerial();
pipeline->setAutoCheck(false);
status += pipeline->init();
{
UTimeCounter counter("test_performance_02");
for (int t = 0; t < 1000000; t++) {
for (int t = 0; t < runTimes; t++) {
pipeline->run();
}
}

if (32000000 != g_test_node_cnt) {
if ((runTimes * size) != g_test_node_cnt) {
std::cout << "test_performance_02: g_test_node_cnt is not right : " << g_test_node_cnt << std::endl;
}

Expand Down
6 changes: 4 additions & 2 deletions test/Performance/test-performance-03.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ void test_performance_03() {
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
GElementPtr a,b1,b2,c1,c2,d = nullptr;
const int runTimes = 1000000;

UThreadPoolConfig config;
config.default_thread_size_ = 2;
config.secondary_thread_size_ = 0;
Expand All @@ -35,12 +37,12 @@ void test_performance_03() {

{
UTimeCounter counter("test_performance_03");
for (int t = 0; t < 1000000; t++) {
for (int t = 0; t < runTimes; t++) {
pipeline->run();
}
}

if (6000000 != g_test_node_cnt) {
if ((runTimes * 6) != g_test_node_cnt) {
std::cout << "test_performance_03: g_test_node_cnt is not right : " << g_test_node_cnt << std::endl;
}

Expand Down
1 change: 1 addition & 0 deletions test/Performance/test-performance-04.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void test_performance_04() {
config.default_thread_size_ = nodePerLayer;
config.max_thread_size_ = nodePerLayer;
config.max_task_steal_range_ = nodePerLayer - 1;
config.primary_thread_busy_epoch_ = 500;
pipeline->setUniqueThreadPoolConfig(config);
pipeline->setAutoCheck(false);

Expand Down

0 comments on commit b58c6e0

Please sign in to comment.