#import os #os.environ['YOLO_VERBOSE'] = "false" from threading import Event, Thread from typing import List import numpy as np from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty #from icecream import ic from detector import Detector from detector.utils import get_bbox_by_point from tracker import Tracker from video_streamer.videostreamer import VideoStreamer from time import sleep import time from PyQt5.QtCore import QObject, pyqtSignal import ctypes from ctypes import c_int64 class Core(QThread): newFrame = pyqtSignal(object, int, bool,ctypes.c_int64) coordsUpdated = pyqtSignal(int, object, bool) def __init__(self, video_sources: List[VideoStreamer], parent=None): super(QThread, self).__init__(parent) self.__detector = Detector(classes=[0, 2, 5, 7]) self.__tracker = Tracker() self.__video_sources = video_sources self.__processing_source = video_sources[0] self.__detection_roi = list() self.__is_detecting = False self.__detection_thread = None self.__thickness = 2 self.__detection_bboxes = np.empty([]) self.__detection_frame = None self.__is_tracking = False self.__tracking_thread = None self.__processing_id = 0 # ic() def set_thickness(self, thickness: int): self.__thickness = thickness def set_source(self, source_id: int): self.__processing_source = self.__video_sources[source_id] self.__processing_id = source_id def set_video_sources(self, video_sources: List[VideoStreamer]): self.__video_sources = video_sources self.set_source(0) def __detection(self): while self.__is_detecting: try: source = self.__processing_source roi = self.__detection_roi frame = source.get_frame() cropped_frame = frame[roi[1]:roi[3], roi[0]:roi[2]] results = self.__detector.predict(cropped_frame) global_bboxes = list() for result in results: cls = result[0] bbox = result[1:] bbox[:2] += roi[:2] global_bboxes.append(bbox) # color = (0, 0, 255) if cls == 0 else (80, 127, 255) # self.__draw_bbox(frame, bbox, color) self.newFrame.emit(global_bboxes, self.__processing_id, True) self.__detection_bboxes = np.array(global_bboxes) self.__detection_frame = frame.copy() sleep(0.03) except Exception as e: print(e) sleep(0.1) def __tracking(self): source = self.__processing_source while self.__is_tracking: ctime = c_int64(int(time.time() * 1000)) # Convert to c_int64 frame = source.get_frame() bbox, success = self.__tracker.update(frame) center = None if bbox is not None: center = bbox[:2] + bbox[2:] // 2 self.coordsUpdated.emit(self.__processing_id, center, success) self.newFrame.emit([bbox], self.__processing_id, False, ctime) sleep(0.01) else: self.newFrame.emit([bbox], self.__processing_id, False, ctime) sleep(0.05) def start_detect(self, x: int, y: int, w: int, h: int): self.__detection_roi = [x, y, x + w, y + h] if not self.__is_detecting: if self.__detection_thread is not None: self.__detection_thread.join() self.__is_detecting = True self.__detection_thread = Thread(target=self.__detection) self.__detection_thread.start() def stop_detection(self): self.__is_detecting = False if self.__detection_thread is not None: self.__detection_thread.join() self.__detection_thread = None def start_track(self, x: int, y: int, w: int = 0, h: int = 0): try: self.__is_detecting = False self.__is_tracking = False bbox = None if w == 0: if len(self.__detection_bboxes): bbox = get_bbox_by_point(self.__detection_bboxes, np.array([x, y])) frame = self.__detection_frame else: bbox = np.array([x, y, w, h]) frame = self.__processing_source.get_frame() self.__tracker.stop() if bbox is not None: self.__tracker.init(frame, bbox) else: return except Exception as e: print(e) return if self.__tracking_thread is not None: self.__tracking_thread.join() self.__is_tracking = True self.__tracking_thread = Thread(target=self.__tracking) self.__tracking_thread.start() sleep(0.03) def stop_track(self): self.stop_detection() self.__tracker.stop() self.__is_tracking = False if self.__tracking_thread is not None: self.__tracking_thread.join() self.__tracking_thread = None def __draw_bbox(self, img: np.ndarray, bbox, color): thickness = self.__thickness # cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2] + bbox[0], bbox[3] + bbox[1]), # color, thickness)