import base64 import re import sys from datetime import datetime, timedelta from time import sleep from typing import List import os import time import threading # Import threading module from detector import Detector # Add the proto directory to the Python path proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto') if proto_dir not in sys.path: sys.path.append(proto_dir) # Debug: Print the updated Python path print("Updated Python Path:", sys.path) import requests from PyQt5.QtCore import QCoreApplication, Qt, pyqtSlot, QThread, QTimer import numpy as np from icecream import ic from configs import ConfigManager from core import Core from tracker import Tracker from message_queue.Bridge import Bridge from message_queue.Manager import Manager from message_queue.proto.ImageMessage_pb2 import ImageMessage, TrackMode from message_queue.proto.Message_pb2 import Message from message_queue.proto.Point_pb2 import Point from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand from message_queue.proto.TrackCoordsMessage_pb2 import TrackCoordsMessage from message_queue.proto.enums_pb2 import MessageType from message_queue.proto.ConnectStatus_pb2 import ConnectStatus from video_streamer.gst_video_streamer import GstVideoStreamer import cv2 config_manager = ConfigManager('config.yaml') rtsp_links = config_manager.configs['rtsp_links'].get() debug = config_manager.configs['debug'].get() def handle_camera_status(status: int): m = Message() m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS m.cam_status.status = status manager.send_message(m.SerializeToString()) # Helper class to track client connection status class ConnectionTracker: def __init__(self): self.last_message_time = time.time() self.timeout = 15 # 15-second timeout self.last_client_ip = None # Track last connected client IP def update_last_message_time(self, client_ip=None): self.last_message_time = time.time() if client_ip: self.last_client_ip = client_ip # Update last known client def is_client_active(self): """Check if the last client is still active within the last 30 seconds.""" return (time.time() - self.last_message_time) < self.timeout # Create an instance of ConnectionTracker connection_tracker = ConnectionTracker() class ConnectionThread(threading.Thread): def __init__(self): super().__init__() self.running = True def run(self): while self.running: sleep(0.5) if connection_tracker.last_client_ip: if debug: print(f"Checking if last client {connection_tracker.last_client_ip} is still available...") if self.query_last_client(connection_tracker.last_client_ip): connection_tracker.update_last_message_time() if debug: print(f"Last client {connection_tracker.last_client_ip} responded. Continuing...") continue # Skip discovering a new client if not connection_tracker.is_client_active(): print("Client inactive for 30 seconds. Searching for another client...") cl_ip, _ = self.start_discovery_service(12345) print(f"New client found: {cl_ip}") # Reinitialize the Manager with the new client address global manager manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") manager.start(manager_callback) connection_tracker.update_last_message_time(cl_ip) # Update with new client def query_last_client(self, client_ip): """Send a heartbeat packet to the last known client IP on port 29170 and wait for a response.""" if debug: print(f"Sending heartbeat to {client_ip}:29170") # Debugging sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(5) # 5-second timeout heartbeat_port = 29170 # New port for heartbeat try: sock.sendto(b"HEARTBEAT", (client_ip, heartbeat_port)) # Send heartbeat message data, addr = sock.recvfrom(1024) # Wait for response if debug: print(f"Received response from {addr}: {data.decode()}") # Debugging if data.decode() == "HEARTBEAT_ACK": if debug: print(f"Client {client_ip} is still responding.") return True except socket.timeout: print(f"Client {client_ip} did not respond. Marking as inactive.") finally: sock.close() return False # Client did not respond def start_discovery_service(self, port): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('0.0.0.0', port)) print(f"Discovery service listening on port {port}...") while True: data, addr = sock.recvfrom(1024) if data.decode() == "DISCOVER_SERVER": print(f"Received discovery request from {addr}") sock.sendto(b"SERVER_RESPONSE", addr) break sock.close() return addr def stop(self): self.running = False if __name__ == '__main__': QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling) app = QCoreApplication(sys.argv) videoStreamers: List[GstVideoStreamer] = [] streamerThreads = [] for idx, rtsp_link in enumerate(rtsp_links): videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) videoStreamer.cameraStatus.connect(handle_camera_status) print(f'{videoStreamer.id} connected') videoStreamers.append(videoStreamer) tracker = Tracker() detector = Detector(classes=[0, 2, 5, 7]) core = Core(videoStreamers,tracker,detector) def manager_callback(msg_str): msg = Message() msg.ParseFromString(msg_str) connection_tracker.update_last_message_time() # Update the last message time if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK: if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP: core.stop_track() elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START: core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT: core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS: core.set_thickness(msg.track_settings.thickness) elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA: core.set_source(msg.cam_switch.primaryCamType) elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA: ip = msg.cam_set.ip videoStreamers = [] for src in range(2): rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0' # ic(rtsp_link) videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) videoStreamer.cameraStatus.connect(handle_camera_status) print(f'{videoStreamer.id} connected') videoStreamers.append(videoStreamer) core.set_video_sources(videoStreamers) import socket def start_discovery_service(port): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('0.0.0.0', port)) print(f"Discovery service listening on port {port}...") while True: data, addr = sock.recvfrom(1024) if data.decode() == "DISCOVER_SERVER": print(f"Received discovery request from {addr}") sock.sendto(b"SERVER_RESPONSE", addr) break sock.close() return addr cl_ip, _ = start_discovery_service(12345) print(cl_ip) global manager manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") manager.start(manager_callback) def gotNewFrame(bboxes, id_, isDetection, ctime): #print(f"Got new frame, bboxes : {bboxes} Id: {id} Is detection {isDetection}") m = Message() m.msgType = MessageType.MESSAGE_TYPE_IMAGE m.image.timestamp = int(ctime.value) for bbox in bboxes: # Skip if bbox is None, doesn't have exactly 4 elements, or contains None values if bbox is None or len(bbox) != 4 or not all(element is not None for element in bbox): continue # Add the bounding box to the image box = m.image.boxes.add() box.x, box.y, box.w, box.h = bbox m.image.camType = id_ if isDetection: m.image.trackMode = TrackMode.TRACK_MODE_DETECT else: m.image.trackMode = TrackMode.TRACK_MODE_TRACK manager.send_message(m.SerializeToString()) def gotCoords(id_, coord, successful): m = Message() m.msgType = MessageType.MESSAGE_TYPE_TRACK_COORD m.track_coords.camType = id_ m.track_coords.center.x = coord[0] m.track_coords.center.y = coord[1] m.track_coords.isLost = not successful manager.send_message(m.SerializeToString()) core.newFrame.connect(gotNewFrame) core.coordsUpdated.connect(gotCoords) # Start the connection thread connection_thread = ConnectionThread() connection_thread.start() try: app.exec_() except KeyboardInterrupt: connection_thread.stop() connection_thread.join() sys.exit(0)