Skip to content

Commit

Permalink
add stacktrace and some debug info into contructors / destructors
Browse files Browse the repository at this point in the history
  • Loading branch information
filimonov committed Jan 23, 2024
1 parent 7cbfee6 commit e30b1ab
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <base/demangle.h>
#include <Common/StackTrace.h>
#include <Common/logger_useful.h>

namespace DB
{
Expand Down Expand Up @@ -128,9 +130,12 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
// std::lock_guard lock(mutex);
// jobs.reserve(queue_size);
// while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? min_free_threads : 1) ) )
std::lock_guard lock(mutex);
jobs.reserve(max_free_threads);
LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
"ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}",
static_cast<void*>(this), max_threads, max_free_threads, queue_size, StackTrace().toString());
// while (threads.size() < max_free_threads)
// {
// try
// {
Expand Down Expand Up @@ -285,7 +290,9 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
Stopwatch watch2;
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
/// if (wait_microseconds.has_value() && wait_microseconds.value() > 0)

if (wait_microseconds) /// Check for optional. Condition is also true if the optional has value=0.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds));
Expand All @@ -306,7 +313,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// Check if there are enough threads to process job.
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
{
// while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? min_free_threads : 1) ) )
// while (threads.size() < std::min(max_threads, scheduled_jobs + 1) )
// {
try
{
Expand All @@ -320,7 +327,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
{
on_error("can not start new thread");
}
// }
// }
}

Stopwatch watch3;
Expand Down Expand Up @@ -393,6 +400,10 @@ ThreadPoolImpl<Thread>::~ThreadPoolImpl()
/// Note: should not use logger from here,
/// because it can be an instance of GlobalThreadPool that is a global variable
/// and the destruction order of global variables is unspecified.
LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"),
"ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}",
static_cast<void*>(this), threads.size());


finalize();
onDestroy();
Expand Down

0 comments on commit e30b1ab

Please sign in to comment.