Skip to content

Commit

Permalink
Switch to ncnn (#217)
Browse files Browse the repository at this point in the history
* use async core

* update sensor

* break at night

* update run

* temp

* vision using ultralytics

* fix deps

* update dep

* fix path

* fix path 2

* indent

* fix path

* imgsz

* fix model loading

* fix test vision

* use temp folder

* fix engine

* adapt test  core

* add test async

* mypy

* split build

* docstring

* drop nvidia

* style

* fix mypy

* crop unused

* header

* fix import

* unused import

* fix day time

* put back alerts

* use ultralytics image

* update deps

* missing deps

* fix version

* fix vision sha

* drop git install

* 2 digits

* get back to 30s

* update install

* fix docker

* fix path

* drop cpu limit

* imprrove vision test

* add missing tests

* unused variable

* missing blank line
  • Loading branch information
MateoLostanlen committed Jul 16, 2024
1 parent 040552c commit 57e4b4c
Show file tree
Hide file tree
Showing 17 changed files with 1,548 additions and 499 deletions.
19 changes: 9 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
FROM python:3.8.16-slim

# set environment variables
ENV PYTHONPATH "${PYTHONPATH}:/usr/src/app"
ENV PATH /usr/local/bin:$PATH
ENV LANG C.UTF-8
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE 1

ENV PATH="/usr/local/bin:$PATH"
ENV LANG="C.UTF-8"
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
# set work directory
WORKDIR /usr/src/app

COPY ./README.md /tmp/README.md
COPY ./setup.py /tmp/setup.py

# install git
RUN apt-get update && apt-get install git -y

COPY ./src/requirements.txt /tmp/requirements.txt

RUN apt-get update && apt-get install ffmpeg libsm6 libxext6 -y\
&& pip install --upgrade pip setuptools wheel \
&& pip install --default-timeout=500 -r /tmp/requirements.txt \
&& pip install --upgrade pip setuptools wheel

COPY ./src/requirements.txt /tmp/requirements.txt
RUN pip install --default-timeout=500 -r /tmp/requirements.txt \
&& pip cache purge \
&& rm -rf /root/.cache/pip

Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ build-optional-lib:

# Run the engine wrapper
run:
bash scripts/setup-docker-compose.sh
docker build . -t pyronear/pyro-engine:latest
docker compose up -d

Expand Down
4 changes: 0 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ services:
- ./data:/usr/src/app/data
restart: always
network_mode: host
deploy:
resources:
limits:
cpus: "3"
logging:
driver: "json-file"
options:
Expand Down
8 changes: 3 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ classifiers = [
]
dynamic = ["version"]
dependencies = [
"Pillow>=8.4.0",
"onnxruntime>=1.10.0,<2.0.0",
"numpy>=1.19.5,<2.0.0",
"ultralytics==8.2.50",
"pyroclient @ git+https://github.com/pyronear/pyro-api.git@767be30a781b52b29d68579d543e3f45ac8c4713#egg=pyroclient&subdirectory=client",
"requests>=2.20.0,<3.0.0",
"opencv-python==4.5.5.64",
"tqdm>=4.62.0",
"huggingface_hub==0.23.1",
]

[project.optional-dependencies]
test = [
"pytest>=5.3.2",
"pytest-asyncio>=0.14.0",
"coverage[toml]>=4.5.4",
"requests>=2.20.0,<3.0.0",
"python-dotenv>=0.15.0",
Expand All @@ -66,6 +64,7 @@ docs = [
dev = [
# test
"pytest>=5.3.2",
"pytest-asyncio>=0.14.0",
"coverage[toml]>=4.5.4",
"requests>=2.20.0,<3.0.0",
# style
Expand Down Expand Up @@ -93,7 +92,6 @@ zip-safe = true
[tool.setuptools.packages.find]
exclude = ["docs*", "scripts*", "tests*", "src*"]


[tool.mypy]
files = "pyroengine/"
show_error_codes = true
Expand Down
184 changes: 89 additions & 95 deletions pyroengine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,49 @@
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

import asyncio
import logging
import signal
import time
from datetime import datetime
from multiprocessing import Manager, Pool
from multiprocessing import Queue as MPQueue
from types import FrameType
from typing import List, Optional, Tuple, cast
from typing import Any, List

import numpy as np
import urllib3
from PIL import Image

from .engine import Engine
from .sensors import ReolinkCamera

__all__ = ["SystemController", "is_day_time"]


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Configure logging
logging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s", level=logging.INFO, force=True)


def is_day_time(cache, frame, strategy, delta=0):
"""This function allows to know if it is daytime or not. We have two strategies.
The first one is to take the current time and compare it to the sunset time.
The second is to see if we have a color image. The ir cameras switch to ir mode at night and
therefore produce black and white images. This function can use one or more strategies depending on the use case.
"""
Determines if it is daytime using specified strategies.
Strategies:
1. Time-based: Compares the current time with sunrise and sunset times.
2. IR-based: Analyzes the color of the image; IR cameras produce black and white images at night.
Args:
cache (Path): cache folder where sunset_sunrise.txt is located
frame (PIL image): frame to analyze with ir strategy
strategy (str): Strategy to define day time [None, time, ir or both]
delta (int): delta before and after sunset / sunrise in sec
cache (Path): Cache folder where `sunset_sunrise.txt` is located.
frame (PIL.Image): Frame to analyze with the IR strategy.
strategy (str): Strategy to define daytime ("time", "ir", or "both").
delta (int): Time delta in seconds before and after sunrise/sunset.
Returns:
bool: is day time
bool: True if it is daytime, False otherwise.
"""
is_day = True
if strategy in ["both", "time"]:
with open(cache.joinpath("sunset_sunrise.txt")) as f:
lines = f.readlines()
sunrise = datetime.strptime(lines[0][:-1], "%H:%M")
sunset = datetime.strptime(lines[1][:-1], "%H:%M")
sunrise = datetime.strptime(lines[0].strip(), "%H:%M")
sunset = datetime.strptime(lines[1].strip(), "%H:%M")
now = datetime.strptime(datetime.now().isoformat().split("T")[1][:5], "%H:%M")
if (now - sunrise).total_seconds() < -delta or (sunset - now).total_seconds() < -delta:
is_day = False
Expand All @@ -60,150 +58,146 @@ def is_day_time(cache, frame, strategy, delta=0):
return is_day


def handler(signum: int, frame: Optional[FrameType]) -> None:
"""
Signal handler for timeout.
Args:
signum (int): The signal number.
frame (Optional[FrameType]): The current stack frame (or None).
"""
raise Exception("Analyze stream timeout")


def capture_camera_image(args: Tuple[ReolinkCamera, MPQueue]) -> None:
async def capture_camera_image(camera: ReolinkCamera, image_queue: asyncio.Queue) -> None:
"""
Captures an image from the camera and puts it into a queue.
Args:
args (tuple): A tuple containing the camera instance and a queue.
camera (ReolinkCamera): The camera instance.
image_queue (asyncio.Queue): The queue to put the captured image.
"""
camera, queue = args

cam_id = camera.ip_address
try:
if camera.cam_type == "ptz":
for pose_id in camera.cam_poses:
for idx, pose_id in enumerate(camera.cam_poses):
cam_id = f"{camera.ip_address}_{pose_id}"
frame = camera.capture(pose_id)
frame = camera.capture()
# Move camera to the next pose to avoid waiting
next_pos_id = camera.cam_poses[(idx + 1) % len(camera.cam_poses)]
camera.move_camera("ToPos", idx=int(next_pos_id), speed=50)
if frame is not None:
queue.put((cam_id, frame))
await image_queue.put((cam_id, frame))
await asyncio.sleep(0) # Yield control
else:
frame = camera.capture()
if frame is not None:
queue.put((cam_id, frame))
await image_queue.put((cam_id, frame))
await asyncio.sleep(0) # Yield control
except Exception as e:
logging.exception(f"Error during image capture from camera {cam_id}: {e}")


class SystemController:
"""
Implements the full system controller for capturing and analyzing camera streams.
Controls the system for capturing and analyzing camera streams.
Attributes:
engine (Engine): The image analyzer engine.
cameras (List[ReolinkCamera]): The list of cameras to get the visual streams from.
cameras (List[ReolinkCamera]): List of cameras to capture streams from.
"""

def __init__(self, engine: Engine, cameras: List[ReolinkCamera]) -> None:
"""
Initializes the SystemController with an engine and a list of cameras.
Initializes the SystemController.
Args:
engine (Engine): The image analyzer engine.
cameras (List[ReolinkCamera]): The list of cameras to get the visual streams from.
cameras (List[ReolinkCamera]): List of cameras to capture streams from.
"""
self.engine = engine
self.cameras = cameras
self.day_time = True

def capture_images(self) -> MPQueue:
async def capture_images(self, image_queue: asyncio.Queue) -> None:
"""
Captures images from all cameras using multiprocessing.
Captures images from all cameras using asyncio.
Returns:
MPQueue: A queue containing the captured images and their camera IDs.
Args:
image_queue (asyncio.Queue): The queue to put the captured images.
"""
tasks = [capture_camera_image(camera, image_queue) for camera in self.cameras]
await asyncio.gather(*tasks)

manager = Manager()
queue: MPQueue = cast(MPQueue, manager.Queue()) # Cast to MPQueue

# Create a list of arguments to pass to capture_camera_image
args_list: List[Tuple[ReolinkCamera, MPQueue]] = [(camera, queue) for camera in self.cameras]

# Use a pool of processes to capture images concurrently
with Pool(processes=len(self.cameras)) as pool:
pool.map(capture_camera_image, args_list)

return queue

def analyze_stream(self, img: Image.Image, cam_id: str) -> None:
async def analyze_stream(self, image_queue: asyncio.Queue) -> None:
"""
Analyzes the image stream from a specific camera.
Analyzes the image stream from the queue.
Args:
img (Image.Image): The image to analyze.
cam_id (str): The ID of the camera.
image_queue (asyncio.Queue): The queue with images to analyze.
"""
# Run the prediction using the engine
self.engine.predict(img, cam_id)
while True:
item = await image_queue.get()
if item is None:
break
cam_id, frame = item
try:
self.engine.predict(frame, cam_id)
except Exception as e:
logging.error(f"Error running prediction: {e}")
finally:
image_queue.task_done() # Mark the task as done

def check_day_time(self) -> None:
"""
Checks and updates the day_time attribute based on the current frame.
"""
try:
frame = self.cameras[0].capture()
if frame is not None:
self.day_time = is_day_time(None, frame, "ir")
except Exception as e:
logging.exception(f"Exception during initial day time check: {e}")

def run(self, period: int = 30) -> None:
async def run(self, period: int = 30) -> None:
"""
Captures and analyzes all camera streams, then processes alerts.
Args:
period (int): The time period between captures in seconds.
"""

try:
# Set the signal alarm
signal.signal(signal.SIGALRM, handler)
signal.alarm(period)

if not self.day_time:
self.check_day_time()
self.check_day_time()

if self.day_time:
# Capture images
queue = None
try:
queue = self.capture_images()
except Exception as e:
logging.error(f"Error capturing images: {e}")

# Analyze each captured frame
if queue:
while not queue.empty():
cam_id, frame = queue.get()
try:
if frame is not None:
self.analyze_stream(frame, cam_id)
except Exception as e:
logging.error(f"Error running prediction: {e}")

# Use the last frame to check if it's day_time
if frame is not None:
self.day_time = is_day_time(None, frame, "ir")
image_queue: asyncio.Queue[Any] = asyncio.Queue()

# Start the image processor task
processor_task = asyncio.create_task(self.analyze_stream(image_queue))

# Capture images concurrently
await self.capture_images(image_queue)

# Wait for the image processor to finish processing
await image_queue.join() # Ensure all tasks are marked as done

# Signal the image processor to stop processing
await image_queue.put(None)
await processor_task # Ensure the processor task completes

# Process alerts
try:
self.engine._process_alerts()
except Exception as e:
logging.error(f"Error processing alerts: {e}")

# Disable the alarm
signal.alarm(0)
except Exception:
logging.warning("Analyze stream timeout")
except Exception as e:
logging.warning(f"Analyze stream error: {e}")

async def main_loop(self, period: int) -> None:
"""
Main loop to capture and process images at regular intervals.
Args:
period (int): The time period between captures in seconds.
"""
while True:
start_ts = time.time()
await self.run(period)
# Sleep only once all images are processed
loop_time = time.time() - start_ts
sleep_time = max(period - (loop_time), 0)
logging.info(f"Loop run under {loop_time:.2f} seconds, sleeping for {sleep_time:.2f}")
await asyncio.sleep(sleep_time)

def __repr__(self) -> str:
"""
Expand Down
Loading

0 comments on commit 57e4b4c

Please sign in to comment.