From a985b81168884b7674ae9464de3ae190f7bb0e22 Mon Sep 17 00:00:00 2001 From: Brett Downing Date: Tue, 13 Jul 2021 21:19:08 +1000 Subject: [PATCH] Adds a context pointer to subscriptions (#107) (cherry picked from commit 5b11c5a9ca2e0f120f8d1dfeb30bf480689abfa9) # Conflicts: # rclc_examples/CMakeLists.txt --- NOTICE | 2 + rclc/include/rclc/executor.h | 33 ++++ rclc/include/rclc/executor_handle.h | 14 +- rclc/src/rclc/executor.c | 81 +++++++++- rclc/src/rclc/executor_handle.c | 2 +- rclc/test/rclc/test_executor.cpp | 115 ++++++++++++++ rclc_examples/CMakeLists.txt | 9 ++ rclc_examples/src/example_sub_context.c | 203 ++++++++++++++++++++++++ 8 files changed, 450 insertions(+), 9 deletions(-) create mode 100644 rclc_examples/src/example_sub_context.c diff --git a/NOTICE b/NOTICE index 766e803c..376daf75 100644 --- a/NOTICE +++ b/NOTICE @@ -35,3 +35,5 @@ eProsima Antonio Cuadros Loïc Dauphin + +Brett Downing diff --git a/rclc/include/rclc/executor.h b/rclc/include/rclc/executor.h index 473d2228..daa6f9ac 100644 --- a/rclc/include/rclc/executor.h +++ b/rclc/include/rclc/executor.h @@ -221,6 +221,39 @@ rclc_executor_add_subscription( rclc_callback_t callback, rclc_executor_handle_invocation_t invocation); +/** + * Adds a subscription to an executor. + * * An error is returned, if {@link rclc_executor_t.handles} array is full. + * * The total number_of_subscriptions field of {@link rclc_executor_t.info} + * is incremented by one. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | No + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Yes + * + * \param [inout] executor pointer to initialized executor + * \param [in] subscription pointer to an allocated subscription + * \param [in] msg pointer to an allocated message + * \param [in] callback function pointer to a callback + * \param [in] context type-erased ptr to additional callback context + * \param [in] invocation invocation type for the callback (ALWAYS or only ON_NEW_DATA) + * \return `RCL_RET_OK` if add-operation was successful + * \return `RCL_RET_INVALID_ARGUMENT` if any parameter is a null pointer (NULL context is ignored) + * \return `RCL_RET_ERROR` if any other error occured + */ +rcl_ret_t +rclc_executor_add_subscription_with_context( + rclc_executor_t * executor, + rcl_subscription_t * subscription, + void * msg, + rclc_subscription_callback_with_context_t callback, + void * context, + rclc_executor_handle_invocation_t invocation); + /** * Adds a timer to an executor. * * An error is returned, if {@link rclc_executor_t.handles} array is full. diff --git a/rclc/include/rclc/executor_handle.h b/rclc/include/rclc/executor_handle.h index 05ff9de0..f9825c71 100644 --- a/rclc/include/rclc/executor_handle.h +++ b/rclc/include/rclc/executor_handle.h @@ -58,6 +58,15 @@ typedef enum /// Type definition for callback function. typedef void (* rclc_callback_t)(const void *); +/// Type definition for subscription callback function +/// - incoming message +// typedef void (* rclc_subscription_callback_t)(const void *); + +/// Type definition for subscription callback function +/// - incoming message +/// - additional callback context +typedef void (* rclc_subscription_callback_with_context_t)(const void *, void *); + /// Type definition for client callback function /// - request message /// - response message @@ -114,8 +123,8 @@ typedef struct /// only for service - ptr to response message void * data_response_msg; - /// only for service - ptr to additional service context - void * service_context; + /// ptr to additional callback context + void * callback_context; // TODO(jst3si) new type to be stored as data for // service/client objects @@ -129,6 +138,7 @@ typedef struct /// Storage for callbacks union { rclc_callback_t callback; + rclc_subscription_callback_with_context_t subscription_callback_with_context; rclc_service_callback_t service_callback; rclc_service_callback_with_request_id_t service_callback_with_reqid; rclc_service_callback_with_context_t service_callback_with_context; diff --git a/rclc/src/rclc/executor.c b/rclc/src/rclc/executor.c index a8a945fc..9968d268 100644 --- a/rclc/src/rclc/executor.c +++ b/rclc/src/rclc/executor.c @@ -222,6 +222,7 @@ rclc_executor_add_subscription( executor->handles[executor->index].subscription = subscription; executor->handles[executor->index].data = msg; executor->handles[executor->index].callback = callback; + executor->handles[executor->index].callback_type = CB_WITHOUT_REQUEST_ID; executor->handles[executor->index].invocation = invocation; executor->handles[executor->index].initialized = true; @@ -244,6 +245,56 @@ rclc_executor_add_subscription( return ret; } + +rcl_ret_t +rclc_executor_add_subscription_with_context( + rclc_executor_t * executor, + rcl_subscription_t * subscription, + void * msg, + rclc_subscription_callback_with_context_t callback, + void * context, + rclc_executor_handle_invocation_t invocation) +{ + RCL_CHECK_ARGUMENT_FOR_NULL(executor, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(subscription, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(msg, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(callback, RCL_RET_INVALID_ARGUMENT); + rcl_ret_t ret = RCL_RET_OK; + // array bound check + if (executor->index >= executor->max_handles) { + RCL_SET_ERROR_MSG("Buffer overflow of 'executor->handles'. Increase 'max_handles'"); + return RCL_RET_ERROR; + } + + // assign data fields + executor->handles[executor->index].type = SUBSCRIPTION; + executor->handles[executor->index].subscription = subscription; + executor->handles[executor->index].data = msg; + executor->handles[executor->index].subscription_callback_with_context = callback; + executor->handles[executor->index].callback_type = CB_WITH_CONTEXT; + executor->handles[executor->index].invocation = invocation; + executor->handles[executor->index].initialized = true; + executor->handles[executor->index].callback_context = context; + + // increase index of handle array + executor->index++; + + // invalidate wait_set so that in next spin_some() call the + // 'executor->wait_set' is updated accordingly + if (rcl_wait_set_is_valid(&executor->wait_set)) { + ret = rcl_wait_set_fini(&executor->wait_set); + if (RCL_RET_OK != ret) { + RCL_SET_ERROR_MSG("Could not reset wait_set in rclc_executor_add_subscription_with_context."); + return ret; + } + } + + executor->info.number_of_subscriptions++; + + RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Added a subscription."); + return ret; +} + rcl_ret_t rclc_executor_add_timer( rclc_executor_t * executor, @@ -460,7 +511,7 @@ rclc_executor_add_service_with_context( executor->handles[executor->index].callback_type = CB_WITH_CONTEXT; executor->handles[executor->index].invocation = ON_NEW_DATA; // invoce when request came in executor->handles[executor->index].initialized = true; - executor->handles[executor->index].service_context = context; + executor->handles[executor->index].callback_context = context; // increase index of handle array executor->index++; @@ -923,10 +974,28 @@ _rclc_execute(rclc_executor_handle_t * handle) if (invoke_callback) { switch (handle->type) { case SUBSCRIPTION: - if (handle->data_available) { - handle->callback(handle->data); - } else { - handle->callback(NULL); + switch (handle->callback_type) { + case CB_WITHOUT_REQUEST_ID: + if (handle->data_available) { + handle->callback(handle->data); + } else { + handle->callback(NULL); + } + break; + case CB_WITH_CONTEXT: + if (handle->data_available) { + handle->subscription_callback_with_context( + handle->data, + handle->callback_context); + } else { + handle->subscription_callback_with_context( + NULL, + handle->callback_context); + } + break; + default: + PRINT_RCLC_ERROR(rclc_execute, unknown_callback_type); + break; } break; @@ -955,7 +1024,7 @@ _rclc_execute(rclc_executor_handle_t * handle) handle->service_callback_with_context( handle->data, handle->data_response_msg, - handle->service_context); + handle->callback_context); break; default: PRINT_RCLC_ERROR(rclc_execute, unknown_callback_type); diff --git a/rclc/src/rclc/executor_handle.c b/rclc/src/rclc/executor_handle.c index c3d8a7b2..e87b6075 100644 --- a/rclc/src/rclc/executor_handle.c +++ b/rclc/src/rclc/executor_handle.c @@ -48,7 +48,7 @@ rclc_executor_handle_init( handle->data = NULL; handle->data_response_msg = NULL; - handle->service_context = NULL; + handle->callback_context = NULL; handle->callback = NULL; // because of union structure: diff --git a/rclc/test/rclc/test_executor.cpp b/rclc/test/rclc/test_executor.cpp index 7a78bc1b..f517cf42 100644 --- a/rclc/test/rclc/test_executor.cpp +++ b/rclc/test/rclc/test_executor.cpp @@ -268,6 +268,20 @@ void int32_callback5(const void * msgin) } } +void int32_callback_with_context(const void * msgin, void * context) +{ + const std_msgs__msg__Int32 * msg = (const std_msgs__msg__Int32 *)msgin; + if (msg == NULL) { + printf("(int32_callback_with_context): msg is NULL\n"); + } + if (context == NULL) { + printf("(int32_callback_with_context): context is NULL\n"); + } else { + // This side effect allows the test to check that the subscription received its context + int32_t * sub_context_value = reinterpret_cast( context ); + *sub_context_value = msg->data; + } +} void service_callback(const void * req_msg, void * resp_msg) { @@ -697,6 +711,66 @@ TEST_F(TestDefaultExecutor, executor_add_subscription) { EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str; } +TEST_F(TestDefaultExecutor, executor_add_subscription_with_context) { + rcl_ret_t rc; + rclc_executor_t executor; + // test with normal arguemnt and NULL pointers as arguments + rc = rclc_executor_init(&executor, &this->context, 10, this->allocator_ptr); + EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str; + + int32_t sub_context_value = 0; + void * sub_context_ptr = reinterpret_cast( &sub_context_value ); + + // normal case + rc = rclc_executor_add_subscription_with_context( + &executor, &this->sub1, &this->sub1_msg, + &int32_callback_with_context, sub_context_ptr, ON_NEW_DATA); + EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str; + size_t num_subscriptions = 1; + EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) << + "number of subscriptions is expected to be one"; + + // test NULL pointer for executor + rc = rclc_executor_add_subscription_with_context( + NULL, &this->sub1, &this->sub1_msg, &int32_callback_with_context, + sub_context_ptr, ON_NEW_DATA); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str; + rcutils_reset_error(); + EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) << + "number of subscriptions is expected to be one"; + + // test NULL pointer for subscription + rc = rclc_executor_add_subscription_with_context( + &executor, NULL, &this->sub1_msg, &int32_callback_with_context, + sub_context_ptr, ON_NEW_DATA); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str; + rcutils_reset_error(); + EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) << + "number of subscriptions is expected to be one"; + + // test NULL pointer for message + rc = rclc_executor_add_subscription_with_context( + &executor, &this->sub1, NULL, &int32_callback_with_context, + sub_context_ptr, ON_NEW_DATA); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str; + rcutils_reset_error(); + EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) << + "number of subscriptions is expected to be one"; + + // test NULL pointer for callback + rc = rclc_executor_add_subscription_with_context( + &executor, &this->sub1, &this->sub1_msg, NULL, + sub_context_ptr, ON_NEW_DATA); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str; + rcutils_reset_error(); + EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) << + "number of subscriptions is expected to be one"; + + // tear down + rc = rclc_executor_fini(&executor); + EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str; +} + TEST_F(TestDefaultExecutor, executor_add_subscription_too_many) { rcl_ret_t rc; rclc_executor_t executor; @@ -2447,6 +2521,47 @@ TEST_F(TestDefaultExecutor, executor_test_service_with_context) { example_interfaces__srv__AddTwoInts_Response__fini(&cli_resp); } +TEST_F(TestDefaultExecutor, executor_test_subscription_with_context) { + // This unit test tests, that a subscription with context receives the correct context pointer + rcl_ret_t rc; + rclc_executor_t executor; + executor = rclc_executor_get_zero_initialized_executor(); + rc = rclc_executor_init(&executor, &this->context, 10, this->allocator_ptr); + EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str; + + // prepare a context pointer carrying a value that differs from the published value + int32_t sub_context_value = 0; + void * sub_context_ptr = reinterpret_cast( &sub_context_value ); + + std_msgs__msg__Int32__init(&this->pub1_msg); + this->pub1_msg.data = 42; + + // create a subscription on the same topic as our publisher + EXPECT_EQ(executor.info.number_of_subscriptions, (size_t) 0) << + "number of subscriptions was not initialised at zero"; + + rc = rclc_executor_add_subscription_with_context( + &executor, &this->sub1, &this->sub1_msg, + &int32_callback_with_context, sub_context_ptr, ON_NEW_DATA); + EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str; + EXPECT_EQ(executor.info.number_of_subscriptions, (size_t) 1) << + "number of subscriptions is expected to be one"; + + // run the callback + rc = rcl_publish(&this->pub1, &this->pub1_msg, nullptr); + EXPECT_EQ(RCL_RET_OK, rc) << " pub1 not published"; + std::this_thread::sleep_for(rclc_test_sleep_time); + rclc_executor_spin_some(&executor, rclc_test_timeout_ns); + + // check the side effect + EXPECT_EQ(this->pub1_msg.data, sub_context_value) << + "subscription did not alter context value"; + + // tear down + rc = rclc_executor_fini(&executor); + EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str; +} + TEST_F(TestDefaultExecutor, executor_test_guard_condition) { // Test guard_condition. rcl_ret_t rc; diff --git a/rclc_examples/CMakeLists.txt b/rclc_examples/CMakeLists.txt index c68d4e92..0ca8b893 100644 --- a/rclc_examples/CMakeLists.txt +++ b/rclc_examples/CMakeLists.txt @@ -47,8 +47,13 @@ ament_target_dependencies(example_client_node rcl rclc example_interfaces) add_executable(example_lifecycle_node src/example_lifecycle_node.c) ament_target_dependencies(example_lifecycle_node rcutils rcl rcl_lifecycle rclc rclc_lifecycle lifecycle_msgs) +<<<<<<< HEAD add_executable(example_parameter_server src/example_parameter_server.c) ament_target_dependencies(example_parameter_server rclc rclc_parameter) +======= +add_executable(example_sub_context src/example_sub_context.c) +ament_target_dependencies(example_sub_context rcl rclc std_msgs) +>>>>>>> 5b11c5a (Adds a context pointer to subscriptions (#107)) install(TARGETS example_executor @@ -57,7 +62,11 @@ install(TARGETS example_lifecycle_node example_service_node example_client_node +<<<<<<< HEAD example_parameter_server +======= + example_sub_context +>>>>>>> 5b11c5a (Adds a context pointer to subscriptions (#107)) DESTINATION lib/${PROJECT_NAME} ) diff --git a/rclc_examples/src/example_sub_context.c b/rclc_examples/src/example_sub_context.c new file mode 100644 index 00000000..3341fe38 --- /dev/null +++ b/rclc_examples/src/example_sub_context.c @@ -0,0 +1,203 @@ +// Copyright (c) 2020 - for information on the respective copyright owner +// see the NOTICE file and/or the repository https://github.com/micro-ROS/rclc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +// This example shows the use of context pointers in subscriptions +// context pointers allow you to avoid using global state +// avoiding the use of global state makes your code more self-contained +// this is useful if you need identical subscriptions doing different things + +// Instead of creating some global variables, +// we can define some data structures point to the local state info we care about +typedef struct { + int some_int; + char* some_text; +} sub_context_t; + + +/***************************** CALLBACKS ***********************************/ + +// subscriptions with context allow you to pass +// additional state information to your subscription callback + +void my_subscriber_callback_with_context(const void * msgin, void* context_void_ptr) +{ + const std_msgs__msg__String * msg = (const std_msgs__msg__String *)msgin; + if (msg == NULL) { + printf("Callback: msg NULL\n"); + } else { + printf("Callback: I heard: %s\n", msg->data.data); + } + + if (context_void_ptr == NULL) { + printf("Callback: context is empty\n"); + } else { + // cast the context pointer into the appropriate type + sub_context_t * context_ptr = (sub_context_t * ) context_void_ptr; + // then you can access the context data + printf("Callback: context contains: %s\n", context_ptr->some_text); + printf("Callback: context also contains: %d\n", context_ptr->some_int); + // this context data is in main(), and we can change it from here. + context_ptr->some_int++; + } +} + +/******************** MAIN PROGRAM ****************************************/ +int main(int argc, const char * argv[]) +{ + rcl_allocator_t allocator = rcl_get_default_allocator(); + rclc_support_t support; + rcl_ret_t rc; + + // within main, we can create the state information our subscriptions work with + const unsigned int n_topics=3; + const char * topic_names[] = {"topic_foo", "topic_bar", "topic_baz"}; + sub_context_t my_contexts[]= { + {0,"foo counting from zero"}, + {100,"bar counting from 100"}, + {300,"baz counting from 300"}, + }; + rcl_publisher_t my_pubs[n_topics]; + std_msgs__msg__String pub_msgs[n_topics]; + rcl_subscription_t my_subs[n_topics]; + std_msgs__msg__String sub_msgs[n_topics]; + + // create init_options + rc = rclc_support_init(&support, argc, argv, &allocator); + if (rc != RCL_RET_OK) { + printf("Error rclc_support_init.\n"); + return -1; + } + + // create rcl_node + rcl_node_t my_node = rcl_get_zero_initialized_node(); + rc = rclc_node_init_default(&my_node, "node_0", "executor_examples", &support); + if (rc != RCL_RET_OK) { + printf("Error in rclc_node_init_default\n"); + return -1; + } + + const rosidl_message_type_support_t * my_type_support = + ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, String); + + //initialise each publisher and subscriber + for(unsigned int i=0; i