You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

187 lines
5.9 KiB

#from multiprocessing import shared_memory
from typing import Optional
import cv2
import glob
import numpy as np
from time import sleep
from threading import Thread
from .utils import get_stream_codec
from PyQt5.QtCore import QObject, pyqtSignal
class GstVideoStreamer(QObject):
cameraStatus = pyqtSignal(int)
def __init__(self, stream_source, shape, id_, is_init=None, fps: int = 4, nChannels: int = 3,
shm_name: Optional[str] = None, flip=False):
super().__init__()
self.__stream_source = stream_source
self.__id = id_
self.__is_init = is_init
self.__shape = shape
self.__fps = fps
self.__nChannels = nChannels
self.__shm_name = shm_name
self.__shm = None
self.__cap = None
self.__flip = flip
self.__running = False
self.connected = False
self.__frame_error_occurred = 0
self.__frame: np.ndarray = np.zeros(self.__shape, 'uint8')
self.__framing_started = False
self.__reconnect_thread = None
self.__reconnect_interval = 1 # in seconds
self.__reconnecting = False
self.__start()
def __init_shape(self, nChannels):
width = int(self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.__cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.__shape[0] = height
self.__shape[1] = width
self.__shape[2] = nChannels
self.__d_size = np.dtype('uint8').itemsize * np.prod(self.__shape)
@property
def id(self):
return self.__id
def __init_camera(self):
# region connecting to camera
self.__cap = self.__connect_to_camera(self.__stream_source)
sleep(2)
# endregion
# self.__init_shape(self.__nChannels)
self.connected = True
self.__frame_error_occurred = 0
self.__frame: np.ndarray = np.zeros(self.__shape, 'uint8')
self.__framing_started = False
self.__reconnect_thread = None
self.__reconnect_interval = 1 # in seconds
self.__reconnecting = False
def __start(self):
def start():
self.__init_camera()
self.__read_frame()
self.__stream_th = Thread(target=start)
self.__stream_th.start()
@property
def shape(self):
return self.__shape
@property
def d_size(self):
return self.__d_size
def __connect_to_camera(self, stream_source):
codec = None
while codec is None:
codec = get_stream_codec(stream_source)
sleep(0.5)
pipeline = ''
if codec == 'hevc':
pipeline = 'rtph265depay ! h265parse ! avdec_h265'
elif codec == 'h264':
pipeline = 'rtph264depay ! h264parse ! avdec_h264'
stream_pipeline = f'rtspsrc protocols=tcp location="{stream_source}" latency=100 ! queue {"! " + pipeline if pipeline else ""} {"! videoflip method=rotate-180" if self.__flip else ""} ! videoconvert ! appsink max-buffers=1 drop=true'
camera = cv2.VideoCapture(stream_pipeline,
cv2.CAP_GSTREAMER)
while not camera.isOpened():
sleep(1)
camera.open(stream_pipeline,
cv2.CAP_GSTREAMER)
print('camera opened')
camera.setExceptionMode(True)
return camera
def __start_reconnect_thread(self):
self.connected = False
self.__reconnecting = True
self.__reconnect_thread = Thread(target=self.__reconnect_loop)
self.__reconnect_thread.start()
def __reconnect_loop(self):
print('start camera reconnecting...')
self.__framing_started = False
while self.__reconnecting:
try:
if self.__cap is not None:
self.__cap.release()
sleep(0.1)
del self.__cap
self.__cap = None
self.__cap = self.__connect_to_camera(self.__stream_source)
assert self.__cap.isOpened()
assert self.__cap.read()[1] is not None
self.__frame_error_occurred = 0
self.__reconnecting = False
self.connected = True
print('camera reconnected')
break
except Exception as e:
print(e)
sleep(self.__reconnect_interval)
def __read_frame(self):
self.__running = True
while self.__running:
if self.__frame_error_occurred > 5 and not self.__reconnecting:
self.__frame = None
self.__start_reconnect_thread()
self.cameraStatus.emit(0)
elif self.__frame_error_occurred > 600:
pass
try:
if self.connected:
if not self.__cap.isOpened():
raise Exception('frame is None')
ok, frame = self.__cap.read()
if ok:
self.__frame = frame
self.__framing_started = True
self.__frame_error_occurred = 0
self.cameraStatus.emit(1)
sleep(1/self.__fps)
elif self.__framing_started:
raise Exception('frame is None')
else:
sleep(0.2)
except Exception as e:
print(f"Error while read frame from camera {e}")
self.__frame_error_occurred += 1
sleep(0.2)
def stop(self):
self.__reconnecting = False
self.__running = False
if self.__stream_th:
self.__stream_th.join(0.1)
if self.__reconnect_thread:
self.__reconnect_thread.join(0.1)
def get_frame(self) -> Optional[np.ndarray]:
if self.__frame is not None:
return self.__frame.astype(np.uint8)
return None