Browse Source

cleaning code by adding classes

video-streaming
mht 4 weeks ago
parent
commit
2e112c3a69
  1. 234
      app.py
  2. 114
      conection.py
  3. 1
      shared_manager.py
  4. 71
      utils.py

234
app.py

@ -5,26 +5,20 @@ from datetime import datetime, timedelta
from time import sleep
from typing import List
import os
import time
import threading # Import threading module
from cvStreamer import cvStreamer
import utils
from detector import Detector
# Add the proto directory to the Python path
proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto')
if proto_dir not in sys.path:
sys.path.append(proto_dir)
# Debug: Print the updated Python path
print("Updated Python Path:", sys.path)
import requests
from PyQt5.QtCore import QCoreApplication, Qt, pyqtSlot, QThread, QTimer
import numpy as np
from icecream import ic
# print("Updated Python Path:", sys.path)
from PyQt5.QtCore import QCoreApplication, Qt
import conection
from conection import ConnectionThread
import shared_manager
from utils import manager_callback, handle_camera_status, findUsbCams
from configs import ConfigManager
from core import Core
from tracker import Tracker
@ -32,157 +26,17 @@ from message_queue.Bridge import Bridge
from message_queue.Manager import Manager
from message_queue.proto.ImageMessage_pb2 import ImageMessage, TrackMode
from message_queue.proto.Message_pb2 import Message
from message_queue.proto.Point_pb2 import Point
from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand
from message_queue.proto.TrackCoordsMessage_pb2 import TrackCoordsMessage
from message_queue.proto.enums_pb2 import MessageType
from message_queue.proto.ConnectStatus_pb2 import ConnectStatus
from video_streamer.gst_video_streamer import GstVideoStreamer
import cv2
config_manager = ConfigManager('config.yaml')
rtsp_links = config_manager.configs['rtsp_links'].get()
debug = config_manager.configs['debug'].get()
def findUsbCams():
cv_cameras = []
for i in range(50):
cap = cvStreamer(i)
if cap.isOpened():
cv_cameras.append(cap)
return cv_cameras
def handle_camera_status(status: int):
m = Message()
m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS
m.cam_status.status = status
manager.send_message(m.SerializeToString())
# Helper class to track client connection status
class ConnectionTracker:
def __init__(self):
self.last_message_time = time.time()
self.timeout = 15 # 15-second timeout
self.last_client_ip = None # Track last connected client IP
def update_last_message_time(self, client_ip=None):
self.last_message_time = time.time()
if client_ip:
self.last_client_ip = client_ip # Update last known client
def is_client_active(self):
"""Check if the last client is still active within the last 15 seconds."""
return (time.time() - self.last_message_time) < self.timeout
# Create an instance of ConnectionTracker
connection_tracker = ConnectionTracker()
class ConnectionThread(threading.Thread):
def __init__(self):
super().__init__()
self.running = True
def run(self):
while self.running:
sleep(0.5)
if connection_tracker.last_client_ip:
if debug:
print(f"Checking if last client {connection_tracker.last_client_ip} is still available...")
if self.query_last_client(connection_tracker.last_client_ip):
connection_tracker.update_last_message_time()
if debug:
print(f"Last client {connection_tracker.last_client_ip} responded. Continuing...")
continue # Skip discovering a new client
if not connection_tracker.is_client_active():
print("Client inactive for 15 seconds. Searching for another client...")
cl_ip, _ = self.start_discovery_service(12345)
print(f"New client found: {cl_ip}")
# Reinitialize the Manager with the new client address
global manager
manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557")
manager.start(manager_callback)
connection_tracker.update_last_message_time(cl_ip) # Update with new client
def query_last_client(self, client_ip):
"""Send a heartbeat packet to the last known client IP on port 29170 and wait for a response."""
if debug:
print(f"Sending heartbeat to {client_ip}:29170") # Debugging
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5) # 5-second timeout
heartbeat_port = 29170 # New port for heartbeat
try:
sock.sendto(b"HEARTBEAT", (client_ip, heartbeat_port)) # Send heartbeat message
data, addr = sock.recvfrom(1024) # Wait for response
if debug:
print(f"Received response from {addr}: {data.decode()}") # Debugging
if data.decode() == "HEARTBEAT_ACK":
connection_tracker.update_last_message_time() # Update the last message time
if debug:
print(f"Client {client_ip} is still responding.")
return True
except socket.timeout:
print(f"Client {client_ip} did not respond. Marking as inactive.")
finally:
sock.close()
return False # Client did not respond
def start_discovery_service(self, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', port))
print(f"Discovery service listening on port {port}...")
while True:
data, addr = sock.recvfrom(1024)
if data.decode() == "DISCOVER_SERVER":
print(f"Received discovery request from {addr}")
sock.sendto(b"SERVER_RESPONSE", addr)
break
sock.close()
return addr
def stop(self):
self.running = False
if __name__ == '__main__':
QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
app = QCoreApplication(sys.argv)
# cv_cameras = []
# for i in range(3):
# cap = cvStreamer(i)
# if cap.isOpened():
# cv_cameras.append(cap)
videoStreamers = []
for idx, rtsp_link in enumerate(rtsp_links):
videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15)
@ -199,73 +53,13 @@ if __name__ == '__main__':
else:
core = Core(videoStreamers, tracker, detector)
utils.core = core
def manager_callback(msg_str):
msg = Message()
msg.ParseFromString(msg_str)
if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK:
if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP:
core.stop_track()
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START:
core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h)
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT:
core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h)
elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS:
core.set_thickness(msg.track_settings.thickness)
elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA:
print(f"switch camera detected ,primary value is : {msg.cam_switch.primaryCamType}")
print(f"switch camera detected ,secondary value is : {msg.cam_switch.secondaryCamType}")
core.set_source(msg.cam_switch.primaryCamType)
elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA:
if msg.cam_set.cameraSource == 1:
print(f"set camera source to network")
ip = msg.cam_set.ip
videoStreamers = []
for src in range(2):
rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0'
# ic(rtsp_link)
videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15)
videoStreamer.cameraStatus.connect(handle_camera_status)
print(f'{videoStreamer.id} connected')
videoStreamers.append(videoStreamer)
core.set_video_sources(videoStreamers)
# videoStreamers = []
# for idx, rtsp_link in enumerate(rtsp_links):
# videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15)
# videoStreamer.cameraStatus.connect(handle_camera_status)
# print(f'{videoStreamer.id} connected')
# videoStreamers.append(videoStreamer)
# core.set_video_sources(videoStreamers)
else:
sources = findUsbCams()
if len(sources) >= 2 :
print("set camera source to captutre card")
core.set_video_sources(findUsbCams())
import socket
def start_discovery_service(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', port))
print(f"Discovery service listening on port {port}...")
while True:
data, addr = sock.recvfrom(1024)
if data.decode() == "DISCOVER_SERVER":
print(f"Received discovery request from {addr}")
sock.sendto(b"SERVER_RESPONSE", addr)
break
sock.close()
return addr
cl_ip, _ = start_discovery_service(12345)
cl_ip, _ = conection.start_discovery_service(12345)
print(cl_ip)
global manager
manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557")
manager.start(manager_callback)
shared_manager.manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557")
shared_manager.manager.start(manager_callback)
def gotNewFrame(bboxes, id_, isDetection, ctime):
#print(f"Got new frame, bboxes : {bboxes} Id: {id} Is detection {isDetection}")
@ -287,7 +81,7 @@ if __name__ == '__main__':
m.image.trackMode = TrackMode.TRACK_MODE_DETECT
else:
m.image.trackMode = TrackMode.TRACK_MODE_TRACK
manager.send_message(m.SerializeToString())
shared_manager.manager.send_message(m.SerializeToString())
def gotCoords(id_, coord, successful):
m = Message()
@ -296,13 +90,13 @@ if __name__ == '__main__':
m.track_coords.center.x = coord[0]
m.track_coords.center.y = coord[1]
m.track_coords.isLost = not successful
manager.send_message(m.SerializeToString())
shared_manager.manager.send_message(m.SerializeToString())
core.newFrame.connect(gotNewFrame)
core.coordsUpdated.connect(gotCoords)
# Start the connection thread
connection_thread = ConnectionThread()
connection_thread = ConnectionThread(debug,core)
connection_thread.start()
try:

114
conection.py

@ -0,0 +1,114 @@
# Helper class to track client connection status
import threading
import time
import socket
from message_queue.Manager import Manager
import shared_manager
from utils import manager_callback
def start_discovery_service(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', port))
print(f"Discovery service listening on port {port}...")
while True:
data, addr = sock.recvfrom(1024)
if data.decode() == "DISCOVER_SERVER":
print(f"Received discovery request from {addr}")
sock.sendto(b"SERVER_RESPONSE", addr)
break
sock.close()
return addr
class ConnectionTracker:
def __init__(self):
self.last_message_time = time.time()
self.timeout = 15 # 15-second timeout
self.last_client_ip = None # Track last connected client IP
def update_last_message_time(self, client_ip=None):
self.last_message_time = time.time()
if client_ip:
self.last_client_ip = client_ip # Update last known client
def is_client_active(self):
"""Check if the last client is still active within the last 15 seconds."""
return (time.time() - self.last_message_time) < self.timeout
# Create an instance of ConnectionTracker
connection_tracker = ConnectionTracker()
class ConnectionThread(threading.Thread):
def __init__(self,debug = False,core = None):
super().__init__()
self.running = True
self.__debug = debug
self.__core = core
def run(self):
while self.running:
time.sleep(0.5)
if connection_tracker.last_client_ip:
if self.__debug:
print(f"Checking if last client {connection_tracker.last_client_ip} is still available...")
if self.query_last_client(connection_tracker.last_client_ip):
connection_tracker.update_last_message_time()
if self.__debug:
print(f"Last client {connection_tracker.last_client_ip} responded. Continuing...")
continue # Skip discovering a new client
if not connection_tracker.is_client_active():
print("Client inactive for 15 seconds. Searching for another client...")
cl_ip, _ = start_discovery_service(12345)
print(f"New client found: {cl_ip}")
# Reinitialize the Manager with the new client address
shared_manager.manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557")
shared_manager.manager.start(manager_callback)
connection_tracker.update_last_message_time(cl_ip) # Update with new client
def query_last_client(self, client_ip):
"""Send a heartbeat packet to the last known client IP on port 29170 and wait for a response."""
if self.__debug:
print(f"Sending heartbeat to {client_ip}:29170") # Debugging
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5) # 5-second timeout
heartbeat_port = 29170 # New port for heartbeat
try:
sock.sendto(b"HEARTBEAT", (client_ip, heartbeat_port)) # Send heartbeat message
data, addr = sock.recvfrom(1024) # Wait for response
if self.__debug:
print(f"Received response from {addr}: {data.decode()}") # Debugging
if data.decode() == "HEARTBEAT_ACK":
connection_tracker.update_last_message_time() # Update the last message time
if self.__debug:
print(f"Client {client_ip} is still responding.")
return True
except socket.timeout:
print(f"Client {client_ip} did not respond. Marking as inactive.")
finally:
sock.close()
return False # Client did not respond
def stop(self):
self.running = False

1
shared_manager.py

@ -0,0 +1 @@
manager = "nothing"

71
utils.py

@ -0,0 +1,71 @@
import os
import sys
proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto')
if proto_dir not in sys.path:
sys.path.append(proto_dir)
import shared_manager
from cvStreamer import cvStreamer
from message_queue.proto.Message_pb2 import Message
from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand
from message_queue.proto.enums_pb2 import MessageType
from video_streamer.gst_video_streamer import GstVideoStreamer
def findUsbCams():
cv_cameras = []
for i in range(50):
cap = cvStreamer(i)
if cap.isOpened():
cv_cameras.append(cap)
return cv_cameras
core = "a"
def manager_callback(msg_str):
msg = Message()
msg.ParseFromString(msg_str)
if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK:
if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP:
core.stop_track()
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START:
core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h)
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT:
core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h)
elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS:
core.set_thickness(msg.track_settings.thickness)
elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA:
print(f"switch camera detected ,primary value is : {msg.cam_switch.primaryCamType}")
print(f"switch camera detected ,secondary value is : {msg.cam_switch.secondaryCamType}")
core.set_source(msg.cam_switch.primaryCamType)
elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA:
if msg.cam_set.cameraSource == 1:
print(f"set camera source to network")
ip = msg.cam_set.ip
videoStreamers = []
for src in range(2):
rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0'
# ic(rtsp_link)
videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15)
videoStreamer.cameraStatus.connect(handle_camera_status)
print(f'{videoStreamer.id} connected')
videoStreamers.append(videoStreamer)
core.set_video_sources(videoStreamers)
# videoStreamers = []
# for idx, rtsp_link in enumerate(rtsp_links):
# videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15)
# videoStreamer.cameraStatus.connect(handle_camera_status)
# print(f'{videoStreamer.id} connected')
# videoStreamers.append(videoStreamer)
# core.set_video_sources(videoStreamers)
else:
sources = findUsbCams()
if len(sources) >= 2:
print("set camera source to captutre card")
core.set_video_sources(findUsbCams())
def handle_camera_status(status: int):
m = Message()
m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS
m.cam_status.status = status
shared_manager.manager.send_message(m.SerializeToString())
Loading…
Cancel
Save