From f467c167cb89adbbf52f9b85d3c34c1398c68036 Mon Sep 17 00:00:00 2001 From: Tim Camise Date: Wed, 21 Aug 2024 12:59:27 -0700 Subject: [PATCH] Cleanup / testing. About to add state management --- .../open_gopro/api/ble_commands.py | 7 + .../open_gopro/api/http_commands.py | 7 + .../open_gopro/api/params.py | 15 ++ .../open_gopro/constants.py | 2 +- .../open_gopro/demos/log_battery.py | 154 ++---------------- .../open_gopro/demos/video.py | 10 +- 6 files changed, 52 insertions(+), 143 deletions(-) diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/api/ble_commands.py b/demos/python/sdk_wireless_camera_control/open_gopro/api/ble_commands.py index 71551e4d..6e58d176 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/api/ble_commands.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/api/ble_commands.py @@ -1081,6 +1081,13 @@ def __init__(self, communicator: GoProBle): ) """File type of photo output""" + self.video_duration: BleSetting[Params.VideoDuration] = BleSetting[Params.VideoDuration]( + communicator, + SettingId.VIDEO_DURATION, + Params.VideoDuration + ) + """If set, a video will automatically be stopped after recording for this long.""" + super().__init__(communicator) diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/api/http_commands.py b/demos/python/sdk_wireless_camera_control/open_gopro/api/http_commands.py index af577f84..e0305a95 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/api/http_commands.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/api/http_commands.py @@ -711,6 +711,7 @@ def __init__(self, communicator: GoProHttp): self.hindsight: HttpSetting[Params.Hindsight] = HttpSetting[Params.Hindsight]( communicator, + SettingId.HINDSIGHT, ) """Hindsight time / disable""" @@ -733,4 +734,10 @@ def __init__(self, communicator: GoProHttp): ) """File type of photo output""" + self.video_duration: HttpSetting[Params.VideoDuration] = HttpSetting[Params.VideoDuration]( + communicator, + SettingId.VIDEO_DURATION + ) + """If set, a video will automatically be stopped after recording for this long.""" + super().__init__(communicator) diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/api/params.py b/demos/python/sdk_wireless_camera_control/open_gopro/api/params.py index e2715b6f..2fcd2594 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/api/params.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/api/params.py @@ -419,6 +419,7 @@ class PhotoDuration(GoProIntEnum): MIN_15 = 5 MIN_30 = 6 HOUR_1 = 7 + HOUR_2 = 8 HOUR_3 = 9 @@ -434,3 +435,17 @@ class PresetGroup(GoProIntEnum): VIDEO = 1000 PHOTO = 1001 TIMELAPSE = 1002 + + +class VideoDuration(GoProIntEnum): + DUR_15_SECONDS = 1 + DUR_30_SECONDS = 2 + DUR_1_MINUTE = 3 + DUR_5_MINUTES = 4 + DUR_15_MINUTES = 5 + DUR_30_MINUTES = 6 + DUR_1_HOUR = 7 + DUR_2_HOURS = 8 + DUR_3_HOURS = 9 + DUR_5_SECONDS = 10 + DUR_NO_LIMIT = 100 diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/constants.py b/demos/python/sdk_wireless_camera_control/open_gopro/constants.py index 46b0f811..29353eb5 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/constants.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/constants.py @@ -225,7 +225,7 @@ class SettingId(GoProIntEnum): INTERNAL_153 = 153 INTERNAL_154 = 154 INTERNAL_155 = 155 - INTERNAL_156 = 156 + VIDEO_DURATION = 156 INTERNAL_157 = 157 INTERNAL_158 = 158 INTERNAL_159 = 159 diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/demos/log_battery.py b/demos/python/sdk_wireless_camera_control/open_gopro/demos/log_battery.py index 0239cc4e..5eef02aa 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/demos/log_battery.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/demos/log_battery.py @@ -1,157 +1,33 @@ -# log_battery.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro). -# This copyright was auto-generated on Wed, Sep 1, 2021 5:05:45 PM - -"""Example to continuously read the battery (with no Wifi connection)""" - -import argparse import asyncio -import csv -import logging -from dataclasses import dataclass -from datetime import datetime from pathlib import Path -from typing import Optional - -from rich.console import Console - -from open_gopro import WirelessGoPro, types -from open_gopro.constants import StatusId -from open_gopro.logger import set_stream_logging_level, setup_logging -from open_gopro.util import add_cli_args_and_parse, ainput - -console = Console() - -last_percentage = 0 -last_bars = 0 - - -@dataclass -class Sample: - """Simple class to store battery samples""" - - index: int - percentage: int - bars: int - - def __post_init__(self) -> None: - self.time = datetime.now() - - def __str__(self) -> str: - return f"Index {self.index} @ time {self.time.strftime('%H:%M:%S')} --> bars: {self.bars}, percentage: {self.percentage}" - -SAMPLE_INDEX = 0 -SAMPLES: list[Sample] = [] +from open_gopro import WirelessGoPro +from open_gopro.logger import setup_logging -def dump_results_as_csv(location: Path) -> None: - """Write all of the samples to a csv file +async def main() -> None: + logger = setup_logging(__name__, Path("demo.log")) - Args: - location (Path): File to write to - """ - console.print(f"Dumping results as CSV to {location}") - with open(location, mode="w") as f: - w = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) - w.writerow(["index", "time", "percentage", "bars"]) - initial_time = SAMPLES[0].time - for s in SAMPLES: - w.writerow([s.index, (s.time - initial_time).seconds, s.percentage, s.bars]) + gopro: WirelessGoPro | None = None + async with WirelessGoPro(enable_wifi=False) as gopro: + logger.critical("Set AP Mode on") + await gopro.ble_command.enable_wifi_ap(enable=True) + logger.critical("Wait 10 seconds") + await asyncio.sleep(10) -async def process_battery_notifications(update: types.UpdateType, value: int) -> None: - """Handle asynchronous battery update notifications + logger.critical("Initiate AP scan") + await gopro.ble_command.scan_wifi_networks() - Args: - update (types.UpdateType): type of update - value (int): value of update - """ + logger.critical("Wait 5 minutes before disconnecting") + await asyncio.sleep(300) - global last_percentage - global last_bars - - if update == StatusId.INT_BATT_PER: - last_percentage = value - elif update == StatusId.BATT_LEVEL: - last_bars = value - - # Append and print sample - global SAMPLE_INDEX - SAMPLES.append(Sample(index=SAMPLE_INDEX, percentage=last_percentage, bars=last_bars)) - console.print(str(SAMPLES[-1])) - SAMPLE_INDEX += 1 - - -async def main(args: argparse.Namespace) -> None: - logger = setup_logging(__name__, args.log) - - gopro: Optional[WirelessGoPro] = None - try: - async with WirelessGoPro(args.identifier, enable_wifi=False) as gopro: - set_stream_logging_level(logging.ERROR) - - async def log_battery() -> None: - global SAMPLE_INDEX - if args.poll: - with console.status("[bold green]Polling the battery until it dies..."): - while True: - SAMPLES.append( - Sample( - index=SAMPLE_INDEX, - percentage=(await gopro.ble_status.int_batt_per.get_value()).data, - bars=(await gopro.ble_status.batt_level.get_value()).data, - ) - ) - console.print(str(SAMPLES[-1])) - SAMPLE_INDEX += 1 - await asyncio.sleep(args.poll) - else: # Not polling. Set up notifications - global last_bars - global last_percentage - - console.print("Configuring battery notifications...") - # Enable notifications of the relevant battery statuses. Also store initial values. - last_bars = ( - await gopro.ble_status.batt_level.register_value_update(process_battery_notifications) - ).data - last_percentage = ( - await gopro.ble_status.int_batt_per.register_value_update(process_battery_notifications) - ).data - # Append initial sample - SAMPLES.append(Sample(index=SAMPLE_INDEX, percentage=last_percentage, bars=last_bars)) - SAMPLE_INDEX += 1 - console.print(str(SAMPLES[-1])) - console.print("[bold green]Receiving battery notifications until it dies...") - - asyncio.create_task(log_battery()) - await ainput("[purple]Press enter to exit.", console.print) - console.print("Exiting...") - - except KeyboardInterrupt: - logger.warning("Received keyboard interrupt. Shutting down...") - if SAMPLES: - csv_location = Path(args.log.parent) / "battery_results.csv" - dump_results_as_csv(csv_location) if gopro: await gopro.close() -def parse_arguments() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Connect to the GoPro via BLE only and continuously read the battery (either by polling or notifications)." - ) - parser.add_argument( - "-p", - "--poll", - type=int, - help="Set to poll the battery at a given interval. If not set, battery level will be notified instead. Defaults to notifications.", - default=None, - ) - return add_cli_args_and_parse(parser, wifi=False) - - def entrypoint() -> None: - asyncio.run(main(parse_arguments())) + asyncio.run(main()) if __name__ == "__main__": diff --git a/demos/python/sdk_wireless_camera_control/open_gopro/demos/video.py b/demos/python/sdk_wireless_camera_control/open_gopro/demos/video.py index ef80585b..b98a6432 100644 --- a/demos/python/sdk_wireless_camera_control/open_gopro/demos/video.py +++ b/demos/python/sdk_wireless_camera_control/open_gopro/demos/video.py @@ -33,9 +33,11 @@ async def main(args: argparse.Namespace) -> None: media_set_before = set((await gopro.http_command.get_media_list()).data.files) # Take a video console.print("Capturing a video...") + # assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.ENABLE)).ok + # await asyncio.sleep(args.record_time) + # assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.DISABLE)).ok + assert (await gopro.http_setting.video_duration.set(Params.VideoDuration.DUR_15_SECONDS)) assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.ENABLE)).ok - await asyncio.sleep(args.record_time) - assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.DISABLE)).ok # Get the media list after media_set_after = set((await gopro.http_command.get_media_list()).data.files) @@ -57,9 +59,11 @@ async def main(args: argparse.Namespace) -> None: def parse_arguments() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Connect to a GoPro camera, take a video, then download it.") parser.add_argument( - "record_time", + "-r", + "--record_time", type=float, help="How long to record for", + default=2.0 ) parser.add_argument( "-o",