diff --git a/src/main.rs b/src/main.rs index 8becf24..ed449e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use r2r::{sensor_msgs, std_msgs, ur_script_msgs}; -use r2r::{Context, Node, ParameterValue, Publisher, ActionServerGoal, ServiceRequest}; +use r2r::{Context, Node, ParameterValue, Publisher, ServiceRequest}; use std::io::{Error, ErrorKind}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -16,10 +16,9 @@ use ur_script_msgs::srv::DashboardCommand as DBCommand; struct DriverState { running: bool, - // only handle one goal at the time. - // later think about allowing goals to be queued up - goal: Option>, - goal_sender: Option>, + // only handle one goal at the time. reply with true/false if + // script executed successfully. + goal_sender: Option>, robot_state: i32, program_state: i32, joint_values: Vec, @@ -48,7 +47,6 @@ impl DriverState { fn new() -> Self { DriverState { running: true, - goal: None, goal_sender: None, robot_state: 0, program_state: 0, @@ -96,7 +94,7 @@ async fn handle_dashboard_commands( let ok = ret.is_ok(); let resp = DBCommand::Response { ok }; - req.respond(resp); + req.respond(resp).expect("could not send service response"); } None => break, } @@ -114,15 +112,6 @@ async fn action_server( match requests.next().await { Some(req) => { - if driver_state.lock().unwrap().goal.is_some() { - println!( - "Already have a goal, rejecting request with goal id: {}, script: '{}'", - req.uuid, req.goal.script - ); - req.reject().expect("could not reject goal"); - continue; - } - if driver_state.lock().unwrap().robot_state != 1 { //todo println!( "Robot is in protective stop, rejecting request with goal id: {}, script: '{}'", @@ -137,7 +126,7 @@ async fn action_server( req.uuid, req.goal.script ); - let (goal_sender, goal_receiver) = oneshot::channel::<()>(); + let (goal_sender, goal_receiver) = oneshot::channel::(); let (mut g, mut cancel) = req.accept().expect("could not accept goal"); println!("making a new connection to the driver."); @@ -156,16 +145,20 @@ async fn action_server( { let mut ds = driver_state.lock().unwrap(); - ds.goal = Some(g.clone()); ds.goal_sender = Some(goal_sender); } match future::select(goal_receiver, cancel.next()).await { - Either::Left(_) => { + Either::Left((res, _cancel_stream)) => { // success. println!("goal completed!"); - let result_msg = ExecuteScript::Result { ok: true }; - g.succeed(result_msg).expect("could not send result"); + if let Ok(ok) = res { + let result_msg = ExecuteScript::Result { ok }; + // TODO: perhaps g.abort here if ok is false. + g.succeed(result_msg).expect("could not send result"); + } else { + println!("future appears canceled..."); + } }, Either::Right((request, goal_receiver)) => { if let Some(request) = request { @@ -181,29 +174,30 @@ async fn action_server( .try_send((DashboardCommand::Stop, sender)) .expect("could not send..."); - let driver_state_task = driver_state.clone(); - match future::select(cancel_receiver, goal_receiver).await { Either::Left((res, _goal_receiver)) => { // cancelled using dashboard // todo: check res - let result_msg = ExecuteScript::Result { ok: false }; - if let Some(mut goal) = driver_state_task.lock().unwrap().goal.take() { - goal.cancel(result_msg).expect("could not cancel goal"); + if let Ok(ok) = res { + let result_msg = ExecuteScript::Result { ok }; + g.cancel(result_msg).expect("could not cancel goal"); } }, Either::Right((res, _cancel_receiver)) => { // finished executing anyway // todo: check res - let result_msg = ExecuteScript::Result { ok: true }; - if let Some(mut goal) = driver_state_task.lock().unwrap().goal.take() { - goal.cancel(result_msg).expect("could not cancel goal"); + if let Ok(ok) = res { + let result_msg = ExecuteScript::Result { ok }; + g.succeed(result_msg).expect("could not succeed goal"); } } } } } }; + + // at this point we have processed the goal. + driver_state.lock().unwrap().goal_sender = None; } None => break, } @@ -238,8 +232,7 @@ async fn realtime_reader( ur_address: String, ) -> Result<(), std::io::Error> { let mut checking_for_1 = false; - // let mut checking_for_1_since = None; - let mut cancelling = false; + let mut checking_for_2_since = None; let mut stream = connect_loop(&ur_address).await; let mut size_bytes = [0u8; 4]; @@ -253,6 +246,14 @@ async fn realtime_reader( if let Err(_) = ret { println!("timeout on read, reconnecting..."); stream = connect_loop(&ur_address).await; + // reset state + { + let mut ds = driver_state.lock().unwrap(); + ds.goal_sender = None; + } + checking_for_1 = false; + checking_for_2_since = None; + continue; } else if let Ok(ret) = ret { if let Err(e) = ret { @@ -332,33 +333,35 @@ async fn realtime_reader( (*ds).output_bit5 = digital_outputs & 32 == 32; (*ds).output_bit6 = digital_outputs & 64 == 64; (*ds).output_bit7 = digital_outputs & 128 == 128; - (*ds).goal.is_some() + (*ds).goal_sender.is_some() }; - // if program_running && checking_for_1_since.is_none() { - // // flank check for when we requested program (waiting for program state 2) - // checking_for_1_since = Some(std::time::Instant::now()); - // } - - // if program_running && checking_for_1_since.is_some() && program_state == 1 { - // // we are currently waiting for program state == 2 - // let elapsed_since_request = checking_for_1_since.unwrap().elapsed(); - // if elapsed_since_request > std::time::Duration::from_millis(100) { - // let result_msg = ExecuteScript::Result { ok: false }; - // { - // let mut ds = driver_state.lock().unwrap(); - // if let Some(mut goal) = ds.goal.take() { - // println!("program state never changed"); - // goal.abort(result_msg).expect("could not abort goal"); - // } - // } - // } - // } + let checking_for_2 = program_state == 1 && !checking_for_1; + if program_running && checking_for_2 && checking_for_2_since.is_none() { + // flank check for when we requested program (waiting for program state 2) + checking_for_2_since = Some(std::time::Instant::now()); + } else if program_running && checking_for_2 && checking_for_2_since.is_some() { + // we are currently waiting for program state == 2 + let elapsed_since_request = checking_for_2_since.unwrap().elapsed(); + if elapsed_since_request > std::time::Duration::from_millis(500) { + // if there's been more than 500 millis without the program + // entering the running state, abort this request. + checking_for_2_since = None; + { + let mut ds = driver_state.lock().unwrap(); + if let Some(goal_sender) = ds.goal_sender.take() { + println!("program state never changed to running"); + goal_sender.send(false).expect("goal receiver dropped"); + } + } + } + } // when we have a goal, first wait until program_state reaches 2 if program_running && program_state == 2 && !checking_for_1 { println!("program started, waiting for finish"); checking_for_1 = true; + checking_for_2_since = None; } // when the program state has been 2 and goes back to @@ -369,12 +372,11 @@ async fn realtime_reader( checking_for_1 = false; // we are finished. succeed and remove the action goal handle. - let result_msg = ExecuteScript::Result { ok: true }; { let mut ds = driver_state.lock().unwrap(); - if let Some(mut goal) = ds.goal.take() { + if let Some(goal_sender) = ds.goal_sender.take() { println!("goal succeeded"); - goal.succeed(result_msg).expect("could not set result"); + goal_sender.send(true).expect("goal receiver dropped"); } else { println!("we fininshed but probably canceled the goal before..."); } @@ -386,12 +388,13 @@ async fn realtime_reader( // there is an active goal, abort it. we are // finished. succeed and remove the action goal // handle. - let result_msg = ExecuteScript::Result { ok: false }; { let mut ds = driver_state.lock().unwrap(); - if let Some(mut goal) = ds.goal.take() { + checking_for_1 = false; + checking_for_2_since = None; + if let Some(goal_sender) = ds.goal_sender.take() { println!("aborting due to protective stop"); - goal.abort(result_msg).expect("could not abort goal"); + goal_sender.send(false).expect("goal receiver dropped"); } } } @@ -574,7 +577,6 @@ async fn run() -> Result<(), Box> { .create_publisher::("measured", r2r::qos::QosProfile::default())?; - let (tx, rx) = mpsc::channel::(10); let (tx_dashboard, rx_dashboard) = mpsc::channel::<(DashboardCommand, oneshot::Sender)>(10); @@ -585,17 +587,6 @@ async fn run() -> Result<(), Box> { let shared_state = Arc::new(Mutex::new(DriverState::new())); - let shared_state_cb = shared_state.clone(); - let handle_goal_cb = move |g: r2r::ActionServerGoal| { - // since we already know that we do not accept goal unless we - // don't already have one, simply set this goal handle as the - // currently active goal... - (*shared_state_cb.lock().unwrap()).goal.replace(g.clone()); - // ... and pass the ur script on to the driver - tx.try_send(g.goal.script.clone()) - .expect("could not send new script"); - }; - let server_requests = node .create_action_server::("ur_script")?; @@ -605,13 +596,6 @@ async fn run() -> Result<(), Box> { tx_dashboard, server_requests); - // let _server = node.create_action_server::( - // "ur_script", - // Box::new(move |uuid, goal| accept_goal_cb(shared_state_cb.clone(), uuid, goal)), - // Box::new(accept_cancel_cb), - // Box::new(handle_goal_cb), - // )?; - let task_shared_state = shared_state.clone(); let realtime_reader = realtime_reader(