Skip to content

Commit

Permalink
Introduced performance metrics for better monitoring and troubleshoot…
Browse files Browse the repository at this point in the history
…ing of ThreadPool. make jobs queue in the ThreadPool stable (i.e. FIFO for the same priority), otherwise some jobs can stay in

queue untaken for a long time
  • Loading branch information
filimonov committed Aug 23, 2024
1 parent 8e9be16 commit 522991c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 28 deletions.
14 changes: 14 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
\
M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \
M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \
M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \
M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \
M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \
M(GlobalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \
M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \
M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \
M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \
M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \
M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \
M(LocalThreadPoolBusyMicroseconds, "Total time threads have spent executing the actual work.") \
M(LocalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \
Expand Down
118 changes: 116 additions & 2 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
Expand Down Expand Up @@ -27,6 +28,67 @@ namespace CurrentMetrics
extern const Metric GlobalThreadActive;
}

namespace ProfileEvents
{
extern const Event GlobalThreadPoolExpansions;
extern const Event GlobalThreadPoolShrinks;
extern const Event GlobalThreadPoolThreadCreationMicroseconds;
extern const Event GlobalThreadPoolLockWaitMicroseconds;
extern const Event GlobalThreadPoolJobs;
extern const Event GlobalThreadPoolJobWaitTimeMicroseconds;

extern const Event LocalThreadPoolExpansions;
extern const Event LocalThreadPoolShrinks;
extern const Event LocalThreadPoolThreadCreationMicroseconds;
extern const Event LocalThreadPoolLockWaitMicroseconds;
extern const Event LocalThreadPoolJobs;
extern const Event LocalThreadPoolBusyMicroseconds;
extern const Event LocalThreadPoolJobWaitTimeMicroseconds;

}

class JobWithPriority
{
public:
using Job = std::function<void()>;

Job job;
Priority priority;
CurrentMetrics::Increment metric_increment;
DB::OpenTelemetry::TracingContextOnThread thread_trace_context;

/// Call stacks of all jobs' schedulings leading to this one
std::vector<StackTrace::FramePointers> frame_pointers;
bool enable_job_stack_trace = false;
Stopwatch job_create_time;

JobWithPriority(
Job job_, Priority priority_, CurrentMetrics::Metric metric,
const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_,
bool capture_frame_pointers)
: job(job_), priority(priority_), metric_increment(metric),
thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers)
{
if (!capture_frame_pointers)
return;
/// Save all previous jobs call stacks and append with current
frame_pointers = DB::Exception::getThreadFramePointers();
frame_pointers.push_back(StackTrace().getFramePointers());
}

bool operator<(const JobWithPriority & rhs) const
{
return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first
}

UInt64 elapsedMicroseconds() const
{
return job_create_time.elapsedMicroseconds();
}


};

static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";

template <typename Thread>
Expand Down Expand Up @@ -143,11 +205,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
};

{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());

auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds));
Expand Down Expand Up @@ -176,7 +242,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:

try
{
Stopwatch watch2;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch2.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
}
catch (...)
{
Expand All @@ -198,6 +270,8 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();

ProfileEvents::increment(std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs);

return static_cast<ReturnType>(true);
}

Expand All @@ -221,7 +295,14 @@ void ThreadPoolImpl<Thread>::startNewThreadsNoLock()

try
{
Stopwatch watch;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);

}
catch (...)
{
Expand Down Expand Up @@ -252,7 +333,11 @@ void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, Priority priority, uint64_
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());
/// Signal here just in case.
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
Expand Down Expand Up @@ -293,7 +378,11 @@ void ThreadPoolImpl<Thread>::finalize()

/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}

threads.clear();
}
Expand Down Expand Up @@ -355,7 +444,11 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
Job job;

{
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());

// Finish with previous job if any
if (job_is_done)
Expand Down Expand Up @@ -388,6 +481,8 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
return;
}
Expand All @@ -401,6 +496,10 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
thread_frame_pointers = std::move(const_cast<std::vector<StackTrace::FramePointers> &>(jobs.top().frame_pointers));
jobs.pop();

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds,
job_data->elapsedMicroseconds());

/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
{
Expand All @@ -423,7 +522,22 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_

CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads);

job();

if constexpr (!std::is_same_v<Thread, std::thread>)
{
Stopwatch watch;
job_data->job();
// This metric is less relevant for the global thread pool, as it would show large values (time while
// a thread was used by local pools) and increment only when local pools are destroyed.
//
// In cases where global pool threads are used directly (without a local thread pool), distinguishing
// them is difficult.
ProfileEvents::increment(ProfileEvents::LocalThreadPoolBusyMicroseconds, watch.elapsedMicroseconds());
}
else
{
job_data->job();
}

if (thread_trace_context.root_span.isTraceEnabled())
{
Expand Down
29 changes: 3 additions & 26 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <Common/Exception.h>
#include <base/scope_guard.h>

class JobWithPriority;

/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
Expand Down Expand Up @@ -123,33 +125,8 @@ class ThreadPoolImpl
bool threads_remove_themselves = true;
const bool shutdown_on_exception = true;

struct JobWithPriority
{
Job job;
Priority priority;
DB::OpenTelemetry::TracingContextOnThread thread_trace_context;

/// Call stacks of all jobs' schedulings leading to this one
std::vector<StackTrace::FramePointers> frame_pointers;
bool enable_job_stack_trace = false;

JobWithPriority(Job job_, Priority priority_, const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_, bool capture_frame_pointers = false)
: job(job_), priority(priority_), thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers)
{
if (!capture_frame_pointers)
return;
/// Save all previous jobs call stacks and append with current
frame_pointers = DB::Exception::thread_frame_pointers;
frame_pointers.push_back(StackTrace().getFramePointers());
}

bool operator<(const JobWithPriority & rhs) const
{
return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first
}
};
boost::heap::priority_queue<JobWithPriority,boost::heap::stable<true>> jobs;

boost::heap::priority_queue<JobWithPriority> jobs;
std::list<Thread> threads;
std::exception_ptr first_exception;
std::stack<OnDestroyCallback> on_destroy_callbacks;
Expand Down

0 comments on commit 522991c

Please sign in to comment.