Skip to content

Commit

Permalink
refactor(api): align with latest serialization changes
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Oct 2, 2024
1 parent d022967 commit 1cfc9f6
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 297 deletions.
174 changes: 49 additions & 125 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,17 @@

#include <cstdlib>
#include <cstring>
#include <string>
#include <stdexcept>
#include <string_view>
#include <utility>

#include "rmw/types.h"

#include "logging_macros.hpp"

#include "attachment_helpers.hpp"

namespace rmw_zenoh_cpp
{

attachement_context_t::attachement_context_t(std::unique_ptr<attachement_data_t> && _data)
: data(std::move(_data)) {}

bool create_attachment_iter(z_owned_bytes_t * kv_pair, void * context)
{
attachement_context_t * ctx = reinterpret_cast<attachement_context_t *>(context);
z_owned_bytes_t k, v;

if (ctx->idx == 0) {
z_bytes_serialize_from_str(&k, "sequence_number");
z_bytes_serialize_from_int64(&v, ctx->data->sequence_number);
} else if (ctx->idx == 1) {
z_bytes_serialize_from_str(&k, "source_timestamp");
z_bytes_serialize_from_int64(&v, ctx->data->source_timestamp);
} else if (ctx->idx == 2) {
z_bytes_serialize_from_str(&k, "source_gid");
z_bytes_serialize_from_buf(
&v, ctx->data->source_gid,
RMW_GID_STORAGE_SIZE);
} else {
return false;
}

z_bytes_from_pair(kv_pair, z_move(k), z_move(v));
ctx->idx += 1;
return true;
}

attachement_data_t::attachement_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
Expand All @@ -66,117 +37,70 @@ attachement_data_t::attachement_data_t(
memcpy(source_gid, _source_gid, RMW_GID_STORAGE_SIZE);
}

z_result_t attachement_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment)
attachement_data_t::attachement_data_t(attachement_data_t && data)
{
attachement_context_t context =
attachement_context_t(std::make_unique<attachement_data_t>(*this));
return z_bytes_from_iter(
attachment, create_attachment_iter,
reinterpret_cast<void *>(&context));
sequence_number = std::move(data.sequence_number);
source_timestamp = std::move(data.source_timestamp);
memcpy(source_gid, data.source_gid, RMW_GID_STORAGE_SIZE);
}


bool get_attachment(
const z_loaned_bytes_t * const attachment,
const std::string & key, z_owned_bytes_t * val)
void attachement_data_t::serialize_to_zbytes(z_owned_bytes_t * attachment)
{
if (attachment == NULL || z_bytes_is_empty(attachment)) {
return false;
}

z_bytes_iterator_t iter = z_bytes_get_iterator(attachment);
z_owned_bytes_t pair, key_;
bool found = false;

while (z_bytes_iterator_next(&iter, &pair)) {
z_bytes_deserialize_into_pair(z_loan(pair), &key_, val);
z_owned_string_t key_string;
z_bytes_deserialize_into_string(z_loan(key_), &key_string);
ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_str(z_loan_mut(serializer), "sequence_number");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->sequence_number);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_timestamp");
ze_serializer_serialize_int64(z_loan_mut(serializer), this->source_timestamp);
ze_serializer_serialize_str(z_loan_mut(serializer), "source_gid");
ze_serializer_serialize_buf(z_loan_mut(serializer), this->source_gid, RMW_GID_STORAGE_SIZE);
ze_serializer_finish(z_move(serializer), attachment);
}

const char * key_string_ptr = z_string_data(z_loan(key_string));
size_t key_string_len = z_string_len(z_loan(key_string));
if (key_string_len == key.length() && strncmp(key_string_ptr, key.c_str(), key.length()) == 0) {
found = true;
}
attachement_data_t::attachement_data_t(const z_loaned_bytes_t * attachment)
{
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
z_owned_string_t key;

z_drop(z_move(pair));
z_drop(z_move(key_));
z_drop(z_move(key_string));
ze_deserializer_deserialize_string(&deserializer, &key);

if (found) {
break;
} else {
z_drop(z_move(*val));
}
// Deserialize the sequence_number
if (std::string_view(z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "sequence_number")
{
throw std::runtime_error("sequence_number is not found in the attachment.");
}

if (!found) {
return false;
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->sequence_number)) {
throw std::runtime_error("Failed to deserialize the sequence_number.");
}

if (z_bytes_is_empty(z_loan(*val))) {
return false;
// Deserialize the source_timestamp
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(z_string_data(z_loan(key)),
z_string_len(z_loan(key))) != "source_timestamp")
{
throw std::runtime_error("source_timestamp is not found in the attachment");
}

return true;
}

