import base64 import re import sys from datetime import datetime, timedelta from time import sleep from typing import List import sys import os # Add the proto directory to the Python path proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto') if proto_dir not in sys.path: sys.path.append(proto_dir) # Debug: Print the updated Python path print("Updated Python Path:", sys.path) import requests from PyQt5.QtCore import QCoreApplication, Qt, pyqtSlot, QThread, QTimer import numpy as np from icecream import ic from configs import ConfigManager from core import Core from message_queue.Bridge import Bridge from message_queue.Manager import Manager from message_queue.proto.ImageMessage_pb2 import ImageMessage, TrackMode from message_queue.proto.Message_pb2 import Message from message_queue.proto.Point_pb2 import Point from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand from message_queue.proto.TrackCoordsMessage_pb2 import TrackCoordsMessage from message_queue.proto.enums_pb2 import MessageType from message_queue.proto.ConnectStatus_pb2 import ConnectStatus from video_streamer.gst_video_streamer import GstVideoStreamer import cv2 import os import time # Ensure time module is imported config_manager = ConfigManager('config.yaml') rtsp_links = config_manager.configs['rtsp_links'].get() debug = config_manager.configs['debug'].get() def handle_camera_status(status: int): m = Message() m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS m.cam_status.status = status manager.send_message(m.SerializeToString()) # Helper class to track client connection status class ConnectionTracker: def __init__(self): self.last_message_time = time.time() # Use time.time() to get the current time self.timeout = 15 # Timeout in seconds (adjust as needed) def update_last_message_time(self): self.last_message_time = time.time() # Update with the current time def is_client_connected(self): return (time.time() - self.last_message_time) < self.timeout # Use time.time() # Create an instance of ConnectionTracker connection_tracker = ConnectionTracker() def check_client_connection(): if not connection_tracker.is_client_connected(): print("Client disconnected. Searching for another client...") cl_ip, _ = start_discovery_service(12345) ic(cl_ip) # Reinitialize the Manager with the new client addressimport time # Ensure time module is imported # Helper class to track client connection status class ConnectionTracker: def __init__(self): self.last_message_time = time.time() # Use time.time() to get the current time self.timeout = 15 # Timeout in seconds (adjust as needed) def update_last_message_time(self): self.last_message_time = time.time() # Update with the current time def is_client_connected(self): return (time.time() - self.last_message_time) < self.timeout # Use time.time() # Create an instance of ConnectionTracker connection_tracker = ConnectionTracker() def check_client_connection(): if not connection_tracker.is_client_connected(): print("Client disconnected. Searching for another client...") cl_ip, _ = start_discovery_service(12345) print(cl_ip) # Reinitialize the Manager with the new client address global manager manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") manager.start(manager_callback) # Restart the Manager and re-register the callback connection_tracker.update_last_message_time() # Reset the timer after reconnecting if __name__ == '__main__': QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling) app = QCoreApplication(sys.argv) videoStreamers: List[GstVideoStreamer] = [] streamerThreads = [] for idx, rtsp_link in enumerate(rtsp_links): videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) videoStreamer.cameraStatus.connect(handle_camera_status) print(f'{videoStreamer.id} connected') videoStreamers.append(videoStreamer) core = Core(videoStreamers) def manager_callback(msg_str): msg = Message() msg.ParseFromString(msg_str) connection_tracker.update_last_message_time() # Update the last message time if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK: if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP: core.stop_track() elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START: core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT: core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS: core.set_thickness(msg.track_settings.thickness) elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA: core.set_source(msg.cam_switch.primaryCamType) elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA: ip = msg.cam_set.ip videoStreamers = [] for src in range(2): rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0' # ic(rtsp_link) videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) videoStreamer.cameraStatus.connect(handle_camera_status) print(f'{videoStreamer.id} connected') videoStreamers.append(videoStreamer) core.set_video_sources(videoStreamers) import socket def start_discovery_service(port): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('0.0.0.0', port)) print(f"Discovery service listening on port {port}...") while True: data, addr = sock.recvfrom(1024) if data.decode() == "DISCOVER_SERVER": print(f"Received discovery request from {addr}") sock.sendto(b"SERVER_RESPONSE", addr) break return addr cl_ip, _ = start_discovery_service(12345) print(cl_ip) manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") manager.start(manager_callback) def gotNewFrame(bboxes, id_, isDetection, ctime): m = Message() m.msgType = MessageType.MESSAGE_TYPE_IMAGE m.image.timestamp = int(ctime.value) for bbox in bboxes: box = m.image.boxes.add() box.x = bbox[0] box.y = bbox[1] box.w = bbox[2] box.h = bbox[3] m.image.camType = id_ if isDetection: m.image.trackMode = TrackMode.TRACK_MODE_DETECT else: m.image.trackMode = TrackMode.TRACK_MODE_TRACK manager.send_message(m.SerializeToString()) def gotCoords(id_, coord, successful): m = Message() m.msgType = MessageType.MESSAGE_TYPE_TRACK_COORD m.track_coords.camType = id_ m.track_coords.center.x = coord[0] m.track_coords.center.y = coord[1] m.track_coords.isLost = not successful manager.send_message(m.SerializeToString()) core.newFrame.connect(gotNewFrame) core.coordsUpdated.connect(gotCoords) # Set up the QTimer to check client connection every 10 seconds timer = QTimer() timer.timeout.connect(check_client_connection) timer.start(10000) # 10 seconds try: app.exec_() except KeyboardInterrupt: sys.exit(0)