From 0275ba1e3c7400b610baa712df8c03278ed703c4 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 6 Feb 2024 13:45:23 +0100 Subject: [PATCH] use promise / futute --- src/Common/ThreadPool.cpp | 99 ++++++++++++++++++++++----------------- src/Common/ThreadPool.h | 13 +---- 2 files changed, 58 insertions(+), 54 deletions(-) diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 66d76b3533ae..1f4fbf6a2b67 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include namespace DB @@ -82,10 +83,10 @@ class JobWithPriority static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool"; static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_THREADS = 1; -static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 12; +static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 1; static constexpr const size_t LOCAL_THREAD_POOL_MIN_FREE_THREADS = 1; -static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_STEP = 8; +static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_STEP = 1; static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_INTERVAL_MILLISECONDS = 10000; // 10 seconds /// static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_HISTORY_WINDOW_SECONDS = 600; // 10 minutes @@ -127,6 +128,10 @@ ThreadPoolImpl::ThreadPoolImpl( , shutdown_on_exception(shutdown_on_exception_) { std::unique_lock lock(mutex); + // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), + // "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}", + // static_cast(this), max_threads, max_free_threads, queue_size, StackTrace().toString()); + calculateDesiredThreadPoolSizeNoLock(); jobs.reserve(queue_size ? queue_size : desired_pool_size); @@ -209,8 +214,8 @@ void ThreadPoolImpl::calculateDesiredThreadPoolSizeNoLock() { desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads); - // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage - // + // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage + // /// TODO /// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes /// and at least as small as maximum utilization over last 10 minutes @@ -271,7 +276,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Check if there are enough threads to process job. try { - startThreads(std::is_same_v, lock); // async for global thread pool + startThreads(false, lock); // async for global thread pool } catch (const DB::Exception & e) { @@ -303,18 +308,6 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: return static_cast(true); } -template -void ThreadPoolImpl::removeThread(std::shared_ptr thread_it_holder) -{ - if (thread_it_holder->thread_it.has_value()) { - auto thread_it = thread_it_holder->thread_it.value(); - thread_it->detach(); - threads.erase(thread_it); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); - } -} - template void ThreadPoolImpl::startThreads(bool async, std::unique_lock & lock) { @@ -329,35 +322,44 @@ void ThreadPoolImpl::startThreads(bool async, std::unique_lock= desired_pool_size) + if (shutdown || threads.size() >= desired_pool_size) return; pool_grow_thread_cv.notify_all(); } else // sync { - while (threads.size() < desired_pool_size) + while (threads.size() < desired_pool_size && !shutdown) { try { - lock.unlock(); - auto thread_it_holder = std::make_shared(); + std::promise::iterator> promise_thread_it; + std::future::iterator> future_thread_id = promise_thread_it.get_future(); + + lock.unlock() Stopwatch watch; - auto thread = Thread(&ThreadPoolImpl::worker, this, thread_it_holder); + auto thread = Thread(&ThreadPoolImpl::worker, this, std::move(future_thread_id)); ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, watch.elapsedMicroseconds()); + + lock.lock(); + ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); - lock.lock(); + // if (shutdown) + // { + // thread.detach(); + // return; + // } threads.push_front(std::move(thread)); - thread_it_holder->set( std::move(threads.begin()) ); + future_thread_id.set_value( threads.begin() ); } catch (const std::exception & e) { - lock.lock(); + //lock.lock(); throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what()); } catch (...) @@ -404,7 +406,7 @@ void ThreadPoolImpl::scheduleOrThrow(Job job, Priority priority, uint64_ template void ThreadPoolImpl::wait() { - + //LOG_TRACE(&Poco::Logger::get("ThreadPoolImpl"), "Waiting for ThreadPool to finish"); Stopwatch watch; std::unique_lock lock(mutex); ProfileEvents::increment( @@ -416,6 +418,7 @@ void ThreadPoolImpl::wait() new_job_or_shutdown.notify_all(); job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); + //LOG_TRACE(&Poco::Logger::get("ThreadPoolImpl"), "ThreadPool has finished"); if (first_exception) { @@ -440,6 +443,11 @@ template void ThreadPoolImpl::finalize() { { + // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), + // "ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}, StackTrace: {}", + // static_cast(this), threads.size(), StackTrace().toString()); + + std::lock_guard lock(mutex); shutdown = true; /// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function. @@ -448,27 +456,26 @@ void ThreadPoolImpl::finalize() } /// Wake up threads so they can finish themselves. + new_job_or_shutdown.notify_all(); + /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). + for (auto & thread : threads) + { + + thread.join(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + } + threads.clear(); + housekeeping_thread_cv.notify_all(); pool_grow_thread_cv.notify_all(); + for (auto & thread : service_threads) { - std::lock_guard lock(mutex); - /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). - for (auto & thread : threads) - { - thread.join(); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); - } - threads.clear(); - - for (auto & thread : service_threads) - { - thread.join(); - } - service_threads.clear(); + thread.join(); } + service_threads.clear(); } template @@ -601,8 +608,9 @@ bool ThreadPoolImpl::finished() const } template -void ThreadPoolImpl::worker(std::shared_ptr thread_it_holder) +void ThreadPoolImpl::worker(std::future::iterator> promise_thread_it) { + auto thread_it = promise_thread_it.get(); DENY_ALLOCATIONS_IN_SCOPE; CurrentMetrics::Increment metric_pool_threads(metric_threads); @@ -661,7 +669,12 @@ void ThreadPoolImpl::worker(std::shared_ptr thread // - or shutdown happened AND all jobs are already handled. if (threads_remove_themselves) - removeThread(thread_it_holder); + { + thread_it->detach(); + threads.erase(thread_it); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + } return; } diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index ab37226a7168..bdfa223dfe88 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -129,14 +130,6 @@ class ThreadPoolImpl bool threads_remove_themselves = true; const bool shutdown_on_exception = true; - struct ThreadIterarorHolder { - std::optional::iterator> thread_it; - - void set(std::list::iterator it) { - thread_it = it; - } - }; - boost::heap::priority_queue jobs; std::list threads; std::list service_threads; // threads that are not used for running jobs, but for housekeeping tasks (only for global thread pool) @@ -158,9 +151,7 @@ class ThreadPoolImpl void calculateDesiredThreadPoolSizeNoLock(); - void worker(std::shared_ptr thread_it_holder); - - void removeThread(std::shared_ptr thread_it); + void worker(std::future::iterator> promise_thread_it); /// if number of threads is less than desired, creates new threads /// for async mode it creates a task that creates new threads