Skip to content

Commit

Permalink
make thread creation async by using a separate thread for it
Browse files Browse the repository at this point in the history
  • Loading branch information
filimonov committed Jan 24, 2024
1 parent e30b1ab commit 3a4dfeb
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 77 deletions.
196 changes: 121 additions & 75 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,15 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
std::lock_guard lock(mutex);
jobs.reserve(max_free_threads);
LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
"ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}",
static_cast<void*>(this), max_threads, max_free_threads, queue_size, StackTrace().toString());
// LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
// "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}",
// static_cast<void*>(this), max_threads, max_free_threads, queue_size, StackTrace().toString());

// TODO: actually it seems like for a global pool one thread is not enough for a very dynamic thread pool expansion
housekeepeing_thread = std::thread(&ThreadPoolImpl<Thread>::threadPoolHousekeep, this);

// jobs.reserve(max_free_threads);

// while (threads.size() < max_free_threads)
// {
// try
Expand Down Expand Up @@ -162,20 +166,11 @@ void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
queue_size = queue_size ? std::max(queue_size, max_threads) : 0;
jobs.reserve(queue_size);

desired_pool_size = std::min(max_threads, scheduled_jobs);

if (need_start_threads)
{
/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
while (!shutdown && threads.size() < std::min(max_threads, scheduled_jobs))
{
try
{
createThreadNoLock();
}
catch (...)
{
break; /// failed to start more threads
}
}
threads_cv.notify_one();
}
else if (need_finish_free_threads)
{
Expand All @@ -195,12 +190,10 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{
std::lock_guard lock(mutex);
bool need_finish_free_threads = (value < max_free_threads);

max_free_threads = std::min(value, max_threads);

if (need_finish_free_threads)
{
if (current_pool_size > scheduled_jobs + max_free_threads) {
desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads);
/// Wake up free threads so they can finish themselves.
new_job_or_shutdown.notify_all();
}
Expand Down Expand Up @@ -275,7 +268,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
}
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK,
"Cannot schedule a task: {} (threads={}, jobs={})", reason,
threads.size(), scheduled_jobs);
current_pool_size, scheduled_jobs);
}
else
return false;
Expand Down Expand Up @@ -311,23 +304,10 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Because if an exception would be thrown, we won't notify a thread about job occurrence.

/// Check if there are enough threads to process job.
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
if (desired_pool_size < std::min(max_threads, scheduled_jobs + 1))
{
// while (threads.size() < std::min(max_threads, scheduled_jobs + 1) )
// {
try
{
createThreadNoLock();
}
catch (DB::Exception & e)
{
on_error(e.what());
}
catch (...)
{
on_error("can not start new thread");
}
// }
desired_pool_size = std::min(max_threads, scheduled_jobs + 1);
threads_cv.notify_one();
}

Stopwatch watch3;
Expand Down Expand Up @@ -400,10 +380,9 @@ ThreadPoolImpl<Thread>::~ThreadPoolImpl()
/// Note: should not use logger from here,
/// because it can be an instance of GlobalThreadPool that is a global variable
/// and the destruction order of global variables is unspecified.
LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
"ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}",
static_cast<void*>(this), threads.size());

// LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
// "ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}",
// static_cast<void*>(this), threads.size());

finalize();
onDestroy();
Expand All @@ -418,20 +397,12 @@ void ThreadPoolImpl<Thread>::finalize()
/// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function.
threads_remove_themselves = false;
}
desired_pool_size = 0;
threads_cv.notify_all();

/// Wake up threads so they can finish themselves.
new_job_or_shutdown.notify_all();

/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks
);
}

threads.clear();
housekeepeing_thread.join();
}

template <typename Thread>
Expand Down Expand Up @@ -466,13 +437,78 @@ bool ThreadPoolImpl<Thread>::finished() const
return shutdown;
}

