-
Notifications
You must be signed in to change notification settings - Fork 0
/
fastweb.py
93 lines (69 loc) · 2.22 KB
/
fastweb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import cv2
from threading import Thread, Lock
from pydantic import BaseModel
from fps import FpsCounter
from controller import Directions
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
import uvicorn
import time
_app = FastAPI()
_fps = FpsCounter()
_wait = Lock()
_frameLock = Lock()
_frame: cv2.Mat = None
class Controll(BaseModel):
controll: str = ""
def pushFrame(frame):
with _frameLock:
global _frame
_frame = frame
if _wait.locked() == True:
_wait.release()
def getFps():
return _fps.get()
def init(steeringCallback, changeControllCallback, log_level = 'warning', port = 5000):
global _port, _log_level, _steeringCallback, _changeControllCallback
_port = port
_log_level = log_level
_steeringCallback = steeringCallback
_changeControllCallback = changeControllCallback
_wait.acquire()
def _start():
print(f'Web server running on: http://0.0.0.0:{_port}')
uvicorn.run("fastweb:_app", host="0.0.0.0", log_level=_log_level,
port=_port)
def start():
t = Thread(target=_start, args=())
t.name = 'Web_Server'
t.daemon = True
t.start()
def _gen():
while True:
_wait.acquire()
start = time.time()
with _frameLock:
frame = _frame
_fps.update()
frame = cv2.resize(frame, (int(
frame.shape[1]/2), int(frame.shape[0]/2)), interpolation=cv2.INTER_AREA)
data = cv2.imencode(
'.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 75])[1]
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(data) + b'\r\n')
end = time.time()
wait_time = 1/26 - (end - start)
if wait_time < 0:
wait_time = 0
time.sleep(wait_time)
@_app.get('/video_feed')
def video_feed():
return StreamingResponse(_gen(), media_type="multipart/x-mixed-replace;boundary=frame")
@_app.post('/directions')
def directions(item: Directions):
return _steeringCallback(item)
@_app.post('/controll')
def directions(item: Controll):
return _changeControllCallback(item)
_app.mount("/", StaticFiles(directory="templates", html=True), name="static")