Skip to content

Commit

Permalink
goal handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
m-dahl committed Jan 17, 2022
1 parent edda868 commit 1037ee7
Showing 1 changed file with 60 additions and 76 deletions.
136 changes: 60 additions & 76 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use r2r::{sensor_msgs, std_msgs, ur_script_msgs};
use r2r::{Context, Node, ParameterValue, Publisher, ActionServerGoal, ServiceRequest};
use r2r::{Context, Node, ParameterValue, Publisher, ServiceRequest};
use std::io::{Error, ErrorKind};
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand All @@ -16,10 +16,9 @@ use ur_script_msgs::srv::DashboardCommand as DBCommand;

struct DriverState {
running: bool,
// only handle one goal at the time.
// later think about allowing goals to be queued up
goal: Option<ActionServerGoal<ExecuteScript::Action>>,
goal_sender: Option<oneshot::Sender<()>>,
// only handle one goal at the time. reply with true/false if
// script executed successfully.
goal_sender: Option<oneshot::Sender<bool>>,
robot_state: i32,
program_state: i32,
joint_values: Vec<f64>,
Expand Down Expand Up @@ -48,7 +47,6 @@ impl DriverState {
fn new() -> Self {
DriverState {
running: true,
goal: None,
goal_sender: None,
robot_state: 0,
program_state: 0,
Expand Down Expand Up @@ -96,7 +94,7 @@ async fn handle_dashboard_commands(
let ok = ret.is_ok();

let resp = DBCommand::Response { ok };
req.respond(resp);
req.respond(resp).expect("could not send service response");
}
None => break,
}
Expand All @@ -114,15 +112,6 @@ async fn action_server(
match requests.next().await {
Some(req) => {

if driver_state.lock().unwrap().goal.is_some() {
println!(
"Already have a goal, rejecting request with goal id: {}, script: '{}'",
req.uuid, req.goal.script
);
req.reject().expect("could not reject goal");
continue;
}

if driver_state.lock().unwrap().robot_state != 1 { //todo
println!(
"Robot is in protective stop, rejecting request with goal id: {}, script: '{}'",
Expand All @@ -137,7 +126,7 @@ async fn action_server(
req.uuid, req.goal.script
);

let (goal_sender, goal_receiver) = oneshot::channel::<()>();
let (goal_sender, goal_receiver) = oneshot::channel::<bool>();
let (mut g, mut cancel) = req.accept().expect("could not accept goal");

println!("making a new connection to the driver.");
Expand All @@ -156,16 +145,20 @@ async fn action_server(

{
let mut ds = driver_state.lock().unwrap();
ds.goal = Some(g.clone());
ds.goal_sender = Some(goal_sender);
}

match future::select(goal_receiver, cancel.next()).await {
Either::Left(_) => {
Either::Left((res, _cancel_stream)) => {
// success.
println!("goal completed!");
let result_msg = ExecuteScript::Result { ok: true };
g.succeed(result_msg).expect("could not send result");
if let Ok(ok) = res {
let result_msg = ExecuteScript::Result { ok };
// TODO: perhaps g.abort here if ok is false.
g.succeed(result_msg).expect("could not send result");
} else {
println!("future appears canceled...");
}
},
Either::Right((request, goal_receiver)) => {
if let Some(request) = request {
Expand All @@ -181,29 +174,30 @@ async fn action_server(
.try_send((DashboardCommand::Stop, sender))
.expect("could not send...");

let driver_state_task = driver_state.clone();

match future::select(cancel_receiver, goal_receiver).await {
Either::Left((res, _goal_receiver)) => {
// cancelled using dashboard
// todo: check res
let result_msg = ExecuteScript::Result { ok: false };
if let Some(mut goal) = driver_state_task.lock().unwrap().goal.take() {
goal.cancel(result_msg).expect("could not cancel goal");
if let Ok(ok) = res {
let result_msg = ExecuteScript::Result { ok };
g.cancel(result_msg).expect("could not cancel goal");
}
},
Either::Right((res, _cancel_receiver)) => {
// finished executing anyway
// todo: check res
let result_msg = ExecuteScript::Result { ok: true };
if let Some(mut goal) = driver_state_task.lock().unwrap().goal.take() {
goal.cancel(result_msg).expect("could not cancel goal");
if let Ok(ok) = res {
let result_msg = ExecuteScript::Result { ok };
g.succeed(result_msg).expect("could not succeed goal");
}
}
}
}
}
};

// at this point we have processed the goal.
driver_state.lock().unwrap().goal_sender = None;
}
None => break,
}
Expand Down Expand Up @@ -238,8 +232,7 @@ async fn realtime_reader(
ur_address: String,
) -> Result<(), std::io::Error> {
let mut checking_for_1 = false;
// let mut checking_for_1_since = None;
let mut cancelling = false;
let mut checking_for_2_since = None;
let mut stream = connect_loop(&ur_address).await;
let mut size_bytes = [0u8; 4];

Expand All @@ -253,6 +246,14 @@ async fn realtime_reader(
if let Err(_) = ret {
println!("timeout on read, reconnecting...");
stream = connect_loop(&ur_address).await;
// reset state
{
let mut ds = driver_state.lock().unwrap();
ds.goal_sender = None;
}
checking_for_1 = false;
checking_for_2_since = None;

continue;
} else if let Ok(ret) = ret {
if let Err(e) = ret {
Expand Down Expand Up @@ -332,33 +333,35 @@ async fn realtime_reader(
(*ds).output_bit5 = digital_outputs & 32 == 32;
(*ds).output_bit6 = digital_outputs & 64 == 64;
(*ds).output_bit7 = digital_outputs & 128 == 128;
(*ds).goal.is_some()
(*ds).goal_sender.is_some()
};

// if program_running && checking_for_1_since.is_none() {
// // flank check for when we requested program (waiting for program state 2)
// checking_for_1_since = Some(std::time::Instant::now());
// }

// if program_running && checking_for_1_since.is_some() && program_state == 1 {
// // we are currently waiting for program state == 2
// let elapsed_since_request = checking_for_1_since.unwrap().elapsed();
// if elapsed_since_request > std::time::Duration::from_millis(100) {
// let result_msg = ExecuteScript::Result { ok: false };
// {
// let mut ds = driver_state.lock().unwrap();
// if let Some(mut goal) = ds.goal.take() {
// println!("program state never changed");
// goal.abort(result_msg).expect("could not abort goal");
// }
// }
// }
// }
let checking_for_2 = program_state == 1 && !checking_for_1;
if program_running && checking_for_2 && checking_for_2_since.is_none() {
// flank check for when we requested program (waiting for program state 2)
checking_for_2_since = Some(std::time::Instant::now());
} else if program_running && checking_for_2 && checking_for_2_since.is_some() {
// we are currently waiting for program state == 2
let elapsed_since_request = checking_for_2_since.unwrap().elapsed();
if elapsed_since_request > std::time::Duration::from_millis(500) {
// if there's been more than 500 millis without the program
// entering the running state, abort this request.
checking_for_2_since = None;
{
let mut ds = driver_state.lock().unwrap();
if let Some(goal_sender) = ds.goal_sender.take() {
println!("program state never changed to running");
goal_sender.send(false).expect("goal receiver dropped");
}
}
}
}

// when we have a goal, first wait until program_state reaches 2
if program_running && program_state == 2 && !checking_for_1 {
println!("program started, waiting for finish");
checking_for_1 = true;
checking_for_2_since = None;
}

// when the program state has been 2 and goes back to
Expand All @@ -369,12 +372,11 @@ async fn realtime_reader(
checking_for_1 = false;

// we are finished. succeed and remove the action goal handle.
let result_msg = ExecuteScript::Result { ok: true };
{
let mut ds = driver_state.lock().unwrap();
if let Some(mut goal) = ds.goal.take() {
if let Some(goal_sender) = ds.goal_sender.take() {
println!("goal succeeded");
goal.succeed(result_msg).expect("could not set result");
goal_sender.send(true).expect("goal receiver dropped");
} else {
println!("we fininshed but probably canceled the goal before...");
}
Expand All @@ -386,12 +388,13 @@ async fn realtime_reader(
// there is an active goal, abort it. we are
// finished. succeed and remove the action goal
// handle.
let result_msg = ExecuteScript::Result { ok: false };
{
let mut ds = driver_state.lock().unwrap();
if let Some(mut goal) = ds.goal.take() {
checking_for_1 = false;
checking_for_2_since = None;
if let Some(goal_sender) = ds.goal_sender.take() {
println!("aborting due to protective stop");
goal.abort(result_msg).expect("could not abort goal");
goal_sender.send(false).expect("goal receiver dropped");
}
}
}
Expand Down Expand Up @@ -574,7 +577,6 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
.create_publisher::<ur_script_msgs::msg::Measured>("measured",
r2r::qos::QosProfile::default())?;

let (tx, rx) = mpsc::channel::<String>(10);
let (tx_dashboard, rx_dashboard) =
mpsc::channel::<(DashboardCommand, oneshot::Sender<bool>)>(10);

Expand All @@ -585,17 +587,6 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {

let shared_state = Arc::new(Mutex::new(DriverState::new()));

let shared_state_cb = shared_state.clone();
let handle_goal_cb = move |g: r2r::ActionServerGoal<ExecuteScript::Action>| {
// since we already know that we do not accept goal unless we
// don't already have one, simply set this goal handle as the
// currently active goal...
(*shared_state_cb.lock().unwrap()).goal.replace(g.clone());
// ... and pass the ur script on to the driver
tx.try_send(g.goal.script.clone())
.expect("could not send new script");
};

let server_requests = node
.create_action_server::<ExecuteScript::Action>("ur_script")?;

Expand All @@ -605,13 +596,6 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
tx_dashboard,
server_requests);

// let _server = node.create_action_server::<ExecuteScript::Action>(
// "ur_script",
// Box::new(move |uuid, goal| accept_goal_cb(shared_state_cb.clone(), uuid, goal)),
// Box::new(accept_cancel_cb),
// Box::new(handle_goal_cb),
// )?;

let task_shared_state = shared_state.clone();

let realtime_reader = realtime_reader(
Expand Down

0 comments on commit 1037ee7

Please sign in to comment.