You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

160 lines
5.3 KiB

#import os
#os.environ['YOLO_VERBOSE'] = "false"
from threading import Event, Thread
from typing import List
import numpy as np
from PyQt5.QtCore import QThread, pyqtSlot, pyqtSignal, QUrl, QDir, pyqtProperty
#from icecream import ic
from detector import Detector
from detector.utils import get_bbox_by_point
from tracker import Tracker
from video_streamer.videostreamer import VideoStreamer
from time import sleep
import time
from PyQt5.QtCore import QObject, pyqtSignal
import ctypes
from ctypes import c_int64
class Core(QThread):
newFrame = pyqtSignal(object, int, bool,ctypes.c_int64)
coordsUpdated = pyqtSignal(int, object, bool)
def __init__(self, video_sources: List[VideoStreamer], parent=None):
super(QThread, self).__init__(parent)
self.__detector = Detector(classes=[0, 2, 5, 7])
self.__tracker = Tracker()
self.__video_sources = video_sources
self.__processing_source = video_sources[0]
self.__detection_roi = list()
self.__is_detecting = False
self.__detection_thread = None
self.__thickness = 2
self.__detection_bboxes = np.empty([])
self.__detection_frame = None
self.__is_tracking = False
self.__tracking_thread = None
self.__processing_id = 0
# ic()
def set_thickness(self, thickness: int):
self.__thickness = thickness
def set_source(self, source_id: int):
self.__processing_source = self.__video_sources[source_id]
self.__processing_id = source_id
def set_video_sources(self, video_sources: List[VideoStreamer]):
self.__video_sources = video_sources
self.set_source(0)
def __detection(self):
while self.__is_detecting:
try:
source = self.__processing_source
roi = self.__detection_roi
frame = source.get_frame()
cropped_frame = frame[roi[1]:roi[3], roi[0]:roi[2]]
results = self.__detector.predict(cropped_frame)
global_bboxes = list()
for result in results:
cls = result[0]
bbox = result[1:]
bbox[:2] += roi[:2]
global_bboxes.append(bbox)
# color = (0, 0, 255) if cls == 0 else (80, 127, 255)
# self.__draw_bbox(frame, bbox, color)
self.newFrame.emit(global_bboxes, self.__processing_id, True)
self.__detection_bboxes = np.array(global_bboxes)
self.__detection_frame = frame.copy()
sleep(0.03)
except Exception as e:
print(e)
sleep(0.1)
def __tracking(self):
source = self.__processing_source
while self.__is_tracking:
ctime = c_int64(int(time.time() * 1000)) # Convert to c_int64
frame = source.get_frame()
bbox, success = self.__tracker.update(frame)
center = None
if bbox is not None:
center = bbox[:2] + bbox[2:] // 2
self.coordsUpdated.emit(self.__processing_id, center, success)
self.newFrame.emit([bbox], self.__processing_id, False, ctime)
sleep(0.01)
else:
self.newFrame.emit([bbox], self.__processing_id, False, ctime)
sleep(0.05)
def start_detect(self, x: int, y: int, w: int, h: int):
self.__detection_roi = [x, y, x + w, y + h]
if not self.__is_detecting:
if self.__detection_thread is not None:
self.__detection_thread.join()
self.__is_detecting = True
self.__detection_thread = Thread(target=self.__detection)
self.__detection_thread.start()
def stop_detection(self):
self.__is_detecting = False
if self.__detection_thread is not None:
self.__detection_thread.join()
self.__detection_thread = None
def start_track(self, x: int, y: int, w: int = 0, h: int = 0):
try:
self.__is_detecting = False
self.__is_tracking = False
bbox = None
if w == 0:
if len(self.__detection_bboxes):
bbox = get_bbox_by_point(self.__detection_bboxes, np.array([x, y]))
frame = self.__detection_frame
else:
bbox = np.array([x, y, w, h])
frame = self.__processing_source.get_frame()
self.__tracker.stop()
if bbox is not None:
self.__tracker.init(frame, bbox)
else:
return
except Exception as e:
print(e)
return
if self.__tracking_thread is not None:
self.__tracking_thread.join()
self.__is_tracking = True
self.__tracking_thread = Thread(target=self.__tracking)
self.__tracking_thread.start()
sleep(0.03)
def stop_track(self):
self.stop_detection()
self.__tracker.stop()
self.__is_tracking = False
if self.__tracking_thread is not None:
self.__tracking_thread.join()
self.__tracking_thread = None
def __draw_bbox(self, img: np.ndarray, bbox, color):
thickness = self.__thickness
# cv2.rectangle(img, (bbox[0], bbox[1]), (bbox[2] + bbox[0], bbox[3] + bbox[1]),
# color, thickness)