Skip to content

Commit

Permalink
Backport #41 to branch foxy (#47)
Browse files Browse the repository at this point in the history
* Add environment variable to control override of DomainParticipantQos
* Add entry for RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY to README
* Add note about relation to RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY
* Remove accidentally committed debug statement

Signed-off-by: Andrea Sorbini <[email protected]>
  • Loading branch information
asorbini authored Apr 27, 2021
1 parent f86592a commit 46b23ca
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 139 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ variables.
- [RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY](#RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY)
- [RMW_CONNEXT_INITIAL_PEERS](#RMW_CONNEXT_INITIAL_PEERS)
- [RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE](#RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE)
- [RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY](#RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY)
- [RMW_CONNEXT_REQUEST_REPLY_MAPPING](#RMW_CONNEXT_REQUEST_REPLY_MAPPING)
- [RMW_CONNEXT_UDP_INTERFACE](#RMW_CONNEXT_UDP_INTERFACE)
- [RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE](#RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE)
Expand Down Expand Up @@ -275,6 +276,26 @@ In particular, when this mode is enabled, `rmw_connextdds` will revert to adding
a suffix (`_`) to the end of the names of the attributes of the ROS2 data types
propagated via DDS discovery.

### RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY

Control how `rmw_connextdds` will override the default DomainParticipantQos obtained
from Connext.

If this variable is unspecified, or set to `all`, then `rmw_connextdds` will modify
the default DomainParticipantQos with settings derived from ROS 2 options (e.g.
"localhost only", or "node enclave"), and some additional optimizations meant to
improve the out of the box experiene (e.g. speed up endpoint discovery, and increase
the size of type information shared via discovery).

If the variable is set to `basic`, then only those settings associated with ROS 2
options will be modified.

If the variable is set to `never`, then no settings will be modified and the
DomainParticipantQos will be used as is.

Note that values `basic` and `never` will disable the same endpoint discovery
optimizations controlled by [RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY](#RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY).

### RMW_CONNEXT_REQUEST_REPLY_MAPPING

The [DDS-RPC specification](https://www.omg.org/spec/DDS-RPC/About-DDS-RPC/)
Expand Down
21 changes: 21 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ struct rmw_context_impl_t
bool optimize_large_data{true};
#endif /* RMW_CONNEXT_DEFAULT_LARGE_DATA_OPTIMIZATIONS */

enum class participant_qos_override_policy_t
{
// Always override the default DomainParticipantQoS obtained at runtime from
// Connext with RMW-specific configuration. This will include settings derived
// from ROS 2 configuration parameters (e.g. "localhost_only", or "enclave"),
// but also some additional configurations that the RMW performs arbitrarly
// to improve the out of the box experience. Note that some of these customizations
// can also be disabled individually (e.g. fast endpoint discovery).
All,
// Only perform basic modifications on the default DomainParticipantQos value
// based on ROS 2 configuration parameters (e.g. "localhost only", and "enclave").
// All other RMW-specific customizations will not be applied.
Basic,
// Use the default DomainParticipantQoS returned by Connext without any modification.
Never,
};

participant_qos_override_policy_t participant_qos_override_policy;

enum class endpoint_qos_override_policy_t
{
// Use default QoS policy got from the DDS qos profile file applying topic filters
Expand All @@ -106,6 +125,8 @@ struct rmw_context_impl_t
endpoint_qos_override_policy_t endpoint_qos_override_policy;
std::regex endpoint_qos_override_policy_topics_regex;

struct DDS_StringSeq initial_peers = DDS_SEQUENCE_INITIALIZER;

/* Participant reference count*/
size_t node_count{0};
std::mutex initialization_mutex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@

#ifndef RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY
#define RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY "RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY"
#endif /* RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES */
#endif /* RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY */

#ifndef RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY
#define RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY \
"RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY"
#endif /* RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY */

/******************************************************************************
* DDS Implementation
Expand Down
226 changes: 151 additions & 75 deletions rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,14 @@ rmw_connextdds_initialize_participant_qos(
return RMW_RET_ERROR;
}

/* Lookup and configure initial peer from environment */
const char * initial_peers = nullptr;
const char * lookup_rc =
rcutils_get_env(RMW_CONNEXT_ENV_INITIAL_PEERS, &initial_peers);

if (nullptr != lookup_rc || nullptr == initial_peers) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
RMW_CONNEXT_ENV_INITIAL_PEERS,
lookup_rc)
return RMW_RET_ERROR;
}

if ('\0' != initial_peers[0]) {
rmw_ret_t rc = rmw_connextdds_parse_string_list(
initial_peers,
&dp_qos.discovery.initial_peers,
',' /* delimiter */,
true /* trim_elements */,
false /* allow_empty_elements */,
false /* append_values */);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR_A(
"failed to parse initial peers: '%s'", initial_peers)
return rc;
if (ctx->participant_qos_override_policy ==
rmw_context_impl_t::participant_qos_override_policy_t::All &&
DDS_StringSeq_get_length(&ctx->initial_peers) > 0)
{
if (!DDS_StringSeq_copy(&dp_qos.discovery.initial_peers, &ctx->initial_peers)) {
RMW_CONNEXT_LOG_ERROR_SET("failed to copy initial peers sequence")
return RMW_RET_ERROR;
}
RMW_CONNEXT_LOG_DEBUG_A("initial DDS peers: %s", initial_peers)
}

return RMW_RET_OK;
Expand Down Expand Up @@ -175,60 +154,13 @@ rmw_context_impl_t::initialize_node(
return RMW_RET_OK;
}


rmw_ret_t
rmw_context_impl_t::initialize_participant(const bool localhost_only)
{
RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipant")

this->localhost_only = localhost_only;

/* Lookup RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES env variable.*/
const char * endpoint_qos_policy = nullptr;
const char * lookup_rc = rcutils_get_env(
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, &endpoint_qos_policy);

if (nullptr != lookup_rc || nullptr == endpoint_qos_policy) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
lookup_rc)
return RMW_RET_ERROR;
}

this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Always;
const char dds_topic_policy_prefix[] = "dds_topics: ";
const char never_policy[] = "never";
const char always_policy[] = "always";
if (
0 == strncmp(
endpoint_qos_policy, dds_topic_policy_prefix, sizeof(dds_topic_policy_prefix) - 1u))
{
this->endpoint_qos_override_policy =
rmw_context_impl_t::endpoint_qos_override_policy_t::DDSTopics;
try {
this->endpoint_qos_override_policy_topics_regex =
&endpoint_qos_policy[sizeof(dds_topic_policy_prefix) - 1u];
} catch (std::regex_error & err) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"regex expression provided in {%s} environment variable is invalid: %s\n",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
err.what());
return RMW_RET_ERROR;
}
} else if (0 == strcmp(endpoint_qos_policy, never_policy)) {
this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Never;
} else if (endpoint_qos_policy[0] != '\0' && strcmp(endpoint_qos_policy, always_policy) != 0) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"Environment variable {%s} has an unexpected value {%s}. "
"Allowed values are {always}, {never} or {dds_topics: <regex_expression>}.\n",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
endpoint_qos_policy);
return RMW_RET_ERROR;
}

if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
RMW_CONNEXT_LOG_ERROR("DDS DomainParticipantFactory not initialized")
return RMW_RET_ERROR;
Expand Down Expand Up @@ -708,6 +640,69 @@ rmw_api_connextdds_init_options_fini(rmw_init_options_t * init_options)
return ret;
}

static
rmw_ret_t
rmw_connextdds_parse_participant_qos_override_policy(
const char * const user_input,
rmw_context_impl_t::participant_qos_override_policy_t & policy)
{
static const char pfx_never[] = "never";
static const char pfx_all[] = "all";
static const char pfx_basic[] = "basic";

policy = rmw_context_impl_t::participant_qos_override_policy_t::All;

if (0 == strcmp(user_input, pfx_never)) {
policy = rmw_context_impl_t::participant_qos_override_policy_t::Never;
} else if (0 == strcmp(user_input, pfx_basic)) {
policy = rmw_context_impl_t::participant_qos_override_policy_t::Basic;
} else if (user_input[0] != '\0' && strcmp(user_input, pfx_all) != 0) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"unexpected value for participant qos override policy. "
"Allowed values are {all}, {basic}, or {never}: %s",
user_input);
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

static
rmw_ret_t
rmw_connextdds_parse_endpoint_qos_override_policy(
const char * const user_input,
rmw_context_impl_t::endpoint_qos_override_policy_t & policy,
std::regex & policy_regex)
{
static const char pfx_dds_topics[] = "dds_topics: ";
static const size_t pfx_dds_topics_len = sizeof(pfx_dds_topics) - 1u;
static const char pfx_never[] = "never";
static const char pfx_always[] = "always";

policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Always;

if (0 == strncmp(user_input, pfx_dds_topics, pfx_dds_topics_len)) {
policy = rmw_context_impl_t::endpoint_qos_override_policy_t::DDSTopics;
try {
policy_regex = &user_input[pfx_dds_topics_len];
} catch (std::regex_error & err) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to parse regex for endpoint qos override policy: %s",
err.what());
return RMW_RET_ERROR;
}
} else if (0 == strcmp(user_input, pfx_never)) {
policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Never;
} else if (user_input[0] != '\0' && strcmp(user_input, pfx_always) != 0) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"unexpected value for endpoint qos override policy. "
"Allowed values are {always}, {never} or {dds_topics: <regex_expression>}: %s",
user_input);
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

rmw_ret_t
rmw_api_connextdds_init(
Expand Down Expand Up @@ -810,6 +805,56 @@ rmw_api_connextdds_init(
}
ctx->use_default_publish_mode = '\0' != use_default_publish_mode_env[0];

// Check if the user specified a custom override policy for participant qos.
const char * participant_qos_policy = nullptr;
lookup_rc = rcutils_get_env(
RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY, &participant_qos_policy);

if (nullptr != lookup_rc || nullptr == participant_qos_policy) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY,
lookup_rc)
return RMW_RET_ERROR;
}

