Skip to content

Commit

Permalink
New/improved support for initialization and finalization routines in …
Browse files Browse the repository at this point in the history
…ThreadPool (#19)

* clang-tidy fixes

* Improvements to ThreadPool constructors

* Support for finalization functions in ThreadPool

* Bumped version

* PTL_INSTALL_HEADERS + PTL_INSTALL_CONFIG
  • Loading branch information
jrmadsen authored Nov 30, 2021
1 parent 50bc10a commit 6da7f0f
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 72 deletions.
2 changes: 2 additions & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ modernize-*,\
-modernize-use-using,\
-modernize-use-auto,\
-modernize-concat-nested-namespaces,\
-modernize-use-nodiscard,\
performance-*,\
readability-*,\
-readability-function-size,\
Expand All @@ -38,6 +39,7 @@ readability-*,\
-readability-static-accessed-through-instance,\
-readability-const-return-type,\
-readability-function-cognitive-complexity,\
-readability-redundant-access-specifiers,\
"
HeaderFilterRegex: 'source/[^/]*\.(hh|cc)$'
CheckOptions:
Expand Down
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ include(PTLBuildSettings)
# User options
ptl_add_option(PTL_USE_TBB "Enable TBB" ON)
ptl_add_option(PTL_USE_LOCKS "Enable mutex locking in task subqueues for extra safety" OFF)
ptl_add_option(PTL_INSTALL_HEADERS "Install the headers" ON)
ptl_add_option(PTL_INSTALL_CONFIG "Install the cmake configuration" ON)

if(DEFINED PTL_DEVELOPER_INSTALL)
set(PTL_INSTALL_HEADERS ${PTL_DEVELOPER_INSTALL} CACHE BOOL "Set via PTL_DEVELOPER_INSTALL" FORCE)
set(PTL_INSTALL_CONFIG ${PTL_DEVELOPER_INSTALL} CACHE BOOL "Set via PTL_DEVELOPER_INSTALL" FORCE)
endif()

################################################################################
# Build Dependencies
Expand Down Expand Up @@ -92,4 +99,3 @@ endif()
if(PTL_MASTER_PROJECT)
ptl_print_features()
endif()

2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.0
2.2.0
24 changes: 14 additions & 10 deletions source/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,17 @@ endif()
# #
################################################################################

# install export
install(EXPORT ${PROJECT_NAME}Targets
NAMESPACE PTL::
DESTINATION ${PTL_INSTALL_CMAKEDIR}
COMPONENT Development)

# headers
install(FILES ${ptl_headers}
DESTINATION ${PTL_INSTALL_INCLUDEDIR}/PTL
COMPONENT Development)
if(PTL_INSTALL_CONFIG)
# install export
install(EXPORT ${PROJECT_NAME}Targets
NAMESPACE PTL::
DESTINATION ${PTL_INSTALL_CMAKEDIR}
COMPONENT Development)
endif()

if(PTL_INSTALL_HEADERS)
# headers
install(FILES ${ptl_headers}
DESTINATION ${PTL_INSTALL_INCLUDEDIR}/PTL
COMPONENT Development)
endif()
5 changes: 3 additions & 2 deletions source/PTL/TaskGroup.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "PTL/Task.hh"
#include "PTL/ThreadData.hh"
#include "PTL/ThreadPool.hh"
#include "PTL/Utility.hh"

#include <atomic>
#include <cstdint>
Expand Down Expand Up @@ -113,9 +114,9 @@ public:
// define move-construct
TaskGroup(this_type&& rhs) = default;
// delete copy-assign
this_type& operator=(const this_type& rhs) = delete;
TaskGroup& operator=(const this_type& rhs) = delete;
// define move-assign
this_type& operator=(this_type&& rhs) = default;
TaskGroup& operator=(this_type&& rhs) = default;

public:
template <typename Up>
Expand Down
1 change: 1 addition & 0 deletions source/PTL/TaskGroup.icc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
// ---------------------------------------------------------------

#include "PTL/Types.hh"
#include "PTL/Utility.hh"

namespace PTL
{
Expand Down
71 changes: 51 additions & 20 deletions source/PTL/ThreadPool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,50 @@ public:
using task_pointer = std::shared_ptr<task_type>;
using task_queue_t = VUserTaskQueue;
// containers
typedef std::deque<ThreadId> thread_list_t;
typedef std::vector<bool> bool_list_t;
typedef std::map<ThreadId, uintmax_t> thread_id_map_t;
typedef std::map<uintmax_t, ThreadId> thread_index_map_t;
using thread_vec_t = std::vector<Thread>;
using thread_data_t = std::vector<std::shared_ptr<ThreadData>>;
using thread_list_t = std::deque<ThreadId>;
using bool_list_t = std::vector<bool>;
using thread_id_map_t = std::map<ThreadId, uintmax_t>;
using thread_index_map_t = std::map<uintmax_t, ThreadId>;
using thread_vec_t = std::vector<Thread>;
using thread_data_t = std::vector<std::shared_ptr<ThreadData>>;
// functions
typedef std::function<void()> initialize_func_t;
typedef std::function<intmax_t(intmax_t)> affinity_func_t;
using initialize_func_t = std::function<void()>;
using finalize_func_t = std::function<void()>;
using affinity_func_t = std::function<intmax_t(intmax_t)>;

static affinity_func_t& affinity_functor()
{
static affinity_func_t _v = [](intmax_t) {
static std::atomic<intmax_t> assigned;
intmax_t _assign = assigned++;
return _assign % Thread::hardware_concurrency();
};
return _v;
}

static initialize_func_t& initialization_functor()
{
static initialize_func_t _v = []() {};
return _v;
}

static finalize_func_t& finalization_functor()
{
static finalize_func_t _v = []() {};
return _v;
}

public:
// Constructor and Destructors
ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue = nullptr,
bool _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false),
affinity_func_t = [](intmax_t) {
static std::atomic<intmax_t> assigned;
intmax_t _assign = assigned++;
return _assign % Thread::hardware_concurrency();
});
affinity_func_t = affinity_functor(),
initialize_func_t = initialization_functor(),
finalize_func_t = finalization_functor());
ThreadPool(const size_type& pool_size, initialize_func_t, finalize_func_t,
bool _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false),
affinity_func_t = affinity_functor(),
VUserTaskQueue* task_queue = nullptr);
// Virtual destructors are required by abstract classes
// so add it by default, just in case
virtual ~ThreadPool();
Expand Down Expand Up @@ -149,11 +174,16 @@ public:
// only relevant when compiled with PTL_USE_TBB
static tbb_global_control_t*& tbb_global_control();

