-
Notifications
You must be signed in to change notification settings - Fork 2
/
jtfp_mod.py
executable file
·351 lines (301 loc) · 12.7 KB
/
jtfp_mod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
#!/usr/bin/env python
# Copyright (c) 2013-2018, Rethink Robotics Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SDK Joint Trajectory Example: file playback
"""
import argparse
import operator
import sys
import threading
from bisect import bisect
from copy import copy
from os import path
import rospy
import actionlib
from control_msgs.msg import (
FollowJointTrajectoryAction,
FollowJointTrajectoryGoal,
)
from trajectory_msgs.msg import (
JointTrajectoryPoint,
)
import intera_interface
from intera_interface import CHECK_VERSION
import argparse
class Trajectory(object):
def __init__(self, limb="right"):
#create our action server clients
self._limb_client = actionlib.SimpleActionClient(
'robot/limb/right/follow_joint_trajectory',
FollowJointTrajectoryAction,
)
#verify joint trajectory action servers are available
is_server_up = self._limb_client.wait_for_server(rospy.Duration(10.0))
if not is_server_up:
msg = ("Action server not available."
" Verify action server availability.")
rospy.logerr(msg)
rospy.signal_shutdown(msg)
sys.exit(1)
#create our goal request
self.goal = FollowJointTrajectoryGoal()
#limb interface - current angles needed for start move
self.arm = intera_interface.Limb(limb)
self.limb = limb
self.gripper_name = '_'.join([limb, 'gripper'])
#gripper interface - for gripper command playback
#TODO: Gripper not detected excepted
try:
self.gripper = intera_interface.Gripper(self.gripper_name)
except:
self.gripper = None
rospy.loginfo("Did not detect a connected electric gripper.")
#flag to signify the arm trajectories have begun executing
self._arm_trajectory_started = False
#reentrant lock to prevent same-thread lockout
self._lock = threading.RLock()
# Verify Grippers Have No Errors and are Calibrated
if self.gripper:
if self.gripper.has_error():
self.gripper.reboot()
if not self.gripper.is_calibrated():
self.gripper.calibrate()
#gripper goal trajectories
self.grip = FollowJointTrajectoryGoal()
#gripper control rate
self._gripper_rate = 20.0 # Hz
# Timing offset to prevent gripper playback before trajectory has started
self._slow_move_offset = 0.0
self._trajectory_start_offset = rospy.Duration(0.0)
self._trajectory_actual_offset = rospy.Duration(0.0)
#param namespace
self._param_ns = '/rsdk_joint_trajectory_action_server/'
def _execute_gripper_commands(self):
start_time = rospy.get_time() - self._trajectory_actual_offset.to_sec()
grip_cmd = self.grip.trajectory.points
pnt_times = [pnt.time_from_start.to_sec() for pnt in grip_cmd]
end_time = pnt_times[-1]
rate = rospy.Rate(self._gripper_rate)
now_from_start = rospy.get_time() - start_time
while(now_from_start < end_time + (1.0 / self._gripper_rate) and
not rospy.is_shutdown()):
idx = bisect(pnt_times, now_from_start) - 1
if self.gripper:
self.gripper.set_position(grip_cmd[idx].positions[0])
rate.sleep()
now_from_start = rospy.get_time() - start_time
def _clean_line(self, line, joint_names):
"""
Cleans a single line of recorded joint positions
@param line: the line described in a list to process
@param joint_names: joint name keys
@return command: returns dictionary {joint: value} of valid commands
@return line: returns list of current line values stripped of commas
"""
def try_float(x):
try:
return float(x)
except ValueError:
return None
#convert the line of strings to a float or None
line = [try_float(x) for x in line.rstrip().split(',')]
#zip the values with the joint names
combined = zip(joint_names[1:], line[1:])
#take out any tuples that have a none value
cleaned = [x for x in combined if x[1] is not None]
#convert it to a dictionary with only valid commands
command = dict(cleaned)
return (command, line,)
def _add_point(self, positions, side, time):
"""
Appends trajectory with new point
@param positions: joint positions
@param side: limb to command point
@param time: time from start for point in seconds
"""
#creates a point in trajectory with time_from_start and positions
point = JointTrajectoryPoint()
point.positions = copy(positions)
point.time_from_start = rospy.Duration(time)
if side == self.limb:
self.goal.trajectory.points.append(point)
elif self.gripper and side == self.gripper_name:
self.grip.trajectory.points.append(point)
def parse_file(self, filename):
"""
Parses input file into FollowJointTrajectoryGoal format
@param filename: input filename
"""
#open recorded file
with open(filename, 'r') as f:
lines = f.readlines()
#read joint names specified in file
joint_names = lines[0].rstrip().split(',')
#parse joint names for right limb
for name in joint_names:
if self.limb == name[:-3]:
self.goal.trajectory.joint_names.append(name)
def find_start_offset(pos):
#create empty lists
cur = []
cmd = []
dflt_vel = []
vel_param = self._param_ns + "%s_default_velocity"
#for all joints find our current and first commanded position
#reading default velocities from the parameter server if specified
for name in joint_names:
if self.limb == name[:-3]:
cmd.append(pos[name])
cur.append(self.arm.joint_angle(name))
prm = rospy.get_param(vel_param % name, 0.25)
dflt_vel.append(prm)
diffs = map(operator.sub, cmd, cur)
diffs = map(operator.abs, diffs)
#determine the largest time offset necessary across all joints
offset = max(map(operator.div, diffs, dflt_vel))
return offset
for idx, values in enumerate(lines[1:]):
#clean each line of file
cmd, values = self._clean_line(values, joint_names)
#find allowable time offset for move to start position
if idx == 0:
# Set the initial position to be the current pose.
# This ensures we move slowly to the starting point of the
# trajectory from the current pose - The user may have moved
# arm since recording
cur_cmd = [self.arm.joint_angle(jnt) for jnt in self.goal.trajectory.joint_names]
self._add_point(cur_cmd, self.limb, 0.0)
start_offset = find_start_offset(cmd)
# Gripper playback won't start until the starting movement's
# duration has passed, and the actual trajectory playback begins
self._slow_move_offset = start_offset
self._trajectory_start_offset = rospy.Duration(start_offset + values[0])
#add a point for this set of commands with recorded time
cur_cmd = [cmd[jnt] for jnt in self.goal.trajectory.joint_names]
self._add_point(cur_cmd, self.limb, values[0] + start_offset)
if self.gripper:
cur_cmd = [cmd[self.gripper_name]]
self._add_point(cur_cmd, self.gripper_name, values[0] + start_offset)
def _feedback(self, data):
# Test to see if the actual playback time has exceeded
# the move-to-start-pose timing offset
if (not self._get_trajectory_flag() and
data.actual.time_from_start >= self._trajectory_start_offset):
self._set_trajectory_flag(value=True)
self._trajectory_actual_offset = data.actual.time_from_start
def _set_trajectory_flag(self, value=False):
with self._lock:
# Assign a value to the flag
self._arm_trajectory_started = value
def _get_trajectory_flag(self):
temp_flag = False
with self._lock:
# Copy to external variable
temp_flag = self._arm_trajectory_started
return temp_flag
def start(self):
"""
Sends FollowJointTrajectoryAction request
"""
self._limb_client.send_goal(self.goal, feedback_cb=self._feedback)
# Syncronize playback by waiting for the trajectories to start
while not rospy.is_shutdown() and not self._get_trajectory_flag():
rospy.sleep(0.05)
if self.gripper:
self._execute_gripper_commands()
def stop(self):
"""
Preempts trajectory execution by sending cancel goals
"""
if (self._limb_client.gh is not None and
self._limb_client.get_state() == actionlib.GoalStatus.ACTIVE):
self._limb_client.cancel_goal()
#delay to allow for terminating handshake
rospy.sleep(0.1)
def wait(self):
"""
Waits for and verifies trajectory execution result
"""
#create a timeout for our trajectory execution
#total time trajectory expected for trajectory execution plus a buffer
last_time = self.goal.trajectory.points[-1].time_from_start.to_sec()
time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
timeout = rospy.Duration(self._slow_move_offset +
last_time +
time_buffer)
finish = self._limb_client.wait_for_result(timeout)
result = (self._limb_client.get_result().error_code == 0)
#verify result
if all([finish, result]):
return True
else:
msg = ("Trajectory action failed or did not finish before "
"timeout/interrupt.")
rospy.logwarn(msg)
return False
def main():
"""SDK Joint Trajectory Example: File Playback
Plays back joint positions honoring timestamps recorded
via the joint_recorder example.
Run the joint_recorder.py example first to create a recording
file for use with this example. Then make sure to start the
joint_trajectory_action_server before running this example.
This example will use the joint trajectory action server
with velocity control to follow the positions and times of
the recorded motion, accurately replicating movement speed
necessary to hit each trajectory point on time.
"""
epilog = """
Related examples:
joint_recorder.py; joint_position_file_playback.py.
"""
parser = argparse.ArgumentParser(description="Playback joint trajectory")
parser.add_argument('-if', '--input-file', type=str, default='traj_final.txt',
help="Input trajectory file")
arg = parser.parse_args()
print(arg.input_file)
rp = intera_interface.RobotParams()
valid_limbs = rp.get_limb_names()
if not valid_limbs:
rp.log_message(("Cannot detect any limb parameters on this robot. "
"Exiting."), "ERROR")
return
print("Initializing node... ")
rospy.init_node("sdk_joint_trajectory_file_playback")
print("Getting robot state... ")
rs = intera_interface.RobotEnable(CHECK_VERSION)
print("Enabling robot... ")
rs.enable()
print("Running. Ctrl-c to quit")
traj = Trajectory(valid_limbs[0])
traj.parse_file(path.expanduser(arg.input_file))
#for safe interrupt handling
rospy.on_shutdown(traj.stop)
result = True
loop_cnt = 1
loopstr = str(1)
numloops = 1
if numloops == 0:
numloops = float('inf')
loopstr = "forever"
while (result == True and loop_cnt <= numloops
and not rospy.is_shutdown()):
print("Playback loop %d of %s" % (loop_cnt, loopstr,))
traj.start()
result = traj.wait()
loop_cnt = loop_cnt + 1
print("Exiting - File Playback Complete")
if __name__ == "__main__":
main()