Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lighthouse geometry estimation example #459

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions examples/lighthouse/multi_bs_geometry_estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
REFERENCE_DIST = 1.0


def record_angles_average(scf: SyncCrazyflie) -> LhCfPoseSample:
def record_angles_average(scf: SyncCrazyflie, timeout: float = 5.0) -> LhCfPoseSample:
"""Record angles and average over the samples to reduce noise"""
recorded_angles = None

Expand All @@ -85,7 +85,10 @@ def ready_cb(averages):

reader = LighthouseSweepAngleAverageReader(scf.cf, ready_cb)
reader.start_angle_collection()
is_ready.wait()

if not is_ready.wait(timeout):
print('Recording timed out.')
return None

angles_calibrated = {}
for bs_id, data in recorded_angles.items():
Expand Down Expand Up @@ -303,6 +306,49 @@ def estimate_from_file(file_name: str):
estimate_geometry(origin, x_axis, xy_plane, samples)


def get_recording(scf: SyncCrazyflie):
data = None
while True: # Infinite loop, will break on valid measurement
input('Press return when ready. ')
print(' Recording...')
measurement = record_angles_average(scf)
if measurement is not None:
data = measurement
break # Exit the loop if a valid measurement is obtained
else:
time.sleep(1)
print('Invalid measurement, please try again.')
return data


def get_multiple_recordings(scf: SyncCrazyflie):
data = []
first_attempt = True

while True:
if first_attempt:
user_input = input('Press return to record a measurement: ').lower()
first_attempt = False
else:
user_input = input('Press return to record another measurement, or "q" to continue: ').lower()

if user_input == 'q' and data:
break
elif user_input == 'q' and not data:
print('You must record at least one measurement.')
continue

print(' Recording...')
measurement = record_angles_average(scf)
if measurement is not None:
data.append(measurement)
else:
time.sleep(1)
print('Invalid measurement, please try again.')

return data


def connect_and_estimate(uri: str, file_name: str | None = None):
"""Connect to a Crazyflie, collect data and estimate the geometry of the system"""
print(f'Step 1. Connecting to the Crazyflie on uri {uri}...')
Expand All @@ -313,44 +359,15 @@ def connect_and_estimate(uri: str, file_name: str | None = None):

print('Step 2. Put the Crazyflie where you want the origin of your coordinate system.')

origin = None
do_repeat = True
while do_repeat:
input('Press return when ready. ')
print(' Recording...')
measurement = record_angles_average(scf)
do_repeat = False
if measurement is not None:
origin = measurement
else:
do_repeat = True
origin = get_recording(scf)

print(f'Step 3. Put the Crazyflie on the positive X-axis, exactly {REFERENCE_DIST} meters from the origin. ' +
'This position defines the direction of the X-axis, but it is also used for scaling of the system.')
x_axis = []
do_repeat = True
while do_repeat:
input('Press return when ready. ')
print(' Recording...')
measurement = record_angles_average(scf)
do_repeat = False
if measurement is not None:
x_axis = [measurement]
else:
do_repeat = True
x_axis = [get_recording(scf)]

print('Step 4. Put the Crazyflie somehere in the XY-plane, but not on the X-axis.')
print('Multiple samples can be recorded if you want to, type "r" before you hit enter to repeat the step.')
xy_plane = []
do_repeat = True
while do_repeat:
do_repeat = 'r' == input('Press return when ready. ').lower()
print(' Recording...')
measurement = record_angles_average(scf)
if measurement is not None:
xy_plane.append(measurement)
else:
do_repeat = True
print('Multiple samples can be recorded if you want to.')
xy_plane = get_multiple_recordings(scf)

print()
print('Step 5. We will now record data from the space you plan to fly in and optimize the base station ' +
Expand Down
Loading