void set_initialization(initialize_func_t f) { m_init_func = f; }
void set_initialization(initialize_func_t f) { m_init_func = std::move(f); }
void set_finalization(finalize_func_t f) { m_fini_func = std::move(f); }

void reset_initialization()
{
auto f = []() {};
m_init_func = f;
m_init_func = []() {};
}
void reset_finalization()
{
m_fini_func = []() {};
}

public:
Expand All @@ -175,7 +205,7 @@ public:
return (m_thread_awake) ? m_thread_awake->load() : 0;
}

void set_affinity(affinity_func_t f) { m_affinity_func = f; }
void set_affinity(affinity_func_t f) { m_affinity_func = std::move(f); }
void set_affinity(intmax_t i, Thread&);

void set_verbose(int n) { m_verbose = n; }
Expand Down Expand Up @@ -257,8 +287,9 @@ private:
tbb_task_group_t* m_tbb_task_group = nullptr;

// functions
initialize_func_t m_init_func = []() {};
affinity_func_t m_affinity_func;
initialize_func_t m_init_func = initialization_functor();
finalize_func_t m_fini_func = finalization_functor();
affinity_func_t m_affinity_func = affinity_functor();

private:
// Private static variables
Expand Down Expand Up @@ -354,7 +385,7 @@ ThreadPool::run_on_this(task_pointer&& _task)

if(m_tbb_tp && m_tbb_task_group)
{
auto _arena = get_task_arena();
auto* _arena = get_task_arena();
_arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
}
else
Expand Down
36 changes: 0 additions & 36 deletions source/PTL/Types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -123,42 +123,6 @@ GetSharedPointerPairMasterInstance()

//======================================================================================//

struct ScopeDestructor
{
template <typename FuncT>
ScopeDestructor(FuncT&& _func)
: m_functor(std::forward<FuncT>(_func))
{}

// delete copy operations
ScopeDestructor(const ScopeDestructor&) = delete;
ScopeDestructor& operator=(const ScopeDestructor&) = delete;

// allow move operations
ScopeDestructor(ScopeDestructor&& rhs) noexcept
: m_functor(std::move(rhs.m_functor))
{
rhs.m_functor = []() {};
}

ScopeDestructor& operator=(ScopeDestructor&& rhs) noexcept
{
if(this != &rhs)
{
m_functor = std::move(rhs.m_functor);
rhs.m_functor = []() {};
}
return *this;
}

~ScopeDestructor() { m_functor(); }

private:
std::function<void()> m_functor = []() {};
};

//======================================================================================//

} // namespace PTL

