From ed6c50d3ccfaba0e84442ca77c4036528867cfa9 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 29 Jan 2024 15:30:49 +0100 Subject: [PATCH] Reworked code to use way less shared ptr locks Signed-off-by: Janosch Machowinski --- .../include/rclcpp/executors/cbg_executor.hpp | 26 ++-- .../detail/any_executable_weak_ref.hpp | 134 +++++++++++++++++- .../executors/detail/rcl_to_rclcpp_map.hpp | 116 ++++++--------- rclcpp/src/rclcpp/executors/cbg_executor.cpp | 63 +++----- rclcpp/test/benchmark/benchmark_executor.cpp | 17 ++- 5 files changed, 210 insertions(+), 146 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/cbg_executor.hpp b/rclcpp/include/rclcpp/executors/cbg_executor.hpp index 774ac29fec..103bc5fe7a 100644 --- a/rclcpp/include/rclcpp/executors/cbg_executor.hpp +++ b/rclcpp/include/rclcpp/executors/cbg_executor.hpp @@ -21,6 +21,7 @@ #include #include +#include "rclcpp/executors/detail/any_executable_weak_ref.hpp" #include "rclcpp/executor.hpp" #include "rclcpp/executors/callback_group_state.hpp" #include "rclcpp/macros.hpp" @@ -48,9 +49,10 @@ class ExecutionGroup next_unprocessed_ready_executable = 0; } - void add_ready_executable(const Executable & e) + void add_ready_executable(AnyExecutableWeakRef & e) { - ready_executables.push_back(e); + ready_executables.push_back(&e); +// ready_executables.push_back(std::get(e.executable)); } bool has_unprocessed_executables() @@ -58,9 +60,9 @@ class ExecutionGroup for (; next_unprocessed_ready_executable < ready_executables.size(); next_unprocessed_ready_executable++) { - const auto & ready_executable = ready_executables[next_unprocessed_ready_executable]; + auto & ready_executable = ready_executables[next_unprocessed_ready_executable]; - if (ready_executable.lock()) { + if (ready_executable->executable_alive()) { return true; } } @@ -74,7 +76,7 @@ class ExecutionGroup { const auto & ready_executable = ready_executables[next_unprocessed_ready_executable]; - if (fill_any_executable(any_executable, ready_executable)) { + if (fill_any_executable(any_executable, std::get(ready_executable->executable))) { // mark the current element as processed next_unprocessed_ready_executable++; @@ -108,7 +110,7 @@ class ExecutionGroup return false; } - any_executable.data = *data; +// any_executable.data = *data; return true; //RCUTILS_LOG_INFO("Executing timer"); @@ -142,7 +144,7 @@ class ExecutionGroup return false; } - std::vector ready_executables; + std::vector ready_executables; size_t next_unprocessed_ready_executable = 0; }; @@ -166,11 +168,11 @@ class CallbackGroupScheduler void clear_and_prepare(const CallbackGroupState & cb_elements); - void add_ready_executable(const rclcpp::SubscriptionBase::WeakPtr & executable); - void add_ready_executable(const rclcpp::ServiceBase::WeakPtr & executable); - void add_ready_executable(const rclcpp::TimerBase::WeakPtr & executable); - void add_ready_executable(const rclcpp::ClientBase::WeakPtr & executable); - void add_ready_executable(const rclcpp::Waitable::WeakPtr & executable); + void add_ready_timer(AnyExecutableWeakRef & executable); + void add_ready_subscription(AnyExecutableWeakRef & executable); + void add_ready_service(AnyExecutableWeakRef & executable); + void add_ready_client(AnyExecutableWeakRef & executable); + void add_ready_waitable(AnyExecutableWeakRef & executable); enum Priorities { diff --git a/rclcpp/include/rclcpp/executors/detail/any_executable_weak_ref.hpp b/rclcpp/include/rclcpp/executors/detail/any_executable_weak_ref.hpp index ea7ecc406e..19b432fec2 100644 --- a/rclcpp/include/rclcpp/executors/detail/any_executable_weak_ref.hpp +++ b/rclcpp/include/rclcpp/executors/detail/any_executable_weak_ref.hpp @@ -5,6 +5,7 @@ #include "rclcpp/client.hpp" #include "rclcpp/waitable.hpp" #include "rclcpp/guard_condition.hpp" +#include "rclcpp/executors/callback_group_state.hpp" namespace rclcpp::executors { @@ -17,27 +18,72 @@ struct AnyExecutableWeakRef AnyExecutableWeakRef(const rclcpp::SubscriptionBase::WeakPtr & p, int16_t callback_group_index) : executable(p), callback_group_index(callback_group_index) - {} + { + if(auto shr = p.lock()) + { + rcl_handle_shr_ptr = shr->get_subscription_handle(); + } + else + { + rcl_handle_shr_ptr = std::monostate(); + } + } AnyExecutableWeakRef(const rclcpp::TimerBase::WeakPtr & p, int16_t callback_group_index) : executable(p), callback_group_index(callback_group_index) - {} + { + if(auto shr = p.lock()) + { + rcl_handle_shr_ptr = shr->get_timer_handle(); + } + else + { + rcl_handle_shr_ptr = std::monostate(); + } + } AnyExecutableWeakRef(const rclcpp::ServiceBase::WeakPtr & p, int16_t callback_group_index) : executable(p), callback_group_index(callback_group_index) - {} + { + if(auto shr = p.lock()) + { + rcl_handle_shr_ptr = shr->get_service_handle(); + } + else + { + rcl_handle_shr_ptr = std::monostate(); + } + } AnyExecutableWeakRef(const rclcpp::ClientBase::WeakPtr & p, int16_t callback_group_index) : executable(p), callback_group_index(callback_group_index) - {} + { + if(auto shr = p.lock()) + { + rcl_handle_shr_ptr = shr->get_client_handle(); + } + else + { + rcl_handle_shr_ptr = std::monostate(); + } + } AnyExecutableWeakRef(const rclcpp::Waitable::WeakPtr & p, int16_t callback_group_index) : executable(p), callback_group_index(callback_group_index) - {} + { + if(auto shr = p.lock()) + { + rcl_handle_shr_ptr = shr; + } + else + { + rcl_handle_shr_ptr = std::monostate(); + } + } AnyExecutableWeakRef( const rclcpp::GuardCondition::WeakPtr & p, @@ -49,6 +95,76 @@ struct AnyExecutableWeakRef { //special case, guard conditions are auto processed by waking up the wait set // therefore they shall never create a real executable + + { + if(auto shr = p.lock()) + { + rcl_handle_shr_ptr = shr; + } + else + { + rcl_handle_shr_ptr = std::monostate(); + } + } + + + } + + /** + * Checks, if the executable still exists, or if was deleted + */ + bool executable_alive() + { + auto check_valid = [this] (const auto &shr_ptr) + { + auto use_cnt = shr_ptr.use_count(); + if(use_cnt <= 1) + { + rcl_handle_shr_ptr = std::monostate(); + return false; + } + + return true; + }; + + switch(rcl_handle_shr_ptr.index()) { + case AnyExecutableWeakRef::ExecutableIndex::Subscription: + { + return check_valid(std::get>(rcl_handle_shr_ptr)); + } + break; + case AnyExecutableWeakRef::ExecutableIndex::Timer: + { + return check_valid(std::get>(rcl_handle_shr_ptr)); + } + break; + case AnyExecutableWeakRef::ExecutableIndex::Service: + { + return check_valid(std::get>(rcl_handle_shr_ptr)); + } + break; + case AnyExecutableWeakRef::ExecutableIndex::Client: + { + return check_valid(std::get>(rcl_handle_shr_ptr)); + } + break; + case AnyExecutableWeakRef::ExecutableIndex::Waitable: + { + return check_valid(std::get>(rcl_handle_shr_ptr)); + } + break; + case AnyExecutableWeakRef::ExecutableIndex::GuardCondition: + { + return check_valid(std::get>(rcl_handle_shr_ptr)); + } + case AnyExecutableWeakRef::ExecutableIndex::Deleted: + { + return false; + } + break; + + } + return false; } AnyExecutableWeakRef(const AnyExecutableWeakRef &) = delete; @@ -69,10 +185,16 @@ struct AnyExecutableWeakRef Client, Waitable, GuardCondition, + Deleted, }; // shared_ptr holding the rcl handle during wait - std::shared_ptr rcl_handle_shr_ptr; + + using RclHandleVariant = std::variant, std::shared_ptr, + std::shared_ptr, std::shared_ptr, + rclcpp::Waitable::SharedPtr, rclcpp::GuardCondition::SharedPtr, std::monostate>; + + RclHandleVariant rcl_handle_shr_ptr; // A function that should be executed if the executable is a guard condition and ready std::function handle_guard_condition_fun; diff --git a/rclcpp/include/rclcpp/executors/detail/rcl_to_rclcpp_map.hpp b/rclcpp/include/rclcpp/executors/detail/rcl_to_rclcpp_map.hpp index c42bb87c39..244122876b 100644 --- a/rclcpp/include/rclcpp/executors/detail/rcl_to_rclcpp_map.hpp +++ b/rclcpp/include/rclcpp/executors/detail/rcl_to_rclcpp_map.hpp @@ -114,50 +114,61 @@ struct RCLToRCLCPPMap bool add_to_wait_set_and_mapping(rcl_wait_set_s & ws, AnyExecutableWeakRef & executable_ref) { - switch (executable_ref.executable.index()) { + auto check_usage_and_delete_or_add = [this, &executable_ref, &ws] (const auto &shr_ptr) + { + auto use_cnt = shr_ptr.use_count(); + if(use_cnt <= 1) + { + // got deleted, we are the last one holding a reference + executable_ref.rcl_handle_shr_ptr = std::monostate(); + return true; + } + + return add_to_wait_set_and_mapping( + ws, + shr_ptr, executable_ref); + }; + + switch (executable_ref.rcl_handle_shr_ptr.index()) { case AnyExecutableWeakRef::ExecutableIndex::Subscription: { - return add_to_wait_set_and_mapping( - ws, - std::get( - executable_ref.executable), executable_ref); + return check_usage_and_delete_or_add(std::get>( + executable_ref.rcl_handle_shr_ptr)); } break; case AnyExecutableWeakRef::ExecutableIndex::Timer: { - return add_to_wait_set_and_mapping( - ws, - std::get(executable_ref.executable), executable_ref); + return check_usage_and_delete_or_add(std::get>( + executable_ref.rcl_handle_shr_ptr)); } break; case AnyExecutableWeakRef::ExecutableIndex::Service: { - return add_to_wait_set_and_mapping( - ws, - std::get(executable_ref.executable), - executable_ref); + return check_usage_and_delete_or_add(std::get>( + executable_ref.rcl_handle_shr_ptr)); } break; case AnyExecutableWeakRef::ExecutableIndex::Client: { - return add_to_wait_set_and_mapping( - ws, - std::get(executable_ref.executable), executable_ref); + return check_usage_and_delete_or_add(std::get>( + executable_ref.rcl_handle_shr_ptr)); } break; case AnyExecutableWeakRef::ExecutableIndex::Waitable: { - return add_to_wait_set_and_mapping( - ws, - std::get(executable_ref.executable), executable_ref); + return check_usage_and_delete_or_add(std::get>( + executable_ref.rcl_handle_shr_ptr)); } break; case AnyExecutableWeakRef::ExecutableIndex::GuardCondition: { - return add_to_wait_set_and_mapping( - ws, - std::get( - executable_ref.executable), executable_ref); + return check_usage_and_delete_or_add(std::get( + executable_ref.rcl_handle_shr_ptr)); + } + break; + case AnyExecutableWeakRef::ExecutableIndex::Deleted: + { + return true; } break; } @@ -168,17 +179,9 @@ struct RCLToRCLCPPMap bool add_to_wait_set_and_mapping( rcl_wait_set_s & ws, - const rclcpp::SubscriptionBase::WeakPtr & sub_weak_ptr, + const std::shared_ptr & handle_shr_ptr, AnyExecutableWeakRef & executable_ref) { - const rclcpp::SubscriptionBase::SharedPtr & sub_ptr = sub_weak_ptr.lock(); - if (!sub_ptr) { - // got deleted, we just ignore it - return true; - } - - auto handle_shr_ptr = sub_ptr->get_subscription_handle(); - executable_ref.rcl_handle_shr_ptr = handle_shr_ptr; subscription_map.emplace_back(&executable_ref); size_t idx; @@ -202,17 +205,9 @@ struct RCLToRCLCPPMap bool add_to_wait_set_and_mapping( rcl_wait_set_s & ws, - const rclcpp::ClientBase::WeakPtr & client_weak_ptr, + const std::shared_ptr & handle_shr_ptr, AnyExecutableWeakRef & any_exec) { - const rclcpp::ClientBase::SharedPtr & client_ptr = client_weak_ptr.lock(); - if (!client_ptr) { - // got deleted, we just ignore it from now on - return true; - } - - auto handle_shr_ptr = client_ptr->get_client_handle(); - any_exec.rcl_handle_shr_ptr = handle_shr_ptr; clients_map.emplace_back(&any_exec); size_t idx; @@ -231,17 +226,9 @@ struct RCLToRCLCPPMap bool add_to_wait_set_and_mapping( rcl_wait_set_s & ws, - const rclcpp::ServiceBase::WeakPtr & weak_ptr, + const std::shared_ptr & handle_shr_ptr, AnyExecutableWeakRef & any_exec) { - const rclcpp::ServiceBase::SharedPtr & shr_ptr = weak_ptr.lock(); - if (!shr_ptr) { - // got deleted, we just ignore it from now on - return true; - } - - auto handle_shr_ptr = shr_ptr->get_service_handle(); - any_exec.rcl_handle_shr_ptr = handle_shr_ptr; services_map.emplace_back(&any_exec); size_t idx; @@ -259,17 +246,10 @@ struct RCLToRCLCPPMap } bool add_to_wait_set_and_mapping( - rcl_wait_set_s & ws, const rclcpp::TimerBase::WeakPtr & weak_ptr, + rcl_wait_set_s & ws, + const std::shared_ptr & handle_shr_ptr, AnyExecutableWeakRef & any_exec) { - const rclcpp::TimerBase::SharedPtr & shr_ptr = weak_ptr.lock(); - if (!shr_ptr) { - // got deleted, we just ignore it from now on - return true; - } - - auto handle_shr_ptr = shr_ptr->get_timer_handle(); - any_exec.rcl_handle_shr_ptr = handle_shr_ptr; timer_map.emplace_back(&any_exec); size_t idx; @@ -287,15 +267,9 @@ struct RCLToRCLCPPMap } bool add_to_wait_set_and_mapping( - rcl_wait_set_s & ws, const rclcpp::Waitable::WeakPtr & weak_ptr, + rcl_wait_set_s & ws, const rclcpp::Waitable::SharedPtr & waitable_ptr, AnyExecutableWeakRef & any_exec) { - const rclcpp::Waitable::SharedPtr & waitable_ptr = weak_ptr.lock(); - if (!waitable_ptr) { - // got deleted, we just ignore it from now on - return true; - } - const size_t old_client_index = ws.client_index; const size_t old_event_index = ws.event_index; const size_t old_guard_condition_index = ws.guard_condition_index; @@ -353,19 +327,11 @@ struct RCLToRCLCPPMap bool add_to_wait_set_and_mapping( rcl_wait_set_s & ws, - const rclcpp::GuardCondition::WeakPtr & weak_ptr, + const rclcpp::GuardCondition::SharedPtr &gc_ptr, AnyExecutableWeakRef & any_exec) { - rclcpp::GuardCondition::SharedPtr shr_ptr = weak_ptr.lock(); - - if (!shr_ptr) { - return false; - } - - const auto & gc = shr_ptr->get_rcl_guard_condition(); - size_t idx = 200; - rcl_ret_t ret = rcl_wait_set_add_guard_condition(&ws, &gc, &idx); + rcl_ret_t ret = rcl_wait_set_add_guard_condition(&ws, &(gc_ptr->get_rcl_guard_condition()), &idx); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error( diff --git a/rclcpp/src/rclcpp/executors/cbg_executor.cpp b/rclcpp/src/rclcpp/executors/cbg_executor.cpp index 071e7fba18..5c1c1ef77a 100644 --- a/rclcpp/src/rclcpp/executors/cbg_executor.cpp +++ b/rclcpp/src/rclcpp/executors/cbg_executor.cpp @@ -164,24 +164,24 @@ bool CallbackGroupScheduler::has_unprocessed_executables() ready_waitables.has_unprocessed_executables(); } -void CallbackGroupScheduler::add_ready_executable( - const rclcpp::SubscriptionBase::WeakPtr & executable) + +void CallbackGroupScheduler::add_ready_subscription(AnyExecutableWeakRef & executable) { ready_subscriptions.add_ready_executable(executable); } -void CallbackGroupScheduler::add_ready_executable(const rclcpp::ServiceBase::WeakPtr & executable) +void CallbackGroupScheduler::add_ready_service(AnyExecutableWeakRef & executable) { ready_services.add_ready_executable(executable); } -void CallbackGroupScheduler::add_ready_executable(const rclcpp::TimerBase::WeakPtr & executable) +void CallbackGroupScheduler::add_ready_timer(AnyExecutableWeakRef & executable) { ready_timers.add_ready_executable(executable); } -void CallbackGroupScheduler::add_ready_executable(const rclcpp::ClientBase::WeakPtr & executable) +void CallbackGroupScheduler::add_ready_client(AnyExecutableWeakRef & executable) { ready_clients.add_ready_executable(executable); } -void CallbackGroupScheduler::add_ready_executable(const rclcpp::Waitable::WeakPtr & executable) +void CallbackGroupScheduler::add_ready_waitable(AnyExecutableWeakRef & executable) { ready_waitables.add_ready_executable(executable); } @@ -428,43 +428,33 @@ void CBGExecutor::fill_callback_group_data( return; } - switch (ready_exec.executable.index()) { + switch (ready_exec.rcl_handle_shr_ptr.index()) { case AnyExecutableWeakRef::ExecutableIndex::Subscription: { - idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_executable( - std::get( - ready_exec.executable)); + idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_subscription(ready_exec); } break; case AnyExecutableWeakRef::ExecutableIndex::Timer: { - idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_executable( - std::get( - ready_exec.executable)); + idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_timer(ready_exec); } break; case AnyExecutableWeakRef::ExecutableIndex::Service: { - idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_executable( - std::get( - ready_exec.executable)); + idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_service(ready_exec); } break; case AnyExecutableWeakRef::ExecutableIndex::Client: { - idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_executable( - std::get( - ready_exec.executable)); + idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_client(ready_exec); } break; case AnyExecutableWeakRef::ExecutableIndex::Waitable: { - const rclcpp::Waitable::WeakPtr & waitable_weak = - std::get(ready_exec.executable); - rclcpp::Waitable::SharedPtr waitable = waitable_weak.lock(); + rclcpp::Waitable::SharedPtr &waitable = std::get(ready_exec.rcl_handle_shr_ptr); + if (waitable && waitable->is_ready(&wait_set)) { - idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_executable( - waitable_weak); + idle_callback_groups[ready_exec.callback_group_index]->scheduler->add_ready_waitable(ready_exec); } } break; @@ -476,7 +466,8 @@ void CBGExecutor::fill_callback_group_data( } } break; - + case AnyExecutableWeakRef::ExecutableIndex::Deleted: + break; } ready_exec.processed = true; @@ -488,10 +479,6 @@ void CBGExecutor::fill_callback_group_data( //RCUTILS_LOG_I("Found ready client"); add_executable(ready_exec); } - else - { - ready_exec.rcl_handle_shr_ptr.reset(); - } } for (size_t i = 0; i < mapping.events_map.size(); ++i) { AnyExecutableWeakRef & ready_exec(*mapping.events_map[i]); @@ -499,10 +486,6 @@ void CBGExecutor::fill_callback_group_data( // RCUTILS_LOG_INFO("Found ready events"); add_executable(ready_exec); } - else - { - ready_exec.rcl_handle_shr_ptr.reset(); - } } for (size_t i = 0; i < mapping.guard_conditions_map.size(); ++i) { if (wait_set.guard_conditions[i]) { @@ -519,10 +502,6 @@ void CBGExecutor::fill_callback_group_data( // RCUTILS_LOG_INFO("Found ready services"); add_executable(ready_exec); } - else - { - ready_exec.rcl_handle_shr_ptr.reset(); - } } for (size_t i = 0; i < mapping.subscription_map.size(); ++i) { AnyExecutableWeakRef & ready_exec(*mapping.subscription_map[i]); @@ -531,10 +510,6 @@ void CBGExecutor::fill_callback_group_data( add_executable(ready_exec); } - else - { - ready_exec.rcl_handle_shr_ptr.reset(); - } } for (size_t i = 0; i < mapping.timer_map.size(); ++i) { AnyExecutableWeakRef & ready_exec(*mapping.timer_map[i]); @@ -542,10 +517,6 @@ void CBGExecutor::fill_callback_group_data( // RCUTILS_LOG_INFO("Found ready timers"); add_executable(ready_exec); } - else - { - ready_exec.rcl_handle_shr_ptr.reset(); - } } } @@ -606,7 +577,7 @@ CBGExecutor::execute_any_executable(AnyExecutable & any_exec) } if (any_exec.timer) { // RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing Timer"); - rclcpp::Executor::execute_timer(any_exec.timer, any_exec.data); + rclcpp::Executor::execute_timer(any_exec.timer); } if (any_exec.subscription) { // RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing subscription"); diff --git a/rclcpp/test/benchmark/benchmark_executor.cpp b/rclcpp/test/benchmark/benchmark_executor.cpp index 9e197cc7ae..5caa24f8bc 100644 --- a/rclcpp/test/benchmark/benchmark_executor.cpp +++ b/rclcpp/test/benchmark/benchmark_executor.cpp @@ -28,6 +28,7 @@ using namespace std::chrono_literals; using performance_test_fixture::PerformanceTest; constexpr unsigned int kNumberOfNodes = 10; +constexpr unsigned int kNumberOfPubSubs = 10; class PerformanceTestExecutor : public PerformanceTest { @@ -39,14 +40,16 @@ class PerformanceTestExecutor : public PerformanceTest for (unsigned int i = 0u; i < kNumberOfNodes; i++) { nodes.push_back(std::make_shared("my_node_" + std::to_string(i))); - publishers.push_back( - nodes[i]->create_publisher( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10))); + for (unsigned int j = 0u; j < kNumberOfPubSubs; j++) { + publishers.push_back( + nodes[i]->create_publisher( + "/empty_msgs_" + std::to_string(i) + "_" + std::to_string(j), rclcpp::QoS(10))); - auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; - subscriptions.push_back( - nodes[i]->create_subscription( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10), std::move(callback))); + auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; + subscriptions.push_back( + nodes[i]->create_subscription( + "/empty_msgs_" + std::to_string(i) + "_" + std::to_string(j), rclcpp::QoS(10), std::move(callback))); + } } PerformanceTest::SetUp(st); }