You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
160 lines
5.3 KiB
160 lines
5.3 KiB
#import os
|
|
#os.environ['YOLO_VERBOSE'] = "false"
|
|
|
|
from threading import Event, Thread
|
|
from typing import List
|
|
|
|
import numpy as np
|
|
from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty
|
|
#from icecream import ic
|
|
|
|
from detector import Detector
|
|
from detector.utils import get_bbox_by_point
|
|
from tracker import Tracker
|
|
from video_streamer.videostreamer import VideoStreamer
|
|
from time import sleep
|
|
import time
|
|
|
|
from PyQt5.QtCore import QObject, pyqtSignal
|
|
import ctypes
|
|
from ctypes import c_int64
|
|
|
|
|
|
class Core(QThread):
|
|
newFrame = pyqtSignal(object, int, bool,ctypes.c_int64)
|
|
coordsUpdated = pyqtSignal(int, object, bool)
|
|
|
|
def __init__(self, video_sources: List[VideoStreamer], parent=None):
|
|
super(QThread, self).__init__(parent)
|
|
|
|
self.__detector = Detector(classes=[0, 2, 5, 7])
|
|
self.__tracker = Tracker()
|
|
|
|
self.__video_sources = video_sources
|
|
self.__processing_source = video_sources[0]
|
|
|
|
self.__detection_roi = list()
|
|
self.__is_detecting = False
|
|
self.__detection_thread = None
|
|
self.__thickness = 2
|
|
self.__detection_bboxes = np.empty([])
|
|
self.__detection_frame = None
|
|
|
|
self.__is_tracking = False
|
|
self.__tracking_thread = None
|
|
|
|
self.__processing_id = 0
|
|
# ic()
|
|
|
|
def set_thickness(self, thickness: int):
|
|
self.__thickness = thickness
|
|
|
|
def set_source(self, source_id: int):
|
|
self.__processing_source = self.__video_sources[source_id]
|
|
self.__processing_id = source_id
|
|
|
|
def set_video_sources(self, video_sources: List[VideoStreamer]):
|
|
self.__video_sources = video_sources
|
|
self.set_source(0)
|
|
|
|
def __detection(self):
|
|
while self.__is_detecting:
|
|
try:
|
|
source = self.__processing_source
|
|
roi = self.__detection_roi
|
|
frame = source.get_frame()
|
|
cropped_frame = frame[roi[1]:roi[3], roi[0]:roi[2]]
|
|
results = self.__detector.predict(cropped_frame)
|
|
global_bboxes = list()
|
|
for result in results:
|
|
cls = result[0]
|
|
bbox = result[1:]
|
|
bbox[:2] += roi[:2]
|
|
global_bboxes.append(bbox)
|
|
# color = (0, 0, 255) if cls == 0 else (80, 127, 255)
|
|
# self.__draw_bbox(frame, bbox, color)
|
|
|
|
self.newFrame.emit(global_bboxes, self.__processing_id, True)
|
|
self.__detection_bboxes = np.array(global_bboxes)
|
|
self.__detection_frame = frame.copy()
|
|
sleep(0.03)
|
|
except Exception as e:
|
|
print(e)
|
|
sleep(0.1)
|
|
|
|
def __tracking(self):
|
|
source = self.__processing_source
|
|
while self.__is_tracking:
|
|
ctime = c_int64(int(time.time() * 1000)) # Convert to c_int64
|
|
frame = source.get_frame()
|
|
bbox, success = self.__tracker.update(frame)
|
|
center = None
|
|
if bbox is not None:
|
|
center = bbox[:2] + bbox[2:] // 2
|
|
self.coordsUpdated.emit(self.__processing_id, center, success)
|
|
self.newFrame.emit([bbox], self.__processing_id, False, ctime)
|
|
sleep(0.01)
|
|
else:
|
|
self.newFrame.emit([bbox], self.__processing_id, False, ctime)
|
|
sleep(0.05)
|
|
|
|
def start_detect(self, x: int, y: int, w: int, h: int):
|
|
self.__detection_roi = [x, y, x + w, y + h]
|
|
|
|
if not self.__is_detecting:
|
|
if self.__detection_thread is not None:
|
|
self.__detection_thread.join()
|
|
self.__is_detecting = True
|
|
self.__detection_thread = Thread(target=self.__detection)
|
|
self.__detection_thread.start()
|
|
|
|
def stop_detection(self):
|
|
self.__is_detecting = False
|
|
if self.__detection_thread is not None:
|
|
self.__detection_thread.join()
|
|
|
|
self.__detection_thread = None
|
|
|
|
def start_track(self, x: int, y: int, w: int = 0, h: int = 0):
|
|
try:
|
|
self.__is_detecting = False
|
|
self.__is_tracking = False
|
|
bbox = None
|
|
if w == 0:
|
|
if len(self.__detection_bboxes):
|
|
bbox = get_bbox_by_point(self.__detection_bboxes, np.array([x, y]))
|
|
frame = self.__detection_frame
|
|
else:
|
|
bbox = np.array([x, y, w, h])
|
|
frame = self.__processing_source.get_frame()
|
|
|
|
self.__tracker.stop()
|
|
|
|
if bbox is not None:
|
|
self.__tracker.init(frame, bbox)
|
|
else:
|
|
return
|
|
except Exception as e:
|
|
print(e)
|
|
return
|
|
|
|
if self.__tracking_thread is not None:
|
|
self.__tracking_thread.join()
|
|
self.__is_tracking = True
|
|
self.__tracking_thread = Thread(target=self.__tracking)
|
|
self.__tracking_thread.start()
|
|
sleep(0.03)
|
|
|
|
def stop_track(self):
|
|
self.stop_detection()
|
|
self.__tracker.stop()
|
|
self.__is_tracking = False
|
|
if self.__tracking_thread is not None:
|
|
self.__tracking_thread.join()
|
|
|
|
self.__tracking_thread = None
|
|
|
|
def __draw_bbox(self, img: np.ndarray, bbox, color):
|
|
thickness = self.__thickness
|
|
# cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2] + bbox[0], bbox[3] + bbox[1]),
|
|
# color, thickness)
|