Skip to content

Commit

Permalink
Python sdk maintenance (#446)
Browse files Browse the repository at this point in the history
* Update dependencies
* Fix livestream demo
* Test on Mac
  • Loading branch information
tcamise-gpsw authored Dec 7, 2023
1 parent fceba44 commit 222fe5c
Show file tree
Hide file tree
Showing 8 changed files with 2,310 additions and 2,152 deletions.
5 changes: 5 additions & 0 deletions demos/python/sdk_wireless_camera_control/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0

Unreleased
----------
* Fix livestream demo.

0.15.0 (December-6-2021)
------------------------
* Add alpha support for COHN (Camera-on-the-Home-Network)
* A real implementation is going to require a major rearchitecture to dynamically add connection types.
* Remove TKinter GUI. Will be replaced with Textual TUI in the future
* Improve wifi SSID matching
* Fix unhashable pydantic base models

0.14.1 (September-21-2022)
--------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ To additionally install the extra dependencies to run the GUI demos:
$ pip install open-gopro[gui]
External Dependencies
^^^^^^^^^^^^^^^^^^^^^

In order to use any of the Webcam API's, ensure first that your system is setup to
`Use the GoPro as a Webcam <https://community.gopro.com/s/article/GoPro-Webcam?language=de>`_


From sources
------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,84 @@

import argparse
import asyncio
from typing import Any

from rich.console import Console

from open_gopro import Params, WirelessGoPro
from open_gopro.constants import WebcamError, WebcamStatus
from open_gopro import Params, WirelessGoPro, constants, proto
from open_gopro.logger import setup_logging
from open_gopro.util import add_cli_args_and_parse, ainput

console = Console()
console = Console() # rich consoler printer


async def wait_for_webcam_status(gopro: WirelessGoPro, status: WebcamStatus, timeout: int = 10) -> bool:
"""Wait for a specified webcam status for a given timeout
async def main(args: argparse.Namespace) -> None:
setup_logging(__name__, args.log)

Args:
gopro (WirelessGoPro): gopro to communicate with
status (WebcamStatus): status to wait for
timeout (int): timeout in seconds. Defaults to 10.
async with WirelessGoPro(args.identifier, enable_wifi=False) as gopro:
await gopro.ble_command.set_shutter(shutter=Params.Toggle.DISABLE)
await gopro.ble_command.register_livestream_status(
register=[proto.EnumRegisterLiveStreamStatus.REGISTER_LIVE_STREAM_STATUS_STATUS]
)

Returns:
bool: True if status was received before timing out, False if timed out or received error
"""
console.print(f"[yellow]Connecting to {args.ssid}...")
await gopro.connect_to_access_point(args.ssid, args.password)

async def poll_for_status() -> bool:
# Poll until status is received
while True:
response = (await gopro.http_command.webcam_status()).data
if response.error != WebcamError.SUCCESS:
# Something bad happened
return False
if response.status == status:
# We found the desired status
return True
# Start livestream
livestream_is_ready = asyncio.Event()

# Wait for either status or timeout
try:
return await asyncio.wait_for(poll_for_status(), timeout)
except TimeoutError:
return False
async def wait_for_livestream_start(_: Any, update: proto.NotifyLiveStreamStatus) -> None:
if update.live_stream_status == proto.EnumLiveStreamStatus.LIVE_STREAM_STATE_READY:
livestream_is_ready.set()

console.print("[yellow]Configuring livestream...")
gopro.register_update(wait_for_livestream_start, constants.ActionId.LIVESTREAM_STATUS_NOTIF)
await gopro.ble_command.set_livestream_mode(
url=args.url,
window_size=args.resolution,
minimum_bitrate=args.min_bit,
maximum_bitrate=args.max_bit,
starting_bitrate=args.start_bit,
lens=args.fov,
)

async def main(args: argparse.Namespace) -> None:
setup_logging(__name__, args.log)
# Wait to receive livestream started status
console.print("[yellow]Waiting for livestream to be ready...\n")
await livestream_is_ready.wait()

async with WirelessGoPro(args.identifier) as gopro:
await gopro.ble_command.set_shutter(shutter=Params.Toggle.DISABLE)
if (await gopro.http_command.webcam_status()).data.status != WebcamStatus.OFF:
console.print("[blue]Webcam is currently on. Turning if off.")
assert (await gopro.http_command.webcam_stop()).ok
await wait_for_webcam_status(gopro, WebcamStatus.OFF)
# TODO Is this still needed
await asyncio.sleep(2)

console.print("[yellow]Starting livestream")
assert (await gopro.ble_command.set_shutter(shutter=Params.Toggle.ENABLE)).ok

console.print("[blue]Starting webcam...")
await gopro.http_command.webcam_start()
await wait_for_webcam_status(gopro, WebcamStatus.HIGH_POWER_PREVIEW)
console.print("Livestream is now streaming and should be available for viewing.")
await ainput("Press enter to stop livestreaming...\n")

await ainput("Press enter to exit.", console.print)
await gopro.ble_command.set_shutter(shutter=Params.Toggle.DISABLE)
await gopro.ble_command.release_network()


def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Connect to the GoPro via BLE and Wifi, start and view wireless webcam."
description="Connect to the GoPro via BLE only, configure then start a Livestream, then display it with CV2."
)
parser.add_argument("ssid", type=str, help="WiFi SSID to connect to.")
parser.add_argument("password", type=str, help="Password of WiFi SSID.")
parser.add_argument("url", type=str, help="RTMP server URL to stream to.")
parser.add_argument("--min_bit", type=int, help="Minimum bitrate.", default=1000)
parser.add_argument("--max_bit", type=int, help="Maximum bitrate.", default=1000)
parser.add_argument("--start_bit", type=int, help="Starting bitrate.", default=1000)
parser.add_argument(
"--resolution",
help="Resolution.",
choices=list(proto.EnumWindowSize.values()),
default=proto.EnumWindowSize.WINDOW_SIZE_720,
)
parser.add_argument(
"--fov", help="Field of View.", choices=list(proto.EnumLens.values()), default=proto.EnumLens.LENS_WIDE
)
return add_cli_args_and_parse(parser)
return add_cli_args_and_parse(parser, wifi=False)


