Skip to content

Commit

Permalink
very rough attempt to shard the global pool. That allows to reduce lo…
Browse files Browse the repository at this point in the history
…ck waiting time in the test from ~3.5 sec to ~0.008 sec
  • Loading branch information
filimonov committed Jan 22, 2024
1 parent 485917f commit 50fe57e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
35 changes: 20 additions & 15 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,9 @@ template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>;
template class ThreadFromGlobalPoolImpl<true>;

std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;

/// std::unique_ptr<GlobalThreadPool> GlobalThreadPool::the_instance;
std::vector<std::unique_ptr<GlobalThreadPool>> GlobalThreadPool::the_pools;
size_t GlobalThreadPool::next_pool_index = 0;

GlobalThreadPool::GlobalThreadPool(
size_t max_threads_,
Expand All @@ -589,30 +590,34 @@ GlobalThreadPool::GlobalThreadPool(

void GlobalThreadPool::initialize(size_t max_threads, size_t max_free_threads, size_t queue_size)
{
if (the_instance)
{
if (!the_pools.empty()) {
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"The global thread pool is initialized twice");
"The global thread pools are already initialized");
}

the_instance.reset(new GlobalThreadPool(max_threads, max_free_threads, queue_size, false /*shutdown_on_exception*/));
next_pool_index = 0;

// Initialize 5 instances of GlobalThreadPool
for (int i = 0; i < 5; ++i) {
the_pools.push_back(std::make_unique<GlobalThreadPool>(max_threads, max_free_threads, queue_size, false));
}
}

GlobalThreadPool & GlobalThreadPool::instance()
{
if (!the_instance)
{
// Allow implicit initialization. This is needed for old code that is
// impractical to redo now, especially Arcadia users and unit tests.
initialize();
if (the_pools.empty()) {
initialize(); // Default initialization if not already done
}

return *the_instance;
GlobalThreadPool &pool = *the_pools[next_pool_index];
next_pool_index = (next_pool_index + 1) % the_pools.size(); // Round-robin selection
return pool;
}
void GlobalThreadPool::shutdown()
{
if (the_instance)
{
the_instance->finalize();
for (auto &pool : the_pools) {
if (pool) {
pool->finalize();
}
}
}
14 changes: 7 additions & 7 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,18 @@ using FreeThreadPool = ThreadPoolImpl<std::thread>;
*/
class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
{
static std::unique_ptr<GlobalThreadPool> the_instance;

public:
GlobalThreadPool(
size_t max_threads_,
size_t max_free_threads_,
size_t queue_size_,
bool shutdown_on_exception_);

public:
const bool shutdown_on_exception_);
static void initialize(size_t max_threads = 10000, size_t max_free_threads = 1000, size_t queue_size = 10000);
static GlobalThreadPool & instance();
static void shutdown();
static GlobalThreadPool &instance();
void shutdown();
private:
static std::vector<std::unique_ptr<GlobalThreadPool>> the_pools;
static size_t next_pool_index; // For round-robin selection
};


Expand Down

0 comments on commit 50fe57e

Please sign in to comment.