Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python sdk maintenance #446

Merged
merged 6 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions demos/python/sdk_wireless_camera_control/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0

Unreleased
----------
* Fix livestream demo.

0.15.0 (December-6-2021)
------------------------
* Add alpha support for COHN (Camera-on-the-Home-Network)
* A real implementation is going to require a major rearchitecture to dynamically add connection types.
* Remove TKinter GUI. Will be replaced with Textual TUI in the future
* Improve wifi SSID matching
* Fix unhashable pydantic base models

0.14.1 (September-21-2022)
--------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ To additionally install the extra dependencies to run the GUI demos:
$ pip install open-gopro[gui]


External Dependencies
^^^^^^^^^^^^^^^^^^^^^

In order to use any of the Webcam API's, ensure first that your system is setup to
`Use the GoPro as a Webcam <https://community.gopro.com/s/article/GoPro-Webcam?language=de>`_


From sources
------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,84 @@

import argparse
import asyncio
from typing import Any

from rich.console import Console

from open_gopro import Params, WirelessGoPro
from open_gopro.constants import WebcamError, WebcamStatus
from open_gopro import Params, WirelessGoPro, constants, proto
from open_gopro.logger import setup_logging
from open_gopro.util import add_cli_args_and_parse, ainput

console = Console()
console = Console() # rich consoler printer


async def wait_for_webcam_status(gopro: WirelessGoPro, status: WebcamStatus, timeout: int = 10) -> bool:
"""Wait for a specified webcam status for a given timeout
async def main(args: argparse.Namespace) -> None:
setup_logging(__name__, args.log)

Args:
gopro (WirelessGoPro): gopro to communicate with
status (WebcamStatus): status to wait for
timeout (int): timeout in seconds. Defaults to 10.
async with WirelessGoPro(args.identifier, enable_wifi=False) as gopro:
await gopro.ble_command.set_shutter(shutter=Params.Toggle.DISABLE)
await gopro.ble_command.register_livestream_status(
register=[proto.EnumRegisterLiveStreamStatus.REGISTER_LIVE_STREAM_STATUS_STATUS]
)

Returns:
bool: True if status was received before timing out, False if timed out or received error
"""
console.print(f"[yellow]Connecting to {args.ssid}...")
await gopro.connect_to_access_point(args.ssid, args.password)

async def poll_for_status() -> bool:
# Poll until status is received
while True:
response = (await gopro.http_command.webcam_status()).data
if response.error != WebcamError.SUCCESS:
# Something bad happened
return False
if response.status == status:
# We found the desired status
return True
# Start livestream
livestream_is_ready = asyncio.Event()

# Wait for either status or timeout
try:
return await asyncio.wait_for(poll_for_status(), timeout)
except TimeoutError:
return False
async def wait_for_livestream_start(_: Any, update: proto.NotifyLiveStreamStatus) -> None:
if update.live_stream_status == proto.EnumLiveStreamStatus.LIVE_STREAM_STATE_READY:
livestream_is_ready.set()

console.print("[yellow]Configuring livestream...")
gopro.register_update(wait_for_livestream_start, constants.ActionId.LIVESTREAM_STATUS_NOTIF)
await gopro.ble_command.set_livestream_mode(
url=args.url,
window_size=args.resolution,
minimum_bitrate=args.min_bit,
maximum_bitrate=args.max_bit,
starting_bitrate=args.start_bit,
lens=args.fov,
)

async def main(args: argparse.Namespace) -> None:
setup_logging(__name__, args.log)
# Wait to receive livestream started status
console.print("[yellow]Waiting for livestream to be ready...\n")
await livestream_is_ready.wait()

async with WirelessGoPro(args.identifier) as gopro:
await gopro.ble_command.set_shutter(shutter=Params.Toggle.DISABLE)
if (await gopro.http_command.webcam_status()).data.status != WebcamStatus.OFF:
console.print("[blue]Webcam is currently on. Turning if off.")
assert (await gopro.http_command.webcam_stop()).ok
await wait_for_webcam_status(gopro, WebcamStatus.OFF)
# TODO Is this still needed
await asyncio.sleep(2)

console.print("[yellow]Starting livestream")
assert (await gopro.ble_command.set_shutter(shutter=Params.Toggle.ENABLE)).ok

console.print("[blue]Starting webcam...")
await gopro.http_command.webcam_start()
await wait_for_webcam_status(gopro, WebcamStatus.HIGH_POWER_PREVIEW)
console.print("Livestream is now streaming and should be available for viewing.")
await ainput("Press enter to stop livestreaming...\n")

await ainput("Press enter to exit.", console.print)
await gopro.ble_command.set_shutter(shutter=Params.Toggle.DISABLE)
await gopro.ble_command.release_network()


def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Connect to the GoPro via BLE and Wifi, start and view wireless webcam."
description="Connect to the GoPro via BLE only, configure then start a Livestream, then display it with CV2."
)
parser.add_argument("ssid", type=str, help="WiFi SSID to connect to.")
parser.add_argument("password", type=str, help="Password of WiFi SSID.")
parser.add_argument("url", type=str, help="RTMP server URL to stream to.")
parser.add_argument("--min_bit", type=int, help="Minimum bitrate.", default=1000)
parser.add_argument("--max_bit", type=int, help="Maximum bitrate.", default=1000)
parser.add_argument("--start_bit", type=int, help="Starting bitrate.", default=1000)
parser.add_argument(
"--resolution",
help="Resolution.",
choices=list(proto.EnumWindowSize.values()),
default=proto.EnumWindowSize.WINDOW_SIZE_720,
)
parser.add_argument(
"--fov", help="Field of View.", choices=list(proto.EnumLens.values()), default=proto.EnumLens.LENS_WIDE
)
return add_cli_args_and_parse(parser)
return add_cli_args_and_parse(parser, wifi=False)


def entrypoint() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class CustomBaseModel(BaseModel):

def __hash__(self) -> int:
h = hash((type(self),))
for field in self.__fields__.keys():
if isinstance(field, (dict, list)):
h += json.loads(field)
# Base case
h += hash(field)
for v in self.__dict__.values():
if isinstance(v, (dict, list)):
h += hash(json.dumps(v))
else:
h += hash(v)
return h

def __str__(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path
from typing import Optional

from pydantic import Field
from pydantic import ConfigDict, Field

from open_gopro import constants
from open_gopro.models.bases import CustomBaseModel
Expand All @@ -20,6 +20,7 @@
class CameraInfo(CustomBaseModel):
"""General camera info"""

model_config = ConfigDict(protected_namespaces=())
model_number: int #: Camera model number
model_name: str #: Camera model name as string
firmware_version: str #: Complete firmware version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ def _detect_driver(self) -> WifiController:
if "VALID PASSWORD" not in cmd(f'echo "{self._password}" | sudo -S echo "VALID PASSWORD"'):
raise RuntimeError("Invalid password")

# try nmcli (Ubuntu 14.04)
if which("nmcli"):
# try nmcli (Ubuntu 14.04). Allow for use in Snap Package
if which("nmcli") or which("nmcli", path="/snap/bin/"):
version = cmd("nmcli --version").split()[-1]
return (
Nmcli0990Wireless(password=self._password)
Expand Down Expand Up @@ -691,6 +691,10 @@ def connect(self, ssid: str, password: str, timeout: float = 15) -> bool:
response = cmd(
r"/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport --scan"
)
# TODO Sometimes the response is blank?
if not response:
logger.warning("MacOS did not return a response to SSID scanning.")
continue
lines = response.splitlines()
ssid_end_index = lines[0].index("SSID") + 4 # Find where the SSID column ends

Expand Down
Loading
Loading