Skip to content

Commit

Permalink
integrated camera_factory in video_input (#232)
Browse files Browse the repository at this point in the history
* integrated camera_factory in video_input

* added missing parameters

* removed cluster estimation test file

* accommodate video input for new camera factory and logger changes in common

* refactor for moving camera config classes into each camera module

* add comments and change empty to string to a valid iamge prefix

* Fix comments

* refactor video_input save_prefix to image_name, for proper use with logger.save_image

* bug fix for video_input integration tests

---------

Co-authored-by: Maxwell Lou <[email protected]>
  • Loading branch information
Aleksa-M and maxlou05 authored Dec 22, 2024
1 parent a0bd731 commit a268391
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 34 deletions.
27 changes: 21 additions & 6 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,40 @@
queue_max_size: 10

video_input:
camera_name: 0
worker_period: 1.0 # seconds
save_prefix: "log_image"
camera_enum: 0 # Enum values can be found in camera_factory.py
width: 1920
height: 1200
# For camera_enum=0, use the OpenCV camera config. For camera_enum=1, use the PiCamera2 config
# OpenCV camera config (regular cameras, enum 0)
camera_config:
device_index: 0
# PiCamera2 camera config (PiCamera NoIR, enum 1)
# camera_config:
# exposure_time: 250 # microseconds
# analogue_gain: 64.0 # Sets ISO, 1.0 for normal, 64.0 for max, 0.0 for min
# contrast: 1.0 # Contrast, 1.0 for nomral, 32.0 for max, 0.0 for min
# lens_position: null # Focal length, 1/m (0 for infinity, null for auto focus)
log_images: true # Set to true to save images
image_name: "log_image" # Image name when saving images

detect_target:
worker_count: 1
option: 0 # 0 is for Ultralytics (from detect_target_factory.py)
device: 0
model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # TODO: update
model_path: "tests/model_example/yolov8s_ultralytics_pretrained_default.pt" # See autonomy OneDrive for latest model
save_prefix: "log_comp"

flight_interface:
address: "tcp:127.0.0.1:14550"
timeout: 10.0 # seconds
# Port 5762 connects directly to the simulated auto pilot, which is more realistic
# than connecting to port 14550, which is the ground station
address: "tcp:localhost:5762"
timeout: 30.0 # seconds
baud_rate: 57600 # symbol rate
worker_period: 0.1 # seconds

data_merge:
timeout: 10.0 # seconds
timeout: 30.0 # seconds

geolocation:
resolution_x: 1920
Expand Down
44 changes: 34 additions & 10 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# Used in type annotation of flight interface output
# pylint: disable-next=unused-import
from modules import odometry_and_time
from modules.common.modules.camera import camera_factory
from modules.common.modules.camera import camera_opencv
from modules.common.modules.camera import camera_picamera2
from modules.communications import communications_worker
from modules.detect_target import detect_target_factory
from modules.detect_target import detect_target_worker
Expand Down Expand Up @@ -81,19 +84,37 @@ def main() -> int:
# pylint: disable=invalid-name
QUEUE_MAX_SIZE = config["queue_max_size"]

VIDEO_INPUT_CAMERA_NAME = config["video_input"]["camera_name"]
VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"]
VIDEO_INPUT_SAVE_NAME_PREFIX = config["video_input"]["save_prefix"]
VIDEO_INPUT_SAVE_PREFIX = str(pathlib.Path(logging_path, VIDEO_INPUT_SAVE_NAME_PREFIX))
VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_enum"])
VIDEO_INPUT_WIDTH = config["video_input"]["width"]
VIDEO_INPUT_HEIGHT = config["video_input"]["height"]
match VIDEO_INPUT_OPTION:
case camera_factory.CameraOption.OPENCV:
VIDEO_INPUT_CAMERA_CONFIG = camera_opencv.ConfigOpenCV(
**config["video_input"]["camera_config"]
)
case camera_factory.CameraOption.PICAM2:
VIDEO_INPUT_CAMERA_CONFIG = camera_picamera2.ConfigPiCamera2(
**config["video_input"]["camera_config"]
)
case _:
main_logger.error(f"Inputted an invalid camera option: {VIDEO_INPUT_OPTION}", True)
return -1

VIDEO_INPUT_IMAGE_NAME = (
config["video_input"]["image_name"] if config["video_input"]["log_images"] else None
)

DETECT_TARGET_WORKER_COUNT = config["detect_target"]["worker_count"]
DETECT_TARGET_OPTION_INT = config["detect_target"]["option"]
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(DETECT_TARGET_OPTION_INT)
DETECT_TARGET_OPTION = detect_target_factory.DetectTargetOption(
config["detect_target"]["option"]
)
DETECT_TARGET_DEVICE = "cpu" if args.cpu else config["detect_target"]["device"]
DETECT_TARGET_MODEL_PATH = config["detect_target"]["model_path"]
DETECT_TARGET_OVERRIDE_FULL_PRECISION = args.full
DETECT_TARGET_SAVE_NAME_PREFIX = config["detect_target"]["save_prefix"]
DETECT_TARGET_SAVE_PREFIX = str(pathlib.Path(logging_path, DETECT_TARGET_SAVE_NAME_PREFIX))
DETECT_TARGET_SAVE_PREFIX = str(
pathlib.Path(logging_path, config["detect_target"]["save_prefix"])
)
DETECT_TARGET_SHOW_ANNOTATED = args.show_annotated

FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"]
Expand Down Expand Up @@ -125,7 +146,7 @@ def main() -> int:
main_logger.error(f"Config key(s) not found: {exception}", True)
return -1
except ValueError as exception:
main_logger.error(f"Could not convert detect target option into enum: {exception}", True)
main_logger.error(f"{exception}", True)
return -1

# Setup
Expand Down Expand Up @@ -199,9 +220,12 @@ def main() -> int:
count=1,
target=video_input_worker.video_input_worker,
work_arguments=(
VIDEO_INPUT_CAMERA_NAME,
VIDEO_INPUT_OPTION,
VIDEO_INPUT_WIDTH,
VIDEO_INPUT_HEIGHT,
VIDEO_INPUT_CAMERA_CONFIG,
VIDEO_INPUT_IMAGE_NAME,
VIDEO_INPUT_WORKER_PERIOD,
VIDEO_INPUT_SAVE_PREFIX,
),
input_queues=[],
output_queues=[video_input_to_detect_target_queue],
Expand Down
59 changes: 54 additions & 5 deletions modules/video_input/video_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,72 @@
"""

from .. import image_and_time
from ..common.modules.camera import camera_device
from ..common.modules.camera import base_camera
from ..common.modules.camera import camera_factory
from ..common.modules.camera import camera_opencv
from ..common.modules.camera import camera_picamera2
from ..common.modules.logger import logger


class VideoInput:
"""
Combines image and timestamp together.
"""

def __init__(self, camera_name: "int | str", save_name: str = "") -> None:
self.device = camera_device.CameraDevice(camera_name, 1, save_name)
__create_key = object()

def run(self) -> "tuple[bool, image_and_time.ImageAndTime | None]":
@classmethod
def create(
cls,
camera_option: camera_factory.CameraOption,
width: int,
height: int,
config: camera_opencv.ConfigOpenCV | camera_picamera2.ConfigPiCamera2,
maybe_image_name: str | None,
local_logger: logger.Logger,
) -> "tuple[True, VideoInput] | tuple[False, None]":
"""
camera_option specifies which camera driver to use.
width is the width of the images the camera takes in pixels.
height is the height of the images the camera takes in pixels.
camera_config specifies camera settings.
maybe_image_name is the filename to save the image as. Set to None to not log any images.
"""
result, camera = camera_factory.create_camera(camera_option, width, height, config)
if not result:
local_logger.error(
f"Camera factory failed. Current configs were: {camera_option=}, {width=}, {height=}, {config=}."
)
return False, None

return True, VideoInput(cls.__create_key, camera, maybe_image_name, local_logger)

def __init__(
self,
class_private_create_key: object,
camera: base_camera.BaseCameraDevice,
maybe_image_name: str | None,
local_logger: logger.Logger,
) -> None:
"""
Private constructor, use create() method.
"""
assert class_private_create_key is VideoInput.__create_key, "Use create() method."

self.__device = camera
self.__maybe_image_name = maybe_image_name
self.__logger = local_logger

def run(self) -> "tuple[True, image_and_time.ImageAndTime] | tuple[False, None]":
"""
Returns a possible ImageAndTime with current timestamp.
"""
result, image = self.device.get_image()
result, image = self.__device.run()
if not result:
self.__logger.warning("Failed to take image")
return False, None

if self.__maybe_image_name is not None:
self.__logger.save_image(image, self.__maybe_image_name)

return image_and_time.ImageAndTime.create(image)
38 changes: 32 additions & 6 deletions modules/video_input/video_input_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,56 @@
Gets images from the camera.
"""

import os
import pathlib
import time

from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import video_input
from ..common.modules.camera import camera_factory
from ..common.modules.camera import camera_opencv
from ..common.modules.camera import camera_picamera2
from ..common.modules.logger import logger


def video_input_worker(
camera_name: "int | str",
camera_option: camera_factory.CameraOption,
width: int,
height: int,
camera_config: camera_opencv.ConfigOpenCV | camera_picamera2.ConfigPiCamera2,
maybe_image_name: str | None,
period: float,
save_name: str,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.
camera_name is initial setting.
period is minimum period between loops.
save_name is path for logging.
period is the minimum period between image captures in seconds.
output_queue is the data queue.
controller is how the main process communicates to this worker process.
"""
input_device = video_input.VideoInput(camera_name, save_name)
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return

assert local_logger is not None

local_logger.info("Logger initialized")

result, input_device = video_input.VideoInput.create(
camera_option, width, height, camera_config, maybe_image_name, local_logger
)
if not result:
local_logger.error("Worker failed to create class object")
return

# Get Pylance to stop complaining
assert input_device is not None

while not controller.is_exit_requested():
controller.check_pause()
Expand Down
25 changes: 21 additions & 4 deletions tests/integration/test_video_input_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,38 @@
Simple hardware test, requires camera.
"""

import pathlib

from modules.common.modules.camera import camera_factory
from modules.common.modules.camera import camera_opencv
from modules.common.modules.logger import logger
from modules.video_input import video_input


CAMERA = 0
# Modify as needed
CAMERA = camera_factory.CameraOption.OPENCV
WIDTH = 640
HEIGHT = 480
CONFIG = camera_opencv.ConfigOpenCV(0)
IMAGE_NAME = None # Not saving any pictures


def main() -> int:
"""
Main function.
"""
# Logger
test_name = pathlib.Path(__file__).stem
result, local_logger = logger.Logger.create(test_name, False)
assert result
assert local_logger is not None

# Setup
# TODO: Common change logging option
camera = video_input.VideoInput(
CAMERA,
result, camera = video_input.VideoInput.create(
CAMERA, WIDTH, HEIGHT, CONFIG, IMAGE_NAME, local_logger
)
assert result
assert camera is not None

# Run
result, image = camera.run()
Expand Down
20 changes: 18 additions & 2 deletions tests/integration/test_video_input_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
import queue
import time

from modules.common.modules.camera import camera_factory
from modules.common.modules.camera import camera_opencv
from modules.video_input import video_input_worker
from modules import image_and_time
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller


# Modify these settings as needed
VIDEO_INPUT_WORKER_PERIOD = 1.0
CAMERA = 0
CAMERA = camera_factory.CameraOption.OPENCV
WIDTH = 640
HEIGHT = 480
CONFIG = camera_opencv.ConfigOpenCV(0)
IMAGE_NAME = None # Not saving any pictures


def main() -> int:
Expand All @@ -29,7 +36,16 @@ def main() -> int:

worker = mp.Process(
target=video_input_worker.video_input_worker,
args=(CAMERA, VIDEO_INPUT_WORKER_PERIOD, "", out_queue, controller),
args=(
CAMERA,
WIDTH,
HEIGHT,
CONFIG,
IMAGE_NAME,
VIDEO_INPUT_WORKER_PERIOD,
out_queue,
controller,
),
)

# Run
Expand Down

0 comments on commit a268391

Please sign in to comment.