diff --git a/app.py b/app.py index a300407..dd0b0f7 100755 --- a/app.py +++ b/app.py @@ -5,26 +5,20 @@ from datetime import datetime, timedelta from time import sleep from typing import List import os -import time -import threading # Import threading module - -from cvStreamer import cvStreamer +import utils from detector import Detector # Add the proto directory to the Python path proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto') if proto_dir not in sys.path: sys.path.append(proto_dir) - # Debug: Print the updated Python path -print("Updated Python Path:", sys.path) - -import requests -from PyQt5.QtCore import QCoreApplication, Qt, pyqtSlot, QThread, QTimer - -import numpy as np -from icecream import ic - +# print("Updated Python Path:", sys.path) +from PyQt5.QtCore import QCoreApplication, Qt +import conection +from conection import ConnectionThread +import shared_manager +from utils import manager_callback, handle_camera_status, findUsbCams from configs import ConfigManager from core import Core from tracker import Tracker @@ -32,157 +26,17 @@ from message_queue.Bridge import Bridge from message_queue.Manager import Manager from message_queue.proto.ImageMessage_pb2 import ImageMessage, TrackMode from message_queue.proto.Message_pb2 import Message -from message_queue.proto.Point_pb2 import Point -from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand -from message_queue.proto.TrackCoordsMessage_pb2 import TrackCoordsMessage from message_queue.proto.enums_pb2 import MessageType -from message_queue.proto.ConnectStatus_pb2 import ConnectStatus - from video_streamer.gst_video_streamer import GstVideoStreamer -import cv2 - config_manager = ConfigManager('config.yaml') rtsp_links = config_manager.configs['rtsp_links'].get() debug = config_manager.configs['debug'].get() -def findUsbCams(): - cv_cameras = [] - for i in range(50): - cap = cvStreamer(i) - if cap.isOpened(): - cv_cameras.append(cap) - return cv_cameras - - -def handle_camera_status(status: int): - m = Message() - m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS - m.cam_status.status = status - manager.send_message(m.SerializeToString()) - -# Helper class to track client connection status -class ConnectionTracker: - def __init__(self): - self.last_message_time = time.time() - self.timeout = 15 # 15-second timeout - self.last_client_ip = None # Track last connected client IP - - def update_last_message_time(self, client_ip=None): - self.last_message_time = time.time() - if client_ip: - self.last_client_ip = client_ip # Update last known client - - def is_client_active(self): - """Check if the last client is still active within the last 15 seconds.""" - return (time.time() - self.last_message_time) < self.timeout -# Create an instance of ConnectionTracker -connection_tracker = ConnectionTracker() - - -class ConnectionThread(threading.Thread): - def __init__(self): - super().__init__() - self.running = True - - def run(self): - while self.running: - - - - sleep(0.5) - - if connection_tracker.last_client_ip: - - if debug: - print(f"Checking if last client {connection_tracker.last_client_ip} is still available...") - - if self.query_last_client(connection_tracker.last_client_ip): - connection_tracker.update_last_message_time() - - if debug: - print(f"Last client {connection_tracker.last_client_ip} responded. Continuing...") - continue # Skip discovering a new client - if not connection_tracker.is_client_active(): - - - print("Client inactive for 15 seconds. Searching for another client...") - cl_ip, _ = self.start_discovery_service(12345) - print(f"New client found: {cl_ip}") - - # Reinitialize the Manager with the new client address - global manager - manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") - manager.start(manager_callback) - - connection_tracker.update_last_message_time(cl_ip) # Update with new client - - def query_last_client(self, client_ip): - """Send a heartbeat packet to the last known client IP on port 29170 and wait for a response.""" - - if debug: - print(f"Sending heartbeat to {client_ip}:29170") # Debugging - - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.settimeout(5) # 5-second timeout - heartbeat_port = 29170 # New port for heartbeat - - try: - sock.sendto(b"HEARTBEAT", (client_ip, heartbeat_port)) # Send heartbeat message - data, addr = sock.recvfrom(1024) # Wait for response - - if debug: - print(f"Received response from {addr}: {data.decode()}") # Debugging - - if data.decode() == "HEARTBEAT_ACK": - connection_tracker.update_last_message_time() # Update the last message time - - if debug: - print(f"Client {client_ip} is still responding.") - return True - - except socket.timeout: - - print(f"Client {client_ip} did not respond. Marking as inactive.") - - finally: - sock.close() - - return False # Client did not respond - - - def start_discovery_service(self, port): - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind(('0.0.0.0', port)) - print(f"Discovery service listening on port {port}...") - - while True: - data, addr = sock.recvfrom(1024) - if data.decode() == "DISCOVER_SERVER": - print(f"Received discovery request from {addr}") - sock.sendto(b"SERVER_RESPONSE", addr) - break - sock.close() - return addr - - - - - def stop(self): - self.running = False - - if __name__ == '__main__': QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling) app = QCoreApplication(sys.argv) - # cv_cameras = [] - # for i in range(3): - # cap = cvStreamer(i) - # if cap.isOpened(): - # cv_cameras.append(cap) - - videoStreamers = [] for idx, rtsp_link in enumerate(rtsp_links): videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) @@ -199,73 +53,13 @@ if __name__ == '__main__': else: core = Core(videoStreamers, tracker, detector) + utils.core = core - def manager_callback(msg_str): - msg = Message() - msg.ParseFromString(msg_str) - - if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK: - if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP: - core.stop_track() - elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START: - core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) - elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT: - core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) - elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS: - core.set_thickness(msg.track_settings.thickness) - elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA: - print(f"switch camera detected ,primary value is : {msg.cam_switch.primaryCamType}") - print(f"switch camera detected ,secondary value is : {msg.cam_switch.secondaryCamType}") - core.set_source(msg.cam_switch.primaryCamType) - elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA: - if msg.cam_set.cameraSource == 1: - print(f"set camera source to network") - ip = msg.cam_set.ip - videoStreamers = [] - for src in range(2): - rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0' - # ic(rtsp_link) - videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) - videoStreamer.cameraStatus.connect(handle_camera_status) - print(f'{videoStreamer.id} connected') - videoStreamers.append(videoStreamer) - core.set_video_sources(videoStreamers) - # videoStreamers = [] - # for idx, rtsp_link in enumerate(rtsp_links): - # videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) - # videoStreamer.cameraStatus.connect(handle_camera_status) - # print(f'{videoStreamer.id} connected') - # videoStreamers.append(videoStreamer) - # core.set_video_sources(videoStreamers) - else: - sources = findUsbCams() - if len(sources) >= 2 : - print("set camera source to captutre card") - core.set_video_sources(findUsbCams()) - - import socket - - def start_discovery_service(port): - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind(('0.0.0.0', port)) - print(f"Discovery service listening on port {port}...") - - while True: - data, addr = sock.recvfrom(1024) - if data.decode() == "DISCOVER_SERVER": - print(f"Received discovery request from {addr}") - sock.sendto(b"SERVER_RESPONSE", addr) - break - - sock.close() - return addr - - cl_ip, _ = start_discovery_service(12345) + cl_ip, _ = conection.start_discovery_service(12345) print(cl_ip) - global manager - manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") - manager.start(manager_callback) + shared_manager.manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") + shared_manager.manager.start(manager_callback) def gotNewFrame(bboxes, id_, isDetection, ctime): #print(f"Got new frame, bboxes : {bboxes} Id: {id} Is detection {isDetection}") @@ -287,7 +81,7 @@ if __name__ == '__main__': m.image.trackMode = TrackMode.TRACK_MODE_DETECT else: m.image.trackMode = TrackMode.TRACK_MODE_TRACK - manager.send_message(m.SerializeToString()) + shared_manager.manager.send_message(m.SerializeToString()) def gotCoords(id_, coord, successful): m = Message() @@ -296,13 +90,13 @@ if __name__ == '__main__': m.track_coords.center.x = coord[0] m.track_coords.center.y = coord[1] m.track_coords.isLost = not successful - manager.send_message(m.SerializeToString()) + shared_manager.manager.send_message(m.SerializeToString()) core.newFrame.connect(gotNewFrame) core.coordsUpdated.connect(gotCoords) # Start the connection thread - connection_thread = ConnectionThread() + connection_thread = ConnectionThread(debug,core) connection_thread.start() try: diff --git a/conection.py b/conection.py new file mode 100644 index 0000000..fc66aef --- /dev/null +++ b/conection.py @@ -0,0 +1,114 @@ +# Helper class to track client connection status +import threading +import time +import socket + +from message_queue.Manager import Manager +import shared_manager +from utils import manager_callback + + +def start_discovery_service(port): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(('0.0.0.0', port)) + print(f"Discovery service listening on port {port}...") + + while True: + data, addr = sock.recvfrom(1024) + if data.decode() == "DISCOVER_SERVER": + print(f"Received discovery request from {addr}") + sock.sendto(b"SERVER_RESPONSE", addr) + break + sock.close() + return addr + +class ConnectionTracker: + def __init__(self): + self.last_message_time = time.time() + self.timeout = 15 # 15-second timeout + self.last_client_ip = None # Track last connected client IP + + def update_last_message_time(self, client_ip=None): + self.last_message_time = time.time() + if client_ip: + self.last_client_ip = client_ip # Update last known client + + def is_client_active(self): + """Check if the last client is still active within the last 15 seconds.""" + return (time.time() - self.last_message_time) < self.timeout +# Create an instance of ConnectionTracker +connection_tracker = ConnectionTracker() + + +class ConnectionThread(threading.Thread): + def __init__(self,debug = False,core = None): + super().__init__() + self.running = True + self.__debug = debug + self.__core = core + + def run(self): + while self.running: + time.sleep(0.5) + + if connection_tracker.last_client_ip: + + if self.__debug: + print(f"Checking if last client {connection_tracker.last_client_ip} is still available...") + + if self.query_last_client(connection_tracker.last_client_ip): + connection_tracker.update_last_message_time() + + if self.__debug: + print(f"Last client {connection_tracker.last_client_ip} responded. Continuing...") + continue # Skip discovering a new client + if not connection_tracker.is_client_active(): + + + print("Client inactive for 15 seconds. Searching for another client...") + cl_ip, _ = start_discovery_service(12345) + print(f"New client found: {cl_ip}") + + # Reinitialize the Manager with the new client address + shared_manager.manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") + shared_manager.manager.start(manager_callback) + + connection_tracker.update_last_message_time(cl_ip) # Update with new client + + def query_last_client(self, client_ip): + """Send a heartbeat packet to the last known client IP on port 29170 and wait for a response.""" + + if self.__debug: + print(f"Sending heartbeat to {client_ip}:29170") # Debugging + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(5) # 5-second timeout + heartbeat_port = 29170 # New port for heartbeat + + try: + sock.sendto(b"HEARTBEAT", (client_ip, heartbeat_port)) # Send heartbeat message + data, addr = sock.recvfrom(1024) # Wait for response + + if self.__debug: + print(f"Received response from {addr}: {data.decode()}") # Debugging + + if data.decode() == "HEARTBEAT_ACK": + connection_tracker.update_last_message_time() # Update the last message time + + if self.__debug: + print(f"Client {client_ip} is still responding.") + return True + + except socket.timeout: + + print(f"Client {client_ip} did not respond. Marking as inactive.") + + finally: + sock.close() + + return False # Client did not respond + + + def stop(self): + self.running = False + diff --git a/shared_manager.py b/shared_manager.py new file mode 100644 index 0000000..6395bac --- /dev/null +++ b/shared_manager.py @@ -0,0 +1 @@ +manager = "nothing" \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..1835f07 --- /dev/null +++ b/utils.py @@ -0,0 +1,71 @@ +import os +import sys +proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto') +if proto_dir not in sys.path: + sys.path.append(proto_dir) +import shared_manager +from cvStreamer import cvStreamer +from message_queue.proto.Message_pb2 import Message +from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand +from message_queue.proto.enums_pb2 import MessageType +from video_streamer.gst_video_streamer import GstVideoStreamer + +def findUsbCams(): + cv_cameras = [] + for i in range(50): + cap = cvStreamer(i) + if cap.isOpened(): + cv_cameras.append(cap) + return cv_cameras + +core = "a" + +def manager_callback(msg_str): + msg = Message() + msg.ParseFromString(msg_str) + + if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK: + if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP: + core.stop_track() + elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START: + core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) + elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT: + core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) + elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS: + core.set_thickness(msg.track_settings.thickness) + elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA: + print(f"switch camera detected ,primary value is : {msg.cam_switch.primaryCamType}") + print(f"switch camera detected ,secondary value is : {msg.cam_switch.secondaryCamType}") + core.set_source(msg.cam_switch.primaryCamType) + elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA: + if msg.cam_set.cameraSource == 1: + print(f"set camera source to network") + ip = msg.cam_set.ip + videoStreamers = [] + for src in range(2): + rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0' + # ic(rtsp_link) + videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) + videoStreamer.cameraStatus.connect(handle_camera_status) + print(f'{videoStreamer.id} connected') + videoStreamers.append(videoStreamer) + core.set_video_sources(videoStreamers) + # videoStreamers = [] + # for idx, rtsp_link in enumerate(rtsp_links): + # videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) + # videoStreamer.cameraStatus.connect(handle_camera_status) + # print(f'{videoStreamer.id} connected') + # videoStreamers.append(videoStreamer) + # core.set_video_sources(videoStreamers) + else: + sources = findUsbCams() + if len(sources) >= 2: + print("set camera source to captutre card") + core.set_video_sources(findUsbCams()) + + +def handle_camera_status(status: int): + m = Message() + m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS + m.cam_status.status = status + shared_manager.manager.send_message(m.SerializeToString()) \ No newline at end of file