Skip to content

Commit

Permalink
Merge pull request #1502 from mavlink/pr-comm-fixes
Browse files Browse the repository at this point in the history
Various commication fixes
  • Loading branch information
julianoes authored Aug 6, 2021
2 parents 3204b40 + 4e70b28 commit 562b3f6
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 103 deletions.
105 changes: 60 additions & 45 deletions src/core/mavlink_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,11 @@ void MavlinkCommandSender::queue_command_async(
// LogDebug() << "Command " << (int)(command.command) << " to send to "
// << (int)(command.target_system_id)<< ", " << (int)(command.target_component_id);

auto new_work = std::make_shared<Work>(_parent.timeout_s());

mavlink_msg_command_int_pack(
_parent.get_own_system_id(),
_parent.get_own_component_id(),
&new_work->mavlink_message,
command.target_system_id,
command.target_component_id,
command.frame,
command.command,
command.current,
command.autocontinue,
command.params.param1,
command.params.param2,
command.params.param3,
command.params.param4,
command.params.x,
command.params.y,
command.params.z);

new_work->callback = callback;
auto new_work = std::make_shared<Work>();
new_work->timeout_s = _parent.timeout_s();
new_work->command = command;
new_work->mavlink_command = command.command;
new_work->callback = callback;
_work_queue.push_back(new_work);
}

Expand All @@ -116,25 +99,11 @@ void MavlinkCommandSender::queue_command_async(
// LogDebug() << "Command " << (int)(command.command) << " to send to "
// << (int)(command.target_system_id)<< ", " << (int)(command.target_component_id);

auto new_work = std::make_shared<Work>(_parent.timeout_s());
mavlink_msg_command_long_pack(
_parent.get_own_system_id(),
_parent.get_own_component_id(),
&new_work->mavlink_message,
command.target_system_id,
command.target_component_id,
command.command,
command.confirmation,
command.params.param1,
command.params.param2,
command.params.param3,
command.params.param4,
command.params.param5,
command.params.param6,
command.params.param7);

new_work->callback = callback;
auto new_work = std::make_shared<Work>();
new_work->timeout_s = _parent.timeout_s();
new_work->command = command;
new_work->mavlink_command = command.command;
new_work->callback = callback;
new_work->time_started = _parent.get_time().steady_time();
_work_queue.push_back(new_work);
}
Expand Down Expand Up @@ -264,7 +233,8 @@ void MavlinkCommandSender::receive_timeout(const uint16_t command)
<< " s, retries to do: " << work->retries_to_do << " ("
<< work->mavlink_command << ").";

