|
@ -7,8 +7,8 @@ from typing import List |
|
|
import numpy as np |
|
|
import numpy as np |
|
|
from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty |
|
|
from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty |
|
|
#from icecream import ic |
|
|
#from icecream import ic |
|
|
|
|
|
|
|
|
from detector import Detector |
|
|
|
|
|
|
|
|
# |
|
|
|
|
|
# from detector import Detector |
|
|
from detector.utils import get_bbox_by_point |
|
|
from detector.utils import get_bbox_by_point |
|
|
from tracker import Tracker |
|
|
from tracker import Tracker |
|
|
from video_streamer.videostreamer import VideoStreamer |
|
|
from video_streamer.videostreamer import VideoStreamer |
|
@ -27,7 +27,7 @@ class Core(QThread): |
|
|
def __init__(self, video_sources: List[VideoStreamer], parent=None): |
|
|
def __init__(self, video_sources: List[VideoStreamer], parent=None): |
|
|
super(QThread, self).__init__(parent) |
|
|
super(QThread, self).__init__(parent) |
|
|
|
|
|
|
|
|
self.__detector = Detector(classes=[0, 2, 5, 7]) |
|
|
|
|
|
|
|
|
# self.__detector = Detector(classes=[0, 2, 5, 7]) |
|
|
self.__tracker = Tracker() |
|
|
self.__tracker = Tracker() |
|
|
|
|
|
|
|
|
self.__video_sources = video_sources |
|
|
self.__video_sources = video_sources |
|
@ -58,29 +58,30 @@ class Core(QThread): |
|
|
self.set_source(0) |
|
|
self.set_source(0) |
|
|
|
|
|
|
|
|
def __detection(self): |
|
|
def __detection(self): |
|
|
while self.__is_detecting: |
|
|
|
|
|
try: |
|
|
|
|
|
source = self.__processing_source |
|
|
|
|
|
roi = self.__detection_roi |
|
|
|
|
|
frame = source.get_frame() |
|
|
|
|
|
cropped_frame = frame[roi[1]:roi[3], roi[0]:roi[2]] |
|
|
|
|
|
results = self.__detector.predict(cropped_frame) |
|
|
|
|
|
global_bboxes = list() |
|
|
|
|
|
for result in results: |
|
|
|
|
|
cls = result[0] |
|
|
|
|
|
bbox = result[1:] |
|
|
|
|
|
bbox[:2] += roi[:2] |
|
|
|
|
|
global_bboxes.append(bbox) |
|
|
|
|
|
# color = (0, 0, 255) if cls == 0 else (80, 127, 255) |
|
|
|
|
|
# self.__draw_bbox(frame, bbox, color) |
|
|
|
|
|
|
|
|
|
|
|
self.newFrame.emit(global_bboxes, self.__processing_id, True) |
|
|
|
|
|
self.__detection_bboxes = np.array(global_bboxes) |
|
|
|
|
|
self.__detection_frame = frame.copy() |
|
|
|
|
|
sleep(0.03) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(e) |
|
|
|
|
|
sleep(0.1) |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
# while self.__is_detecting: |
|
|
|
|
|
# try: |
|
|
|
|
|
# source = self.__processing_source |
|
|
|
|
|
# roi = self.__detection_roi |
|
|
|
|
|
# frame = source.get_frame() |
|
|
|
|
|
# cropped_frame = frame[roi[1]:roi[3], roi[0]:roi[2]] |
|
|
|
|
|
# results = self.__detector.predict(cropped_frame) |
|
|
|
|
|
# global_bboxes = list() |
|
|
|
|
|
# for result in results: |
|
|
|
|
|
# cls = result[0] |
|
|
|
|
|
# bbox = result[1:] |
|
|
|
|
|
# bbox[:2] += roi[:2] |
|
|
|
|
|
# global_bboxes.append(bbox) |
|
|
|
|
|
# # color = (0, 0, 255) if cls == 0 else (80, 127, 255) |
|
|
|
|
|
# # self.__draw_bbox(frame, bbox, color) |
|
|
|
|
|
# |
|
|
|
|
|
# self.newFrame.emit(global_bboxes, self.__processing_id, True) |
|
|
|
|
|
# self.__detection_bboxes = np.array(global_bboxes) |
|
|
|
|
|
# self.__detection_frame = frame.copy() |
|
|
|
|
|
# sleep(0.03) |
|
|
|
|
|
# except Exception as e: |
|
|
|
|
|
# print(e) |
|
|
|
|
|
# sleep(0.1) |
|
|
|
|
|
|
|
|
def __tracking(self): |
|
|
def __tracking(self): |
|
|
source = self.__processing_source |
|
|
source = self.__processing_source |
|
@ -99,21 +100,23 @@ class Core(QThread): |
|
|
sleep(0.05) |
|
|
sleep(0.05) |
|
|
|
|
|
|
|
|
def start_detect(self, x: int, y: int, w: int, h: int): |
|
|
def start_detect(self, x: int, y: int, w: int, h: int): |
|
|
self.__detection_roi = [x, y, x + w, y + h] |
|
|
|
|
|
|
|
|
|
|
|
if not self.__is_detecting: |
|
|
|
|
|
if self.__detection_thread is not None: |
|
|
|
|
|
self.__detection_thread.join() |
|
|
|
|
|
self.__is_detecting = True |
|
|
|
|
|
self.__detection_thread = Thread(target=self.__detection) |
|
|
|
|
|
self.__detection_thread.start() |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
# self.__detection_roi = [x, y, x + w, y + h] |
|
|
|
|
|
# |
|
|
|
|
|
# if not self.__is_detecting: |
|
|
|
|
|
# if self.__detection_thread is not None: |
|
|
|
|
|
# self.__detection_thread.join() |
|
|
|
|
|
# self.__is_detecting = True |
|
|
|
|
|
# self.__detection_thread = Thread(target=self.__detection) |
|
|
|
|
|
# self.__detection_thread.start() |
|
|
|
|
|
|
|
|
def stop_detection(self): |
|
|
def stop_detection(self): |
|
|
self.__is_detecting = False |
|
|
|
|
|
if self.__detection_thread is not None: |
|
|
|
|
|
self.__detection_thread.join() |
|
|
|
|
|
|
|
|
|
|
|
self.__detection_thread = None |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
# self.__is_detecting = False |
|
|
|
|
|
# if self.__detection_thread is not None: |
|
|
|
|
|
# self.__detection_thread.join() |
|
|
|
|
|
# |
|
|
|
|
|
# self.__detection_thread = None |
|
|
|
|
|
|
|
|
def start_track(self, x: int, y: int, w: int = 0, h: int = 0): |
|
|
def start_track(self, x: int, y: int, w: int = 0, h: int = 0): |
|
|
try: |
|
|
try: |
|
|