bool get_gid_from_attachment(
const z_loaned_bytes_t * const attachment,
uint8_t gid[RMW_GID_STORAGE_SIZE])
{
if (attachment == NULL || z_bytes_is_empty(attachment)) {
return false;
z_drop(z_move(key));
if (ze_deserializer_deserialize_int64(&deserializer, &this->source_timestamp)) {
throw std::runtime_error("Failed to deserialize the source_timestamp.");
}

z_owned_bytes_t val;
if (!get_attachment(attachment, "source_gid", &val)) {
return false;
// Deserialize the source_gid
ze_deserializer_deserialize_string(&deserializer, &key);
if (std::string_view(z_string_data(z_loan(key)), z_string_len(z_loan(key))) != "source_gid") {
throw std::runtime_error("Invalid attachment: the key source_gid is not found");
}

z_drop(z_move(key));
z_owned_slice_t slice;
z_bytes_deserialize_into_slice(z_loan(val), &slice);
if (ze_deserializer_deserialize_slice(&deserializer, &slice)) {
throw std::runtime_error("Failed to deserialize the source_gid.");
}
if (z_slice_len(z_loan(slice)) != RMW_GID_STORAGE_SIZE) {
RMW_ZENOH_LOG_ERROR_NAMED("rmw_zenoh_cpp", "GID length mismatched.")
return false;
throw std::runtime_error("The length of source_gid mismatched.");
}
memcpy(gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));

z_drop(z_move(val));
memcpy(this->source_gid, z_slice_data(z_loan(slice)), z_slice_len(z_loan(slice)));
z_drop(z_move(slice));

return true;
}

int64_t get_int64_from_attachment(
const z_loaned_bytes_t * const attachment,
const std::string & name)
{
// A valid request must have had an attachment
if (attachment == NULL || z_bytes_is_empty(attachment)) {
return -1;
}

// TODO(yuyuan): This key should be specific
z_owned_bytes_t val;
if (!get_attachment(attachment, name, &val)) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Failed to deserialize int64 from the attachment.")
return false;
}

int64_t num;
if (z_bytes_deserialize_into_int64(z_loan(val), &num) != Z_OK) {
return -1;
}

if (num == 0) {
// This is an error regardless; the client should never send this
return -1;
}

z_drop(z_move(val));

return num;
}
} // namespace rmw_zenoh_cpp
31 changes: 6 additions & 25 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include <zenoh.h>

#include <string>
#include <memory>

#include "rmw/types.h"

namespace rmw_zenoh_cpp
Expand All @@ -28,35 +25,19 @@ namespace rmw_zenoh_cpp
class attachement_data_t final
{
public:
int64_t sequence_number;
int64_t source_timestamp;
uint8_t source_gid[RMW_GID_STORAGE_SIZE];
explicit attachement_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]);
z_result_t serialize_to_zbytes(z_owned_bytes_t *);
};
explicit attachement_data_t(const z_loaned_bytes_t *);
explicit attachement_data_t(attachement_data_t && data);

class attachement_context_t final
{
public:
std::unique_ptr<attachement_data_t> data;
int length = 3;
int idx = 0;
int64_t sequence_number;
int64_t source_timestamp;
uint8_t source_gid[RMW_GID_STORAGE_SIZE];

attachement_context_t(std::unique_ptr<attachement_data_t> && _data);
void serialize_to_zbytes(z_owned_bytes_t *);
};

bool get_attachment(
const z_loaned_bytes_t * const attachment,
const std::string & key, z_owned_bytes_t * val);

bool get_gid_from_attachment(
const z_loaned_bytes_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]);

int64_t get_int64_from_attachment(
const z_loaned_bytes_t * const attachment, const std::string & name);
} // namespace rmw_zenoh_cpp

#endif // DETAIL__ATTACHMENT_HELPERS_HPP_
53 changes: 10 additions & 43 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ namespace rmw_zenoh_cpp
saved_msg_data::saved_msg_data(
z_owned_slice_t p,
uint64_t recv_ts,
const uint8_t pub_gid[RMW_GID_STORAGE_SIZE],
int64_t seqnum,
int64_t source_ts)
: payload(p), recv_timestamp(recv_ts), sequence_number(seqnum), source_timestamp(source_ts)
attachement_data_t && attachment_)
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
{
memcpy(publisher_gid, pub_gid, RMW_GID_STORAGE_SIZE);
}