if (!_parent.send_message(work->mavlink_message)) {
mavlink_message_t message = create_mavlink_message(work->command);
if (!_parent.send_message(message)) {
LogErr() << "connection send error in retransmit (" << work->mavlink_command
<< ").";
temp_callback = work->callback;
Expand Down Expand Up @@ -307,7 +277,7 @@ void MavlinkCommandSender::do_work()
return;
}

LogDebug() << "sending it the first time (" << work->mavlink_command << ")";
// LogDebug() << "sending it the first time (" << work->mavlink_command << ")";
work->time_started = _parent.get_time().steady_time();

{
Expand All @@ -318,9 +288,10 @@ void MavlinkCommandSender::do_work()
// The command is inserted in the list only if another command with the same id does
// not exist. If such an element exists, we refuse to send the new command.
if (!was_inserted) {
LogWarn()
<< "A command cannot be sent if another command with the same command_id is still processing! Ignoring command "
<< static_cast<int>(work->mavlink_command);
// LogWarn()
// << "A command cannot be sent if another command with the same command_id is still
// processing! Ignoring command "
// << static_cast<int>(work->mavlink_command);
auto temp_callback = work->callback;
auto temp_result = std::make_pair<Result, float>(Result::CommandDenied, NAN);

Expand All @@ -331,7 +302,8 @@ void MavlinkCommandSender::do_work()
return;
}

if (!_parent.send_message(work->mavlink_message)) {
mavlink_message_t message = create_mavlink_message(work->command);
if (!_parent.send_message(message)) {
LogErr() << "connection send error (" << work->mavlink_command << ")";
auto temp_callback = work->callback;
auto temp_result = std::make_pair<Result, float>(Result::ConnectionError, NAN);
Expand Down Expand Up @@ -367,6 +339,49 @@ void MavlinkCommandSender::call_callback(
_parent.call_user_callback([callback, result, progress]() { callback(result, progress); });
}

mavlink_message_t MavlinkCommandSender::create_mavlink_message(const Command& command)
{
mavlink_message_t message;

if (auto command_int = std::get_if<CommandInt>(&command)) {
mavlink_msg_command_int_pack(
_parent.get_own_system_id(),
_parent.get_own_component_id(),
&message,
command_int->target_system_id,
command_int->target_component_id,
command_int->frame,
command_int->command,
command_int->current,
command_int->autocontinue,
command_int->params.param1,
command_int->params.param2,
command_int->params.param3,
command_int->params.param4,
command_int->params.x,
command_int->params.y,
command_int->params.z);

} else if (auto command_long = std::get_if<CommandLong>(&command)) {
mavlink_msg_command_long_pack(
_parent.get_own_system_id(),
_parent.get_own_component_id(),
&message,
command_long->target_system_id,
command_long->target_component_id,
command_long->command,
command_long->confirmation,
command_long->params.param1,
command_long->params.param2,
command_long->params.param3,
command_long->params.param4,
command_long->params.param5,
command_long->params.param6,
command_long->params.param7);
}
return message;
}

MavlinkCommandReceiver::MavlinkCommandReceiver(SystemImpl& system_impl) : _parent(system_impl)
{
_parent.register_mavlink_message_handler(
Expand Down
13 changes: 9 additions & 4 deletions src/core/mavlink_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <mutex>
#include <optional>
#include <unordered_map>
#include <variant>

namespace mavsdk {

Expand Down Expand Up @@ -112,24 +113,28 @@ class MavlinkCommandSender {
const MavlinkCommandSender& operator=(const MavlinkCommandSender&) = delete;

private:
// The std::monostate is required to work around the fact that
// the default ctor of CommandLong and CommandInt is ill-defined.
using Command = std::variant<std::monostate, CommandLong, CommandInt>;

struct Work {
int retries_to_do{3};
double timeout_s{0.5};
uint16_t mavlink_command{0};
bool already_sent{false};
mavlink_message_t mavlink_message{};
Command command;
uint16_t mavlink_command{};
CommandResultCallback callback{};
dl_time_t time_started{};
void* timeout_cookie = nullptr;

explicit Work(double new_timeout_s) : timeout_s(new_timeout_s) {}
};

void receive_command_ack(mavlink_message_t message);
void receive_timeout(const uint16_t command);

void call_callback(const CommandResultCallback& callback, Result result, float progress);

mavlink_message_t create_mavlink_message(const Command& command);

SystemImpl& _parent;
LockedQueue<Work> _work_queue{};
std::unordered_map<uint16_t, std::shared_ptr<Work>> _sent_commands{};
Expand Down
6 changes: 6 additions & 0 deletions src/core/mavlink_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@
#pragma GCC system_header
#endif

#define MAVLINK_GET_CHANNEL_STATUS

#include "mavlink/v2.0/mavlink_types.h"

extern mavlink_status_t* mavlink_get_channel_status(uint8_t chan);

#include "mavlink/v2.0/common/mavlink.h"
21 changes: 20 additions & 1 deletion src/core/mavlink_mission_transfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ void MAVLinkMissionTransfer::UploadWorkItem::send_count()
return;
}

// LogDebug() << "Sending send_count, count: " << _items.size() << ", retries: " <<
// _retries_done;

++_retries_done;
}

Expand Down Expand Up @@ -283,6 +286,7 @@ void MAVLinkMissionTransfer::UploadWorkItem::process_mission_request(
mavlink_mission_request_t request;
mavlink_msg_mission_request_decode(&request_message, &request);

// FIXME: this will mess with the sequence number.
mavlink_message_t request_int_message;
mavlink_msg_mission_request_int_pack(
request_message.sysid,
Expand Down Expand Up @@ -329,6 +333,9 @@ void MAVLinkMissionTransfer::UploadWorkItem::process_mission_request_int(

_step = Step::SendItems;

// LogDebug() << "Process mission_request_int, seq: " << request_int.seq
// << ", next expected sequence: " << _next_sequence;

if (_next_sequence < request_int.seq) {
// We should not go back to a previous one.
// TODO: figure out if we should error here.
Expand All @@ -338,6 +345,7 @@ void MAVLinkMissionTransfer::UploadWorkItem::process_mission_request_int(
} else if (_next_sequence > request_int.seq) {
// We have already sent that one before.
if (_retries_done >= retries) {
LogWarn() << "mission_request_int: retries exceeded";
_timeout_handler.remove(_cookie);
callback_and_reset(Result::Timeout);
return;
Expand Down Expand Up @@ -382,6 +390,9 @@ void MAVLinkMissionTransfer::UploadWorkItem::send_mission_item()
_items[_next_sequence].z,
_type);

// LogDebug() << "Sending mission_item_int seq: " << _next_sequence
// << ", retry: " << _retries_done;

++_next_sequence;

if (!_sender.send_message(message)) {
Expand All @@ -400,6 +411,8 @@ void MAVLinkMissionTransfer::UploadWorkItem::process_mission_ack(const mavlink_m
mavlink_mission_ack_t mission_ack;
mavlink_msg_mission_ack_decode(&message, &mission_ack);

// LogDebug() << "Received mission_ack type: " << static_cast<int>(mission_ack.type);

_timeout_handler.remove(_cookie);

switch (mission_ack.type) {
Expand Down Expand Up @@ -454,7 +467,10 @@ void MAVLinkMissionTransfer::UploadWorkItem::process_timeout()
{
std::lock_guard<std::mutex> lock(_mutex);

// LogDebug() << "Timeout triggered, retries: " << _retries_done;

if (_retries_done >= retries) {
LogWarn() << "timeout: retries exceeded";
callback_and_reset(Result::Timeout);
return;
}
Expand All @@ -466,7 +482,10 @@ void MAVLinkMissionTransfer::UploadWorkItem::process_timeout()
break;

case Step::SendItems:
callback_and_reset(Result::Timeout);
// When waiting for items requested we should wait longer than
// just our timeout, otherwise we give up too quickly.
++_retries_done;
_timeout_handler.add([this]() { process_timeout(); }, _timeout_s, &_cookie);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/mavlink_mission_transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class MAVLinkMissionTransfer {
unsigned _retries_done{0};
};

static constexpr unsigned retries = 4;
static constexpr unsigned retries = 5;

using TimeoutSCallback = std::function<double()>;

Expand Down
11 changes: 10 additions & 1 deletion src/core/mavlink_mission_transfer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,18 @@ TEST_F(MAVLinkMissionTransferTest, UploadMissionTimeoutAfterSendMissionItem)

message_handler.process_message(make_mission_request_int(MAV_MISSION_TYPE_MISSION, 0));

time.sleep_for(std::chrono::milliseconds(static_cast<int>(timeout_s * 1.1 * 1000.)));
// Make sure single timeout does not trigger it yet.
time.sleep_for(std::chrono::milliseconds(static_cast<int>(timeout_s * 1000. + 250)));
timeout_handler.run_once();

EXPECT_EQ(fut.wait_for(std::chrono::milliseconds(50)), std::future_status::timeout);

// But multiple do.
for (unsigned i = 0; i < (MAVLinkMissionTransfer::retries - 1); ++i) {
time.sleep_for(std::chrono::milliseconds(static_cast<int>(timeout_s * 1000.)));
timeout_handler.run_once();
}

EXPECT_EQ(fut.wait_for(std::chrono::seconds(1)), std::future_status::ready);

// Ignore later (wrong) ack.
Expand Down
8 changes: 8 additions & 0 deletions src/core/mavsdk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
#include "cli_arg.h"
#include "version.h"

static mavlink_status_t status_;

mavlink_status_t* mavlink_get_channel_status(uint8_t chan)
{
UNUSED(chan);
return &status_;
}

namespace mavsdk {

MavsdkImpl::MavsdkImpl() : timeout_handler(_time), call_every_handler(_time)
Expand Down
8 changes: 8 additions & 0 deletions src/core/timeout_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ void TimeoutHandler::add(std::function<void()> callback, double duration_s, void

void TimeoutHandler::refresh(const void* cookie)
{
if (cookie == nullptr) {
return;
}

std::lock_guard<std::mutex> lock(_timeouts_mutex);

auto it = _timeouts.find(const_cast<void*>(cookie));
Expand All @@ -38,6 +42,10 @@ void TimeoutHandler::refresh(const void* cookie)

void TimeoutHandler::remove(const void* cookie)
{
if (cookie == nullptr) {
return;
}

std::lock_guard<std::mutex> lock(_timeouts_mutex);

auto it = _timeouts.find(const_cast<void*>(cookie));
Expand Down
14 changes: 11 additions & 3 deletions src/plugins/camera/camera_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,13 @@ void CameraImpl::subscribe_information(const Camera::InformationCallback& callba
}

if (callback) {
_parent->add_call_every([this]() { request_status(); }, 1.0, &_status.call_every_cookie);
if (_status.call_every_cookie == nullptr) {
_parent->add_call_every(
[this]() { request_status(); }, 5.0, &_status.call_every_cookie);
}
} else {
_parent->remove_call_every(_status.call_every_cookie);
_status.call_every_cookie = nullptr;
}
}

Expand Down Expand Up @@ -702,7 +706,7 @@ void CameraImpl::subscribe_mode(const Camera::ModeCallback callback)

if (callback) {
_parent->add_call_every(
[this]() { request_camera_settings(); }, 1.0, &_mode.call_every_cookie);
[this]() { request_camera_settings(); }, 5.0, &_mode.call_every_cookie);
} else {
_parent->remove_call_every(_mode.call_every_cookie);
}
Expand Down Expand Up @@ -732,9 +736,13 @@ void CameraImpl::subscribe_status(const Camera::StatusCallback callback)
_status.subscription_callback = callback;

if (callback) {
_parent->add_call_every([this]() { request_status(); }, 1.0, &_status.call_every_cookie);
if (_status.call_every_cookie == nullptr) {
_parent->add_call_every(
[this]() { request_status(); }, 5.0, &_status.call_every_cookie);
}
} else {
_parent->remove_call_every(_status.call_every_cookie);
_status.call_every_cookie = nullptr;
}
}

Expand Down
Loading

0 comments on commit 562b3f6

Please sign in to comment.