def entrypoint() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class CustomBaseModel(BaseModel):

def __hash__(self) -> int:
h = hash((type(self),))
for field in self.__fields__.keys():
if isinstance(field, (dict, list)):
h += json.loads(field)
# Base case
h += hash(field)
for v in self.__dict__.values():
if isinstance(v, (dict, list)):
h += hash(json.dumps(v))
else:
h += hash(v)
return h

def __str__(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path
from typing import Optional

from pydantic import Field
from pydantic import ConfigDict, Field

from open_gopro import constants
from open_gopro.models.bases import CustomBaseModel
Expand All @@ -20,6 +20,7 @@
class CameraInfo(CustomBaseModel):
"""General camera info"""

model_config = ConfigDict(protected_namespaces=())
model_number: int #: Camera model number
model_name: str #: Camera model name as string
firmware_version: str #: Complete firmware version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ def _detect_driver(self) -> WifiController:
if "VALID PASSWORD" not in cmd(f'echo "{self._password}" | sudo -S echo "VALID PASSWORD"'):
raise RuntimeError("Invalid password")

# try nmcli (Ubuntu 14.04)
if which("nmcli"):
# try nmcli (Ubuntu 14.04). Allow for use in Snap Package
if which("nmcli") or which("nmcli", path="/snap/bin/"):
version = cmd("nmcli --version").split()[-1]
return (
Nmcli0990Wireless(password=self._password)
Expand Down Expand Up @@ -691,6 +691,10 @@ def connect(self, ssid: str, password: str, timeout: float = 15) -> bool:
response = cmd(
r"/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport --scan"
)
# TODO Sometimes the response is blank?
if not response:
logger.warning("MacOS did not return a response to SSID scanning.")
continue
lines = response.splitlines()
ssid_end_index = lines[0].index("SSID") + 4 # Find where the SSID column ends

Expand Down
Loading

0 comments on commit 222fe5c

Please sign in to comment.