diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 5e11bc0e055f..0ae5eb37ab0e 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -77,12 +77,18 @@ M(GlobalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in the global thread pool. This metric accounts for the time elapsed from the moment a job scheduling request is made until the job is successfully queued in the global thread pool, reflecting the responsiveness and scheduling efficiency of the pool.") \ M(GlobalThreadPoolThreadCreationMicroseconds, "") \ M(GlobalThreadPoolJobScheduleLockWaitMicroseconds, "") \ + M(GlobalThreadPoolJobEmplacementMicroseconds, "") \ + M(GlobalThreadPoolCondVarWaitingMicroseconds, "") \ + M(GlobalThreadPoolJobsCounter, "") \ \ M(LocalThreadPoolExpansions, "Counts the total number of times threads were borrowed from the global thread pool to expand local thread pools.") \ M(LocalThreadPoolShrinks, "Counts the total number of times threads were returned to the global thread pool from local thread pools.") \ M(LocalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in a local thread pool. This metric measures the time elapsed from when a job scheduling request is initiated until the job is successfully queued in the local thread pool. Shows how much time the jobs were waiting for a free slot in the local pool.") \ M(LocalThreadPoolThreadCreationMicroseconds, "") \ M(LocalThreadPoolJobScheduleLockWaitMicroseconds, "") \ + M(LocalThreadPoolJobEmplacementMicroseconds, "") \ + M(LocalThreadPoolCondVarWaitingMicroseconds, "") \ + M(LocalThreadPoolJobsCounter, "") \ \ M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \ M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 1cab39f37012..e17bf2e2b7e1 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -36,11 +36,18 @@ namespace ProfileEvents extern const Event GlobalThreadPoolJobScheduleMicroseconds; extern const Event GlobalThreadPoolThreadCreationMicroseconds; extern const Event GlobalThreadPoolJobScheduleLockWaitMicroseconds; + extern const Event GlobalThreadPoolJobEmplacementMicroseconds; + extern const Event GlobalThreadPoolJobsCounter; + extern const Event GlobalThreadPoolCondVarWaitingMicroseconds; + extern const Event LocalThreadPoolExpansions; extern const Event LocalThreadPoolShrinks; extern const Event LocalThreadPoolJobScheduleMicroseconds; extern const Event LocalThreadPoolThreadCreationMicroseconds; extern const Event LocalThreadPoolJobScheduleLockWaitMicroseconds; + extern const Event LocalThreadPoolJobEmplacementMicroseconds; + extern const Event LocalThreadPoolJobsCounter; + extern const Event LocalThreadPoolCondVarWaitingMicroseconds; } class JobWithPriority @@ -113,19 +120,19 @@ ThreadPoolImpl::ThreadPoolImpl( , queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */) , shutdown_on_exception(shutdown_on_exception_) { - std::lock_guard lock(mutex); - jobs.reserve(queue_size); - while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? 256 : 1) ) ) - { - try - { - createThreadNoLock(); - } - catch (...) - { - break; /// failed to start more threads - } - } + // std::lock_guard lock(mutex); + // jobs.reserve(queue_size); + // while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? min_free_threads : 1) ) ) + // { + // try + // { + // createThreadNoLock(); + // } + // catch (...) + // { + // break; /// failed to start more threads + // } + // } } template @@ -267,7 +274,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: std::is_same_v ? ProfileEvents::GlobalThreadPoolJobScheduleLockWaitMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleLockWaitMicroseconds, watch.elapsedMicroseconds()); - + Stopwatch watch2; auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero. @@ -278,17 +285,21 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: else job_finished.wait(lock, pred); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolCondVarWaitingMicroseconds : ProfileEvents::LocalThreadPoolCondVarWaitingMicroseconds, + watch2.elapsedMicroseconds()); + if (shutdown) return on_error("shutdown"); /// We must not to allocate any memory after we emplaced a job in a queue. /// Because if an exception would be thrown, we won't notify a thread about job occurrence. - /// Check if there are enough threads to process job + /// Check if there are enough threads to process job. if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { - while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? 32 : 1) ) ) - { + // while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? min_free_threads : 1) ) ) + // { try { createThreadNoLock(); @@ -301,9 +312,10 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: { on_error("can not start new thread"); } - } + // } } + Stopwatch watch3; jobs.emplace(std::move(job), priority, metric_scheduled_jobs, @@ -312,16 +324,22 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// capture_frame_pointers DB::Exception::enable_job_stack_trace); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolJobEmplacementMicroseconds : ProfileEvents::LocalThreadPoolJobEmplacementMicroseconds, + watch3.elapsedMicroseconds()); + ++scheduled_jobs; } /// Wake up a free thread to run the new job. new_job_or_shutdown.notify_one(); - ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobScheduleMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleMicroseconds, watch.elapsedMicroseconds()); + + ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobsCounter : ProfileEvents::LocalThreadPoolJobsCounter); + return static_cast(true); } @@ -351,8 +369,7 @@ void ThreadPoolImpl::wait() /// If threads are waiting on condition variables, but there are some jobs in the queue /// then it will prevent us from deadlock. new_job_or_shutdown.notify_all(); - - no_jobs.wait(lock, [this] { return scheduled_jobs == 0; }); + job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); if (first_exception) { @@ -467,20 +484,9 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ --scheduled_jobs; - if (!shutdown) - { - job_finished.notify_one(); - if (!scheduled_jobs) - { - no_jobs.notify_all(); - } - } - else - { - job_finished.notify_one(); + job_finished.notify_all(); + if (shutdown) new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves. - no_jobs.notify_all(); - } } new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); }); diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 6a839a5da24f..c312e465b3ff 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -115,7 +115,6 @@ class ThreadPoolImpl mutable std::mutex mutex; std::condition_variable job_finished; std::condition_variable new_job_or_shutdown; - std::condition_variable no_jobs; Metric metric_threads; Metric metric_active_threads;