Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor engine #194

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
894cd8d
add new reolink controler
MateoLostanlen May 20, 2024
1df949b
add debug setup script
MateoLostanlen May 20, 2024
b2754e4
remove old scrpit
MateoLostanlen May 20, 2024
b2ade8e
ignore notebooks
MateoLostanlen May 20, 2024
213270c
prepare engine to multiprocessing
MateoLostanlen May 20, 2024
147db4d
split task by process
MateoLostanlen May 20, 2024
12e7b95
adapt run accordingly
MateoLostanlen May 20, 2024
0882585
keep args name
MateoLostanlen May 20, 2024
b58d0e6
update tests
MateoLostanlen May 20, 2024
fc43633
drop comment ssl
MateoLostanlen May 20, 2024
dfeb4e5
fix file path
MateoLostanlen May 20, 2024
9d9e8f2
style
MateoLostanlen May 20, 2024
420c1d7
header
MateoLostanlen May 20, 2024
8b7520c
drop random save
MateoLostanlen May 20, 2024
5a20709
header
MateoLostanlen May 20, 2024
0646d12
mypy
MateoLostanlen May 20, 2024
2322269
improove coverage
MateoLostanlen May 21, 2024
3d1cc6c
Merge remote-tracking branch 'origin/develop' into refactor_engine
MateoLostanlen May 21, 2024
0f7b4f8
imprrove coverage
MateoLostanlen May 21, 2024
b8fe0d8
fix flake8
MateoLostanlen May 21, 2024
9c9574a
style
MateoLostanlen May 21, 2024
98e4303
fix test
MateoLostanlen May 21, 2024
fce9652
Merge remote-tracking branch 'origin/develop' into refactor_engine
MateoLostanlen May 21, 2024
ca08f97
fix docstring
MateoLostanlen May 21, 2024
8d8d064
header
MateoLostanlen May 21, 2024
905cc00
idx
MateoLostanlen May 21, 2024
fd75930
feat: add test for subprocess testing
May 28, 2024
9881f9f
feat: add test for terminate process
May 28, 2024
4df8af3
feat: add a test for day_time = False
May 28, 2024
511864c
feat: add test ptz
May 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ target/

# Jupyter Notebook
.ipynb_checkpoints
*.ipynb

# pyenv
.python-version
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ RUN pip install -e /tmp/. \
&& rm -rf /root/.cache/pip

COPY ./src/run.py /usr/src/app/run.py
COPY ./src/capture.py /usr/src/app/capture.py
COPY ./src/control_reolink_cam.py /usr/src/app/control_reolink_cam.py
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,7 @@ source = ["pyroengine"]
[tool.black]
line-length = 120
target-version = ['py38']

[too.flake8]
max-line-length = 120

226 changes: 196 additions & 30 deletions pyroengine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,224 @@

import logging
import signal
import time
from multiprocessing import Process, Queue, current_process
from queue import Full
from types import FrameType
from typing import List, Optional
from typing import Optional, Tuple

import numpy as np
import urllib3
from PIL import Image

from .engine import Engine
from .sensors import ReolinkCamera
from pyroengine.engine import is_day_time

__all__ = ["SystemController"]

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s", level=logging.INFO, force=True)
PredictionResult = Tuple[np.ndarray, Image.Image, str]


def handler(signum: int, frame: Optional[FrameType]) -> None:
raise Exception("Analyze stream timeout")


class SystemController:
"""Implements the full system controller
def start_process(name, target, args):
"""Start a new process with the given name, target function, and arguments."""
process = Process(name=name, target=target, args=args, daemon=True)
process.start()
return process


def terminate_processes(processes):
"""Terminate the given list of processes."""
logging.info("Terminating processes due to signal interruption...")
for process in processes:
process.terminate()
process.join()
logging.info("Processes terminated successfully.")
exit(0)

Args:
engine: the image analyzer
cameras: the cameras to get the visual streams from
"""

def __init__(self, engine: Engine, cameras: List[ReolinkCamera]) -> None:
class SystemController:
def __init__(self, engine, cameras):
self.engine = engine
self.cameras = cameras

def analyze_stream(self, idx: int) -> None:
assert 0 <= idx < len(self.cameras)
try:
img = self.cameras[idx].capture()
self.day_time = True