rc = rmw_connextdds_parse_participant_qos_override_policy(
participant_qos_policy, ctx->participant_qos_override_policy);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to parse value for environment variable {%s}",
RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY);
return RMW_RET_ERROR;
}

// Check if the user specified a custom override policy for endpoint qos.
const char * endpoint_qos_policy = nullptr;
lookup_rc = rcutils_get_env(
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, &endpoint_qos_policy);

if (nullptr != lookup_rc || nullptr == endpoint_qos_policy) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
lookup_rc)
return RMW_RET_ERROR;
}

rc = rmw_connextdds_parse_endpoint_qos_override_policy(
endpoint_qos_policy,
ctx->endpoint_qos_override_policy,
ctx->endpoint_qos_override_policy_topics_regex);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to parse value for environment variable {%s}",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY);
return RMW_RET_ERROR;
}

// Check if we should run in "compatibility mode" with Cyclone DDS.
const char * cyclone_compatible_env = nullptr;
lookup_rc = rcutils_get_env(
Expand Down Expand Up @@ -922,6 +967,37 @@ rmw_api_connextdds_init(
ctx->optimize_large_data = '\0' == disable_optimize_large_data_env[0];
#endif /* RMW_CONNEXT_DEFAULT_LARGE_DATA_OPTIMIZATIONS */

/* Lookup and configure initial peer from environment */
const char * initial_peers = nullptr;
lookup_rc =
rcutils_get_env(RMW_CONNEXT_ENV_INITIAL_PEERS, &initial_peers);

if (nullptr != lookup_rc || nullptr == initial_peers) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
RMW_CONNEXT_ENV_INITIAL_PEERS,
lookup_rc)
return RMW_RET_ERROR;
}

if ('\0' != initial_peers[0]) {
rmw_ret_t rc = rmw_connextdds_parse_string_list(
initial_peers,
&ctx->initial_peers,
',' /* delimiter */,
true /* trim_elements */,
false /* allow_empty_elements */,
false /* append_values */);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR_A(
"failed to parse initial peers: '%s'", initial_peers)
return rc;
}
RMW_CONNEXT_LOG_DEBUG_A("initial DDS peers: %s", initial_peers)
}

if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipantFactory")

Expand Down
Loading

0 comments on commit 46b23ca

Please sign in to comment.