You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

204 lines
7.5 KiB

import base64
import re
import sys
from datetime import datetime, timedelta
from time import sleep
from typing import List
import sys
import os
# Add the proto directory to the Python path
proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto')
if proto_dir not in sys.path:
sys.path.append(proto_dir)
# Debug: Print the updated Python path
print("Updated Python Path:", sys.path)
import requests
from PyQt5.QtCore import QCoreApplication, Qt, pyqtSlot, QThread, QTimer
import numpy as np
from icecream import ic
from configs import ConfigManager
from core import Core
from message_queue.Bridge import Bridge
from message_queue.Manager import Manager
from message_queue.proto.ImageMessage_pb2 import ImageMessage, TrackMode
from message_queue.proto.Message_pb2 import Message
from message_queue.proto.Point_pb2 import Point
from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand
from message_queue.proto.TrackCoordsMessage_pb2 import TrackCoordsMessage
from message_queue.proto.enums_pb2 import MessageType
from message_queue.proto.ConnectStatus_pb2 import ConnectStatus
from video_streamer.gst_video_streamer import GstVideoStreamer
import cv2
import os
import time # Ensure time module is imported
config_manager = ConfigManager('config.yaml')
rtsp_links = config_manager.configs['rtsp_links'].get()
debug = config_manager.configs['debug'].get()
def handle_camera_status(status: int):
m = Message()
m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS
m.cam_status.status = status
manager.send_message(m.SerializeToString())
# Helper class to track client connection status
class ConnectionTracker:
def __init__(self):
self.last_message_time = time.time() # Use time.time() to get the current time
self.timeout = 15 # Timeout in seconds (adjust as needed)
def update_last_message_time(self):
self.last_message_time = time.time() # Update with the current time
def is_client_connected(self):
return (time.time() - self.last_message_time) < self.timeout # Use time.time()
# Create an instance of ConnectionTracker
connection_tracker = ConnectionTracker()
def check_client_connection():
if not connection_tracker.is_client_connected():
print("Client disconnected. Searching for another client...")
cl_ip, _ = start_discovery_service(12345)
ic(cl_ip)
# Reinitialize the Manager with the new client addressimport time # Ensure time module is imported
# Helper class to track client connection status
class ConnectionTracker:
def __init__(self):
self.last_message_time = time.time() # Use time.time() to get the current time
self.timeout = 15 # Timeout in seconds (adjust as needed)
def update_last_message_time(self):
self.last_message_time = time.time() # Update with the current time
def is_client_connected(self):
return (time.time() - self.last_message_time) < self.timeout # Use time.time()
# Create an instance of ConnectionTracker
connection_tracker = ConnectionTracker()
def check_client_connection():
if not connection_tracker.is_client_connected():
print("Client disconnected. Searching for another client...")
cl_ip, _ = start_discovery_service(12345)
print(cl_ip)
# Reinitialize the Manager with the new client address
global manager
manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557")
manager.start(manager_callback) # Restart the Manager and re-register the callback
connection_tracker.update_last_message_time() # Reset the timer after reconnecting
if __name__ == '__main__':
QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
app = QCoreApplication(sys.argv)
videoStreamers: List[GstVideoStreamer] = []
streamerThreads = []
for idx, rtsp_link in enumerate(rtsp_links):
videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15)
videoStreamer.cameraStatus.connect(handle_camera_status)
print(f'{videoStreamer.id} connected')
videoStreamers.append(videoStreamer)
core = Core(videoStreamers)
def manager_callback(msg_str):
msg = Message()
msg.ParseFromString(msg_str)
connection_tracker.update_last_message_time() # Update the last message time
if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK:
if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP:
core.stop_track()
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START:
core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h)
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT:
core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h)
elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS:
core.set_thickness(msg.track_settings.thickness)
elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA:
core.set_source(msg.cam_switch.primaryCamType)
elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA:
ip = msg.cam_set.ip
videoStreamers = []
for src in range(2):
rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0'
# ic(rtsp_link)
videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15)
videoStreamer.cameraStatus.connect(handle_camera_status)
print(f'{videoStreamer.id} connected')
videoStreamers.append(videoStreamer)
core.set_video_sources(videoStreamers)
import socket
def start_discovery_service(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', port))
print(f"Discovery service listening on port {port}...")
while True:
data, addr = sock.recvfrom(1024)
if data.decode() == "DISCOVER_SERVER":
print(f"Received discovery request from {addr}")
sock.sendto(b"SERVER_RESPONSE", addr)
break
return addr
cl_ip, _ = start_discovery_service(12345)
print(cl_ip)
manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557")
manager.start(manager_callback)
def gotNewFrame(bboxes, id_, isDetection, ctime):
m = Message()
m.msgType = MessageType.MESSAGE_TYPE_IMAGE
m.image.timestamp = int(ctime.value)
for bbox in bboxes:
box = m.image.boxes.add()
box.x = bbox[0]
box.y = bbox[1]
box.w = bbox[2]
box.h = bbox[3]
m.image.camType = id_
if isDetection:
m.image.trackMode = TrackMode.TRACK_MODE_DETECT
else:
m.image.trackMode = TrackMode.TRACK_MODE_TRACK
manager.send_message(m.SerializeToString())
def gotCoords(id_, coord, successful):
m = Message()
m.msgType = MessageType.MESSAGE_TYPE_TRACK_COORD
m.track_coords.camType = id_
m.track_coords.center.x = coord[0]
m.track_coords.center.y = coord[1]
m.track_coords.isLost = not successful
manager.send_message(m.SerializeToString())
core.newFrame.connect(gotNewFrame)
core.coordsUpdated.connect(gotCoords)
# Set up the QTimer to check client connection every 10 seconds
timer = QTimer()
timer.timeout.connect(check_client_connection)
timer.start(10000) # 10 seconds
try:
app.exec_()
except KeyboardInterrupt:
sys.exit(0)