def capture_images(self, capture_queue: Queue, capture_interval: int = 30, run_for_seconds: Optional[int] = None):

Check notice on line 55 in pyroengine/core.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

pyroengine/core.py#L55

SystemController.capture_images is too complex (17) (MC0001)
"""Capture images from cameras and put them into the queue."""
process_name = current_process().name
logging.debug(f"[{process_name}] Capture process started")
start_time = time.time()
while True:
if run_for_seconds and (time.time() - start_time) > run_for_seconds:
break
try:
start_ts = time.time()

if not self.day_time:
try:
frame = self.cameras[0].capture()
self.day_time = is_day_time(None, frame, "ir")
except Exception as e:
logging.exception(f"[{process_name}] Exception during initial day time check: {e}")
continue

Check warning on line 72 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L70-L72

Added lines #L70 - L72 were not covered by tests

try:
for idx, camera in enumerate(self.cameras):
try:
if camera.cam_type == "ptz":
for pose_id in camera.cam_poses:
frame = camera.capture(pose_id)
if frame is not None:
logging.debug(

Check warning on line 81 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L78-L81

Added lines #L78 - L81 were not covered by tests
f"[{process_name}] Captured frame from camera {camera.ip_address}"
f"at pose {pose_id}"
)

self.process_frame(idx, frame, capture_queue, camera, pose_id)

Check warning on line 86 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L86

Added line #L86 was not covered by tests
else:
logging.error(

Check warning on line 88 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L88

Added line #L88 was not covered by tests
f"[{process_name}] Failed to capture image from camera {camera.ip_address}"
f"at pose {pose_id}"
)
else:
frame = camera.capture()
if frame is not None:
logging.debug(f"[{process_name}] Captured frame from camera {camera.ip_address}")
self.process_frame(idx, frame, capture_queue, camera)
except Exception as e:
logging.exception(

Check warning on line 98 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L97-L98

Added lines #L97 - L98 were not covered by tests
f"[{process_name}] Exception during image capture from camera {camera.ip_address}: {e}"
)
except Exception as e:
logging.exception(f"[{process_name}] Exception during image capture loop: {e}")

Check warning on line 102 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L101-L102

Added lines #L101 - L102 were not covered by tests

sleep_duration = max(capture_interval - (time.time() - start_ts), 0)
logging.debug(f"[{process_name}] Sleeping for {sleep_duration:.2f} seconds")
# Ensure capturing an image every capture_interval seconds
time.sleep(sleep_duration)
except Exception as e:
logging.exception(f"[{process_name}] Unexpected error in capture process: {e}")
time.sleep(1)

Check warning on line 110 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L108-L110

Added lines #L108 - L110 were not covered by tests

def process_frame(self, idx, frame, capture_queue, camera, pose_id=None):
"""Process a captured frame and put it into the capture queue if conditions are met."""
process_name = current_process().name
self.day_time = is_day_time(None, frame, "ir")
if self.day_time:
try:
self.engine.predict(img, self.cameras[idx].ip_address)
except Exception:
logging.warning(f"Unable to analyze stream from camera {self.cameras[idx]}")
except Exception:
logging.warning(f"Unable to fetch stream from camera {self.cameras[idx]}")

def run(self, period=30):
"""Analyzes all camera streams"""
for idx in range(len(self.cameras)):
cam_id = f"{camera.ip_address}_{pose_id}" if pose_id is not None else camera.ip_address
capture_queue.put_nowait((frame, cam_id))
logging.debug(f"[{process_name}] Putting frame from camera {camera.ip_address} into capture queue")
logging.debug(f"[{process_name}] Capture queue size: {capture_queue.qsize()}")
except Full:
logging.warning(

Check warning on line 123 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L122-L123

Added lines #L122 - L123 were not covered by tests
f"[{process_name}] Capture queue is full. Dropping frame from camera {camera.ip_address}"
)
else:
logging.info(f"[{process_name}] Not running prediction at night on camera {camera.ip_address}")

