Skip to content

Commit

Permalink
ThreadPool: Move thread creation out of the lock
Browse files Browse the repository at this point in the history
  • Loading branch information
filimonov committed Aug 23, 2024
1 parent 522991c commit 3ba5dac
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 58 deletions.
117 changes: 62 additions & 55 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <base/demangle.h>
#include <future>

namespace DB
{
Expand Down Expand Up @@ -47,48 +48,21 @@ namespace ProfileEvents

}

class JobWithPriority
class ScopedUnlocker
{
public:
using Job = std::function<void()>;

Job job;
Priority priority;
CurrentMetrics::Increment metric_increment;
DB::OpenTelemetry::TracingContextOnThread thread_trace_context;

/// Call stacks of all jobs' schedulings leading to this one
std::vector<StackTrace::FramePointers> frame_pointers;
bool enable_job_stack_trace = false;
Stopwatch job_create_time;

JobWithPriority(
Job job_, Priority priority_, CurrentMetrics::Metric metric,
const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_,
bool capture_frame_pointers)
: job(job_), priority(priority_), metric_increment(metric),
thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers)
{
if (!capture_frame_pointers)
return;
/// Save all previous jobs call stacks and append with current
frame_pointers = DB::Exception::getThreadFramePointers();
frame_pointers.push_back(StackTrace().getFramePointers());
}
explicit ScopedUnlocker(std::unique_lock<std::mutex>& lock) : lock_(lock) { lock_.unlock(); }

bool operator<(const JobWithPriority & rhs) const
{
return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first
}

UInt64 elapsedMicroseconds() const
{
return job_create_time.elapsedMicroseconds();
}
~ScopedUnlocker() { lock_.lock(); }

ScopedUnlocker(const ScopedUnlocker&) = delete;
ScopedUnlocker& operator=(const ScopedUnlocker&) = delete;

private:
std::unique_lock<std::mutex>& lock_;
};


static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";

template <typename Thread>
Expand Down Expand Up @@ -230,31 +204,62 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Check if there are enough threads to process job.
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
{
try
{
threads.emplace_front();
}
catch (...)
{
/// Most likely this is a std::bad_alloc exception
return on_error("cannot allocate thread slot");
}
bool push_successful = false;
std::promise<typename std::list<Thread>::iterator> promise_thread_it;
std::shared_future<typename std::list<Thread>::iterator> future_thread_id = promise_thread_it.get_future().share();
std::unique_ptr<Thread> thread_ptr;

try
{
Stopwatch watch2;
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch2.elapsedMicroseconds());
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
{
ScopedUnlocker unlocker(lock);

/// in certain conditions thread creation can be slow, so we need to do that out of the critical section,
/// otherwise it may lead to huge delays in thread pool

Stopwatch watch2;

/// we use future here for 2 reasons:
/// 1) just passing a ref to list iterator after adding the thread to the list
/// 2) hold the thread work until the thread is added to the list, otherwise
/// is can get the lock faster and then can wait for a cond_variable forever
thread_ptr = std::make_unique<Thread>([this, ft = std::move(future_thread_id)] mutable
{
auto thread_it = ft.get();
worker(thread_it);
});

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch2.elapsedMicroseconds());

}

threads.push_front(std::move(*thread_ptr));
push_successful = true;
promise_thread_it.set_value(threads.begin());
}
catch (...)
{
threads.pop_front();
// Set exception in the promise, so the thread will be stopped with exception
promise_thread_it.set_exception(std::current_exception());
if (push_successful)
{
if (threads.front().joinable())
threads.front().join();

threads.pop_front();
}
else if (thread_ptr && thread_ptr->joinable())
{
thread_ptr->join();
}

return on_error("cannot allocate thread");
}
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);

}

jobs.emplace(std::move(job),
Expand Down Expand Up @@ -489,16 +494,18 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_

/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.

job = std::move(const_cast<Job &>(jobs.top().job));
parent_thread_trace_context = std::move(const_cast<DB::OpenTelemetry::TracingContextOnThread &>(jobs.top().thread_trace_context));
DB::Exception::enable_job_stack_trace = jobs.top().enable_job_stack_trace;
auto elapsed = jobs.top().job_create_time.elapsedMicroseconds();
if (DB::Exception::enable_job_stack_trace)
thread_frame_pointers = std::move(const_cast<std::vector<StackTrace::FramePointers> &>(jobs.top().frame_pointers));
jobs.pop();

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds,
job_data->elapsedMicroseconds());
elapsed);

/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
Expand Down Expand Up @@ -526,7 +533,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
if constexpr (!std::is_same_v<Thread, std::thread>)
{
Stopwatch watch;
job_data->job();
job();
// This metric is less relevant for the global thread pool, as it would show large values (time while
// a thread was used by local pools) and increment only when local pools are destroyed.
//
Expand All @@ -536,7 +543,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
}
else
{
job_data->job();
job();
}

if (thread_trace_context.root_span.isTraceEnabled())
Expand Down
30 changes: 27 additions & 3 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include <Common/Exception.h>
#include <base/scope_guard.h>

class JobWithPriority;

/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
Expand Down Expand Up @@ -125,8 +123,34 @@ class ThreadPoolImpl
bool threads_remove_themselves = true;
const bool shutdown_on_exception = true;

boost::heap::priority_queue<JobWithPriority,boost::heap::stable<true>> jobs;
struct JobWithPriority
{
Job job;
Priority priority;
DB::OpenTelemetry::TracingContextOnThread thread_trace_context;

/// Call stacks of all jobs' schedulings leading to this one
std::vector<StackTrace::FramePointers> frame_pointers;
bool enable_job_stack_trace = false;
Stopwatch job_create_time;

JobWithPriority(Job job_, Priority priority_, const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_, bool capture_frame_pointers = false)
: job(job_), priority(priority_), thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers)
{
if (!capture_frame_pointers)
return;
/// Save all previous jobs call stacks and append with current
frame_pointers = DB::Exception::thread_frame_pointers;
frame_pointers.push_back(StackTrace().getFramePointers());
}

bool operator<(const JobWithPriority & rhs) const
{
return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first
}
};

boost::heap::priority_queue<JobWithPriority,boost::heap::stable<true>> jobs;
std::list<Thread> threads;
std::exception_ptr first_exception;
std::stack<OnDestroyCallback> on_destroy_callbacks;
Expand Down

0 comments on commit 3ba5dac

Please sign in to comment.