Skip to content

Commit

Permalink
Adds a context pointer to subscriptions (#107)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5b11c5a)

# Conflicts:
#	rclc_examples/CMakeLists.txt
  • Loading branch information
BrettRD authored and mergify-bot committed Jul 13, 2021
1 parent 0263722 commit 4ee3509
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 9 deletions.
2 changes: 2 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ eProsima
Antonio Cuadros <[email protected]>

Loïc Dauphin <[email protected]>

Brett Downing <[email protected]>
33 changes: 33 additions & 0 deletions rclc/include/rclc/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,39 @@ rclc_executor_add_subscription(
rclc_callback_t callback,
rclc_executor_handle_invocation_t invocation);

/**
* Adds a subscription to an executor.
* * An error is returned, if {@link rclc_executor_t.handles} array is full.
* * The total number_of_subscriptions field of {@link rclc_executor_t.info}
* is incremented by one.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Yes
*
* \param [inout] executor pointer to initialized executor
* \param [in] subscription pointer to an allocated subscription
* \param [in] msg pointer to an allocated message
* \param [in] callback function pointer to a callback
* \param [in] context type-erased ptr to additional callback context
* \param [in] invocation invocation type for the callback (ALWAYS or only ON_NEW_DATA)
* \return `RCL_RET_OK` if add-operation was successful
* \return `RCL_RET_INVALID_ARGUMENT` if any parameter is a null pointer (NULL context is ignored)
* \return `RCL_RET_ERROR` if any other error occured
*/
rcl_ret_t
rclc_executor_add_subscription_with_context(
rclc_executor_t * executor,
rcl_subscription_t * subscription,
void * msg,
rclc_subscription_callback_with_context_t callback,
void * context,
rclc_executor_handle_invocation_t invocation);

/**
* Adds a timer to an executor.
* * An error is returned, if {@link rclc_executor_t.handles} array is full.
Expand Down
14 changes: 12 additions & 2 deletions rclc/include/rclc/executor_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ typedef enum
/// Type definition for callback function.
typedef void (* rclc_callback_t)(const void *);

/// Type definition for subscription callback function
/// - incoming message
// typedef void (* rclc_subscription_callback_t)(const void *);

/// Type definition for subscription callback function
/// - incoming message
/// - additional callback context
typedef void (* rclc_subscription_callback_with_context_t)(const void *, void *);

/// Type definition for client callback function
/// - request message
/// - response message
Expand Down Expand Up @@ -114,8 +123,8 @@ typedef struct
/// only for service - ptr to response message
void * data_response_msg;

/// only for service - ptr to additional service context
void * service_context;
/// ptr to additional callback context
void * callback_context;

// TODO(jst3si) new type to be stored as data for
// service/client objects
Expand All @@ -129,6 +138,7 @@ typedef struct
/// Storage for callbacks
union {
rclc_callback_t callback;
rclc_subscription_callback_with_context_t subscription_callback_with_context;
rclc_service_callback_t service_callback;
rclc_service_callback_with_request_id_t service_callback_with_reqid;
rclc_service_callback_with_context_t service_callback_with_context;
Expand Down
81 changes: 75 additions & 6 deletions rclc/src/rclc/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ rclc_executor_add_subscription(
executor->handles[executor->index].subscription = subscription;
executor->handles[executor->index].data = msg;
executor->handles[executor->index].callback = callback;
executor->handles[executor->index].callback_type = CB_WITHOUT_REQUEST_ID;
executor->handles[executor->index].invocation = invocation;
executor->handles[executor->index].initialized = true;

Expand All @@ -244,6 +245,56 @@ rclc_executor_add_subscription(
return ret;
}


rcl_ret_t
rclc_executor_add_subscription_with_context(
rclc_executor_t * executor,
rcl_subscription_t * subscription,
void * msg,
rclc_subscription_callback_with_context_t callback,
void * context,
rclc_executor_handle_invocation_t invocation)
{
RCL_CHECK_ARGUMENT_FOR_NULL(executor, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(subscription, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(msg, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(callback, RCL_RET_INVALID_ARGUMENT);
rcl_ret_t ret = RCL_RET_OK;
// array bound check
if (executor->index >= executor->max_handles) {
RCL_SET_ERROR_MSG("Buffer overflow of 'executor->handles'. Increase 'max_handles'");
return RCL_RET_ERROR;
}

// assign data fields
executor->handles[executor->index].type = SUBSCRIPTION;
executor->handles[executor->index].subscription = subscription;
executor->handles[executor->index].data = msg;
executor->handles[executor->index].subscription_callback_with_context = callback;
executor->handles[executor->index].callback_type = CB_WITH_CONTEXT;
executor->handles[executor->index].invocation = invocation;
executor->handles[executor->index].initialized = true;
executor->handles[executor->index].callback_context = context;

// increase index of handle array
executor->index++;

// invalidate wait_set so that in next spin_some() call the
// 'executor->wait_set' is updated accordingly
if (rcl_wait_set_is_valid(&executor->wait_set)) {
ret = rcl_wait_set_fini(&executor->wait_set);
if (RCL_RET_OK != ret) {
RCL_SET_ERROR_MSG("Could not reset wait_set in rclc_executor_add_subscription_with_context.");
return ret;
}
}

executor->info.number_of_subscriptions++;

RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Added a subscription.");
return ret;
}

rcl_ret_t
rclc_executor_add_timer(
rclc_executor_t * executor,
Expand Down Expand Up @@ -460,7 +511,7 @@ rclc_executor_add_service_with_context(
executor->handles[executor->index].callback_type = CB_WITH_CONTEXT;
executor->handles[executor->index].invocation = ON_NEW_DATA; // invoce when request came in
executor->handles[executor->index].initialized = true;
executor->handles[executor->index].service_context = context;
executor->handles[executor->index].callback_context = context;

// increase index of handle array
executor->index++;
Expand Down Expand Up @@ -923,10 +974,28 @@ _rclc_execute(rclc_executor_handle_t * handle)
if (invoke_callback) {
switch (handle->type) {
case SUBSCRIPTION:
if (handle->data_available) {
handle->callback(handle->data);
} else {
handle->callback(NULL);
switch (handle->callback_type) {
case CB_WITHOUT_REQUEST_ID:
if (handle->data_available) {
handle->callback(handle->data);
} else {
handle->callback(NULL);
}
break;
case CB_WITH_CONTEXT:
if (handle->data_available) {
handle->subscription_callback_with_context(
handle->data,
handle->callback_context);
} else {
handle->subscription_callback_with_context(
NULL,
handle->callback_context);
}
break;
default:
PRINT_RCLC_ERROR(rclc_execute, unknown_callback_type);
break;
}
break;

Expand Down Expand Up @@ -955,7 +1024,7 @@ _rclc_execute(rclc_executor_handle_t * handle)
handle->service_callback_with_context(
handle->data,
handle->data_response_msg,
handle->service_context);
handle->callback_context);
break;
default:
PRINT_RCLC_ERROR(rclc_execute, unknown_callback_type);
Expand Down
2 changes: 1 addition & 1 deletion rclc/src/rclc/executor_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ rclc_executor_handle_init(

handle->data = NULL;
handle->data_response_msg = NULL;
handle->service_context = NULL;
handle->callback_context = NULL;

handle->callback = NULL;
// because of union structure:
Expand Down
115 changes: 115 additions & 0 deletions rclc/test/rclc/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,20 @@ void int32_callback5(const void * msgin)
}
}

void int32_callback_with_context(const void * msgin, void * context)
{
const std_msgs__msg__Int32 * msg = (const std_msgs__msg__Int32 *)msgin;
if (msg == NULL) {
printf("(int32_callback_with_context): msg is NULL\n");
}
if (context == NULL) {
printf("(int32_callback_with_context): context is NULL\n");
} else {
// This side effect allows the test to check that the subscription received its context
int32_t * sub_context_value = reinterpret_cast<int32_t *>( context );
*sub_context_value = msg->data;
}
}

void service_callback(const void * req_msg, void * resp_msg)
{
Expand Down Expand Up @@ -697,6 +711,66 @@ TEST_F(TestDefaultExecutor, executor_add_subscription) {
EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str;
}

TEST_F(TestDefaultExecutor, executor_add_subscription_with_context) {
rcl_ret_t rc;
rclc_executor_t executor;
// test with normal arguemnt and NULL pointers as arguments
rc = rclc_executor_init(&executor, &this->context, 10, this->allocator_ptr);
EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str;

int32_t sub_context_value = 0;
void * sub_context_ptr = reinterpret_cast<void *>( &sub_context_value );

// normal case
rc = rclc_executor_add_subscription_with_context(
&executor, &this->sub1, &this->sub1_msg,
&int32_callback_with_context, sub_context_ptr, ON_NEW_DATA);
EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str;
size_t num_subscriptions = 1;
EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) <<
"number of subscriptions is expected to be one";

// test NULL pointer for executor
rc = rclc_executor_add_subscription_with_context(
NULL, &this->sub1, &this->sub1_msg, &int32_callback_with_context,
sub_context_ptr, ON_NEW_DATA);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str;
rcutils_reset_error();
EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) <<
"number of subscriptions is expected to be one";

// test NULL pointer for subscription
rc = rclc_executor_add_subscription_with_context(
&executor, NULL, &this->sub1_msg, &int32_callback_with_context,
sub_context_ptr, ON_NEW_DATA);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str;
rcutils_reset_error();
EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) <<
"number of subscriptions is expected to be one";

// test NULL pointer for message
rc = rclc_executor_add_subscription_with_context(
&executor, &this->sub1, NULL, &int32_callback_with_context,
sub_context_ptr, ON_NEW_DATA);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str;
rcutils_reset_error();
EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) <<
"number of subscriptions is expected to be one";

// test NULL pointer for callback
rc = rclc_executor_add_subscription_with_context(
&executor, &this->sub1, &this->sub1_msg, NULL,
sub_context_ptr, ON_NEW_DATA);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, rc) << rcl_get_error_string().str;
rcutils_reset_error();
EXPECT_EQ(executor.info.number_of_subscriptions, num_subscriptions) <<
"number of subscriptions is expected to be one";

// tear down
rc = rclc_executor_fini(&executor);
EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str;
}

TEST_F(TestDefaultExecutor, executor_add_subscription_too_many) {
rcl_ret_t rc;
rclc_executor_t executor;
Expand Down Expand Up @@ -2447,6 +2521,47 @@ TEST_F(TestDefaultExecutor, executor_test_service_with_context) {
example_interfaces__srv__AddTwoInts_Response__fini(&cli_resp);
}

TEST_F(TestDefaultExecutor, executor_test_subscription_with_context) {
// This unit test tests, that a subscription with context receives the correct context pointer
rcl_ret_t rc;
rclc_executor_t executor;
executor = rclc_executor_get_zero_initialized_executor();
rc = rclc_executor_init(&executor, &this->context, 10, this->allocator_ptr);
EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str;

// prepare a context pointer carrying a value that differs from the published value
int32_t sub_context_value = 0;
void * sub_context_ptr = reinterpret_cast<void *>( &sub_context_value );

std_msgs__msg__Int32__init(&this->pub1_msg);
this->pub1_msg.data = 42;

// create a subscription on the same topic as our publisher
EXPECT_EQ(executor.info.number_of_subscriptions, (size_t) 0) <<
"number of subscriptions was not initialised at zero";

rc = rclc_executor_add_subscription_with_context(
&executor, &this->sub1, &this->sub1_msg,
&int32_callback_with_context, sub_context_ptr, ON_NEW_DATA);
EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str;
EXPECT_EQ(executor.info.number_of_subscriptions, (size_t) 1) <<
"number of subscriptions is expected to be one";

// run the callback
rc = rcl_publish(&this->pub1, &this->pub1_msg, nullptr);
EXPECT_EQ(RCL_RET_OK, rc) << " pub1 not published";
std::this_thread::sleep_for(rclc_test_sleep_time);
rclc_executor_spin_some(&executor, rclc_test_timeout_ns);

// check the side effect
EXPECT_EQ(this->pub1_msg.data, sub_context_value) <<
"subscription did not alter context value";

// tear down
rc = rclc_executor_fini(&executor);
EXPECT_EQ(RCL_RET_OK, rc) << rcl_get_error_string().str;
}

TEST_F(TestDefaultExecutor, executor_test_guard_condition) {
// Test guard_condition.
rcl_ret_t rc;
Expand Down
9 changes: 9 additions & 0 deletions rclc_examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ ament_target_dependencies(example_client_node rcl rclc example_interfaces)
add_executable(example_lifecycle_node src/example_lifecycle_node.c)
ament_target_dependencies(example_lifecycle_node rcutils rcl rcl_lifecycle rclc rclc_lifecycle lifecycle_msgs)

<<<<<<< HEAD
add_executable(example_parameter_server src/example_parameter_server.c)
ament_target_dependencies(example_parameter_server rclc rclc_parameter)
=======
add_executable(example_sub_context src/example_sub_context.c)
ament_target_dependencies(example_sub_context rcl rclc std_msgs)
>>>>>>> 5b11c5a (Adds a context pointer to subscriptions (#107))

install(TARGETS
example_executor
Expand All @@ -57,7 +62,11 @@ install(TARGETS
example_lifecycle_node
example_service_node
example_client_node
<<<<<<< HEAD
example_parameter_server
=======
example_sub_context
>>>>>>> 5b11c5a (Adds a context pointer to subscriptions (#107))
DESTINATION lib/${PROJECT_NAME}
)

Expand Down
Loading

0 comments on commit 4ee3509

Please sign in to comment.