def run_predictions(self, capture_queue: Queue, prediction_queue: Queue, run_for_seconds: Optional[int] = None):
"""Run predictions on captured images."""
process_name = current_process().name
start_time = time.time()
while True:
if run_for_seconds and (time.time() - start_time) > run_for_seconds:
break
if not capture_queue.empty():
try:
logging.debug(f"[{process_name}] Waiting for frames in capture queue")
frame, cam_id = capture_queue.get(timeout=5)
logging.debug(f"[{process_name}] Received frame from capture queue")
preds = self.engine.predict(frame, cam_id)
logging.debug(
f"[{process_name}] Putting prediction results for camera {cam_id} into prediction queue"
)
prediction_queue.put((preds, frame, cam_id))
except Exception as e:
logging.exception(f"[{process_name}] Exception during prediction : {e}")

Check warning on line 147 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L146-L147

Added lines #L146 - L147 were not covered by tests
else:
time.sleep(1)

def process_alerts(self, prediction_queue: Queue, run_for_seconds: Optional[int] = None):
"""Process prediction results and send alerts."""
process_name = current_process().name
logging.debug(f"[{process_name}] Alert process started")
start_time = time.time()
while True:
if run_for_seconds and (time.time() - start_time) > run_for_seconds:
break
try:
signal.signal(signal.SIGALRM, handler)
signal.alarm(int(period / len(self.cameras)))
self.analyze_stream(idx)
signal.alarm(0)
except Exception:
logging.warning(f"Analyze stream timeout on {self.cameras[idx]}")
if not prediction_queue.empty():
logging.debug(f"[{process_name}] Waiting for prediction results in prediction queue")
preds, frame, cam_id = prediction_queue.get(timeout=5)
logging.debug(f"[{process_name}] Processing prediction results for camera {cam_id}")
self.engine.process_prediction(preds, frame, cam_id)
logging.debug(
f"[{process_name}] Prediction queue size after processing: {prediction_queue.qsize()}"
)

# Process all pending alerts
if len(self.engine._alerts) > 0:
logging.debug(f"[{process_name}] Processing pending alerts")
self.engine._process_alerts()

Check warning on line 172 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L171-L172

Added lines #L171 - L172 were not covered by tests
else:
time.sleep(1)
except Exception as e:
logging.exception(f"[{process_name}] Unexpected error in alert process: {e}")
time.sleep(1)

def run(
self,
capture_interval: int = 30,
capture_queue_size: int = 10,
prediction_queue_size: int = 10,
watchdog_interval: int = 5,
):
"""Run the system with separate processes for capturing, predicting, and alerting."""
capture_queue: Queue = Queue(maxsize=capture_queue_size) # Increased size for the queue for captured frames
prediction_queue: Queue = Queue(maxsize=prediction_queue_size) # Queue for prediction results

capture_process = start_process("capture_process", self.capture_images, (capture_queue, capture_interval))
prediction_process = start_process(
"prediction_process", self.run_predictions, (capture_queue, prediction_queue)
)
alert_process = start_process("alert_process", self.process_alerts, (prediction_queue,))

processes = [capture_process, prediction_process, alert_process]

signal.signal(signal.SIGTERM, lambda signum, frame: terminate_processes(processes))
signal.signal(signal.SIGINT, lambda signum, frame: terminate_processes(processes))

try:
# Infinite loop to monitor and restart processes if they stop
while True:
if not capture_process.is_alive():
logging.warning("Capture process stopped, restarting...")
capture_process = start_process(
"capture_process", self.capture_images, (capture_queue, capture_interval)
)
if not prediction_process.is_alive():
logging.warning("Prediction process stopped, restarting...")
prediction_process = start_process(
"prediction_process", self.run_predictions, (capture_queue, prediction_queue)
)
if not alert_process.is_alive():
logging.warning("Alert process stopped, restarting...")
alert_process = start_process("alert_process", self.process_alerts, (prediction_queue,))

processes = [capture_process, prediction_process, alert_process]

logging.debug(f"Capture queue size: {capture_queue.qsize()}")
logging.debug(f"Prediction queue size: {prediction_queue.qsize()}")
time.sleep(watchdog_interval) # Interval to check if the process is alive
except KeyboardInterrupt:
logging.info("Terminating processes...")
terminate_processes(processes)

Check warning on line 225 in pyroengine/core.py

View check run for this annotation

Codecov / codecov/patch

pyroengine/core.py#L224-L225

Added lines #L224 - L225 were not covered by tests

def __repr__(self) -> str:
repr_str = f"{self.__class__.__name__}("
Expand Down
Loading
Loading