import datetime import threading import pygame import cv2 import numpy as np import torch from threading import Event, Thread from typing import List from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty from matplotlib import pyplot as plt from detector import Detector from detector.utils import get_bbox_by_point from tracker import Tracker from video_streamer.videostreamer import VideoStreamer from time import sleep import time from PyQt5.QtCore import QObject, pyqtSignal import ctypes from ctypes import c_int64 from server import rtsp_server, run_server, RTSPServer showTrack = False class Core(QThread): newFrame = pyqtSignal(object, int, bool, ctypes.c_int64) coordsUpdated = pyqtSignal(int, object, bool) def __init__(self, video_sources, tracker=None, detector=None, parent=None): super(QThread, self).__init__(parent) self.__detector = detector self.__tracker = tracker self.__rtspserver = rtsp_server threading.Thread(target=run_server, daemon=True).start() self.__video_sources = video_sources self.__processing_source = video_sources[0] self.__detection_roi = list() self.__is_detecting = False self.__detection_thread = None self.__thickness = 2 self.__detection_bboxes = np.empty([]) self.__detection_frame = None self.__is_tracking = False self.__tracking_thread = None self.__processing_id = 0 self.__frame = None # Frame property for Pygame @pyqtProperty(np.ndarray) def frame(self): return self.__frame def set_thickness(self, thickness: int): self.__thickness = thickness def set_source(self, source_id: int): self.__processing_source = self.__video_sources[source_id] self.__processing_id = source_id def set_video_sources(self, video_sources): if len(video_sources) >= 2: self.__video_sources = video_sources self.set_source(0) def __detection(self): while self.__is_detecting: try: torch.cuda.empty_cache() source = self.__processing_source roi = self.__detection_roi frame = source.get_frame() cropped_frame = frame[roi[1]:roi[3], roi[0]:roi[2]] results = self.__detector.predict(cropped_frame) global_bboxes = list() for result in results: cls = result[0] bbox = result[1:] bbox[:2] += roi[:2] global_bboxes.append(bbox) self.newFrame.emit(global_bboxes, self.__processing_id, True, c_int64(int(time.time() * 1e3))) self.__detection_bboxes = np.array(global_bboxes) self.__detection_frame = frame.copy() sleep(0.03) except Exception as e: print(e) sleep(0.1) def __tracking(self): source = self.__processing_source if showTrack: pygame.init() # Get actual screen resolution info = pygame.display.Info() screen_width, screen_height = info.current_w, info.current_h screen = pygame.display.set_mode((screen_width, screen_height), pygame.FULLSCREEN) pygame.display.set_caption('Tracking Frame') clock = pygame.time.Clock() # Add a clock to control frame rate while self.__is_tracking: if showTrack: for event in pygame.event.get(): # Prevent freezing by handling events if event.type == pygame.QUIT: pygame.quit() return ctime = c_int64(int(time.time() * 1000)) # Convert to c_int64 frame = source.get_frame() bbox, success = self.__tracker.update(frame) if bbox is not None: center = bbox[:2] + bbox[2:] // 2 self.coordsUpdated.emit(self.__processing_id, center, success) self.newFrame.emit([bbox], self.__processing_id, False, ctime) x, y, w, h = map(int, bbox) box_color = (0, 255, 0) if success else (255, 0, 0) cv2.rectangle(frame, (x, y), (x + w, y + h), box_color, 2) self.__rtspserver.update_frame(frame) if showTrack: # Convert OpenCV frame (BGR) to RGB frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 2.25 font_color = (255, 255, 0) thickness = 6 position = (50, 450) # Bottom-left corner of the text in the image now = datetime.datetime.now() time_string = now.strftime("%H:%M:%S.%f")[:-3] # 4. Use cv2.putText() to write time on the image cv2.putText(frame, time_string, position, font, font_scale, font_color, thickness, cv2.LINE_AA) cv2.putText(frame, f"{ctime}", (50, 380), font, font_scale, (255,255,255), thickness, cv2.LINE_AA) # print(ctime) frame = cv2.flip(frame, 1) # Flip horizontally # Resize frame while maintaining aspect ratio frame_height, frame_width, _ = frame.shape aspect_ratio = frame_width / frame_height if aspect_ratio > (screen_width / screen_height): # Wider than screen new_width = screen_width new_height = int(screen_width / aspect_ratio) else: # Taller than screen new_height = screen_height new_width = int(screen_height * aspect_ratio) resized_frame = cv2.resize(frame, (new_width, new_height)) # Convert to Pygame surface without unnecessary rotation frame_surface = pygame.surfarray.make_surface(resized_frame) # Optional: If rotation is needed, use pygame.transform.rotate() frame_surface = pygame.transform.rotate(frame_surface, -90) # Example rotation # Center the frame x_offset = (screen_width - new_width) // 2 y_offset = (screen_height - new_height) // 2 screen.fill((0, 0, 0)) # Clear screen screen.blit(frame_surface, (x_offset, y_offset)) pygame.display.flip() clock.tick(30) # Limit FPS to prevent excessive CPU usage def start_detect(self, x: int, y: int, w: int, h: int): self.__detection_roi = [x, y, x + w, y + h] if not self.__is_detecting: if self.__detection_thread is not None: self.__detection_thread.join() self.__is_detecting = True self.__detection_thread = Thread(target=self.__detection) self.__detection_thread.start() def stop_detection(self): self.__is_detecting = False if self.__detection_thread is not None and self.__detection_thread.is_alive(): self.__detection_thread.join() self.__detection_thread = None def start_track(self, x: int, y: int, w: int = 0, h: int = 0): print(f"start tracking: {x}, {y}, {w}, {h}") try: self.__is_detecting = False self.__is_tracking = False bbox = None if w == 0: if len(self.__detection_bboxes): bbox = get_bbox_by_point(self.__detection_bboxes, np.array([x, y])) frame = self.__detection_frame else: bbox = np.array([x, y, w, h]) frame = self.__processing_source.get_frame() self.__tracker.stop() if bbox is not None: self.__tracker.init(frame, bbox) else: return except Exception as e: print(e) return if self.__tracking_thread is not None: self.__tracking_thread.join() self.stop_track() self.__is_tracking = True self.__tracking_thread = Thread(target=self.__tracking) self.__tracking_thread.start() sleep(0.03) def stop_track(self): if showTrack: pygame.quit() print("stop tracking") self.stop_detection() self.__tracker.stop() self.__is_tracking = False self.__tracking_thread = None def __draw_bbox(self, img: np.ndarray, bbox, color): thickness = self.__thickness cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2] + bbox[0], bbox[3] + bbox[1]), color, thickness)