You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
187 lines
5.9 KiB
187 lines
5.9 KiB
#from multiprocessing import shared_memory
|
|
from typing import Optional
|
|
|
|
import cv2
|
|
import glob
|
|
import numpy as np
|
|
from time import sleep
|
|
from threading import Thread
|
|
|
|
from .utils import get_stream_codec
|
|
|
|
from PyQt5.QtCore import QObject, pyqtSignal
|
|
|
|
class GstVideoStreamer(QObject):
|
|
cameraStatus = pyqtSignal(int)
|
|
|
|
def __init__(self, stream_source, shape, id_, is_init=None, fps: int = 4, nChannels: int = 3,
|
|
shm_name: Optional[str] = None, flip=False):
|
|
super().__init__()
|
|
|
|
self.__stream_source = stream_source
|
|
self.__id = id_
|
|
self.__is_init = is_init
|
|
self.__shape = shape
|
|
self.__fps = fps
|
|
self.__nChannels = nChannels
|
|
self.__shm_name = shm_name
|
|
self.__shm = None
|
|
self.__cap = None
|
|
self.__flip = flip
|
|
self.__running = False
|
|
|
|
self.connected = False
|
|
|
|
self.__frame_error_occurred = 0
|
|
self.__frame: np.ndarray = np.zeros(self.__shape, 'uint8')
|
|
self.__framing_started = False
|
|
|
|
self.__reconnect_thread = None
|
|
self.__reconnect_interval = 1 # in seconds
|
|
self.__reconnecting = False
|
|
|
|
self.__start()
|
|
|
|
def __init_shape(self, nChannels):
|
|
width = int(self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(self.__cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
self.__shape[0] = height
|
|
self.__shape[1] = width
|
|
self.__shape[2] = nChannels
|
|
self.__d_size = np.dtype('uint8').itemsize * np.prod(self.__shape)
|
|
|
|
@property
|
|
def id(self):
|
|
return self.__id
|
|
|
|
def __init_camera(self):
|
|
# region connecting to camera
|
|
self.__cap = self.__connect_to_camera(self.__stream_source)
|
|
sleep(2)
|
|
# endregion
|
|
|
|
# self.__init_shape(self.__nChannels)
|
|
|
|
self.connected = True
|
|
|
|
self.__frame_error_occurred = 0
|
|
self.__frame: np.ndarray = np.zeros(self.__shape, 'uint8')
|
|
self.__framing_started = False
|
|
|
|
self.__reconnect_thread = None
|
|
self.__reconnect_interval = 1 # in seconds
|
|
self.__reconnecting = False
|
|
|
|
def __start(self):
|
|
def start():
|
|
self.__init_camera()
|
|
self.__read_frame()
|
|
|
|
self.__stream_th = Thread(target=start)
|
|
self.__stream_th.start()
|
|
|
|
@property
|
|
def shape(self):
|
|
return self.__shape
|
|
|
|
@property
|
|
def d_size(self):
|
|
return self.__d_size
|
|
|
|
def __connect_to_camera(self, stream_source):
|
|
codec = None
|
|
while codec is None:
|
|
codec = get_stream_codec(stream_source)
|
|
sleep(0.5)
|
|
pipeline = ''
|
|
if codec == 'hevc':
|
|
pipeline = 'rtph265depay ! h265parse ! avdec_h265'
|
|
elif codec == 'h264':
|
|
pipeline = 'rtph264depay ! h264parse ! avdec_h264'
|
|
|
|
stream_pipeline = f'rtspsrc protocols=tcp location="{stream_source}" latency=100 ! queue {"! " + pipeline if pipeline else ""} {"! videoflip method=rotate-180" if self.__flip else ""} ! videoconvert ! appsink max-buffers=1 drop=true'
|
|
|
|
camera = cv2.VideoCapture(stream_pipeline,
|
|
cv2.CAP_GSTREAMER)
|
|
|
|
while not camera.isOpened():
|
|
sleep(1)
|
|
camera.open(stream_pipeline,
|
|
cv2.CAP_GSTREAMER)
|
|
|
|
print('camera opened')
|
|
|
|
camera.setExceptionMode(True)
|
|
|
|
return camera
|
|
|
|
def __start_reconnect_thread(self):
|
|
self.connected = False
|
|
self.__reconnecting = True
|
|
self.__reconnect_thread = Thread(target=self.__reconnect_loop)
|
|
self.__reconnect_thread.start()
|
|
|
|
def __reconnect_loop(self):
|
|
print('start camera reconnecting...')
|
|
self.__framing_started = False
|
|
while self.__reconnecting:
|
|
try:
|
|
if self.__cap is not None:
|
|
self.__cap.release()
|
|
sleep(0.1)
|
|
del self.__cap
|
|
self.__cap = None
|
|
self.__cap = self.__connect_to_camera(self.__stream_source)
|
|
assert self.__cap.isOpened()
|
|
assert self.__cap.read()[1] is not None
|
|
self.__frame_error_occurred = 0
|
|
self.__reconnecting = False
|
|
self.connected = True
|
|
print('camera reconnected')
|
|
break
|
|
except Exception as e:
|
|
print(e)
|
|
sleep(self.__reconnect_interval)
|
|
|
|
def __read_frame(self):
|
|
self.__running = True
|
|
while self.__running:
|
|
if self.__frame_error_occurred > 5 and not self.__reconnecting:
|
|
self.__frame = None
|
|
self.__start_reconnect_thread()
|
|
self.cameraStatus.emit(0)
|
|
|
|
elif self.__frame_error_occurred > 600:
|
|
pass
|
|
try:
|
|
if self.connected:
|
|
if not self.__cap.isOpened():
|
|
raise Exception('frame is None')
|
|
ok, frame = self.__cap.read()
|
|
if ok:
|
|
self.__frame = frame
|
|
self.__framing_started = True
|
|
self.__frame_error_occurred = 0
|
|
self.cameraStatus.emit(1)
|
|
sleep(1/self.__fps)
|
|
elif self.__framing_started:
|
|
raise Exception('frame is None')
|
|
else:
|
|
sleep(0.2)
|
|
except Exception as e:
|
|
print(f"Error while read frame from camera {e}")
|
|
self.__frame_error_occurred += 1
|
|
sleep(0.2)
|
|
|
|
def stop(self):
|
|
self.__reconnecting = False
|
|
self.__running = False
|
|
if self.__stream_th:
|
|
self.__stream_th.join(0.1)
|
|
if self.__reconnect_thread:
|
|
self.__reconnect_thread.join(0.1)
|
|
|
|
def get_frame(self) -> Optional[np.ndarray]:
|
|
if self.__frame is not None:
|
|
return self.__frame.astype(np.uint8)
|
|
return None
|