diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 6183b96c..54ea7dae 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -16,46 +16,17 @@ #include #include -#include +#include +#include #include #include "rmw/types.h" -#include "logging_macros.hpp" - #include "attachment_helpers.hpp" namespace rmw_zenoh_cpp { -attachement_context_t::attachement_context_t(std::unique_ptr && _data) -: data(std::move(_data)) {} - -bool create_attachment_iter(z_owned_bytes_t * kv_pair, void * context) -{ - attachement_context_t * ctx = reinterpret_cast(context); - z_owned_bytes_t k, v; - - if (ctx->idx == 0) { - z_bytes_serialize_from_str(&k, "sequence_number"); - z_bytes_serialize_from_int64(&v, ctx->data->sequence_number); - } else if (ctx->idx == 1) { - z_bytes_serialize_from_str(&k, "source_timestamp"); - z_bytes_serialize_from_int64(&v, ctx->data->source_timestamp); - } else if (ctx->idx == 2) { - z_bytes_serialize_from_str(&k, "source_gid"); - z_bytes_serialize_from_buf( - &v, ctx->data->source_gid, - RMW_GID_STORAGE_SIZE); - } else { - return false; - } - - z_bytes_from_pair(kv_pair, z_move(k), z_move(v)); - ctx->idx += 1; - return true; -} - attachement_data_t::attachement_data_t( const int64_t _sequence_number, const int64_t _source_timestamp, @@ -66,117 +37,70 @@ attachement_data_t::attachement_data_t( memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE); } -z_result_t attachement_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment) +attachement_data_t::attachement_data_t(attachement_data_t && data) { - attachement_context_t context = - attachement_context_t(std::make_unique(*this)); - return z_bytes_from_iter( - attachment, create_attachment_iter, - reinterpret_cast(&context)); + sequence_number = std::move(data.sequence_number); + source_timestamp = std::move(data.source_timestamp); + memcpy(source_gid, data.source_gid, RMW_GID_STORAGE_SIZE); } - -bool get_attachment( - const z_loaned_bytes_t * const attachment, - const std::string & key, z_owned_bytes_t * val) +void attachement_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment) { - if (attachment == NULL || z_bytes_is_empty(attachment)) { - return false; - } - - z_bytes_iterator_t iter = z_bytes_get_iterator(attachment); - z_owned_bytes_t pair, key_; - bool found = false; - - while (z_bytes_iterator_next(&iter, &pair)) { - z_bytes_deserialize_into_pair(z_loan(pair), &key_, val); - z_owned_string_t key_string; - z_bytes_deserialize_into_string(z_loan(key_), &key_string); + ze_owned_serializer_t serializer; + ze_serializer_empty(&serializer); + ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number"); + ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number); + ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp"); + ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp); + ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid"); + ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid, RMW_GID_STORAGE_SIZE); + ze_serializer_finish(z_move(serializer), attachment); +} - const char * key_string_ptr = z_string_data(z_loan(key_string)); - size_t key_string_len = z_string_len(z_loan(key_string)); - if (key_string_len == key.length() && strncmp(key_string_ptr, key.c_str(), key.length()) == 0) { - found = true; - } +attachement_data_t::attachement_data_t(const z_loaned_bytes_t * attachment) +{ + ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment); + z_owned_string_t key; - z_drop(z_move(pair)); - z_drop(z_move(key_)); - z_drop(z_move(key_string)); + ze_deserializer_deserialize_string(&deserializer, &key); - if (found) { - break; - } else { - z_drop(z_move(*val)); - } + // Deserialize the sequence_number + if (std::string_view(z_string_data(z_loan(key)), + z_string_len(z_loan(key))) != "sequence_number") + { + throw std::runtime_error("sequence_number is not found in the attachment."); } - - if (!found) { - return false; + z_drop(z_move(key)); + if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number)) { + throw std::runtime_error("Failed to deserialize the sequence_number."); } - if (z_bytes_is_empty(z_loan(*val))) { - return false; + // Deserialize the source_timestamp + ze_deserializer_deserialize_string(&deserializer, &key); + if (std::string_view(z_string_data(z_loan(key)), + z_string_len(z_loan(key))) != "source_timestamp") + { + throw std::runtime_error("source_timestamp is not found in the attachment"); } - - return true; -} - -bool get_gid_from_attachment( - const z_loaned_bytes_t * const attachment, - uint8_t gid[RMW_GID_STORAGE_SIZE]) -{ - if (attachment == NULL || z_bytes_is_empty(attachment)) { - return false; + z_drop(z_move(key)); + if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp)) { + throw std::runtime_error("Failed to deserialize the source_timestamp."); } - z_owned_bytes_t val; - if (!get_attachment(attachment, "source_gid", &val)) { - return false; + // Deserialize the source_gid + ze_deserializer_deserialize_string(&deserializer, &key); + if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") { + throw std::runtime_error("Invalid attachment: the key source_gid is not found"); } - + z_drop(z_move(key)); z_owned_slice_t slice; - z_bytes_deserialize_into_slice(z_loan(val), &slice); + if (ze_deserializer_deserialize_slice(&deserializer, &slice)) { + throw std::runtime_error("Failed to deserialize the source_gid."); + } if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) { - RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "GID length mismatched.") - return false; + throw std::runtime_error("The length of source_gid mismatched."); } - memcpy(gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice))); - - z_drop(z_move(val)); + memcpy(this->source_gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice))); z_drop(z_move(slice)); - - return true; -} - -int64_t get_int64_from_attachment( - const z_loaned_bytes_t * const attachment, - const std::string & name) -{ - // A valid request must have had an attachment - if (attachment == NULL || z_bytes_is_empty(attachment)) { - return -1; - } - - // TODO(yuyuan): This key should be specific - z_owned_bytes_t val; - if (!get_attachment(attachment, name, &val)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Failed to deserialize int64 from the attachment.") - return false; - } - - int64_t num; - if (z_bytes_deserialize_into_int64(z_loan(val), &num) != Z_OK) { - return -1; - } - - if (num == 0) { - // This is an error regardless; the client should never send this - return -1; - } - - z_drop(z_move(val)); - - return num; } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index c31714c1..2648b667 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -17,9 +17,6 @@ #include -#include -#include - #include "rmw/types.h" namespace rmw_zenoh_cpp @@ -28,35 +25,19 @@ namespace rmw_zenoh_cpp class attachement_data_t final { public: - int64_t sequence_number; - int64_t source_timestamp; - uint8_t source_gid[RMW_GID_STORAGE_SIZE]; explicit attachement_data_t( const int64_t _sequence_number, const int64_t _source_timestamp, const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]); - z_result_t serialize_to_zbytes(z_owned_bytes_t *); -}; + explicit attachement_data_t(const z_loaned_bytes_t *); + explicit attachement_data_t(attachement_data_t && data); -class attachement_context_t final -{ -public: - std::unique_ptr data; - int length = 3; - int idx = 0; + int64_t sequence_number; + int64_t source_timestamp; + uint8_t source_gid[RMW_GID_STORAGE_SIZE]; - attachement_context_t(std::unique_ptr && _data); + void serialize_to_zbytes(z_owned_bytes_t *); }; - -bool get_attachment( - const z_loaned_bytes_t * const attachment, - const std::string & key, z_owned_bytes_t * val); - -bool get_gid_from_attachment( - const z_loaned_bytes_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]); - -int64_t get_int64_from_attachment( - const z_loaned_bytes_t * const attachment, const std::string & name); } // namespace rmw_zenoh_cpp #endif // DETAIL__ATTACHMENT_HELPERS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index ab0fa72e..2f9e24c7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -54,12 +54,9 @@ namespace rmw_zenoh_cpp saved_msg_data::saved_msg_data( z_owned_slice_t p, uint64_t recv_ts, - const uint8_t pub_gid[RMW_GID_STORAGE_SIZE], - int64_t seqnum, - int64_t source_ts) -: payload(p), recv_timestamp(recv_ts), sequence_number(seqnum), source_timestamp(source_ts) + attachement_data_t && attachment_) +: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_)) { - memcpy(publisher_gid, pub_gid, RMW_GID_STORAGE_SIZE); } ///============================================================================= @@ -146,10 +143,11 @@ void rmw_subscription_data_t::add_new_message( } // Check for messages lost if the new sequence number is not monotonically increasing. - const size_t gid_hash = hash_gid(msg->publisher_gid); + const size_t gid_hash = hash_gid(msg->attachment.source_gid); auto last_known_pub_it = last_known_published_msg_.find(gid_hash); if (last_known_pub_it != last_known_published_msg_.end()) { - const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second); + const int64_t seq_increment = std::abs(msg->attachment.sequence_number - + last_known_pub_it->second); if (seq_increment > 1) { const size_t num_msg_lost = seq_increment - 1; total_messages_lost_ += num_msg_lost; @@ -162,7 +160,7 @@ void rmw_subscription_data_t::add_new_message( } } // Always update the last known sequence number for the publisher - last_known_published_msg_[gid_hash] = msg->sequence_number; + last_known_published_msg_[gid_hash] = msg->attachment.sequence_number; message_queue_.emplace_back(std::move(msg)); @@ -432,48 +430,17 @@ void sub_data_handler( return; } - uint8_t pub_gid[RMW_GID_STORAGE_SIZE]; - const z_loaned_bytes_t * attachment = z_sample_attachment(sample); - if (!get_gid_from_attachment(attachment, pub_gid)) { - // We failed to get the GID from the attachment. While this isn't fatal, - // it is unusual and so we should report it. - memset(pub_gid, 0, RMW_GID_STORAGE_SIZE); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain publisher GID from the attachment."); - } - - int64_t sequence_number = get_int64_from_attachment(attachment, "sequence_number"); - if (sequence_number < 0) { - // We failed to get the sequence number from the attachment. While this - // isn't fatal, it is unusual and so we should report it. - sequence_number = 0; - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment."); - } - - int64_t source_timestamp = get_int64_from_attachment(attachment, "source_timestamp"); - if (source_timestamp < 0) { - // We failed to get the source timestamp from the attachment. While this - // isn't fatal, it is unusual and so we should report it. - source_timestamp = 0; - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain sequence number from the attachment."); - } - + attachement_data_t attachment(z_sample_attachment(sample)); const z_loaned_bytes_t * payload = z_sample_payload(sample); z_owned_slice_t slice; - z_bytes_deserialize_into_slice(payload, &slice); + z_bytes_to_slice(payload, &slice); sub_data->add_new_message( std::make_unique( slice, z_timestamp_ntp64_time(z_sample_timestamp(sample)), - pub_gid, - sequence_number, - source_timestamp), + std::move(attachment)), z_string_data(z_loan(keystr))); } @@ -572,7 +539,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data) const z_loaned_bytes_t * err_payload = z_reply_err_payload(err); z_owned_string_t err_str; - z_bytes_deserialize_into_string(err_payload, &err_str); + z_bytes_to_string(err_payload, &err_str); RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "z_reply_is_ok returned False for keyexpr %s. Reason: %.*s", diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index e5311091..0456b260 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -39,6 +39,7 @@ #include "message_type_support.hpp" #include "rmw_wait_set_data.hpp" #include "service_type_support.hpp" +#include "attachment_helpers.hpp" /// Structs for various type erased data fields. @@ -50,20 +51,13 @@ void sub_data_handler(z_loaned_sample_t * sample, void * sub_data); struct saved_msg_data { - explicit saved_msg_data( - z_owned_slice_t p, - uint64_t recv_ts, - const uint8_t pub_gid[RMW_GID_STORAGE_SIZE], - int64_t seqnum, - int64_t source_ts); + explicit saved_msg_data(z_owned_slice_t p, uint64_t recv_ts, attachement_data_t && attachment); ~saved_msg_data(); z_owned_slice_t payload; uint64_t recv_timestamp; - uint8_t publisher_gid[RMW_GID_STORAGE_SIZE]; - int64_t sequence_number; - int64_t source_timestamp; + attachement_data_t attachment; }; ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 5abdb313..2e44d661 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -287,34 +287,31 @@ rmw_ret_t PublisherData::publish( const size_t data_length = ser.get_serialized_data_length(); - z_owned_bytes_t attachment; - auto free_attachment = rcpputils::make_scope_exit( - [&attachment]() { - z_drop(z_move(attachment)); - }); - if (!create_map_and_set_sequence_num(&attachment, sequence_number_++, gid_)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - // The encoding is simply forwarded and is useful when key expressions in the // session use different encoding formats. In our case, all key expressions // will be encoded with CDR so it does not really matter. z_publisher_put_options_t options; z_publisher_put_options_default(&options); - free_attachment.cancel(); + z_owned_bytes_t attachment; + create_map_and_set_sequence_num(&attachment, sequence_number_++, gid_); options.attachment = z_move(attachment); z_owned_bytes_t payload; if (shmbuf.has_value()) { - z_bytes_serialize_from_shm_mut(&payload, z_move(shmbuf.value())); + z_bytes_from_shm_mut(&payload, z_move(shmbuf.value())); } else { - z_bytes_serialize_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); + z_bytes_copy_from_buf(&payload, reinterpret_cast(msg_bytes), data_length); } - if (z_publisher_put(z_loan(pub_), z_move(payload), &options) != Z_OK) { - RMW_SET_ERROR_MSG("unable to publish message"); - return RMW_RET_ERROR; + z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options); + if (res != Z_OK) { + if (res == Z_ESESSION_CLOSED) { + RMW_ZENOH_LOG_WARN_NAMED("rmw_zenoh_cpp", + "unable to publish message since the zenoh session is closed"); + } else { + RMW_SET_ERROR_MSG("unable to publish message"); + return RMW_RET_ERROR; + } } return RMW_RET_OK; @@ -335,16 +332,6 @@ rmw_ret_t PublisherData::publish_serialized_message( std::lock_guard lock(mutex_); - z_owned_bytes_t attachment; - auto free_attachment = rcpputils::make_scope_exit( - [&attachment]() { - z_drop(z_move(attachment)); - }); - if (!create_map_and_set_sequence_num(&attachment, sequence_number_++, gid_)) { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } - const size_t data_length = ser.get_serialized_data_length(); // The encoding is simply forwarded and is useful when key expressions in the @@ -352,11 +339,13 @@ rmw_ret_t PublisherData::publish_serialized_message( // will be encoded with CDR so it does not really matter. z_publisher_put_options_t options; z_publisher_put_options_default(&options); - free_attachment.cancel(); + z_owned_bytes_t attachment; + create_map_and_set_sequence_num(&attachment, sequence_number_++, gid_); + options.attachment = z_move(attachment); z_owned_bytes_t payload; - z_bytes_serialize_from_buf(&payload, serialized_message->buffer, data_length); + z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length); if (z_publisher_put(z_loan(pub_), z_move(payload), &options) != Z_OK) { RMW_SET_ERROR_MSG("unable to publish message"); diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 8d4feaa2..f6b82255 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -69,9 +69,6 @@ class PublisherData final // Copy the GID of this PublisherData into an rmw_gid_t. void copy_gid(rmw_gid_t * gid) const; - // Returns true if liveliness token is still valid. - bool liveliness_is_valid() const; - // Get the events manager of this PublisherData. std::shared_ptr events_mgr() const; diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp index 0edee5d0..3321034b 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.cpp @@ -16,14 +16,13 @@ #include "zenoh_utils.hpp" #include "attachment_helpers.hpp" -#include "logging_macros.hpp" #include "rmw/types.h" namespace rmw_zenoh_cpp { ///============================================================================= -bool +void create_map_and_set_sequence_num( z_owned_bytes_t * out_bytes, int64_t sequence_number, @@ -34,13 +33,6 @@ create_map_and_set_sequence_num( int64_t source_timestamp = now_ns.count(); rmw_zenoh_cpp::attachement_data_t data(sequence_number, source_timestamp, gid); - if (data.serialize_to_zbytes(out_bytes)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Failed to serialize the attachment"); - return false; - } - - return true; + data.serialize_to_zbytes(out_bytes); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp index 74687824..fba21bad 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_utils.hpp @@ -22,7 +22,7 @@ namespace rmw_zenoh_cpp { ///============================================================================= -bool +void create_map_and_set_sequence_num( z_owned_bytes_t * out_bytes, int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index d0eaa885..1fc7d9d6 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -746,10 +746,6 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) auto pub_data = node_data->get_pub_data(publisher); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); - if (!pub_data->liveliness_is_valid()) { - return RMW_RET_ERROR; - } - return RMW_RET_OK; } @@ -1354,13 +1350,13 @@ __rmw_take_one( } if (message_info != nullptr) { - message_info->source_timestamp = msg_data->source_timestamp; + message_info->source_timestamp = msg_data->attachment.source_timestamp; message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; + message_info->publication_sequence_number = msg_data->attachment.sequence_number; // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } @@ -1569,13 +1565,13 @@ __rmw_take_serialized( *taken = true; if (message_info != nullptr) { - message_info->source_timestamp = msg_data->source_timestamp; + message_info->source_timestamp = msg_data->attachment.source_timestamp; message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; + message_info->publication_sequence_number = msg_data->attachment.sequence_number; // TODO(clalancette): fill in reception_sequence_number message_info->reception_sequence_number = 0; message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + memcpy(message_info->publisher_gid.data, msg_data->attachment.source_gid, RMW_GID_STORAGE_SIZE); message_info->from_intra_process = false; } @@ -2075,12 +2071,8 @@ rmw_send_request( z_get_options_default(&opts); z_owned_bytes_t attachment; - if (!rmw_zenoh_cpp::create_map_and_set_sequence_num(&attachment, *sequence_id, - client_data->client_gid)) - { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } + rmw_zenoh_cpp::create_map_and_set_sequence_num(&attachment, *sequence_id, + client_data->client_gid); auto free_attachment = rcpputils::make_scope_exit( [&attachment]() { z_drop(z_move(attachment)); @@ -2104,7 +2096,7 @@ rmw_send_request( opts.consolidation = z_query_consolidation_latest(); z_owned_bytes_t payload; - z_bytes_serialize_from_buf( + z_bytes_copy_from_buf( &payload, reinterpret_cast(request_bytes), data_length); opts.payload = z_move(payload); @@ -2160,7 +2152,7 @@ rmw_take_response( } z_owned_slice_t payload; - z_bytes_deserialize_into_slice(z_sample_payload(sample), &payload); + z_bytes_to_slice(z_sample_payload(sample), &payload); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( @@ -2180,28 +2172,20 @@ rmw_take_response( // Fill in the request_header - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(z_sample_attachment(sample), "sequence_number"); + rmw_zenoh_cpp::attachement_data_t attachment(z_sample_attachment(sample)); + + request_header->request_id.sequence_number = attachment.sequence_number; if (request_header->request_id.sequence_number < 0) { RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); return RMW_RET_ERROR; } - request_header->source_timestamp = - rmw_zenoh_cpp::get_int64_from_attachment(z_sample_attachment(sample), "source_timestamp"); + request_header->source_timestamp = attachment.source_timestamp; if (request_header->source_timestamp < 0) { RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - if (!rmw_zenoh_cpp::get_gid_from_attachment( - z_sample_attachment(sample), - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client gid from attachment"); - return RMW_RET_ERROR; - } - auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); request_header->received_timestamp = now_ns.count(); @@ -2631,7 +2615,7 @@ rmw_take_request( // DESERIALIZE MESSAGE ======================================================== z_owned_slice_t payload; - z_bytes_deserialize_into_slice(z_query_payload(loaned_query), &payload); + z_bytes_to_slice(z_query_payload(loaned_query), &payload); // Object that manages the raw buffer eprosima::fastcdr::FastBuffer fastbuffer( @@ -2651,32 +2635,20 @@ rmw_take_request( // Fill in the request header. - // Get the sequence_number out of the attachment - const z_loaned_bytes_t * attachment = z_query_attachment(loaned_query); + rmw_zenoh_cpp::attachement_data_t attachment(z_query_attachment(loaned_query)); - request_header->request_id.sequence_number = - rmw_zenoh_cpp::get_int64_from_attachment(attachment, "sequence_number"); + request_header->request_id.sequence_number = attachment.sequence_number; if (request_header->request_id.sequence_number < 0) { RMW_SET_ERROR_MSG("Failed to get sequence_number from client call attachment"); return RMW_RET_ERROR; } - request_header->source_timestamp = rmw_zenoh_cpp::get_int64_from_attachment( - attachment, - "source_timestamp"); + request_header->source_timestamp = attachment.source_timestamp; if (request_header->source_timestamp < 0) { RMW_SET_ERROR_MSG("Failed to get source_timestamp from client call attachment"); return RMW_RET_ERROR; } - if (!rmw_zenoh_cpp::get_gid_from_attachment( - attachment, - request_header->request_id.writer_guid)) - { - RMW_SET_ERROR_MSG("Could not get client GID from attachment"); - return RMW_RET_ERROR; - } - auto now = std::chrono::system_clock::now().time_since_epoch(); auto now_ns = std::chrono::duration_cast(now); request_header->received_timestamp = now_ns.count(); @@ -2768,16 +2740,12 @@ rmw_send_response( z_query_reply_options_default(&options); z_owned_bytes_t attachment; - if (!rmw_zenoh_cpp::create_map_and_set_sequence_num( - &attachment, request_header->sequence_number, request_header->writer_guid)) - { - // create_map_and_set_sequence_num already set the error - return RMW_RET_ERROR; - } + rmw_zenoh_cpp::create_map_and_set_sequence_num(&attachment, request_header->sequence_number, + request_header->writer_guid); options.attachment = z_move(attachment); z_owned_bytes_t payload; - z_bytes_serialize_from_buf( + z_bytes_copy_from_buf( &payload, reinterpret_cast(response_bytes), data_length); z_query_reply( loaned_query, z_loan(service_data->keyexpr), z_move(payload), &options);