diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index a6cfec9521e7..e596aa154221 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -12,6 +12,8 @@ #include #include #include +#include +#include namespace DB { @@ -128,9 +130,12 @@ ThreadPoolImpl::ThreadPoolImpl( , queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */) , shutdown_on_exception(shutdown_on_exception_) { - // std::lock_guard lock(mutex); - // jobs.reserve(queue_size); - // while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? min_free_threads : 1) ) ) + std::lock_guard lock(mutex); + jobs.reserve(max_free_threads); + LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), + "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}", + static_cast(this), max_threads, max_free_threads, queue_size, StackTrace().toString()); + // while (threads.size() < max_free_threads) // { // try // { @@ -285,7 +290,9 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: Stopwatch watch2; auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; - if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero. + /// if (wait_microseconds.has_value() && wait_microseconds.value() > 0) + + if (wait_microseconds) /// Check for optional. Condition is also true if the optional has value=0. { if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds)); @@ -306,7 +313,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Check if there are enough threads to process job. if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { - // while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? min_free_threads : 1) ) ) + // while (threads.size() < std::min(max_threads, scheduled_jobs + 1) ) // { try { @@ -320,7 +327,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: { on_error("can not start new thread"); } - // } + // } } Stopwatch watch3; @@ -393,6 +400,10 @@ ThreadPoolImpl::~ThreadPoolImpl() /// Note: should not use logger from here, /// because it can be an instance of GlobalThreadPool that is a global variable /// and the destruction order of global variables is unspecified. + LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), + "ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}", + static_cast(this), threads.size()); + finalize(); onDestroy();