#from multiprocessing import shared_memory from typing import Optional import cv2 import glob import numpy as np from time import sleep from threading import Thread from .utils import get_stream_codec from PyQt5.QtCore import QObject, pyqtSignal class GstVideoStreamer(QObject): cameraStatus = pyqtSignal(int) def __init__(self, stream_source, shape, id_, is_init=None, fps: int = 4, nChannels: int = 3, shm_name: Optional[str] = None, flip=False): super().__init__() self.__stream_source = stream_source self.__id = id_ self.__is_init = is_init self.__shape = shape self.__fps = fps self.__nChannels = nChannels self.__shm_name = shm_name self.__shm = None self.__cap = None self.__flip = flip self.__running = False self.connected = False self.__frame_error_occurred = 0 self.__frame: np.ndarray = np.zeros(self.__shape, 'uint8') self.__framing_started = False self.__reconnect_thread = None self.__reconnect_interval = 1 # in seconds self.__reconnecting = False self.__start() def __init_shape(self, nChannels): width = int(self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.__cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.__shape[0] = height self.__shape[1] = width self.__shape[2] = nChannels self.__d_size = np.dtype('uint8').itemsize * np.prod(self.__shape) @property def id(self): return self.__id def __init_camera(self): # region connecting to camera self.__cap = self.__connect_to_camera(self.__stream_source) sleep(2) # endregion # self.__init_shape(self.__nChannels) self.connected = True self.__frame_error_occurred = 0 self.__frame: np.ndarray = np.zeros(self.__shape, 'uint8') self.__framing_started = False self.__reconnect_thread = None self.__reconnect_interval = 1 # in seconds self.__reconnecting = False def __start(self): def start(): self.__init_camera() self.__read_frame() self.__stream_th = Thread(target=start) self.__stream_th.start() @property def shape(self): return self.__shape @property def d_size(self): return self.__d_size def __connect_to_camera(self, stream_source): codec = None while codec is None: codec = get_stream_codec(stream_source) sleep(0.5) pipeline = '' if codec == 'hevc': pipeline = 'rtph265depay ! h265parse ! avdec_h265' elif codec == 'h264': pipeline = 'rtph264depay ! h264parse ! avdec_h264' stream_pipeline = f'rtspsrc protocols=tcp location="{stream_source}" latency=100 ! queue {"! " + pipeline if pipeline else ""} {"! videoflip method=rotate-180" if self.__flip else ""} ! videoconvert ! appsink max-buffers=1 drop=true' camera = cv2.VideoCapture(stream_pipeline, cv2.CAP_GSTREAMER) while not camera.isOpened(): sleep(1) camera.open(stream_pipeline, cv2.CAP_GSTREAMER) print('camera opened') camera.setExceptionMode(True) return camera def __start_reconnect_thread(self): self.connected = False self.__reconnecting = True self.__reconnect_thread = Thread(target=self.__reconnect_loop) self.__reconnect_thread.start() def __reconnect_loop(self): print('start camera reconnecting...') self.__framing_started = False while self.__reconnecting: try: if self.__cap is not None: self.__cap.release() sleep(0.1) del self.__cap self.__cap = None self.__cap = self.__connect_to_camera(self.__stream_source) assert self.__cap.isOpened() assert self.__cap.read()[1] is not None self.__frame_error_occurred = 0 self.__reconnecting = False self.connected = True print('camera reconnected') break except Exception as e: print(e) sleep(self.__reconnect_interval) def __read_frame(self): self.__running = True while self.__running: if self.__frame_error_occurred > 5 and not self.__reconnecting: self.__frame = None self.__start_reconnect_thread() self.cameraStatus.emit(0) elif self.__frame_error_occurred > 600: pass try: if self.connected: if not self.__cap.isOpened(): raise Exception('frame is None') ok, frame = self.__cap.read() if ok: self.__frame = frame self.__framing_started = True self.__frame_error_occurred = 0 self.cameraStatus.emit(1) sleep(1/self.__fps) elif self.__framing_started: raise Exception('frame is None') else: sleep(0.2) except Exception as e: print(f"Error while read frame from camera {e}") self.__frame_error_occurred += 1 sleep(0.2) def stop(self): self.__reconnecting = False self.__running = False if self.__stream_th: self.__stream_th.join(0.1) if self.__reconnect_thread: self.__reconnect_thread.join(0.1) def get_frame(self) -> Optional[np.ndarray]: if self.__frame is not None: return self.__frame.astype(np.uint8) return None