Skip to content

Commit

Permalink
Cleanup / testing. About to add state management
Browse files Browse the repository at this point in the history
  • Loading branch information
tcamise-gpsw committed Sep 6, 2024
1 parent 50a342c commit 4cf26ea
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,13 @@ def __init__(self, communicator: GoProBle):
)
"""File type of photo output"""

self.video_duration: BleSetting[Params.VideoDuration] = BleSetting[Params.VideoDuration](
communicator,
SettingId.VIDEO_DURATION,
Params.VideoDuration
)
"""If set, a video will automatically be stopped after recording for this long."""

super().__init__(communicator)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ def __init__(self, communicator: GoProHttp):

self.hindsight: HttpSetting[Params.Hindsight] = HttpSetting[Params.Hindsight](
communicator,

SettingId.HINDSIGHT,
)
"""Hindsight time / disable"""
Expand All @@ -733,4 +734,10 @@ def __init__(self, communicator: GoProHttp):
)
"""File type of photo output"""

self.video_duration: HttpSetting[Params.VideoDuration] = HttpSetting[Params.VideoDuration](
communicator,
SettingId.VIDEO_DURATION
)
"""If set, a video will automatically be stopped after recording for this long."""

super().__init__(communicator)
15 changes: 15 additions & 0 deletions demos/python/sdk_wireless_camera_control/open_gopro/api/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ class PhotoDuration(GoProIntEnum):
MIN_15 = 5
MIN_30 = 6
HOUR_1 = 7

HOUR_2 = 8
HOUR_3 = 9

Expand All @@ -434,3 +435,17 @@ class PresetGroup(GoProIntEnum):
VIDEO = 1000
PHOTO = 1001
TIMELAPSE = 1002


class VideoDuration(GoProIntEnum):
DUR_15_SECONDS = 1
DUR_30_SECONDS = 2
DUR_1_MINUTE = 3
DUR_5_MINUTES = 4
DUR_15_MINUTES = 5
DUR_30_MINUTES = 6
DUR_1_HOUR = 7
DUR_2_HOURS = 8
DUR_3_HOURS = 9
DUR_5_SECONDS = 10
DUR_NO_LIMIT = 100
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class SettingId(GoProIntEnum):
INTERNAL_153 = 153
INTERNAL_154 = 154
INTERNAL_155 = 155
INTERNAL_156 = 156
VIDEO_DURATION = 156
INTERNAL_157 = 157
INTERNAL_158 = 158
INTERNAL_159 = 159
Expand Down
154 changes: 15 additions & 139 deletions demos/python/sdk_wireless_camera_control/open_gopro/demos/log_battery.py
Original file line number Diff line number Diff line change
@@ -1,157 +1,33 @@
# log_battery.py/Open GoPro, Version 2.0 (C) Copyright 2021 GoPro, Inc. (http://gopro.com/OpenGoPro).
# This copyright was auto-generated on Wed, Sep 1, 2021 5:05:45 PM

"""Example to continuously read the battery (with no Wifi connection)"""

import argparse
import asyncio
import csv
import logging
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional

from rich.console import Console

from open_gopro import WirelessGoPro, types
from open_gopro.constants import StatusId
from open_gopro.logger import set_stream_logging_level, setup_logging
from open_gopro.util import add_cli_args_and_parse, ainput

console = Console()

last_percentage = 0
last_bars = 0


@dataclass
class Sample:
"""Simple class to store battery samples"""

index: int
percentage: int
bars: int

def __post_init__(self) -> None:
self.time = datetime.now()

def __str__(self) -> str:
return f"Index {self.index} @ time {self.time.strftime('%H:%M:%S')} --> bars: {self.bars}, percentage: {self.percentage}"


SAMPLE_INDEX = 0
SAMPLES: list[Sample] = []
from open_gopro import WirelessGoPro
from open_gopro.logger import setup_logging


def dump_results_as_csv(location: Path) -> None:
"""Write all of the samples to a csv file
async def main() -> None:
logger = setup_logging(__name__, Path("demo.log"))

Args:
location (Path): File to write to
"""
console.print(f"Dumping results as CSV to {location}")
with open(location, mode="w") as f:
w = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
w.writerow(["index", "time", "percentage", "bars"])
initial_time = SAMPLES[0].time
for s in SAMPLES:
w.writerow([s.index, (s.time - initial_time).seconds, s.percentage, s.bars])
gopro: WirelessGoPro | None = None
async with WirelessGoPro(enable_wifi=False) as gopro:
logger.critical("Set AP Mode on")
await gopro.ble_command.enable_wifi_ap(enable=True)

