From 3ba5dac0db3643c4e332bed6ffd12b614986530c Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Thu, 22 Aug 2024 09:37:48 +0200 Subject: [PATCH] ThreadPool: Move thread creation out of the lock --- src/Common/ThreadPool.cpp | 117 ++++++++++++++++++++------------------ src/Common/ThreadPool.h | 30 +++++++++- 2 files changed, 89 insertions(+), 58 deletions(-) diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 9984dc57c320..58fd578ce1af 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB { @@ -47,48 +48,21 @@ namespace ProfileEvents } -class JobWithPriority +class ScopedUnlocker { public: - using Job = std::function; - - Job job; - Priority priority; - CurrentMetrics::Increment metric_increment; - DB::OpenTelemetry::TracingContextOnThread thread_trace_context; - - /// Call stacks of all jobs' schedulings leading to this one - std::vector frame_pointers; - bool enable_job_stack_trace = false; - Stopwatch job_create_time; - - JobWithPriority( - Job job_, Priority priority_, CurrentMetrics::Metric metric, - const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_, - bool capture_frame_pointers) - : job(job_), priority(priority_), metric_increment(metric), - thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers) - { - if (!capture_frame_pointers) - return; - /// Save all previous jobs call stacks and append with current - frame_pointers = DB::Exception::getThreadFramePointers(); - frame_pointers.push_back(StackTrace().getFramePointers()); - } + explicit ScopedUnlocker(std::unique_lock& lock) : lock_(lock) { lock_.unlock(); } - bool operator<(const JobWithPriority & rhs) const - { - return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first - } - - UInt64 elapsedMicroseconds() const - { - return job_create_time.elapsedMicroseconds(); - } + ~ScopedUnlocker() { lock_.lock(); } + ScopedUnlocker(const ScopedUnlocker&) = delete; + ScopedUnlocker& operator=(const ScopedUnlocker&) = delete; +private: + std::unique_lock& lock_; }; + static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool"; template @@ -230,31 +204,62 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Check if there are enough threads to process job. if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { - try - { - threads.emplace_front(); - } - catch (...) - { - /// Most likely this is a std::bad_alloc exception - return on_error("cannot allocate thread slot"); - } + bool push_successful = false; + std::promise::iterator> promise_thread_it; + std::shared_future::iterator> future_thread_id = promise_thread_it.get_future().share(); + std::unique_ptr thread_ptr; try { - Stopwatch watch2; - threads.front() = Thread([this, it = threads.begin()] { worker(it); }); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, - watch2.elapsedMicroseconds()); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); + { + ScopedUnlocker unlocker(lock); + + /// in certain conditions thread creation can be slow, so we need to do that out of the critical section, + /// otherwise it may lead to huge delays in thread pool + + Stopwatch watch2; + + /// we use future here for 2 reasons: + /// 1) just passing a ref to list iterator after adding the thread to the list + /// 2) hold the thread work until the thread is added to the list, otherwise + /// is can get the lock faster and then can wait for a cond_variable forever + thread_ptr = std::make_unique([this, ft = std::move(future_thread_id)] mutable + { + auto thread_it = ft.get(); + worker(thread_it); + }); + + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch2.elapsedMicroseconds()); + + } + + threads.push_front(std::move(*thread_ptr)); + push_successful = true; + promise_thread_it.set_value(threads.begin()); } catch (...) { - threads.pop_front(); + // Set exception in the promise, so the thread will be stopped with exception + promise_thread_it.set_exception(std::current_exception()); + if (push_successful) + { + if (threads.front().joinable()) + threads.front().join(); + + threads.pop_front(); + } + else if (thread_ptr && thread_ptr->joinable()) + { + thread_ptr->join(); + } + return on_error("cannot allocate thread"); } + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); + } jobs.emplace(std::move(job), @@ -489,16 +494,18 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ /// boost::priority_queue does not provide interface for getting non-const reference to an element /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job. + job = std::move(const_cast(jobs.top().job)); parent_thread_trace_context = std::move(const_cast(jobs.top().thread_trace_context)); DB::Exception::enable_job_stack_trace = jobs.top().enable_job_stack_trace; + auto elapsed = jobs.top().job_create_time.elapsedMicroseconds(); if (DB::Exception::enable_job_stack_trace) thread_frame_pointers = std::move(const_cast &>(jobs.top().frame_pointers)); jobs.pop(); ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds, - job_data->elapsedMicroseconds()); + elapsed); /// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them. if (shutdown) @@ -526,7 +533,7 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ if constexpr (!std::is_same_v) { Stopwatch watch; - job_data->job(); + job(); // This metric is less relevant for the global thread pool, as it would show large values (time while // a thread was used by local pools) and increment only when local pools are destroyed. // @@ -536,7 +543,7 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ } else { - job_data->job(); + job(); } if (thread_trace_context.root_span.isTraceEnabled()) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 71f59891a1c6..abc75cefb75f 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -23,8 +23,6 @@ #include #include -class JobWithPriority; - /** Very simple thread pool similar to boost::threadpool. * Advantages: * - catches exceptions and rethrows on wait. @@ -125,8 +123,34 @@ class ThreadPoolImpl bool threads_remove_themselves = true; const bool shutdown_on_exception = true; - boost::heap::priority_queue> jobs; + struct JobWithPriority + { + Job job; + Priority priority; + DB::OpenTelemetry::TracingContextOnThread thread_trace_context; + /// Call stacks of all jobs' schedulings leading to this one + std::vector frame_pointers; + bool enable_job_stack_trace = false; + Stopwatch job_create_time; + + JobWithPriority(Job job_, Priority priority_, const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_, bool capture_frame_pointers = false) + : job(job_), priority(priority_), thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers) + { + if (!capture_frame_pointers) + return; + /// Save all previous jobs call stacks and append with current + frame_pointers = DB::Exception::thread_frame_pointers; + frame_pointers.push_back(StackTrace().getFramePointers()); + } + + bool operator<(const JobWithPriority & rhs) const + { + return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first + } + }; + + boost::heap::priority_queue> jobs; std::list threads; std::exception_ptr first_exception; std::stack on_destroy_callbacks;