Skip to content

Commit

Permalink
Add Async Function Handler (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
saikishor authored Aug 14, 2024
1 parent 46b4c44 commit e3dea81
Show file tree
Hide file tree
Showing 5 changed files with 612 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ ament_target_dependencies(thread_priority PUBLIC ${THIS_PACKAGE_INCLUDE_DEPENDS}
# Unit Tests
if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
find_package(lifecycle_msgs REQUIRED)
find_package(rclcpp_lifecycle REQUIRED)
find_package(test_msgs REQUIRED)

ament_add_gmock(realtime_box_tests test/realtime_box_tests.cpp)
Expand Down Expand Up @@ -72,6 +74,10 @@ if(BUILD_TESTING)
test/realtime_server_goal_handle_tests.cpp)
target_link_libraries(realtime_server_goal_handle_tests realtime_tools)
ament_target_dependencies(realtime_server_goal_handle_tests test_msgs)

ament_add_gmock(test_async_function_handler test/test_async_function_handler.cpp)
target_link_libraries(test_async_function_handler realtime_tools thread_priority)
ament_target_dependencies(test_async_function_handler lifecycle_msgs rclcpp_lifecycle)
endif()

# Install
Expand Down
287 changes: 287 additions & 0 deletions include/realtime_tools/async_function_handler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// Copyright 2024 PAL Robotics S.L.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// \author Sai Kishor Kothakota

#ifndef REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
#define REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_

#include <atomic>
#include <condition_variable>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include "rclcpp/duration.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/time.hpp"
#include "realtime_tools/thread_priority.hpp"

namespace realtime_tools
{
/**
* @brief Class to handle asynchronous function calls.
* AsyncFunctionHandler is a class that allows the user to have a asynchronous call to the parsed
* method and be able to set some thread specific parameters
*/
template <typename T>
class AsyncFunctionHandler
{
public:
AsyncFunctionHandler() = default;

~AsyncFunctionHandler() { stop_thread(); }

/// Initialize the AsyncFunctionHandler with the callback and thread_priority
/**
* @param callback Function that will be called asynchronously
* If the AsyncFunctionHandler is already initialized and is running, it will throw a runtime
* error.
* If the parsed functions are not valid, it will throw a runtime error.
*/
void init(
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
int thread_priority = 50)
{
if (callback == nullptr) {
throw std::runtime_error(
"AsyncFunctionHandler: parsed function to call asynchronously is not valid!");
}
if (thread_.joinable()) {
throw std::runtime_error(
"AsyncFunctionHandler: Cannot reinitialize while the thread is "
"running. Please stop the async callback first!");
}
async_function_ = callback;
thread_priority_ = thread_priority;
}

/// Initialize the AsyncFunctionHandler with the callback, trigger_predicate and thread_priority
/**
* @param callback Function that will be called asynchronously.
* @param trigger_predicate Predicate function that will be called to check if the async callback
* method should be triggered or not.
* @param thread_priority Priority of the async worker thread.
*
* \note The parsed trigger_predicate should be free from any concurrency issues. It is expected
* to be both thread-safe and reentrant.
*
* If the AsyncFunctionHandler is already initialized and is running, it will throw a runtime
* error.
* If the parsed functions are not valid, it will throw a runtime error.
*/
void init(
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
std::function<bool()> trigger_predicate, int thread_priority = 50)
{
if (trigger_predicate == nullptr) {
throw std::runtime_error("AsyncFunctionHandler: parsed trigger predicate is not valid!");
}
init(callback, thread_priority);
trigger_predicate_ = trigger_predicate;
}

/// Triggers the async callback method cycle
/**
* @param time Current time
* @param period Current period
* @return A pair with the first element being a boolean indicating if the async callback method was
* triggered and the second element being the last return value of the async function.
* If the AsyncFunctionHandler is not initialized properly, it will throw a runtime error.
* If the callback method is waiting for the trigger, it will notify the async thread to start
* the callback.
* If the async callback method is still running, it will return the last return value from the
* last trigger cycle.
*
* \note In the case of controllers, The controller manager is responsible
* for triggering and maintaining the controller's update rate, as it should be only acting as a
* scheduler. Same applies to the resource manager when handling the hardware components.
*/
std::pair<bool, T> trigger_async_callback(
const rclcpp::Time & time, const rclcpp::Duration & period)
{
if (!is_initialized()) {
throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
}
if (!is_running()) {
throw std::runtime_error(
"AsyncFunctionHandler: need to start the async callback thread first before triggering!");
}
std::unique_lock<std::mutex> lock(async_mtx_, std::try_to_lock);
bool trigger_status = false;
if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) {
{
std::unique_lock<std::mutex> scoped_lock(std::move(lock));
trigger_in_progress_ = true;
current_callback_time_ = time;
current_callback_period_ = period;
}
async_callback_condition_.notify_one();
trigger_status = true;
}
const T return_value = async_callback_return_;
return std::make_pair(trigger_status, return_value);
}

/// Waits until the current async callback method trigger cycle is finished
/**
* If the async method is running, it will wait for the current async method call to finish.
*/
void wait_for_trigger_cycle_to_finish()
{
if (is_running()) {
std::unique_lock<std::mutex> lock(async_mtx_);
cycle_end_condition_.wait(lock, [this] { return !trigger_in_progress_; });
}
}

/// Check if the AsyncFunctionHandler is initialized
/**
* @return True if the AsyncFunctionHandler is initialized, false otherwise
*/
bool is_initialized() const { return async_function_ && trigger_predicate_; }

/// Join the async callback thread
/**
* If the async method is running, it will join the async thread.
* If the async method is not running, it will return immediately.
*/
void join_async_callback_thread()
{
if (is_running()) {
thread_.join();
}
}

/// Check if the async worker thread is running
/**
* @return True if the async worker thread is running, false otherwise
*/
bool is_running() const { return thread_.joinable(); }

/// Check if the async callback is triggered to stop the cycle
/**
* @return True if the async callback is requested to be stopped, false otherwise
*/
bool is_stopped() const { return stop_async_callback_; }

/// Get the async worker thread
/**
* @return The async callback thread
*/
std::thread & get_thread() { return thread_; }

/// Check if the async callback method is in progress
/**
* @return True if the async callback method is in progress, false otherwise
*/
bool is_trigger_cycle_in_progress() const { return trigger_in_progress_; }

/// Stops the callback thread
/**
* If the async method is running, it will notify the async thread to stop and then joins the
* async thread.
*/
void stop_thread()
{
if (is_running()) {
{
std::unique_lock<std::mutex> lock(async_mtx_);
stop_async_callback_ = true;
}
async_callback_condition_.notify_one();
thread_.join();
}
}

/// Get the last execution time of the async callback method
/**
* @return The last execution time of the async callback method in seconds
*/
double get_last_execution_time() const { return last_execution_time_; }

/// Initializes and starts the callback thread
/**
* If the worker thread is not running, it will start the async callback thread.
* If the worker thread is already configured and running, does nothing and returns
* immediately.
*/
void start_thread()
{
if (!is_initialized()) {
throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!");
}
if (!thread_.joinable()) {
stop_async_callback_ = false;
trigger_in_progress_ = false;
last_execution_time_ = 0.0;
async_callback_return_ = T();
thread_ = std::thread([this]() -> void {
if (!realtime_tools::configure_sched_fifo(thread_priority_)) {
RCLCPP_WARN(
rclcpp::get_logger("AsyncFunctionHandler"),
"Could not enable FIFO RT scheduling policy. Consider setting up your user to do FIFO "
"RT "
"scheduling. See "
"[https://control.ros.org/master/doc/ros2_control/controller_manager/doc/userdoc.html] "
"for details.");
}

while (!stop_async_callback_.load(std::memory_order_relaxed)) {
{
std::unique_lock<std::mutex> lock(async_mtx_);
async_callback_condition_.wait(
lock, [this] { return trigger_in_progress_ || stop_async_callback_; });
if (!stop_async_callback_) {
const auto start_time = std::chrono::steady_clock::now();
async_callback_return_ =
async_function_(current_callback_time_, current_callback_period_);
const auto end_time = std::chrono::steady_clock::now();
last_execution_time_ = std::chrono::duration<double>(end_time - start_time).count();
}
trigger_in_progress_ = false;
}
cycle_end_condition_.notify_all();
}
});
}
}

private:
rclcpp::Time current_callback_time_;
rclcpp::Duration current_callback_period_{0, 0};

std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> async_function_;
std::function<bool()> trigger_predicate_ = []() { return true; };

// Async related variables
std::thread thread_;
int thread_priority_ = std::numeric_limits<int>::quiet_NaN();
std::atomic_bool stop_async_callback_{false};
std::atomic_bool trigger_in_progress_{false};
std::atomic<T> async_callback_return_;
std::condition_variable async_callback_condition_;
std::condition_variable cycle_end_condition_;
std::mutex async_mtx_;
std::atomic<double> last_execution_time_;
};
} // namespace realtime_tools

#endif // REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_
2 changes: 2 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
<depend>rclcpp_action</depend>

<test_depend>ament_cmake_gmock</test_depend>
<test_depend>lifecycle_msgs</test_depend>
<test_depend>rclcpp_lifecycle</test_depend>
<test_depend>test_msgs</test_depend>

<export>
Expand Down
Loading

0 comments on commit e3dea81

Please sign in to comment.