logger.critical("Wait 10 seconds")
await asyncio.sleep(10)

async def process_battery_notifications(update: types.UpdateType, value: int) -> None:
"""Handle asynchronous battery update notifications
logger.critical("Initiate AP scan")
await gopro.ble_command.scan_wifi_networks()

Args:
update (types.UpdateType): type of update
value (int): value of update
"""
logger.critical("Wait 5 minutes before disconnecting")
await asyncio.sleep(300)

global last_percentage
global last_bars

if update == StatusId.INT_BATT_PER:
last_percentage = value
elif update == StatusId.BATT_LEVEL:
last_bars = value

# Append and print sample
global SAMPLE_INDEX
SAMPLES.append(Sample(index=SAMPLE_INDEX, percentage=last_percentage, bars=last_bars))
console.print(str(SAMPLES[-1]))
SAMPLE_INDEX += 1


async def main(args: argparse.Namespace) -> None:
logger = setup_logging(__name__, args.log)

gopro: Optional[WirelessGoPro] = None
try:
async with WirelessGoPro(args.identifier, enable_wifi=False) as gopro:
set_stream_logging_level(logging.ERROR)

async def log_battery() -> None:
global SAMPLE_INDEX
if args.poll:
with console.status("[bold green]Polling the battery until it dies..."):
while True:
SAMPLES.append(
Sample(
index=SAMPLE_INDEX,
percentage=(await gopro.ble_status.int_batt_per.get_value()).data,
bars=(await gopro.ble_status.batt_level.get_value()).data,
)
)
console.print(str(SAMPLES[-1]))
SAMPLE_INDEX += 1
await asyncio.sleep(args.poll)
else: # Not polling. Set up notifications
global last_bars
global last_percentage

console.print("Configuring battery notifications...")
# Enable notifications of the relevant battery statuses. Also store initial values.
last_bars = (
await gopro.ble_status.batt_level.register_value_update(process_battery_notifications)
).data
last_percentage = (
await gopro.ble_status.int_batt_per.register_value_update(process_battery_notifications)
).data
# Append initial sample
SAMPLES.append(Sample(index=SAMPLE_INDEX, percentage=last_percentage, bars=last_bars))
SAMPLE_INDEX += 1
console.print(str(SAMPLES[-1]))
console.print("[bold green]Receiving battery notifications until it dies...")

asyncio.create_task(log_battery())
await ainput("[purple]Press enter to exit.", console.print)
console.print("Exiting...")

except KeyboardInterrupt:
logger.warning("Received keyboard interrupt. Shutting down...")
if SAMPLES:
csv_location = Path(args.log.parent) / "battery_results.csv"
dump_results_as_csv(csv_location)
if gopro:
await gopro.close()


def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Connect to the GoPro via BLE only and continuously read the battery (either by polling or notifications)."
)
parser.add_argument(
"-p",
"--poll",
type=int,
help="Set to poll the battery at a given interval. If not set, battery level will be notified instead. Defaults to notifications.",
default=None,
)
return add_cli_args_and_parse(parser, wifi=False)


def entrypoint() -> None:
asyncio.run(main(parse_arguments()))
asyncio.run(main())


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ async def main(args: argparse.Namespace) -> None:
media_set_before = set((await gopro.http_command.get_media_list()).data.files)
# Take a video
console.print("Capturing a video...")
# assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.ENABLE)).ok
# await asyncio.sleep(args.record_time)
# assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.DISABLE)).ok
assert (await gopro.http_setting.video_duration.set(Params.VideoDuration.DUR_15_SECONDS))
assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.ENABLE)).ok
await asyncio.sleep(args.record_time)
assert (await gopro.http_command.set_shutter(shutter=Params.Toggle.DISABLE)).ok

# Get the media list after
media_set_after = set((await gopro.http_command.get_media_list()).data.files)
Expand All @@ -57,9 +59,11 @@ async def main(args: argparse.Namespace) -> None:
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Connect to a GoPro camera, take a video, then download it.")
parser.add_argument(
"record_time",
"-r",
"--record_time",
type=float,
help="How long to record for",
default=2.0
)
parser.add_argument(
"-o",
Expand Down

0 comments on commit 4cf26ea

Please sign in to comment.