///=============================================================================
Expand Down Expand Up @@ -146,10 +143,11 @@ void rmw_subscription_data_t::add_new_message(
}

// Check for messages lost if the new sequence number is not monotonically increasing.
const size_t gid_hash = hash_gid(msg->publisher_gid);
const size_t gid_hash = hash_gid(msg->attachment.source_gid);
auto last_known_pub_it = last_known_published_msg_.find(gid_hash);
if (last_known_pub_it != last_known_published_msg_.end()) {
const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second);
const int64_t seq_increment = std::abs(msg->attachment.sequence_number -
last_known_pub_it->second);
if (seq_increment > 1) {
const size_t num_msg_lost = seq_increment - 1;
total_messages_lost_ += num_msg_lost;
Expand All @@ -162,7 +160,7 @@ void rmw_subscription_data_t::add_new_message(
}
}
// Always update the last known sequence number for the publisher
last_known_published_msg_[gid_hash] = msg->sequence_number;
last_known_published_msg_[gid_hash] = msg->attachment.sequence_number;

message_queue_.emplace_back(std::move(msg));

Expand Down Expand Up @@ -432,48 +430,17 @@ void sub_data_handler(
return;
}

uint8_t pub_gid[RMW_GID_STORAGE_SIZE];
const z_loaned_bytes_t * attachment = z_sample_attachment(sample);
if (!get_gid_from_attachment(attachment, pub_gid)) {
// We failed to get the GID from the attachment. While this isn't fatal,
// it is unusual and so we should report it.
memset(pub_gid, 0, RMW_GID_STORAGE_SIZE);
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain publisher GID from the attachment.");
}

int64_t sequence_number = get_int64_from_attachment(attachment, "sequence_number");
if (sequence_number < 0) {
// We failed to get the sequence number from the attachment. While this
// isn't fatal, it is unusual and so we should report it.
sequence_number = 0;
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment.");
}

int64_t source_timestamp = get_int64_from_attachment(attachment, "source_timestamp");
if (source_timestamp < 0) {
// We failed to get the source timestamp from the attachment. While this
// isn't fatal, it is unusual and so we should report it.
source_timestamp = 0;
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain sequence number from the attachment.");
}

attachement_data_t attachment(z_sample_attachment(sample));
const z_loaned_bytes_t * payload = z_sample_payload(sample);

z_owned_slice_t slice;
z_bytes_deserialize_into_slice(payload, &slice);
z_bytes_to_slice(payload, &slice);

sub_data->add_new_message(
std::make_unique<saved_msg_data>(
slice,
z_timestamp_ntp64_time(z_sample_timestamp(sample)),
pub_gid,
sequence_number,
source_timestamp),
std::move(attachment)),
z_string_data(z_loan(keystr)));
}

Expand Down Expand Up @@ -572,7 +539,7 @@ void client_data_handler(z_loaned_reply_t * reply, void * data)
const z_loaned_bytes_t * err_payload = z_reply_err_payload(err);

z_owned_string_t err_str;
z_bytes_deserialize_into_string(err_payload, &err_str);
z_bytes_to_string(err_payload, &err_str);
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"z_reply_is_ok returned False for keyexpr %s. Reason: %.*s",
Expand Down
12 changes: 3 additions & 9 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "message_type_support.hpp"
#include "rmw_wait_set_data.hpp"
#include "service_type_support.hpp"
#include "attachment_helpers.hpp"

/// Structs for various type erased data fields.

Expand All @@ -50,20 +51,13 @@ void sub_data_handler(z_loaned_sample_t * sample, void * sub_data);

struct saved_msg_data
{
explicit saved_msg_data(
z_owned_slice_t p,
uint64_t recv_ts,
const uint8_t pub_gid[RMW_GID_STORAGE_SIZE],
int64_t seqnum,
int64_t source_ts);
explicit saved_msg_data(z_owned_slice_t p, uint64_t recv_ts, attachement_data_t && attachment);

~saved_msg_data();

z_owned_slice_t payload;
uint64_t recv_timestamp;
uint8_t publisher_gid[RMW_GID_STORAGE_SIZE];
int64_t sequence_number;
int64_t source_timestamp;
attachement_data_t attachment;
};

///=============================================================================
Expand Down
Loading

0 comments on commit 1cfc9f6

Please sign in to comment.