Skip to content

Commit

Permalink
Adding metrics to debug ThreadPool misbehaviour (slow schedule on the…
Browse files Browse the repository at this point in the history
… active usage)
  • Loading branch information
filimonov committed Jan 19, 2024
1 parent 9d36fe4 commit 0350103
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
6 changes: 3 additions & 3 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
M(GlobalThread, "Number of threads in global thread pool.") \
M(GlobalThreadActive, "Number of threads in global thread pool running a task.") \
M(GlobalThreadScheduled, "Number of queued or active jobs in global thread pool.") \
M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
M(LocalThreadScheduled, "Number of queued or active jobs in local thread pools.") \
M(LocalThread, "Obsolete. Number of threads in all local thread pools. The threads in local thread pools are taken from the global thread pool.") /* see https://github.com/ClickHouse/ClickHouse/pull/47880 */ \
M(LocalThreadActive, "Obsolete. Number of threads in local thread pools running a task.") \
M(LocalThreadScheduled, "Obsolete. Number of queued or active jobs in local thread pools.") \
M(MergeTreeDataSelectExecutorThreads, "Number of threads in the MergeTreeDataSelectExecutor thread pool.") \
M(MergeTreeDataSelectExecutorThreadsActive, "Number of threads in the MergeTreeDataSelectExecutor thread pool running a task.") \
M(MergeTreeDataSelectExecutorThreadsScheduled, "Number of queued or active jobs in the MergeTreeDataSelectExecutor thread pool.") \
Expand Down
7 changes: 7 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
\
M(GlobalThreadPoolExpansions, "Counts the total number of times new threads were added to the global thread pool. This metric indicates the frequency of global thread pool expansions to accommodate increased processing demands.") \
M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool was shrunk by removing threads, triggered when the number of idle threads exceeded max_thread_pool_free_size. This metric highlights the adjustments in the global thread pool size in response to decreased thread utilization") \
M(GlobalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in the global thread pool. This metric accounts for the time elapsed from the moment a job scheduling request is made until the job is successfully queued in the global thread pool, reflecting the responsiveness and scheduling efficiency of the pool.") \
M(LocalThreadPoolExpansions, "Counts the total number of times threads were borrowed from the global thread pool to expand local thread pools.") \
M(LocalThreadPoolShrinks, "Counts the total number of times threads were returned to the global thread pool from local thread pools.") \
M(LocalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in a local thread pool. This metric measures the time elapsed from when a job scheduling request is initiated until the job is successfully queued in the local thread pool. Shows how much time the jobs were waiting for a free slot in the local pool.") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \
Expand Down
31 changes: 30 additions & 1 deletion src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
Expand Down Expand Up @@ -28,6 +29,16 @@ namespace CurrentMetrics
extern const Metric GlobalThreadScheduled;
}

namespace ProfileEvents
{
extern const Event GlobalThreadPoolExpansions;
extern const Event GlobalThreadPoolShrinks;
extern const Event GlobalThreadPoolJobScheduleMicroseconds;
extern const Event LocalThreadPoolExpansions;
extern const Event LocalThreadPoolShrinks;
extern const Event LocalThreadPoolJobScheduleMicroseconds;
}

class JobWithPriority
{
public:
Expand Down Expand Up @@ -162,6 +173,7 @@ template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std::optional<uint64_t> wait_microseconds, bool propagate_opentelemetry_tracing_context)
{
Stopwatch watch;
auto on_error = [&](const std::string & reason)
{
if constexpr (std::is_same_v<ReturnType, void>)
Expand Down Expand Up @@ -215,6 +227,10 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions
);

}
catch (...)
{
Expand All @@ -236,7 +252,9 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:

/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobScheduleMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleMicroseconds,
watch.elapsedMicroseconds());
return static_cast<ReturnType>(true);
}

Expand All @@ -261,6 +279,9 @@ void ThreadPoolImpl<Thread>::startNewThreadsNoLock()
try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions
);
}
catch (...)
{
Expand Down Expand Up @@ -332,7 +353,12 @@ void ThreadPoolImpl<Thread>::finalize()

/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks
);
}

threads.clear();
}
Expand Down Expand Up @@ -422,6 +448,9 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks
);
}
return;
}
Expand Down

0 comments on commit 0350103

Please sign in to comment.