Skip to content

Commit

Permalink
Merge pull request #743 from roboflow/fix/byte-track-0-fps
Browse files Browse the repository at this point in the history
Fix/byte track 0 fps with webrtc
  • Loading branch information
PawelPeczek-Roboflow authored Oct 11, 2024
2 parents f6e130c + 34da916 commit d7879c9
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 146 deletions.
61 changes: 0 additions & 61 deletions inference/core/interfaces/camera/entities.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import logging
import time
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from threading import Event, Lock
from typing import Callable, Dict, Optional, Tuple, Union

import numpy as np

from inference.core import logger
from inference.core.utils.function import experimental

FrameTimestamp = datetime
FrameID = int

Expand Down Expand Up @@ -103,59 +97,4 @@ def initialize_source_properties(self, properties: Dict[str, float]):
pass


class WebRTCVideoFrameProducer(VideoFrameProducer):
@experimental(
reason="Usage of WebRTCVideoFrameProducer with `InferencePipeline` is an experimental feature."
"Please report any issues here: https://github.com/roboflow/inference/issues"
)
def __init__(
self, to_inference_queue: deque, to_inference_lock: Lock, stop_event: Event
):
self.to_inference_queue: deque = to_inference_queue
self.to_inference_lock: Lock = to_inference_lock
self._stop_event = stop_event
self._w: Optional[int] = None
self._h: Optional[int] = None
self._fps_buff = []
self._is_opened = True

def grab(self) -> bool:
return self._is_opened

def retrieve(self) -> Tuple[bool, np.ndarray]:
while not self._stop_event.is_set() and not self.to_inference_queue:
time.sleep(0.1)
if self._stop_event.is_set():
logger.info("Received termination signal, closing.")
self._is_opened = False
return False, None
with self.to_inference_lock:
img = self.to_inference_queue.pop()
return True, img

def release(self):
self._is_opened = False

def isOpened(self) -> bool:
return self._is_opened

def discover_source_properties(self) -> SourceProperties:
max_ts = max(self._fps_buff, key=lambda x: x["ts"]) if self._fps_buff else 0
min_ts = min(self._fps_buff, key=lambda x: x["ts"]) if self._fps_buff else 0
if max_ts == min_ts:
max_ts += 0.1
fps = len(self._fps_buff) / (max_ts - min_ts)
return SourceProperties(
width=self._w,
height=self._h,
total_frames=-1,
is_file=False,
fps=fps,
is_reconnectable=False,
)

def initialize_source_properties(self, properties: Dict[str, float]):
pass


VideoSourceIdentifier = Union[str, int, Callable[[], VideoFrameProducer]]
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class InitialiseWebRTCPipelinePayload(InitialisePipelinePayload):
stream_output: Optional[List[str]] = Field(default_factory=list)
data_output: Optional[List[str]] = Field(default_factory=list)
webrtc_peer_timeout: float = 1
webcam_fps: Optional[float] = None


class ConsumeResultsPayload(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import signal
import threading
import time
from collections import deque
from dataclasses import asdict
from functools import partial
Expand All @@ -10,7 +11,6 @@
from types import FrameType
from typing import Deque, Dict, Optional, Tuple

from aiortc import RTCPeerConnection
from pydantic import ValidationError

from inference.core import logger
Expand All @@ -19,10 +19,7 @@
RoboflowAPINotAuthorizedError,
RoboflowAPINotNotFoundError,
)
from inference.core.interfaces.camera.entities import (
VideoFrame,
WebRTCVideoFrameProducer,
)
from inference.core.interfaces.camera.entities import VideoFrame
from inference.core.interfaces.camera.exceptions import StreamOperationNotAllowedError
from inference.core.interfaces.http.orjson_utils import (
serialise_single_workflow_result_element,
Expand All @@ -41,12 +38,13 @@
InitialisePipelinePayload,
InitialiseWebRTCPipelinePayload,
OperationStatus,
WebRTCOffer,
)
from inference.core.interfaces.stream_manager.manager_app.serialisation import (
describe_error,
)
from inference.core.interfaces.stream_manager.manager_app.webrtc import (
RTCPeerConnectionWithFPS,
WebRTCVideoFrameProducer,
init_rtc_peer_connection,
)
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
Expand Down Expand Up @@ -202,18 +200,13 @@ def _start_webrtc(self, request_id: str, payload: dict):
watchdog = BasePipelineWatchDog()

webrtc_offer = parsed_payload.webrtc_offer
webcam_fps = parsed_payload.webcam_fps
to_inference_queue = deque()
to_inference_lock = Lock()
from_inference_queue = deque()
from_inference_lock = Lock()

stop_event = Event()
webrtc_producer = partial(
WebRTCVideoFrameProducer,
to_inference_lock=to_inference_lock,
to_inference_queue=to_inference_queue,
stop_event=stop_event,
)

def start_loop(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
Expand All @@ -232,10 +225,19 @@ def start_loop(loop: asyncio.AbstractEventLoop):
from_inference_lock=from_inference_lock,
webrtc_peer_timeout=parsed_payload.webrtc_peer_timeout,
feedback_stop_event=stop_event,
webcam_fps=webcam_fps,
),
loop,
)
peer_connection = future.result()
peer_connection: RTCPeerConnectionWithFPS = future.result()

webrtc_producer = partial(
WebRTCVideoFrameProducer,
to_inference_lock=to_inference_lock,
to_inference_queue=to_inference_queue,
stop_event=stop_event,
webrtc_video_transform_track=peer_connection.video_transform_track,
)

def webrtc_sink(
prediction: Dict[str, WorkflowImageData], video_frame: VideoFrame
Expand Down
Loading

0 comments on commit d7879c9

Please sign in to comment.