template <typename Thread>
void ThreadPoolImpl<Thread>::threadPoolHousekeep()
{
while (true) {
{
std::unique_lock<std::mutex> lock(threads_mutex);

// Wait for notification or timeout
if (threads_cv.wait_for(lock, std::chrono::seconds(5), [this]{ return desired_pool_size != current_pool_size; }))
{
if (desired_pool_size == 0 && current_pool_size > 0) // shutdown
{

/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks
);
}

threads.clear();
break;
}

while (desired_pool_size > threads.size())
{
try
{
createThreadNoLock();
current_pool_size = threads.size();
}
catch (DB::Exception & e)
{
LOG_ERROR(&Poco::Logger::get("ThreadPool"),
"ThreadPoolImpl createThreadNoLock failed: {}", e.what());
break;
}
catch (...)
{
LOG_ERROR(&Poco::Logger::get("ThreadPool"),
"ThreadPoolImpl createThreadNoLock failed: unknown exception");
break;
}
}
// todo: check if we have to shrink the pool
// else if (threads.size() > desired_pool_size) {
// // We have to wake up threads so they can finish themselves.
// new_job_or_shutdown.notify_all();
// }
}
else
{
// timer expired
// TODO: check if we have to shrink the pool
}
}
}
}


template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
DENY_ALLOCATIONS_IN_SCOPE;
CurrentMetrics::Increment metric_pool_threads(metric_threads);

bool job_is_done = false;
bool thread_is_not_needed_anymore = false;
bool thread_should_remove_itself = false;

std::exception_ptr exception_from_job;

/// We'll run jobs in this worker while there are scheduled jobs and until some special event occurs (e.g. shutdown, or decreasing the number of max_threads).
Expand Down Expand Up @@ -517,41 +553,51 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
}

new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); });
new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || desired_pool_size < current_pool_size; });

if (jobs.empty() || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads))
if (jobs.empty() || desired_pool_size < current_pool_size )
{
// We enter here if:
// - either this thread is not needed anymore due to max_free_threads excess;
// - or shutdown happened AND all jobs are already handled.
if (threads_remove_themselves)
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks
);
}
return;
}

/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority.
job_data = std::move(const_cast<JobWithPriority &>(jobs.top()));
jobs.pop();

/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
{
job_is_done = true;
continue;
thread_is_not_needed_anymore = true; // we will remove this thread from the pool after leaving the critical section
thread_should_remove_itself = threads_remove_themselves; // we will not be able to access thread_should_remove_itself after leaving the critical section
}
else
{
/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority.
job_data = std::move(const_cast<JobWithPriority &>(jobs.top()));
jobs.pop();

/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
{
job_is_done = true;
continue;
}
}
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolWorkerLockHoldingMicroseconds : ProfileEvents::LocalThreadPoolWorkerLockHoldingMicroseconds,
watch.elapsedMicroseconds());
}

if (thread_is_not_needed_anymore)
{
if (thread_should_remove_itself)
{
std::lock_guard lock(threads_mutex);
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks
);
current_pool_size--;
}
return;
}

ALLOW_ALLOCATIONS_IN_SCOPE;

/// Set up tracing context for this thread by its parent context.
Expand Down
10 changes: 10 additions & 0 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,18 @@ class ThreadPoolImpl
const bool shutdown_on_exception = true;

boost::heap::priority_queue<JobWithPriority> jobs;

std::list<Thread> threads;
mutable std::mutex threads_mutex; // used only for threads list manipulations
std::thread housekeepeing_thread; // thread that maintains the number of the threads in pool close to desired (asynchronously)
std::atomic<size_t> desired_pool_size = 0;
std::atomic<size_t> current_pool_size = 0;
std::condition_variable threads_cv;

std::exception_ptr first_exception;
std::stack<OnDestroyCallback> on_destroy_callbacks;


template <typename ReturnType>
ReturnType scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context = true);

Expand All @@ -144,6 +152,8 @@ class ThreadPoolImpl

void finalize();
void onDestroy();

void threadPoolHousekeep();
};


Expand Down
2 changes: 2 additions & 0 deletions src/Databases/MySQL/MySQLBinlogEventsDispatcher.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "MySQLBinlogEventsDispatcher.h"
#include <boost/algorithm/string/join.hpp>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>


namespace DB::ErrorCodes
{
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/MySQL/MySQLBinlogEventsDispatcher.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <Databases/MySQL/MySQLBinlog.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Poco/Logger.h>
#include <base/unit.h>

Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/ParallelParsingInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Processors/Formats/IInputFormat.h>
#include <Formats/FormatFactory.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
Expand Down

0 comments on commit 3a4dfeb

Please sign in to comment.