Skip to content

Commit

Permalink
roll back changes and add more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
filimonov committed Jan 22, 2024
1 parent 6af6d62 commit 3c1e3bc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
6 changes: 6 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@
M(GlobalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in the global thread pool. This metric accounts for the time elapsed from the moment a job scheduling request is made until the job is successfully queued in the global thread pool, reflecting the responsiveness and scheduling efficiency of the pool.") \
M(GlobalThreadPoolThreadCreationMicroseconds, "") \
M(GlobalThreadPoolJobScheduleLockWaitMicroseconds, "") \
M(GlobalThreadPoolJobEmplacementMicroseconds, "") \
M(GlobalThreadPoolCondVarWaitingMicroseconds, "") \
M(GlobalThreadPoolJobsCounter, "") \
\
M(LocalThreadPoolExpansions, "Counts the total number of times threads were borrowed from the global thread pool to expand local thread pools.") \
M(LocalThreadPoolShrinks, "Counts the total number of times threads were returned to the global thread pool from local thread pools.") \
M(LocalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in a local thread pool. This metric measures the time elapsed from when a job scheduling request is initiated until the job is successfully queued in the local thread pool. Shows how much time the jobs were waiting for a free slot in the local pool.") \
M(LocalThreadPoolThreadCreationMicroseconds, "") \
M(LocalThreadPoolJobScheduleLockWaitMicroseconds, "") \
M(LocalThreadPoolJobEmplacementMicroseconds, "") \
M(LocalThreadPoolCondVarWaitingMicroseconds, "") \
M(LocalThreadPoolJobsCounter, "") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
Expand Down
74 changes: 40 additions & 34 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ namespace ProfileEvents
extern const Event GlobalThreadPoolJobScheduleMicroseconds;
extern const Event GlobalThreadPoolThreadCreationMicroseconds;
extern const Event GlobalThreadPoolJobScheduleLockWaitMicroseconds;
extern const Event GlobalThreadPoolJobEmplacementMicroseconds;
extern const Event GlobalThreadPoolJobsCounter;
extern const Event GlobalThreadPoolCondVarWaitingMicroseconds;

extern const Event LocalThreadPoolExpansions;
extern const Event LocalThreadPoolShrinks;
extern const Event LocalThreadPoolJobScheduleMicroseconds;
extern const Event LocalThreadPoolThreadCreationMicroseconds;
extern const Event LocalThreadPoolJobScheduleLockWaitMicroseconds;
extern const Event LocalThreadPoolJobEmplacementMicroseconds;
extern const Event LocalThreadPoolJobsCounter;
extern const Event LocalThreadPoolCondVarWaitingMicroseconds;
}

class JobWithPriority
Expand Down Expand Up @@ -113,19 +120,19 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
std::lock_guard lock(mutex);
jobs.reserve(queue_size);
while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? 256 : 1) ) )
{
try
{
createThreadNoLock();
}
catch (...)
{
break; /// failed to start more threads
}
}
// std::lock_guard lock(mutex);
// jobs.reserve(queue_size);
// while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? min_free_threads : 1) ) )
// {
// try
// {
// createThreadNoLock();
// }
// catch (...)
// {
// break; /// failed to start more threads
// }
// }
}

template <typename Thread>
Expand Down Expand Up @@ -267,7 +274,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobScheduleLockWaitMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleLockWaitMicroseconds,
watch.elapsedMicroseconds());


Stopwatch watch2;
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
Expand All @@ -278,17 +285,21 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
else
job_finished.wait(lock, pred);

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolCondVarWaitingMicroseconds : ProfileEvents::LocalThreadPoolCondVarWaitingMicroseconds,
watch2.elapsedMicroseconds());

if (shutdown)
return on_error("shutdown");

/// We must not to allocate any memory after we emplaced a job in a queue.
/// Because if an exception would be thrown, we won't notify a thread about job occurrence.

/// Check if there are enough threads to process job
/// Check if there are enough threads to process job.
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
{
while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? 32 : 1) ) )
{
// while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? min_free_threads : 1) ) )
// {
try
{
createThreadNoLock();
Expand All @@ -301,9 +312,10 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
{
on_error("can not start new thread");
}
}
// }
}

Stopwatch watch3;
jobs.emplace(std::move(job),
priority,
metric_scheduled_jobs,
Expand All @@ -312,16 +324,22 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// capture_frame_pointers
DB::Exception::enable_job_stack_trace);

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobEmplacementMicroseconds : ProfileEvents::LocalThreadPoolJobEmplacementMicroseconds,
watch3.elapsedMicroseconds());

++scheduled_jobs;
}

/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();


ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobScheduleMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleMicroseconds,
watch.elapsedMicroseconds());

ProfileEvents::increment( std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobsCounter : ProfileEvents::LocalThreadPoolJobsCounter);

return static_cast<ReturnType>(true);
}

Expand Down Expand Up @@ -351,8 +369,7 @@ void ThreadPoolImpl<Thread>::wait()
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
new_job_or_shutdown.notify_all();

no_jobs.wait(lock, [this] { return scheduled_jobs == 0; });
job_finished.wait(lock, [this] { return scheduled_jobs == 0; });

if (first_exception)
{
Expand Down Expand Up @@ -467,20 +484,9 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_

--scheduled_jobs;

if (!shutdown)
{
job_finished.notify_one();
if (!scheduled_jobs)
{
no_jobs.notify_all();
}
}
else
{
job_finished.notify_one();
job_finished.notify_all();
if (shutdown)
new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
no_jobs.notify_all();
}
}

new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); });
Expand Down
1 change: 0 additions & 1 deletion src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class ThreadPoolImpl
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
std::condition_variable no_jobs;

Metric metric_threads;
Metric metric_active_threads;
Expand Down

0 comments on commit 3c1e3bc

Please sign in to comment.