Skip to content

Commit

Permalink
use promise / futute
Browse files Browse the repository at this point in the history
  • Loading branch information
filimonov committed Feb 6, 2024
1 parent 4b9f9c5 commit 0275ba1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 54 deletions.
99 changes: 56 additions & 43 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <base/demangle.h>
#include <Common/StackTrace.h>
#include <Common/logger_useful.h>

namespace DB
Expand Down Expand Up @@ -82,10 +83,10 @@ class JobWithPriority

static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_THREADS = 1;
static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 12;
static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 1;
static constexpr const size_t LOCAL_THREAD_POOL_MIN_FREE_THREADS = 1;

static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_STEP = 8;
static constexpr const size_t GLOBAL_THREAD_POOL_EXPANSION_STEP = 1;
static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_INTERVAL_MILLISECONDS = 10000; // 10 seconds
/// static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_HISTORY_WINDOW_SECONDS = 600; // 10 minutes

Expand Down Expand Up @@ -127,6 +128,10 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, shutdown_on_exception(shutdown_on_exception_)
{
std::unique_lock lock(mutex);
// LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
// "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}",
// static_cast<void*>(this), max_threads, max_free_threads, queue_size, StackTrace().toString());

calculateDesiredThreadPoolSizeNoLock();
jobs.reserve(queue_size ? queue_size : desired_pool_size);

Expand Down Expand Up @@ -209,8 +214,8 @@ void ThreadPoolImpl<Thread>::calculateDesiredThreadPoolSizeNoLock()
{
desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads);

// ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage
//
// ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage
//
/// TODO
/// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes
/// and at least as small as maximum utilization over last 10 minutes
Expand Down Expand Up @@ -271,7 +276,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Check if there are enough threads to process job.
try
{
startThreads(std::is_same_v<Thread, std::thread>, lock); // async for global thread pool
startThreads(false, lock); // async for global thread pool
}
catch (const DB::Exception & e)
{
Expand Down Expand Up @@ -303,18 +308,6 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
return static_cast<ReturnType>(true);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::removeThread(std::shared_ptr<ThreadIterarorHolder> thread_it_holder)
{
if (thread_it_holder->thread_it.has_value()) {
auto thread_it = thread_it_holder->thread_it.value();
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
}

template <typename Thread>
void ThreadPoolImpl<Thread>::startThreads(bool async, std::unique_lock<std::mutex> & lock)
{
Expand All @@ -329,35 +322,44 @@ void ThreadPoolImpl<Thread>::startThreads(bool async, std::unique_lock<std::mute

if (async)
{
if (threads.size() >= desired_pool_size)
if (shutdown || threads.size() >= desired_pool_size)
return;
pool_grow_thread_cv.notify_all();
}
else // sync
{
while (threads.size() < desired_pool_size)
while (threads.size() < desired_pool_size && !shutdown)
{
try
{
lock.unlock();
auto thread_it_holder = std::make_shared<ThreadIterarorHolder>();
std::promise<typename std::list<Thread>::iterator> promise_thread_it;
std::future<typename std::list<Thread>::iterator> future_thread_id = promise_thread_it.get_future();

lock.unlock()

Stopwatch watch;
auto thread = Thread(&ThreadPoolImpl<Thread>::worker, this, thread_it_holder);
auto thread = Thread(&ThreadPoolImpl<Thread>::worker, this, std::move(future_thread_id));

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());

lock.lock();

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);

lock.lock();
// if (shutdown)
// {
// thread.detach();
// return;
// }
threads.push_front(std::move(thread));
thread_it_holder->set( std::move(threads.begin()) );
future_thread_id.set_value( threads.begin() );
}
catch (const std::exception & e)
{
lock.lock();
//lock.lock();
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what());
}
catch (...)
Expand Down Expand Up @@ -404,7 +406,7 @@ void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, Priority priority, uint64_
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{

//LOG_TRACE(&Poco::Logger::get("ThreadPoolImpl"), "Waiting for ThreadPool to finish");
Stopwatch watch;
std::unique_lock lock(mutex);
ProfileEvents::increment(
Expand All @@ -416,6 +418,7 @@ void ThreadPoolImpl<Thread>::wait()
new_job_or_shutdown.notify_all();

job_finished.wait(lock, [this] { return scheduled_jobs == 0; });
//LOG_TRACE(&Poco::Logger::get("ThreadPoolImpl"), "ThreadPool has finished");

if (first_exception)
{
Expand All @@ -440,6 +443,11 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::finalize()
{
{
// LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
// "ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}, StackTrace: {}",
// static_cast<void*>(this), threads.size(), StackTrace().toString());


std::lock_guard lock(mutex);
shutdown = true;
/// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function.
Expand All @@ -448,27 +456,26 @@ void ThreadPoolImpl<Thread>::finalize()
}

/// Wake up threads so they can finish themselves.

new_job_or_shutdown.notify_all();
/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{

thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
threads.clear();

housekeeping_thread_cv.notify_all();
pool_grow_thread_cv.notify_all();

for (auto & thread : service_threads)
{
std::lock_guard lock(mutex);
/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
threads.clear();

for (auto & thread : service_threads)
{
thread.join();
}
service_threads.clear();
thread.join();
}
service_threads.clear();
}

template <typename Thread>
Expand Down Expand Up @@ -601,8 +608,9 @@ bool ThreadPoolImpl<Thread>::finished() const
}

template <typename Thread>
void ThreadPoolImpl<Thread>::worker(std::shared_ptr<ThreadIterarorHolder> thread_it_holder)
void ThreadPoolImpl<Thread>::worker(std::future<typename std::list<Thread>::iterator> promise_thread_it)
{
auto thread_it = promise_thread_it.get();
DENY_ALLOCATIONS_IN_SCOPE;
CurrentMetrics::Increment metric_pool_threads(metric_threads);

Expand Down Expand Up @@ -661,7 +669,12 @@ void ThreadPoolImpl<Thread>::worker(std::shared_ptr<ThreadIterarorHolder> thread
// - or shutdown happened AND all jobs are already handled.

if (threads_remove_themselves)
removeThread(thread_it_holder);
{
thread_it->detach();
threads.erase(thread_it);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}

return;
}
Expand Down
13 changes: 2 additions & 11 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <queue>
#include <list>
#include <optional>
Expand Down Expand Up @@ -129,14 +130,6 @@ class ThreadPoolImpl
bool threads_remove_themselves = true;
const bool shutdown_on_exception = true;

struct ThreadIterarorHolder {
std::optional<typename std::list<Thread>::iterator> thread_it;

void set(std::list<Thread>::iterator it) {
thread_it = it;
}
};

boost::heap::priority_queue<JobWithPriority> jobs;
std::list<Thread> threads;
std::list<Thread> service_threads; // threads that are not used for running jobs, but for housekeeping tasks (only for global thread pool)
Expand All @@ -158,9 +151,7 @@ class ThreadPoolImpl

void calculateDesiredThreadPoolSizeNoLock();

void worker(std::shared_ptr<ThreadIterarorHolder> thread_it_holder);

void removeThread(std::shared_ptr<ThreadIterarorHolder> thread_it);
void worker(std::future<typename std::list<Thread>::iterator> promise_thread_it);

/// if number of threads is less than desired, creates new threads
/// for async mode it creates a task that creates new threads
Expand Down

0 comments on commit 0275ba1

Please sign in to comment.