Skip to content

Commit

Permalink
wip2
Browse files Browse the repository at this point in the history
  • Loading branch information
filimonov committed Feb 10, 2024
1 parent 22c7fa4 commit c0032f2
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 112 deletions.
226 changes: 122 additions & 104 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

#include <cassert>
#include <type_traits>
#include <future>

#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
Expand Down Expand Up @@ -96,7 +95,7 @@ class JobWithPriority

static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool";
static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 32;
static constexpr const size_t LOCAL_THREAD_POOL_MIN_FREE_THREADS = 0;
static constexpr const size_t LOCAL_THREAD_POOL_MIN_FREE_THREADS = 1;

static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_INTERVAL_MILLISECONDS = 10000; // 10 seconds
/// static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_HISTORY_WINDOW_SECONDS = 600; // 10 minutes
Expand Down Expand Up @@ -138,7 +137,6 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
std::unique_lock lock(mutex);
// LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
// "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}",
// static_cast<void*>(this), max_threads, max_free_threads, queue_size, StackTrace().toString());
Expand All @@ -148,14 +146,15 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(

if constexpr (std::is_same_v<Thread, std::thread>) // global thread pool
{
// for global thread pool we need to start housekeeping thread
// it will run during the whole lifetime of the global thread pool
housekeeping_thread.emplace(&ThreadPoolImpl<Thread>::housekeep, this);

// {
// std::lock_guard lock(mutex);
// // for global thread pool we need to start housekeeping thread
// // it will run during the whole lifetime of the global thread pool
// // housekeeping_thread.emplace(&ThreadPoolImpl<Thread>::housekeep, this);
// }
// we will start GLOBAL_THREAD_POOL_MIN_FREE_THREADS immediately
adjustThreadPoolSize();
}

}

template <typename Thread>
Expand All @@ -172,7 +171,8 @@ void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
jobs.reserve(queue_size ? queue_size : desired_pool_size.load());
}

adjustThreadPoolSize();
if (current_pool_size > 0)
adjustThreadPoolSize();
}

template <typename Thread>
Expand All @@ -190,7 +190,9 @@ void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
max_free_threads = std::min(value, max_threads);
calculateDesiredThreadPoolSizeNoLock();
}
adjustThreadPoolSize();

if (current_pool_size > 0)
adjustThreadPoolSize();
}

template <typename Thread>
Expand Down Expand Up @@ -220,23 +222,58 @@ void ThreadPoolImpl<Thread>::calculateDesiredThreadPoolSizeNoLock()
{
desired_pool_size = std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? GLOBAL_THREAD_POOL_MIN_FREE_THREADS : LOCAL_THREAD_POOL_MIN_FREE_THREADS));
}
// else if (current_pool_size > std::min(max_threads, scheduled_jobs + max_free_threads))
// {
// /// desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads);
else if (current_pool_size > std::min(max_threads, scheduled_jobs + max_free_threads))
{
desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads);

// // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage
// //
// /// TODO
// /// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes
// /// and at least as small as maximum utilization over last 10 minutes
// // // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage
// // //
// // /// TODO
// // /// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes
// // /// and at least as small as maximum utilization over last 10 minutes

// // we are in allowed range, let's try to guess the optimal number of threads based on the current & history of utilization
// // we want to have some free threads in the pool, but not too many
// // // we are in allowed range, let's try to guess the optimal number of threads based on the current & history of utilization
// // // we want to have some free threads in the pool, but not too many

// }
}
// todo: we need to shrink the pool if there are too many free threads
}

template<typename Thread>
typename ThreadPoolImpl<Thread>::PreparedThread ThreadPoolImpl<Thread>::prepareThread()
{
std::promise<typename std::list<Thread>::iterator> promise_thread_it;
std::shared_future<typename std::list<Thread>::iterator> future_thread_id = promise_thread_it.get_future().share();

Stopwatch watch;
std::unique_ptr<Thread> thread_ptr = std::make_unique<Thread>([this, ft = std::move(future_thread_id)] mutable
{
auto thread_it = ft.get();
worker(thread_it);
});

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());

return {
std::move(promise_thread_it),
std::move(thread_ptr)
};
}

template <typename Thread>
void ThreadPoolImpl<Thread>::activateThread(typename ThreadPoolImpl<Thread>::PreparedThread & prepared_thread)
{
threads.push_front(std::move(*prepared_thread.thread_ptr));
prepared_thread.thread_ptr.reset();

prepared_thread.promise_thread_it.set_value( threads.begin() );
current_pool_size = threads.size();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);
}


template <typename Thread>
template <typename ReturnType>
Expand All @@ -261,6 +298,25 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
return false;
};

std::optional<PreparedThread> prepared_thread;

if (current_pool_size < desired_pool_size)
{
try
{
prepared_thread = prepareThread();
}
catch (const std::exception & e)
{
std::lock_guard lock(mutex);
return on_error(fmt::format("cannot start threads: {}", e.what()));
}
catch (...)
{
std::lock_guard lock(mutex);
return on_error("cannot start thread");
}
}

{
Stopwatch watch;
Expand All @@ -269,6 +325,12 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds,
watch.elapsedMicroseconds());

if (prepared_thread)
{
activateThread(*prepared_thread);
prepared_thread.reset();
}

auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero.
Expand Down Expand Up @@ -299,68 +361,9 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one(); // maybe while we starting the new thread some of the existing will become free and take the job?

if (current_pool_size < desired_pool_size)
{
try
{
startThreads();
}
catch (const DB::Exception & e)
{
return on_error(fmt::format("cannot start threads: {}", e.what()));
}
catch (...)
{
return on_error("cannot start threads");
}
}

return static_cast<ReturnType>(true);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::startThreads()
{
std::lock_guard lock(threads_mutex);

while (threads.size() < desired_pool_size)
{
try
{
std::promise<typename std::list<Thread>::iterator> promise_thread_it;
std::shared_future<typename std::list<Thread>::iterator> future_thread_id = promise_thread_it.get_future().share();

Stopwatch watch;

std::unique_ptr<Thread> thread_ptr = std::make_unique<Thread>([this, ft = std::move(future_thread_id)] mutable
{
auto thread_it = ft.get();
worker(thread_it);
});

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());

ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions);

threads.push_front(std::move(*thread_ptr));
promise_thread_it.set_value( threads.begin() );

current_pool_size = threads.size();
new_job_or_shutdown.notify_one(); // let's notify the new thread just in case
}
catch (const std::exception & e)
{
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what());
}
catch (...)
{
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread");
}
}
}

template <typename Thread>
void ThreadPoolImpl<Thread>::adjustThreadPoolSize()
Expand All @@ -369,7 +372,24 @@ void ThreadPoolImpl<Thread>::adjustThreadPoolSize()
auto desired = desired_pool_size.load();
if (pool_size < desired)
{
startThreads();
std::lock_guard lock(mutex);

while (threads.size() < desired_pool_size)
{
try
{
auto prepared_thread = prepareThread();
activateThread(prepared_thread);
}
catch (const std::exception & e)
{
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what());
}
catch (...)
{
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread");
}
}
}
else if (pool_size > desired)
{
Expand Down Expand Up @@ -452,17 +472,25 @@ void ThreadPoolImpl<Thread>::finalize()
/// Wake up threads so they can finish themselves.
new_job_or_shutdown.notify_all();

/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
/// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does).
for (auto & thread : threads)
{
thread.join();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
threads.clear();
}
threads.clear();

housekeeping_thread_cv.notify_all();
housekeeping_thread.reset();

if (housekeeping_thread)
{
std::lock_guard lock(mutex);
housekeeping_thread->join();
housekeeping_thread.reset();
}
}

template <typename Thread>
Expand Down Expand Up @@ -569,7 +597,6 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
CurrentMetrics::Increment metric_pool_threads(metric_threads);

bool job_is_done = false;
bool should_remove_myself = false;

std::exception_ptr exception_from_job;

Expand Down Expand Up @@ -627,12 +654,14 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_

if (threads_remove_themselves)
{
should_remove_myself = true; // we will do it out the the lock
thread_it->detach();
threads.erase(thread_it);
current_pool_size = threads.size();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
}
else
return;
return;
}

/// boost::priority_queue does not provide interface for getting non-const reference to an element
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority.
job_data = std::move(const_cast<JobWithPriority &>(jobs.top()));
Expand All @@ -650,17 +679,6 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
}
}

if (should_remove_myself)
{
std::lock_guard lock(threads_mutex);
thread_it->detach();
threads.erase(thread_it);
current_pool_size = threads.size();
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks);
return;
}

// if (have_more_jobs)
// {
// new_job_or_shutdown.notify_one(); // just in case some other thread missed the cond var signal
Expand Down
20 changes: 12 additions & 8 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <list>
#include <optional>
#include <atomic>
#include <future>
#include <stack>

#include <boost/heap/priority_queue.hpp>
Expand Down Expand Up @@ -109,9 +110,14 @@ class ThreadPoolImpl
using OnDestroyCallback = std::function<void()>;
void addOnDestroyCallback(OnDestroyCallback && callback);


private:
friend class GlobalThreadPool;

struct PreparedThread {
std::promise<typename std::list<Thread>::iterator> promise_thread_it;
std::unique_ptr<Thread> thread_ptr;
};

mutable std::mutex mutex;
std::condition_variable job_finished;
Expand All @@ -135,9 +141,11 @@ class ThreadPoolImpl
/// boost::heap::stable<true> is used to preserve the FIFO order of jobs with same priority
boost::heap::priority_queue<JobWithPriority, boost::heap::stable<true>> jobs;

mutable std::mutex threads_mutex;
std::list<Thread> threads; // modified when threads_mutex is locked
std::atomic<size_t> current_pool_size = 0; // modified when threads_mutex is locked
std::list<Thread> threads;
std::atomic<size_t> current_pool_size = 0;

PreparedThread prepareThread();
void activateThread(PreparedThread & prepared_thread);

std::exception_ptr first_exception;
std::stack<OnDestroyCallback> on_destroy_callbacks;
Expand All @@ -150,11 +158,7 @@ class ThreadPoolImpl

void worker(typename std::list<Thread>::iterator thread_it);


/// if number of threads is less than desired, creates new threads
void startThreads();

/// will incrase number of threads if needed or decrease if there are too many
/// will increase number of threads if needed or decrease if there are too many
void adjustThreadPoolSize();

void finalize();
Expand Down

0 comments on commit c0032f2

Please sign in to comment.