// Forward declation of void type argument for usage in direct object
Expand Down
39 changes: 38 additions & 1 deletion source/PTL/Utility.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <chrono>
#include <cstdlib>
#include <functional>
#include <iomanip>
#include <iostream>
#include <map>
Expand Down Expand Up @@ -312,7 +313,7 @@ GetEnv(const std::string& env_id, const EnvChoiceList<Tp>& _choices, Tp _default

template <typename Tp>
Tp
GetChoice(const EnvChoiceList<Tp>& _choices, const std::string str_var)
GetChoice(const EnvChoiceList<Tp>& _choices, const std::string& str_var)
{
auto asupper = [](std::string var) {
for(auto& itr : var)
Expand Down Expand Up @@ -363,4 +364,40 @@ PrintEnv(std::ostream& os = std::cout)

//--------------------------------------------------------------------------------------//

struct ScopeDestructor
{
template <typename FuncT>
ScopeDestructor(FuncT&& _func)
: m_functor(std::forward<FuncT>(_func))
{}

// delete copy operations
ScopeDestructor(const ScopeDestructor&) = delete;
ScopeDestructor& operator=(const ScopeDestructor&) = delete;

// allow move operations
ScopeDestructor(ScopeDestructor&& rhs) noexcept
: m_functor(std::move(rhs.m_functor))
{
rhs.m_functor = []() {};
}

ScopeDestructor& operator=(ScopeDestructor&& rhs) noexcept
{
if(this != &rhs)
{
m_functor = std::move(rhs.m_functor);
rhs.m_functor = []() {};
}
return *this;
}

~ScopeDestructor() { m_functor(); }

private:
std::function<void()> m_functor = []() {};
};

//--------------------------------------------------------------------------------------//

} // namespace PTL
22 changes: 21 additions & 1 deletion source/ThreadPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "PTL/Globals.hh"
#include "PTL/ThreadData.hh"
#include "PTL/UserTaskQueue.hh"
#include "PTL/Utility.hh"
#include "PTL/VUserTaskQueue.hh"

#include <cstdlib>
Expand Down Expand Up @@ -138,11 +139,14 @@ ThreadPool::get_this_thread_id()
//======================================================================================//

ThreadPool::ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue,
bool _use_affinity, affinity_func_t _affinity_func)
bool _use_affinity, affinity_func_t _affinity_func,
initialize_func_t _init_func, finalize_func_t _fini_func)
: m_use_affinity(_use_affinity)
, m_pool_state(std::make_shared<std::atomic_short>(thread_pool::state::NONINIT))
, m_task_queue(task_queue)
, m_affinity_func(std::move(_affinity_func))
, m_init_func(std::move(_init_func))
, m_fini_func(std::move(_fini_func))
{
auto master_id = get_this_thread_id();
if(master_id != 0 && m_verbose > 1)
Expand All @@ -157,6 +161,17 @@ ThreadPool::ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue,
m_task_queue = new UserTaskQueue(m_pool_size);
}

ThreadPool::ThreadPool(const size_type& pool_size, initialize_func_t _init_func,
finalize_func_t _fini_func, bool _use_affinity,
affinity_func_t _affinity_func, VUserTaskQueue* task_queue)
: ThreadPool{ pool_size,
task_queue,
_use_affinity,
std::move(_affinity_func),
std::move(_init_func),
std::move(_fini_func) }
{}

//======================================================================================//

ThreadPool::~ThreadPool()
Expand Down Expand Up @@ -251,6 +266,8 @@ ThreadPool::initialize_threadpool(size_type proposed_size)
// create task group (used for async)
if(!m_tbb_task_group)
m_tbb_task_group = new tbb_task_group_t();

execute_on_all_threads([this]() { m_init_func(); });
return m_pool_size;
}
#endif
Expand Down Expand Up @@ -375,6 +392,7 @@ ThreadPool::destroy_threadpool()
#if defined(PTL_USE_TBB)
if(m_tbb_task_group)
{
execute_on_all_threads([this]() { m_fini_func(); });
auto _func = [&]() { m_tbb_task_group->wait(); };
if(m_tbb_task_arena)
m_tbb_task_arena->execute(_func);
Expand Down Expand Up @@ -552,6 +570,8 @@ ThreadPool::execute_thread(VUserTaskQueue* _task_queue)

// initialization function
m_init_func();
// finalization function (executed when scope is destroyed)
ScopeDestructor _fini{ [this]() { m_fini_func(); } };

ThreadId tid = ThisThread::get_id();
ThreadData* data = thread_data();
Expand Down
1 change: 1 addition & 0 deletions source/UserTaskQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "PTL/Task.hh"
#include "PTL/TaskGroup.hh"
#include "PTL/ThreadPool.hh"
#include "PTL/Utility.hh"

#include <cassert>

Expand Down

0 comments on commit 6da7f0f

Please sign in to comment.