|
@ -1,31 +1,31 @@ |
|
|
#import os |
|
|
|
|
|
#os.environ['YOLO_VERBOSE'] = "false" |
|
|
|
|
|
|
|
|
|
|
|
from threading import Event, Thread |
|
|
|
|
|
from typing import List |
|
|
|
|
|
|
|
|
import datetime |
|
|
|
|
|
|
|
|
|
|
|
import pygame |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import numpy as np |
|
|
import torch |
|
|
import torch |
|
|
|
|
|
from threading import Event, Thread |
|
|
|
|
|
from typing import List |
|
|
from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty |
|
|
from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty |
|
|
#from icecream import ic |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from matplotlib import pyplot as plt |
|
|
from detector import Detector |
|
|
from detector import Detector |
|
|
from detector.utils import get_bbox_by_point |
|
|
from detector.utils import get_bbox_by_point |
|
|
from tracker import Tracker |
|
|
from tracker import Tracker |
|
|
from video_streamer.videostreamer import VideoStreamer |
|
|
from video_streamer.videostreamer import VideoStreamer |
|
|
from time import sleep |
|
|
from time import sleep |
|
|
import time |
|
|
import time |
|
|
|
|
|
|
|
|
from PyQt5.QtCore import QObject, pyqtSignal |
|
|
from PyQt5.QtCore import QObject, pyqtSignal |
|
|
import ctypes |
|
|
import ctypes |
|
|
from ctypes import c_int64 |
|
|
from ctypes import c_int64 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
showTrack = False |
|
|
|
|
|
|
|
|
class Core(QThread): |
|
|
class Core(QThread): |
|
|
newFrame = pyqtSignal(object, int, bool,ctypes.c_int64) |
|
|
|
|
|
|
|
|
newFrame = pyqtSignal(object, int, bool, ctypes.c_int64) |
|
|
coordsUpdated = pyqtSignal(int, object, bool) |
|
|
coordsUpdated = pyqtSignal(int, object, bool) |
|
|
|
|
|
|
|
|
def __init__(self, video_sources: List[VideoStreamer], tracker = None, detector = None , parent=None): |
|
|
|
|
|
|
|
|
def __init__(self, video_sources: List[VideoStreamer], tracker=None, detector=None, parent=None): |
|
|
super(QThread, self).__init__(parent) |
|
|
super(QThread, self).__init__(parent) |
|
|
|
|
|
|
|
|
self.__detector = detector |
|
|
self.__detector = detector |
|
@ -45,7 +45,12 @@ class Core(QThread): |
|
|
self.__tracking_thread = None |
|
|
self.__tracking_thread = None |
|
|
|
|
|
|
|
|
self.__processing_id = 0 |
|
|
self.__processing_id = 0 |
|
|
# ic() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.__frame = None # Frame property for Pygame |
|
|
|
|
|
|
|
|
|
|
|
@pyqtProperty(np.ndarray) |
|
|
|
|
|
def frame(self): |
|
|
|
|
|
return self.__frame |
|
|
|
|
|
|
|
|
def set_thickness(self, thickness: int): |
|
|
def set_thickness(self, thickness: int): |
|
|
self.__thickness = thickness |
|
|
self.__thickness = thickness |
|
@ -73,10 +78,8 @@ class Core(QThread): |
|
|
bbox = result[1:] |
|
|
bbox = result[1:] |
|
|
bbox[:2] += roi[:2] |
|
|
bbox[:2] += roi[:2] |
|
|
global_bboxes.append(bbox) |
|
|
global_bboxes.append(bbox) |
|
|
# color = (0, 0, 255) if cls == 0 else (80, 127, 255) |
|
|
|
|
|
# self.__draw_bbox(frame, bbox, color) |
|
|
|
|
|
|
|
|
|
|
|
self.newFrame.emit(global_bboxes, self.__processing_id, True, c_int64(int(time.time()*1e3))) |
|
|
|
|
|
|
|
|
self.newFrame.emit(global_bboxes, self.__processing_id, True, c_int64(int(time.time() * 1e3))) |
|
|
self.__detection_bboxes = np.array(global_bboxes) |
|
|
self.__detection_bboxes = np.array(global_bboxes) |
|
|
self.__detection_frame = frame.copy() |
|
|
self.__detection_frame = frame.copy() |
|
|
sleep(0.03) |
|
|
sleep(0.03) |
|
@ -84,22 +87,94 @@ class Core(QThread): |
|
|
print(e) |
|
|
print(e) |
|
|
sleep(0.1) |
|
|
sleep(0.1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __tracking(self): |
|
|
def __tracking(self): |
|
|
source = self.__processing_source |
|
|
source = self.__processing_source |
|
|
|
|
|
if showTrack: |
|
|
|
|
|
pygame.init() |
|
|
|
|
|
|
|
|
|
|
|
# Get actual screen resolution |
|
|
|
|
|
info = pygame.display.Info() |
|
|
|
|
|
screen_width, screen_height = info.current_w, info.current_h |
|
|
|
|
|
screen = pygame.display.set_mode((screen_width, screen_height), pygame.FULLSCREEN) |
|
|
|
|
|
pygame.display.set_caption('Tracking Frame') |
|
|
|
|
|
|
|
|
|
|
|
clock = pygame.time.Clock() # Add a clock to control frame rate |
|
|
|
|
|
|
|
|
while self.__is_tracking: |
|
|
while self.__is_tracking: |
|
|
|
|
|
if showTrack: |
|
|
|
|
|
for event in pygame.event.get(): # Prevent freezing by handling events |
|
|
|
|
|
if event.type == pygame.QUIT: |
|
|
|
|
|
pygame.quit() |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
ctime = c_int64(int(time.time() * 1000)) # Convert to c_int64 |
|
|
ctime = c_int64(int(time.time() * 1000)) # Convert to c_int64 |
|
|
|
|
|
|
|
|
frame = source.get_frame() |
|
|
frame = source.get_frame() |
|
|
bbox, success = self.__tracker.update(frame) |
|
|
bbox, success = self.__tracker.update(frame) |
|
|
center = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if bbox is not None: |
|
|
if bbox is not None: |
|
|
center = bbox[:2] + bbox[2:] // 2 |
|
|
center = bbox[:2] + bbox[2:] // 2 |
|
|
self.coordsUpdated.emit(self.__processing_id, center, success) |
|
|
self.coordsUpdated.emit(self.__processing_id, center, success) |
|
|
self.newFrame.emit([bbox], self.__processing_id, False, ctime) |
|
|
self.newFrame.emit([bbox], self.__processing_id, False, ctime) |
|
|
sleep(0.01) |
|
|
|
|
|
else: |
|
|
|
|
|
# self.newFrame.emit([bbox], self.__processing_id, False, ctime) |
|
|
|
|
|
print("null bbox") |
|
|
|
|
|
sleep(0.05) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if showTrack: |
|
|
|
|
|
x, y, w, h = map(int, bbox) |
|
|
|
|
|
box_color = (0, 255, 0) if success else (255, 0, 0) |
|
|
|
|
|
cv2.rectangle(frame, (x, y), (x + w, y + h), box_color, 2) |
|
|
|
|
|
|
|
|
|
|
|
# Convert OpenCV frame (BGR) to RGB |
|
|
|
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
|
|
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
|
|
|
|
font_scale = 2.25 |
|
|
|
|
|
font_color = (255, 255, 0) |
|
|
|
|
|
thickness = 6 |
|
|
|
|
|
position = (50, 450) # Bottom-left corner of the text in the image |
|
|
|
|
|
now = datetime.datetime.now() |
|
|
|
|
|
time_string = now.strftime("%H:%M:%S.%f")[:-3] |
|
|
|
|
|
|
|
|
|
|
|
# 4. Use cv2.putText() to write time on the image |
|
|
|
|
|
cv2.putText(frame, time_string, position, font, font_scale, font_color, thickness, cv2.LINE_AA) |
|
|
|
|
|
cv2.putText(frame, f"{ctime}", (50, 380), font, font_scale, (255,255,255), thickness, cv2.LINE_AA) |
|
|
|
|
|
# print(ctime) |
|
|
|
|
|
|
|
|
|
|
|
frame = cv2.flip(frame, 1) # Flip horizontally |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Resize frame while maintaining aspect ratio |
|
|
|
|
|
frame_height, frame_width, _ = frame.shape |
|
|
|
|
|
aspect_ratio = frame_width / frame_height |
|
|
|
|
|
|
|
|
|
|
|
if aspect_ratio > (screen_width / screen_height): # Wider than screen |
|
|
|
|
|
new_width = screen_width |
|
|
|
|
|
new_height = int(screen_width / aspect_ratio) |
|
|
|
|
|
else: # Taller than screen |
|
|
|
|
|
new_height = screen_height |
|
|
|
|
|
new_width = int(screen_height * aspect_ratio) |
|
|
|
|
|
|
|
|
|
|
|
resized_frame = cv2.resize(frame, (new_width, new_height)) |
|
|
|
|
|
|
|
|
|
|
|
# Convert to Pygame surface without unnecessary rotation |
|
|
|
|
|
frame_surface = pygame.surfarray.make_surface(resized_frame) |
|
|
|
|
|
|
|
|
|
|
|
# Optional: If rotation is needed, use pygame.transform.rotate() |
|
|
|
|
|
frame_surface = pygame.transform.rotate(frame_surface, -90) # Example rotation |
|
|
|
|
|
|
|
|
|
|
|
# Center the frame |
|
|
|
|
|
x_offset = (screen_width - new_width) // 2 |
|
|
|
|
|
y_offset = (screen_height - new_height) // 2 |
|
|
|
|
|
|
|
|
|
|
|
screen.fill((0, 0, 0)) # Clear screen |
|
|
|
|
|
screen.blit(frame_surface, (x_offset, y_offset)) |
|
|
|
|
|
pygame.display.flip() |
|
|
|
|
|
|
|
|
|
|
|
clock.tick(30) # Limit FPS to prevent excessive CPU usage |
|
|
|
|
|
|
|
|
def start_detect(self, x: int, y: int, w: int, h: int): |
|
|
def start_detect(self, x: int, y: int, w: int, h: int): |
|
|
self.__detection_roi = [x, y, x + w, y + h] |
|
|
self.__detection_roi = [x, y, x + w, y + h] |
|
@ -113,54 +188,53 @@ class Core(QThread): |
|
|
|
|
|
|
|
|
def stop_detection(self): |
|
|
def stop_detection(self): |
|
|
self.__is_detecting = False |
|
|
self.__is_detecting = False |
|
|
if self.__detection_thread is not None: |
|
|
|
|
|
|
|
|
if self.__detection_thread is not None and self.__detection_thread.is_alive(): |
|
|
self.__detection_thread.join() |
|
|
self.__detection_thread.join() |
|
|
|
|
|
|
|
|
self.__detection_thread = None |
|
|
self.__detection_thread = None |
|
|
|
|
|
|
|
|
def start_track(self, x: int, y: int, w: int = 0, h: int = 0): |
|
|
def start_track(self, x: int, y: int, w: int = 0, h: int = 0): |
|
|
print(f"start tracking: {x}, {y}, {w}, {h}") |
|
|
|
|
|
try: |
|
|
|
|
|
self.__is_detecting = False |
|
|
|
|
|
self.__is_tracking = False |
|
|
|
|
|
bbox = None |
|
|
|
|
|
if w == 0: |
|
|
|
|
|
if len(self.__detection_bboxes): |
|
|
|
|
|
bbox = get_bbox_by_point(self.__detection_bboxes, np.array([x, y])) |
|
|
|
|
|
frame = self.__detection_frame |
|
|
|
|
|
else: |
|
|
|
|
|
bbox = np.array([x, y, w, h]) |
|
|
|
|
|
frame = self.__processing_source.get_frame() |
|
|
|
|
|
|
|
|
print(f"start tracking: {x}, {y}, {w}, {h}") |
|
|
|
|
|
try: |
|
|
|
|
|
self.__is_detecting = False |
|
|
|
|
|
self.__is_tracking = False |
|
|
|
|
|
bbox = None |
|
|
|
|
|
if w == 0: |
|
|
|
|
|
if len(self.__detection_bboxes): |
|
|
|
|
|
bbox = get_bbox_by_point(self.__detection_bboxes, np.array([x, y])) |
|
|
|
|
|
frame = self.__detection_frame |
|
|
|
|
|
else: |
|
|
|
|
|
bbox = np.array([x, y, w, h]) |
|
|
|
|
|
frame = self.__processing_source.get_frame() |
|
|
|
|
|
|
|
|
self.__tracker.stop() |
|
|
|
|
|
|
|
|
self.__tracker.stop() |
|
|
|
|
|
|
|
|
if bbox is not None: |
|
|
|
|
|
self.__tracker.init(frame, bbox) |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
if bbox is not None: |
|
|
|
|
|
self.__tracker.init(frame, bbox) |
|
|
|
|
|
else: |
|
|
|
|
|
return |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(e) |
|
|
return |
|
|
return |
|
|
except Exception as e: |
|
|
|
|
|
print(e) |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
if self.__tracking_thread is not None: |
|
|
|
|
|
self.__tracking_thread.join() |
|
|
|
|
|
self.stop_track() |
|
|
|
|
|
self.__is_tracking = True |
|
|
|
|
|
self.__tracking_thread = Thread(target=self.__tracking) |
|
|
|
|
|
self.__tracking_thread.start() |
|
|
|
|
|
sleep(0.03) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.__tracking_thread is not None: |
|
|
|
|
|
self.__tracking_thread.join() |
|
|
|
|
|
self.stop_track() |
|
|
|
|
|
self.__is_tracking = True |
|
|
|
|
|
self.__tracking_thread = Thread(target=self.__tracking) |
|
|
|
|
|
self.__tracking_thread.start() |
|
|
|
|
|
sleep(0.03) |
|
|
|
|
|
|
|
|
def stop_track(self): |
|
|
def stop_track(self): |
|
|
|
|
|
if showTrack: |
|
|
|
|
|
pygame.quit() |
|
|
print("stop tracking") |
|
|
print("stop tracking") |
|
|
self.stop_detection() |
|
|
self.stop_detection() |
|
|
self.__tracker.stop() |
|
|
self.__tracker.stop() |
|
|
self.__is_tracking = False |
|
|
self.__is_tracking = False |
|
|
if self.__tracking_thread is not None: |
|
|
|
|
|
self.__tracking_thread.join() |
|
|
|
|
|
|
|
|
|
|
|
self.__tracking_thread = None |
|
|
self.__tracking_thread = None |
|
|
|
|
|
|
|
|
def __draw_bbox(self, img: np.ndarray, bbox, color): |
|
|
def __draw_bbox(self, img: np.ndarray, bbox, color): |
|
|
thickness = self.__thickness |
|
|
thickness = self.__thickness |
|
|
# cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2] + bbox[0], bbox[3] + bbox[1]), |
|
|
|
|
|
# color, thickness) |
|
|
|
|
|
|
|
|
cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2] + bbox[0], bbox[3] + bbox[1]), |
|
|
|
|
|
color, thickness) |