@ -1,34 +1,35 @@
#import os
#os.environ['YOLO_VERBOSE'] = "false"
import datetime
import pygame
import cv2
import numpy as np
import torch
from threading import Event , Thread
from threading import Event , Thread
from typing import List
from typing import List
import numpy as np
from PyQt5.QtCore import QThread , pyqtSlot , pyqtSignal , QUrl , QDir , pyqtProperty
from PyQt5.QtCore import QThread , pyqtSlot , pyqtSignal , QUrl , QDir , pyqtProperty
#from icecream import ic
from matplotlib import pyplot as plt
from detector import Detector
from detector import Detector
from detector.utils import get_bbox_by_point
from detector.utils import get_bbox_by_point
from tracker import Tracker
from tracker import Tracker
from video_streamer.videostreamer import VideoStreamer
from video_streamer.videostreamer import VideoStreamer
from time import sleep
from time import sleep
import time
import time
from PyQt5.QtCore import QObject , pyqtSignal
from PyQt5.QtCore import QObject , pyqtSignal
import ctypes
import ctypes
from ctypes import c_int64
from ctypes import c_int64
showTrack = False
class Core ( QThread ) :
class Core ( QThread ) :
newFrame = pyqtSignal ( object , int , bool , ctypes . c_int64 )
newFrame = pyqtSignal ( object , int , bool , ctypes . c_int64 )
coordsUpdated = pyqtSignal ( int , object , bool )
coordsUpdated = pyqtSignal ( int , object , bool )
def __init__ ( self , video_sources : List [ VideoStreamer ] , parent = None ) :
def __init__ ( self , video_sources : List [ VideoStreamer ] , tracker = None , detector = None , parent = None ) :
super ( QThread , self ) . __init__ ( parent )
super ( QThread , self ) . __init__ ( parent )
self . __detector = Detector ( classes = [ 0 , 2 , 5 , 7 ] )
self . __tracker = Tracker ( )
self . __detector = detector
self . __tracker = tracker
self . __video_sources = video_sources
self . __video_sources = video_sources
self . __processing_source = video_sources [ 0 ]
self . __processing_source = video_sources [ 0 ]
@ -44,7 +45,12 @@ class Core(QThread):
self . __tracking_thread = None
self . __tracking_thread = None
self . __processing_id = 0
self . __processing_id = 0
# ic()
self . __frame = None # Frame property for Pygame
@pyqtProperty ( np . ndarray )
def frame ( self ) :
return self . __frame
def set_thickness ( self , thickness : int ) :
def set_thickness ( self , thickness : int ) :
self . __thickness = thickness
self . __thickness = thickness
@ -60,6 +66,7 @@ class Core(QThread):
def __detection ( self ) :
def __detection ( self ) :
while self . __is_detecting :
while self . __is_detecting :
try :
try :
torch . cuda . empty_cache ( )
source = self . __processing_source
source = self . __processing_source
roi = self . __detection_roi
roi = self . __detection_roi
frame = source . get_frame ( )
frame = source . get_frame ( )
@ -71,10 +78,8 @@ class Core(QThread):
bbox = result [ 1 : ]
bbox = result [ 1 : ]
bbox [ : 2 ] + = roi [ : 2 ]
bbox [ : 2 ] + = roi [ : 2 ]
global_bboxes . append ( bbox )
global_bboxes . append ( bbox )
# color = (0, 0, 255) if cls == 0 else (80, 127, 255)
# self.__draw_bbox(frame, bbox, color)
self . newFrame . emit ( global_bboxes , self . __processing_id , True )
self . newFrame . emit ( global_bboxes , self . __processing_id , True , c_int64 ( int ( time . time ( ) * 1e3 ) ) )
self . __detection_bboxes = np . array ( global_bboxes )
self . __detection_bboxes = np . array ( global_bboxes )
self . __detection_frame = frame . copy ( )
self . __detection_frame = frame . copy ( )
sleep ( 0.03 )
sleep ( 0.03 )
@ -82,21 +87,94 @@ class Core(QThread):
print ( e )
print ( e )
sleep ( 0.1 )
sleep ( 0.1 )
def __tracking ( self ) :
def __tracking ( self ) :
source = self . __processing_source
source = self . __processing_source
if showTrack :
pygame . init ( )
# Get actual screen resolution
info = pygame . display . Info ( )
screen_width , screen_height = info . current_w , info . current_h
screen = pygame . display . set_mode ( ( screen_width , screen_height ) , pygame . FULLSCREEN )
pygame . display . set_caption ( ' Tracking Frame ' )
clock = pygame . time . Clock ( ) # Add a clock to control frame rate
while self . __is_tracking :
while self . __is_tracking :
if showTrack :
for event in pygame . event . get ( ) : # Prevent freezing by handling events
if event . type == pygame . QUIT :
pygame . quit ( )
return
ctime = c_int64 ( int ( time . time ( ) * 1000 ) ) # Convert to c_int64
ctime = c_int64 ( int ( time . time ( ) * 1000 ) ) # Convert to c_int64
frame = source . get_frame ( )
frame = source . get_frame ( )
bbox , success = self . __tracker . update ( frame )
bbox , success = self . __tracker . update ( frame )
center = None
if bbox is not None :
if bbox is not None :
center = bbox [ : 2 ] + bbox [ 2 : ] / / 2
center = bbox [ : 2 ] + bbox [ 2 : ] / / 2
self . coordsUpdated . emit ( self . __processing_id , center , success )
self . coordsUpdated . emit ( self . __processing_id , center , success )
self . newFrame . emit ( [ bbox ] , self . __processing_id , False , ctime )
self . newFrame . emit ( [ bbox ] , self . __processing_id , False , ctime )
sleep ( 0.01 )
else :
self . newFrame . emit ( [ bbox ] , self . __processing_id , False , ctime )
sleep ( 0.05 )
if showTrack :
x , y , w , h = map ( int , bbox )
box_color = ( 0 , 255 , 0 ) if success else ( 255 , 0 , 0 )
cv2 . rectangle ( frame , ( x , y ) , ( x + w , y + h ) , box_color , 2 )
# Convert OpenCV frame (BGR) to RGB
frame = cv2 . cvtColor ( frame , cv2 . COLOR_BGR2RGB )
font = cv2 . FONT_HERSHEY_SIMPLEX
font_scale = 2.25
font_color = ( 255 , 255 , 0 )
thickness = 6
position = ( 50 , 450 ) # Bottom-left corner of the text in the image
now = datetime . datetime . now ( )
time_string = now . strftime ( " % H: % M: % S. %f " ) [ : - 3 ]
# 4. Use cv2.putText() to write time on the image
cv2 . putText ( frame , time_string , position , font , font_scale , font_color , thickness , cv2 . LINE_AA )
cv2 . putText ( frame , f " {ctime} " , ( 50 , 380 ) , font , font_scale , ( 255 , 255 , 255 ) , thickness , cv2 . LINE_AA )
# print(ctime)
frame = cv2 . flip ( frame , 1 ) # Flip horizontally
# Resize frame while maintaining aspect ratio
frame_height , frame_width , _ = frame . shape
aspect_ratio = frame_width / frame_height
if aspect_ratio > ( screen_width / screen_height ) : # Wider than screen
new_width = screen_width
new_height = int ( screen_width / aspect_ratio )
else : # Taller than screen
new_height = screen_height
new_width = int ( screen_height * aspect_ratio )
resized_frame = cv2 . resize ( frame , ( new_width , new_height ) )
# Convert to Pygame surface without unnecessary rotation
frame_surface = pygame . surfarray . make_surface ( resized_frame )
# Optional: If rotation is needed, use pygame.transform.rotate()
frame_surface = pygame . transform . rotate ( frame_surface , - 90 ) # Example rotation
# Center the frame
x_offset = ( screen_width - new_width ) / / 2
y_offset = ( screen_height - new_height ) / / 2
screen . fill ( ( 0 , 0 , 0 ) ) # Clear screen
screen . blit ( frame_surface , ( x_offset , y_offset ) )
pygame . display . flip ( )
clock . tick ( 30 ) # Limit FPS to prevent excessive CPU usage
def start_detect ( self , x : int , y : int , w : int , h : int ) :
def start_detect ( self , x : int , y : int , w : int , h : int ) :
self . __detection_roi = [ x , y , x + w , y + h ]
self . __detection_roi = [ x , y , x + w , y + h ]
@ -110,12 +188,13 @@ class Core(QThread):
def stop_detection ( self ) :
def stop_detection ( self ) :
self . __is_detecting = False
self . __is_detecting = False
if self . __detection_thread is not None :
if self . __detection_thread is not None and self . __detection_thread . is_alive ( ) :
self . __detection_thread . join ( )
self . __detection_thread . join ( )
self . __detection_thread = None
self . __detection_thread = None
def start_track ( self , x : int , y : int , w : int = 0 , h : int = 0 ) :
def start_track ( self , x : int , y : int , w : int = 0 , h : int = 0 ) :
print ( f " start tracking: {x}, {y}, {w}, {h} " )
try :
try :
self . __is_detecting = False
self . __is_detecting = False
self . __is_tracking = False
self . __is_tracking = False
@ -140,21 +219,22 @@ class Core(QThread):
if self . __tracking_thread is not None :
if self . __tracking_thread is not None :
self . __tracking_thread . join ( )
self . __tracking_thread . join ( )
self . stop_track ( )
self . __is_tracking = True
self . __is_tracking = True
self . __tracking_thread = Thread ( target = self . __tracking )
self . __tracking_thread = Thread ( target = self . __tracking )
self . __tracking_thread . start ( )
self . __tracking_thread . start ( )
sleep ( 0.03 )
sleep ( 0.03 )
def stop_track ( self ) :
def stop_track ( self ) :
if showTrack :
pygame . quit ( )
print ( " stop tracking " )
self . stop_detection ( )
self . stop_detection ( )
self . __tracker . stop ( )
self . __tracker . stop ( )
self . __is_tracking = False
self . __is_tracking = False
if self . __tracking_thread is not None :
self . __tracking_thread . join ( )
self . __tracking_thread = None
self . __tracking_thread = None
def __draw_bbox ( self , img : np . ndarray , bbox , color ) :
def __draw_bbox ( self , img : np . ndarray , bbox , color ) :
thickness = self . __thickness
thickness = self . __thickness
# cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2] + bbox[0], bbox[3] + bbox[1]) ,
# color, thickness )
cv2 . rectangle ( img , ( bbox [ 0 ] , bbox [ 1 ] ) , ( bbox [ 2 ] + bbox [ 0 ] , bbox [ 3 ] + bbox [ 1 ] ) ,
color , thickness )