diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 1cd9812ac30c..6f9e2d0a18b2 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -93,60 +94,10 @@ class JobWithPriority }; -/// helper class to unlock the mutex temporary in RAII style -class ScopeUnlock { -public: - explicit ScopeUnlock(std::unique_lock& mtx) : _mtx(mtx) { - // Unlock the mutex when this object is created - _mtx.unlock(); - } - - ~ScopeUnlock() { - // Re-lock the mutex when this object is destroyed - _mtx.lock(); - } - - // Delete copy constructor and copy assignment operator to prevent copying - ScopeUnlock(const ScopeUnlock&) = delete; - ScopeUnlock& operator=(const ScopeUnlock&) = delete; - -private: - std::unique_lock& _mtx; -}; - -// Helper class to increment a variable upon construction and decrement it upon destruction if not committed -class Increment { -public: - Increment(size_t& variable) : refVar(variable), committed(false) { - // Increment the variable upon construction - refVar++; - } - - ~Increment() { - // Decrement the variable upon destruction if not committed - if (!committed) { - refVar--; - } - } - - void commit() { - // Disable the automatic decrement - committed = true; - } - -private: - size_t& refVar; // Reference to the variable to be managed - bool committed; // Whether the increment should be kept -}; - - - static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool"; -static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_THREADS = 2; -static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 64; +static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 32; static constexpr const size_t LOCAL_THREAD_POOL_MIN_FREE_THREADS = 0; -static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_STEP = 128; static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_INTERVAL_MILLISECONDS = 10000; // 10 seconds /// static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_HISTORY_WINDOW_SECONDS = 600; // 10 minutes @@ -193,21 +144,16 @@ ThreadPoolImpl::ThreadPoolImpl( // static_cast(this), max_threads, max_free_threads, queue_size, StackTrace().toString()); calculateDesiredThreadPoolSizeNoLock(); - jobs.reserve(queue_size ? queue_size : desired_pool_size); + jobs.reserve(queue_size ? queue_size : desired_pool_size.load()); if constexpr (std::is_same_v) // global thread pool { - { - // for global thread pool we need to start housekeeping threads synchronously - // they will run during the whole lifetime of the global thread pool - service_threads.emplace_front(&ThreadPoolImpl::threadPoolHousekeep, this); + // for global thread pool we need to start housekeeping thread + // it will run during the whole lifetime of the global thread pool + housekeeping_thread.emplace(&ThreadPoolImpl::housekeep, this); - for (size_t i = 0; i < GLOBAL_THREAD_POOL_EXPANSION_THREADS; ++i) - { - service_threads.emplace_front(&ThreadPoolImpl::threadPoolGrow, this); - } - } - adjustThreadPoolSize(std::move(lock)); + // we will start GLOBAL_THREAD_POOL_MIN_FREE_THREADS immediately + adjustThreadPoolSize(); } } @@ -215,16 +161,18 @@ ThreadPoolImpl::ThreadPoolImpl( template void ThreadPoolImpl::setMaxThreads(size_t value) { - std::unique_lock lock(mutex); + { + std::lock_guard lock(mutex); - max_threads = value; - max_free_threads = std::min(max_free_threads, max_threads); - /// We have to also adjust queue size, because it limits the number of scheduled and already running jobs in total. - queue_size = queue_size ? std::max(queue_size, max_threads) : 0; - calculateDesiredThreadPoolSizeNoLock(); - jobs.reserve(queue_size ? queue_size : desired_pool_size); + max_threads = value; + max_free_threads = std::min(max_free_threads, max_threads); + /// We have to also adjust queue size, because it limits the number of scheduled and already running jobs in total. + queue_size = queue_size ? std::max(queue_size, max_threads) : 0; + calculateDesiredThreadPoolSizeNoLock(); + jobs.reserve(queue_size ? queue_size : desired_pool_size.load()); + } - adjustThreadPoolSize(std::move(lock)); + adjustThreadPoolSize(); } template @@ -237,10 +185,12 @@ size_t ThreadPoolImpl::getMaxThreads() const template void ThreadPoolImpl::setMaxFreeThreads(size_t value) { - std::unique_lock lock(mutex); - max_free_threads = std::min(value, max_threads); - calculateDesiredThreadPoolSizeNoLock(); - adjustThreadPoolSize(std::move(lock)); + { + std::lock_guard lock(mutex); + max_free_threads = std::min(value, max_threads); + calculateDesiredThreadPoolSizeNoLock(); + } + adjustThreadPoolSize(); } template @@ -249,7 +199,7 @@ void ThreadPoolImpl::setQueueSize(size_t value) std::lock_guard lock(mutex); queue_size = value ? std::max(value, max_threads) : 0; calculateDesiredThreadPoolSizeNoLock(); - jobs.reserve(queue_size ? queue_size : desired_pool_size); + jobs.reserve(queue_size ? queue_size : desired_pool_size.load()); } template @@ -268,22 +218,22 @@ void ThreadPoolImpl::calculateDesiredThreadPoolSizeNoLock() // so every time schedule will be called it will spawn a new thread(s) to get to the 'desired' state else if (desired_pool_size < scheduled_jobs + (std::is_same_v ? GLOBAL_THREAD_POOL_MIN_FREE_THREADS : LOCAL_THREAD_POOL_MIN_FREE_THREADS)) { - desired_pool_size = std::min(max_threads, scheduled_jobs + (std::is_same_v ? GLOBAL_THREAD_POOL_EXPANSION_STEP : LOCAL_THREAD_POOL_MIN_FREE_THREADS)); + desired_pool_size = std::min(max_threads, scheduled_jobs + (std::is_same_v ? GLOBAL_THREAD_POOL_MIN_FREE_THREADS : LOCAL_THREAD_POOL_MIN_FREE_THREADS)); } - else if (threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads)) - { - desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads); + // else if (current_pool_size > std::min(max_threads, scheduled_jobs + max_free_threads)) + // { + // /// desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads); - // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage - // - /// TODO - /// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes - /// and at least as small as maximum utilization over last 10 minutes + // // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage + // // + // /// TODO + // /// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes + // /// and at least as small as maximum utilization over last 10 minutes - // we are in allowed range, let's try to guess the optimal number of threads based on the current & history of utilization - // we want to have some free threads in the pool, but not too many + // // we are in allowed range, let's try to guess the optimal number of threads based on the current & history of utilization + // // we want to have some free threads in the pool, but not too many - } + // } // todo: we need to shrink the pool if there are too many free threads } @@ -305,12 +255,13 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: } throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "Cannot schedule a task: {} (threads={}, jobs={})", reason, - threads.size(), scheduled_jobs); + current_pool_size, scheduled_jobs); } else return false; }; + { Stopwatch watch; std::unique_lock lock(mutex); @@ -331,27 +282,6 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: if (shutdown) return on_error("shutdown"); - Increment inc(scheduled_jobs); - calculateDesiredThreadPoolSizeNoLock(); - - /// we have a place to grow - if (threads.size() < desired_pool_size) - { - /// Check if there are enough threads to process job. - try - { - startThreads(std::is_same_v, lock); // async for global thread pool - } - catch (const DB::Exception & e) - { - return on_error(fmt::format("cannot start threads: {}", e.what())); - } - catch (...) - { - return on_error("cannot start threads"); - } - } - jobs.emplace(std::move(job), priority, metric_scheduled_jobs, @@ -360,97 +290,88 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// capture_frame_pointers DB::Exception::enable_job_stack_trace); - inc.commit(); - + ++scheduled_jobs; + calculateDesiredThreadPoolSizeNoLock(); } + ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); + /// Wake up a free thread to run the new job. - new_job_or_shutdown.notify_one(); + new_job_or_shutdown.notify_one(); // maybe while we starting the new thread some of the existing will become free and take the job? - ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); + if (current_pool_size < desired_pool_size) + { + try + { + startThreads(); + } + catch (const DB::Exception & e) + { + return on_error(fmt::format("cannot start threads: {}", e.what())); + } + catch (...) + { + return on_error("cannot start threads"); + } + } return static_cast(true); } template -void ThreadPoolImpl::startThreads(bool async, std::unique_lock & lock) +void ThreadPoolImpl::startThreads() { - // global thread pool expansion can be very expensive (under heavy load even 1 thread creation can take 100ms-1000ms) - // so it's better to make that work async in the background by special threads which will be waken up for starting new threads - // - // in contrast, local thread pool expansion is very cheap (it's just pushing a new job to a global pool to borrow a thread) - // and local thread pools are usually small and we don't want them to have excessive threads just to start new ones - // - // so for the global thread pool we just signal TPoolGrow threads to start a new thread(s) via condition variable - // and for the local thread pool we 'start' a new thread here - - if (async) - { - if (shutdown || threads.size() >= desired_pool_size) - return; - pool_grow_thread_cv.notify_all(); - } - else // sync + std::lock_guard lock(threads_mutex); + + while (threads.size() < desired_pool_size) { - while (threads.size() < desired_pool_size && !shutdown) + try { - try - { - std::promise::iterator> promise_thread_it; - std::shared_future::iterator> future_thread_id = promise_thread_it.get_future().share(); - - std::unique_ptr thread_ptr; - - { - // Unlock the mutex temporarily to allow the new thread to add itself to the list - ScopeUnlock unlock(lock); - - Stopwatch watch; + std::promise::iterator> promise_thread_it; + std::shared_future::iterator> future_thread_id = promise_thread_it.get_future().share(); - /// we use future here for 2 reasons: - /// 1) just passing a ref to list iterator after adding the thread to the list - /// 2) hold the thread work until the thread is added to the list, otherwise - /// is can get the lock faster and then can wait for a cond_variable forever - thread_ptr = std::make_unique([this, ft = std::move(future_thread_id)] mutable - { - auto thread_it = ft.get(); - worker(thread_it); - }); + Stopwatch watch; + std::unique_ptr thread_ptr = std::make_unique([this, ft = std::move(future_thread_id)] mutable + { + auto thread_it = ft.get(); + worker(thread_it); + }); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, - watch.elapsedMicroseconds()); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch.elapsedMicroseconds()); - } + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); + threads.push_front(std::move(*thread_ptr)); + promise_thread_it.set_value( threads.begin() ); - threads.push_front(std::move(*thread_ptr)); - promise_thread_it.set_value( threads.begin() ); - new_job_or_shutdown.notify_one(); - } - catch (const std::exception & e) - { - throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what()); - } - catch (...) - { - throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread"); - } + current_pool_size = threads.size(); + new_job_or_shutdown.notify_one(); // let's notify the new thread just in case + } + catch (const std::exception & e) + { + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what()); + } + catch (...) + { + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread"); } } } template -void ThreadPoolImpl::adjustThreadPoolSize(std::unique_lock && lock) +void ThreadPoolImpl::adjustThreadPoolSize() { - if (threads.size() < desired_pool_size) + auto pool_size = current_pool_size.load(); + auto desired = desired_pool_size.load(); + if (pool_size < desired) { - startThreads(std::is_same_v/* async for global thread pool */, lock); + startThreads(); } - else if (threads.size() > desired_pool_size) + else if (pool_size > desired) { /// Wake up free threads so they can finish themselves. new_job_or_shutdown.notify_all(); @@ -491,8 +412,7 @@ void ThreadPoolImpl::wait() std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, watch.elapsedMicroseconds()); - job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); - //LOG_TRACE(&Poco::Logger::get("ThreadPoolImpl"), "ThreadPool has finished"); + no_jobs.wait(lock, [this] { return scheduled_jobs == 0; }); if (first_exception) { @@ -530,12 +450,11 @@ void ThreadPoolImpl::finalize() } /// Wake up threads so they can finish themselves. - new_job_or_shutdown.notify_all(); + /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). for (auto & thread : threads) { - thread.join(); ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); @@ -543,17 +462,11 @@ void ThreadPoolImpl::finalize() threads.clear(); housekeeping_thread_cv.notify_all(); - pool_grow_thread_cv.notify_all(); - - for (auto & thread : service_threads) - { - thread.join(); - } - service_threads.clear(); + housekeeping_thread.reset(); } template -void ThreadPoolImpl::threadPoolHousekeep() +void ThreadPoolImpl::housekeep() { setThreadName("TPoolHousekeep"); while (true) @@ -565,7 +478,7 @@ void ThreadPoolImpl::threadPoolHousekeep() if (housekeeping_thread_cv.wait_for( lock, std::chrono::milliseconds(GLOBAL_THREAD_POOL_HOUSEKEEP_INTERVAL_MILLISECONDS), - [this]{ return shutdown || desired_pool_size != threads.size(); } + [this]{ return shutdown || desired_pool_size != current_pool_size; } )) { if (shutdown) @@ -606,7 +519,7 @@ void ThreadPoolImpl::threadPoolHousekeep() // { // // memory error or similar, but we still want thread to continue working // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), - // "ThreadPoolImpl::threadPoolHousekeep(): exception while updating utilization history"); + // "ThreadPoolImpl::housekeep(): exception while updating utilization history"); // } // // TODO: check if we have to shrink the pool @@ -617,38 +530,6 @@ void ThreadPoolImpl::threadPoolHousekeep() } } - -template -void ThreadPoolImpl::threadPoolGrow() -{ - setThreadName("TPoolGrow"); - while (true) - { - std::unique_lock lock(mutex); - // Wait for notification or timeout - pool_grow_thread_cv.wait(lock, [this]{ return shutdown || desired_pool_size > threads.size(); }); - if (shutdown) - return; - - try - { - startThreads(/* async = */ false, lock); - } - // catch (const DB::Exception & e) - // { - // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), - // "ThreadPoolImpl::threadPoolGrow(): exception while starting threads: {}", - // e.what()); - // } - catch (...) - { - // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), - // "ThreadPoolImpl::threadPoolGrow(): unknown exception while starting threads"); - } - } -} - - template void ThreadPoolImpl::addOnDestroyCallback(OnDestroyCallback && callback) { @@ -688,6 +569,7 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ CurrentMetrics::Increment metric_pool_threads(metric_threads); bool job_is_done = false; + bool should_remove_myself = false; std::exception_ptr exception_from_job; @@ -727,18 +609,17 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ --scheduled_jobs; calculateDesiredThreadPoolSizeNoLock(); - - if (scheduled_jobs == 0) + if (!scheduled_jobs) { - job_finished.notify_all(); // one of the threads can be waiting in wait() for that condition + no_jobs.notify_all(); // notify wait() method (we don } } job_finished.notify_one(); - new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || desired_pool_size < threads.size(); }); + new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || desired_pool_size < current_pool_size; }); - if (jobs.empty() || desired_pool_size < threads.size() ) + if (jobs.empty() || desired_pool_size < current_pool_size ) { // We enter here if: // - either this thread is not needed anymore due to max_free_threads excess; @@ -746,13 +627,10 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ if (threads_remove_themselves) { - thread_it->detach(); - threads.erase(thread_it); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + should_remove_myself = true; // we will do it out the the lock } - - return; + else + return; } /// boost::priority_queue does not provide interface for getting non-const reference to an element @@ -770,13 +648,24 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ job_is_done = true; continue; } - else if (scheduled_jobs > 1) - { - new_job_or_shutdown.notify_all(); - } + } + if (should_remove_myself) + { + std::lock_guard lock(threads_mutex); + thread_it->detach(); + threads.erase(thread_it); + current_pool_size = threads.size(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + return; } + // if (have_more_jobs) + // { + // new_job_or_shutdown.notify_one(); // just in case some other thread missed the cond var signal + // } + ALLOW_ALLOCATIONS_IN_SCOPE; /// Set up tracing context for this thread by its parent context. diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 5764b167c732..f7a2040c9b4c 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -113,9 +112,12 @@ class ThreadPoolImpl private: friend class GlobalThreadPool; + mutable std::mutex mutex; std::condition_variable job_finished; std::condition_variable new_job_or_shutdown; + std::condition_variable housekeeping_thread_cv; + std::condition_variable no_jobs; Metric metric_threads; Metric metric_active_threads; @@ -130,18 +132,12 @@ class ThreadPoolImpl bool threads_remove_themselves = true; const bool shutdown_on_exception = true; - boost::heap::priority_queue> jobs; - std::list threads; - std::list service_threads; // threads that are not used for running jobs, but for housekeeping tasks (only for global thread pool) - - // housekeepeing_thread is used only for global thread pool - // it monitors regularly the demand for threads in the pool - // adjusts the size of the pool by decreasing or increasing - // its size depoending on the load - size_t desired_pool_size = 0; - std::condition_variable housekeeping_thread_cv; - std::condition_variable pool_grow_thread_cv; + /// boost::heap::stable is used to preserve the FIFO order of jobs with same priority + boost::heap::priority_queue> jobs; + mutable std::mutex threads_mutex; + std::list threads; // modified when threads_mutex is locked + std::atomic current_pool_size = 0; // modified when threads_mutex is locked std::exception_ptr first_exception; std::stack on_destroy_callbacks; @@ -149,23 +145,28 @@ class ThreadPoolImpl template ReturnType scheduleImpl(Job job, Priority priority, std::optional wait_microseconds, bool propagate_opentelemetry_tracing_context = true); + std::atomic desired_pool_size = 0; // modified when mutex is locked void calculateDesiredThreadPoolSizeNoLock(); - void worker(typename std::list::iterator future_thread_it); + void worker(typename std::list::iterator thread_it); /// if number of threads is less than desired, creates new threads - /// for async mode it creates a task that creates new threads - /// otherwise it creates new threads synchronously in the current thread - void startThreads(bool async, std::unique_lock & lock); + void startThreads(); - void adjustThreadPoolSize(std::unique_lock && lock); + /// will incrase number of threads if needed or decrease if there are too many + void adjustThreadPoolSize(); void finalize(); void onDestroy(); - void threadPoolHousekeep(); - void threadPoolGrow(); + // housekeeping_thread is used only for global thread pool + // it monitors regularly the demand for threads in the pool + // adjusts the size of the pool by decreasing or increasing + // its size depoending on the load + std::optional housekeeping_thread; + + void housekeep(); };