Skip to content

Commit

Permalink
New metrics for ThreadPool
Browse files Browse the repository at this point in the history
- Introduced performance metrics for better monitoring and troubleshooting of ThreadPool.
- Implemented safety checks in ThreadPoolImpl constructor to prevent thread count from exceeding system limits.
- Set maximum thread count in ThreadPool to 32,768, aligning with Linux PID limits (also to detect incorrect invocations of the ThreadPools)
  • Loading branch information
filimonov committed Jan 26, 2024
1 parent 930ce11 commit 8df8f25
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
\
M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \
M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \
M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \
M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \
M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \
M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \
M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \
M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \
M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \
M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \
Expand Down
63 changes: 62 additions & 1 deletion src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
Expand Down Expand Up @@ -28,6 +29,21 @@ namespace CurrentMetrics
extern const Metric GlobalThreadScheduled;
}

namespace ProfileEvents
{
extern const Event GlobalThreadPoolExpansions;
extern const Event GlobalThreadPoolShrinks;
extern const Event GlobalThreadPoolThreadCreationMicroseconds;
extern const Event GlobalThreadPoolLockWaitMicroseconds;
extern const Event GlobalThreadPoolJobs;

extern const Event LocalThreadPoolExpansions;
extern const Event LocalThreadPoolShrinks;
extern const Event LocalThreadPoolThreadCreationMicroseconds;
extern const Event LocalThreadPoolLockWaitMicroseconds;
extern const Event LocalThreadPoolJobs;
}

class JobWithPriority
{
public:
Expand Down Expand Up @@ -63,6 +79,7 @@ class JobWithPriority
};

static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
static constexpr auto MAX_THREADS_LIMIT = 32768; // This is the default PID limit on most Linux systems.

template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_, Metric metric_scheduled_jobs_)
Expand Down Expand Up @@ -98,11 +115,22 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
if (max_threads == 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "The 'max_threads' parameter for the thread pool cannot be zero.");

if (max_threads > MAX_THREADS_LIMIT)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "The 'max_threads' parameter ({}) for the thread pool should not exceed the limit of {}.", max_threads, MAX_THREADS_LIMIT);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
if (value == 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "The 'max_threads' value cannot be set to zero.");

if (value > MAX_THREADS_LIMIT)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "The 'max_threads' value ({}) for the thread pool should not exceed the limit of {}.", max_threads, MAX_THREADS_LIMIT);

std::lock_guard lock(mutex);
bool need_start_threads = (value > max_threads);
bool need_finish_free_threads = (value < max_free_threads);
Expand Down Expand Up @@ -181,11 +209,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
};

{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());

auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds));
Expand Down Expand Up @@ -214,7 +246,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:

try
{
Stopwatch watch2;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch2.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
}
catch (...)
{
Expand All @@ -237,6 +275,8 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();

ProfileEvents::increment( std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs);

return static_cast<ReturnType>(true);
}

Expand All @@ -260,7 +300,14 @@ void ThreadPoolImpl<Thread>::startNewThreadsNoLock()

try
{
Stopwatch watch;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);

}
catch (...)
{
Expand Down Expand Up @@ -291,7 +338,11 @@ void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, Priority priority, uint64_
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
Expand Down Expand Up @@ -332,7 +383,11 @@ void ThreadPoolImpl<Thread>::finalize()

/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}

threads.clear();
}
Expand Down Expand Up @@ -389,7 +444,11 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
std::optional<JobWithPriority> job_data;

{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());

// Finish with previous job if any
if (job_is_done)
Expand Down Expand Up @@ -422,6 +481,8 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
return;
}
Expand Down

0 comments on commit 8df8f25

Please sign in to comment.