Compare commits
merge into: hs_tohidi:develop
hs_tohidi:add-capture-card
hs_tohidi:client-connection
hs_tohidi:develop
hs_tohidi:develop-Arm
hs_tohidi:video-streaming
pull from: hs_tohidi:video-streaming
hs_tohidi:add-capture-card
hs_tohidi:client-connection
hs_tohidi:develop
hs_tohidi:develop-Arm
hs_tohidi:video-streaming
31 Commits
develop
...
video-stre
26 changed files with 829 additions and 202 deletions
-
BINOxygen-Sys-Warning.wav
-
171app.py
-
114conection.py
-
2config.yaml
-
191core.py
-
52cvStreamer.py
-
2detector/demo.py
-
47gpuMonitor.py
-
1message_queue/proto/SetCameraMessage.proto
-
11message_queue/proto/SetCameraMessage_pb2.py
-
2run.sh
-
140server.py
-
1shared_manager.py
-
1tracker/ltr/external/PreciseRoIPooling/pytorch/prroi_pool/functional.py
-
2tracker/ltr/models/backbone/resnet.py
-
1tracker/ltr/models/bbreg/atom_iou_net.py
-
2tracker/ltr/models/layers/distance.py
-
2tracker/ltr/models/target_classifier/features.py
-
6tracker/pytracking/features/augmentation.py
-
7tracker/pytracking/features/preprocessing.py
-
1tracker/pytracking/libs/dcf.py
-
3tracker/pytracking/tracker/dimp/dimp.py
-
2tracker/pytracking/utils/params.py
-
72utils.py
-
BINvideo_streamer/vision_service.cpython-37m-x86_64-linux-gnu.so
-
136watchdogs.py
@ -0,0 +1,114 @@ |
|||
# Helper class to track client connection status |
|||
import threading |
|||
import time |
|||
import socket |
|||
|
|||
from message_queue.Manager import Manager |
|||
import shared_manager |
|||
from utils import manager_callback |
|||
|
|||
|
|||
def start_discovery_service(port): |
|||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|||
sock.bind(('0.0.0.0', port)) |
|||
print(f"Discovery service listening on port {port}...") |
|||
|
|||
while True: |
|||
data, addr = sock.recvfrom(1024) |
|||
if data.decode() == "DISCOVER_SERVER": |
|||
print(f"Received discovery request from {addr}") |
|||
sock.sendto(b"SERVER_RESPONSE", addr) |
|||
break |
|||
sock.close() |
|||
return addr |
|||
|
|||
class ConnectionTracker: |
|||
def __init__(self): |
|||
self.last_message_time = time.time() |
|||
self.timeout = 15 # 15-second timeout |
|||
self.last_client_ip = None # Track last connected client IP |
|||
|
|||
def update_last_message_time(self, client_ip=None): |
|||
self.last_message_time = time.time() |
|||
if client_ip: |
|||
self.last_client_ip = client_ip # Update last known client |
|||
|
|||
def is_client_active(self): |
|||
"""Check if the last client is still active within the last 15 seconds.""" |
|||
return (time.time() - self.last_message_time) < self.timeout |
|||
# Create an instance of ConnectionTracker |
|||
connection_tracker = ConnectionTracker() |
|||
|
|||
|
|||
class ConnectionThread(threading.Thread): |
|||
def __init__(self,debug = False,core = None): |
|||
super().__init__() |
|||
self.running = True |
|||
self.__debug = debug |
|||
self.__core = core |
|||
|
|||
def run(self): |
|||
while self.running: |
|||
time.sleep(0.5) |
|||
|
|||
if connection_tracker.last_client_ip: |
|||
|
|||
if self.__debug: |
|||
print(f"Checking if last client {connection_tracker.last_client_ip} is still available...") |
|||
|
|||
if self.query_last_client(connection_tracker.last_client_ip): |
|||
connection_tracker.update_last_message_time() |
|||
|
|||
if self.__debug: |
|||
print(f"Last client {connection_tracker.last_client_ip} responded. Continuing...") |
|||
continue # Skip discovering a new client |
|||
if not connection_tracker.is_client_active(): |
|||
|
|||
|
|||
print("Client inactive for 15 seconds. Searching for another client...") |
|||
cl_ip, _ = start_discovery_service(12345) |
|||
print(f"New client found: {cl_ip}") |
|||
|
|||
# Reinitialize the Manager with the new client address |
|||
shared_manager.manager = Manager(f"tcp://{cl_ip}:5558", f"tcp://{cl_ip}:5557") |
|||
shared_manager.manager.start(manager_callback) |
|||
|
|||
connection_tracker.update_last_message_time(cl_ip) # Update with new client |
|||
|
|||
def query_last_client(self, client_ip): |
|||
"""Send a heartbeat packet to the last known client IP on port 29170 and wait for a response.""" |
|||
|
|||
if self.__debug: |
|||
print(f"Sending heartbeat to {client_ip}:29170") # Debugging |
|||
|
|||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|||
sock.settimeout(5) # 5-second timeout |
|||
heartbeat_port = 29170 # New port for heartbeat |
|||
|
|||
try: |
|||
sock.sendto(b"HEARTBEAT", (client_ip, heartbeat_port)) # Send heartbeat message |
|||
data, addr = sock.recvfrom(1024) # Wait for response |
|||
|
|||
if self.__debug: |
|||
print(f"Received response from {addr}: {data.decode()}") # Debugging |
|||
|
|||
if data.decode() == "HEARTBEAT_ACK": |
|||
connection_tracker.update_last_message_time() # Update the last message time |
|||
|
|||
if self.__debug: |
|||
print(f"Client {client_ip} is still responding.") |
|||
return True |
|||
|
|||
except socket.timeout: |
|||
|
|||
print(f"Client {client_ip} did not respond. Marking as inactive.") |
|||
|
|||
finally: |
|||
sock.close() |
|||
|
|||
return False # Client did not respond |
|||
|
|||
|
|||
def stop(self): |
|||
self.running = False |
|||
|
@ -0,0 +1,52 @@ |
|||
import cv2 |
|||
from pywt.version import release |
|||
|
|||
|
|||
class cvStreamer(): |
|||
def __init__(self, idx): |
|||
self.cap = cv2.VideoCapture(idx) |
|||
self.idx = idx |
|||
|
|||
def isOpened(self): |
|||
isOpen = self.cap.isOpened() |
|||
|
|||
if not isOpen: |
|||
self.release() |
|||
else: |
|||
print(f"usb cam open at {self.idx}") |
|||
|
|||
|
|||
return isOpen |
|||
|
|||
def release(self): |
|||
self.cap.release() |
|||
|
|||
def get_frame(self): |
|||
_, frame = self.cap.read() |
|||
|
|||
# Get the original dimensions of the frame |
|||
height, width = frame.shape[:2] |
|||
|
|||
# Define the maximum dimensions |
|||
max_width = 1920 |
|||
max_height = 1080 |
|||
|
|||
# Calculate the aspect ratio |
|||
aspect_ratio = width / height |
|||
|
|||
# Resize the frame if it exceeds the maximum dimensions |
|||
if width > max_width or height > max_height: |
|||
if aspect_ratio > 1: # Landscape orientation |
|||
new_width = max_width |
|||
new_height = int(new_width / aspect_ratio) |
|||
else: # Portrait orientation |
|||
new_height = max_height |
|||
new_width = int(new_height * aspect_ratio) |
|||
|
|||
# Resize the frame |
|||
frame = cv2.resize(frame, (new_width, new_height)) |
|||
|
|||
return frame |
|||
|
|||
def __del__(self): |
|||
self.release() |
@ -0,0 +1,47 @@ |
|||
import pynvml |
|||
import time |
|||
from colorama import Fore, Style, init |
|||
import os |
|||
|
|||
# Initialize colorama |
|||
init(autoreset=True) |
|||
|
|||
|
|||
|
|||
def monitor_gpu_ram_usage(interval=2, threshold_gb=2): |
|||
pynvml.nvmlInit() |
|||
# Initialize NVML |
|||
try: |
|||
device_count = pynvml.nvmlDeviceGetCount() |
|||
print(f"Found {device_count} GPU(s).") |
|||
|
|||
while True: |
|||
for i in range(device_count): |
|||
handle = pynvml.nvmlDeviceGetHandleByIndex(i) |
|||
info = pynvml.nvmlDeviceGetMemoryInfo(handle) |
|||
|
|||
|
|||
|
|||
print(f"GPU {i}:") |
|||
print(f" Total RAM: {info.total / 1024 ** 2:.2f} MB") |
|||
if(info.used / 1024 ** 2 >= 2.5 * 1024 ): |
|||
print(Fore.RED + f" Used RAM: {info.used / 1024 ** 2:.2f} MB") |
|||
os.system("aplay /home/rog/repos/Tracker/NE-Smart-Tracker/Oxygen-Sys-Warning.wav") |
|||
else: |
|||
print(f" Used RAM: {info.used / 1024 ** 2:.2f} MB") |
|||
print(f" Free RAM: {info.free / 1024 ** 2:.2f} MB") |
|||
print(Fore.GREEN + "-" * 30) |
|||
print(Fore.GREEN) |
|||
|
|||
time.sleep(interval) # Wait for the specified interval before checking again |
|||
|
|||
except KeyboardInterrupt: |
|||
print("Monitoring stopped by user.") |
|||
|
|||
finally: |
|||
# Shutdown NVML |
|||
pynvml.nvmlShutdown() |
|||
|
|||
if __name__ == "__main__": |
|||
monitor_gpu_ram_usage(interval=2, threshold_gb=2) # Check every 2 seconds, threshold is 2 GB |
|||
|
@ -0,0 +1,140 @@ |
|||
import gi |
|||
import threading |
|||
import numpy as np |
|||
import socket |
|||
import cv2 |
|||
|
|||
gi.require_version('Gst', '1.0') |
|||
gi.require_version('GstRtspServer', '1.0') |
|||
from gi.repository import Gst, GstRtspServer, GLib |
|||
|
|||
def get_local_ip(): |
|||
"""Retrieve the local IP address of the machine.""" |
|||
try: |
|||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|||
s.connect(("8.8.8.8", 80)) |
|||
ip = s.getsockname()[0] |
|||
s.close() |
|||
return ip |
|||
except Exception as e: |
|||
return str(e) |
|||
|
|||
class VideoStream(GstRtspServer.RTSPMediaFactory): |
|||
def __init__(self, fps, width, height): |
|||
super(VideoStream, self).__init__() |
|||
self.fps = fps |
|||
self.width = width |
|||
self.height = height |
|||
self.frame = np.zeros((height, width, 3), dtype=np.uint8) # Initial black frame |
|||
self.lock = threading.Lock() |
|||
|
|||
def update_frame(self, frame): |
|||
"""Externally updates the current frame.""" |
|||
with self.lock: |
|||
self.frame = cv2.resize(frame, (self.width, self.height)) |
|||
|
|||
def on_need_data(self, src, length): |
|||
"""Provides frames to the pipeline when requested.""" |
|||
with self.lock: |
|||
frame_rgb = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB) |
|||
data = frame_rgb.tobytes() |
|||
|
|||
buf = Gst.Buffer.new_allocate(None, len(data), None) |
|||
buf.fill(0, data) |
|||
buf.duration = Gst.SECOND // self.fps |
|||
buf.pts = buf.dts = Gst.CLOCK_TIME_NONE |
|||
src.emit("push-buffer", buf) |
|||
|
|||
def do_create_element(self, url): |
|||
"""Creates the GStreamer pipeline for RTSP streaming.""" |
|||
pipeline_str = ( |
|||
"appsrc name=source is-live=true format=GST_FORMAT_TIME " |
|||
"caps=video/x-raw,format=RGB,width={},height={},framerate={}/1 " |
|||
"! videoconvert ! video/x-raw,format=I420 " |
|||
"! x264enc tune=zerolatency bitrate=500 speed-preset=ultrafast " |
|||
"! h264parse ! rtph264pay config-interval=1 name=pay0 pt=96" |
|||
).format(self.width, self.height, self.fps) |
|||
|
|||
pipeline = Gst.parse_launch(pipeline_str) |
|||
src = pipeline.get_by_name("source") |
|||
src.connect("need-data", self.on_need_data) |
|||
return pipeline |
|||
|
|||
class RTSPServer: |
|||
def __init__(self, ip="0.0.0.0", port=8554, mount_point="/stream"): |
|||
Gst.init(None) |
|||
self.server = GstRtspServer.RTSPServer() |
|||
self.server.set_address(ip) |
|||
self.server.set_service(str(port)) |
|||
self.mount_point = mount_point |
|||
|
|||
self.video_stream = None |
|||
self.current_fps = None |
|||
self.current_width = None |
|||
self.current_height = None |
|||
self.lock = threading.Lock() |
|||
|
|||
self.server.attach(None) |
|||
|
|||
def create_stream(self, fps, width, height): |
|||
"""Create a new RTSP stream with the given resolution and FPS.""" |
|||
with self.lock: |
|||
# Check if we need to recreate the stream |
|||
if (self.video_stream and |
|||
self.current_fps == fps and |
|||
self.current_width == width and |
|||
self.current_height == height): |
|||
return # No need to recreate |
|||
|
|||
# Remove old stream if exists |
|||
if self.video_stream: |
|||
print("Recreating RTSP stream for new resolution/fps...") |
|||
self.server.get_mount_points().remove_factory(self.mount_point) |
|||
|
|||
# Create a new stream with the updated settings |
|||
self.video_stream = VideoStream(fps, width, height) |
|||
self.server.get_mount_points().add_factory(self.mount_point, self.video_stream) |
|||
|
|||
self.current_fps = fps |
|||
self.current_width = width |
|||
self.current_height = height |
|||
|
|||
print(f"New RTSP stream created: {width}x{height} at {fps}fps") |
|||
|
|||
def update_frame(self, frame): |
|||
"""Push frames to the current streaming channel.""" |
|||
if frame is None or frame.size == 0: |
|||
return |
|||
|
|||
height, width, _ = frame.shape |
|||
fps = 25 # Default FPS |
|||
|
|||
# Ensure we have a valid stream for the given resolution |
|||
self.create_stream(fps, width, height) |
|||
|
|||
# Update the current stream with the new frame |
|||
if self.video_stream: |
|||
self.video_stream.update_frame(frame) |
|||
|
|||
# Global RTSP server instance |
|||
# rtsp_server = RTSPServer(get_local_ip(), 41231) |
|||
|
|||
def run_server(server): |
|||
"""Start the RTSP server loop.""" |
|||
print(f"RTSP Server running at rtsp://{server.server.get_address()}:{server.server.get_service()}/stream") |
|||
loop = GLib.MainLoop() |
|||
loop.run() |
|||
|
|||
# def stream_webcam(): |
|||
# cap = cv2.VideoCapture("/home/mht/Downloads/bcd2890d71caaf0e095b95c9b525973f61186656-360p.mp4") # Open webcam |
|||
# while cap.isOpened(): |
|||
# ret, frame = cap.read() |
|||
# if ret: |
|||
# rtsp_server.update_frame(frame) # Send frame to RTSP server |
|||
|
|||
# if __name__ == "__main__": |
|||
# # Start RTSP server in a separate thread |
|||
# threading.Thread(target=run_server, daemon=True).start() |
|||
# |
|||
# # Stream webcam frames |
|||
# stream_webcam() |
@ -0,0 +1 @@ |
|||
manager = "nothing" |
@ -0,0 +1,72 @@ |
|||
import os |
|||
import sys |
|||
proto_dir = os.path.join(os.path.dirname(__file__), 'message_queue', 'proto') |
|||
if proto_dir not in sys.path: |
|||
sys.path.append(proto_dir) |
|||
import shared_manager |
|||
from cvStreamer import cvStreamer |
|||
from message_queue.proto.Message_pb2 import Message |
|||
from message_queue.proto.TrackCommandMessage_pb2 import TrackCommand |
|||
from message_queue.proto.enums_pb2 import MessageType |
|||
from video_streamer.gst_video_streamer import GstVideoStreamer |
|||
|
|||
def findUsbCams(): |
|||
cv_cameras = [] |
|||
for i in range(50): |
|||
cap = cvStreamer(i) |
|||
if cap.isOpened(): |
|||
cv_cameras.append(cap) |
|||
return cv_cameras |
|||
|
|||
core = "a place holder for object" |
|||
|
|||
def manager_callback(msg_str): |
|||
msg = Message() |
|||
msg.ParseFromString(msg_str) |
|||
|
|||
if msg.msgType == MessageType.MESSAGE_TYPE_TOGGLE_TRACK: |
|||
if msg.track_cmd.command == TrackCommand.TRACK_COMMAND_STOP: |
|||
core.stop_track() |
|||
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START: |
|||
core.start_track(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) |
|||
elif msg.track_cmd.command == TrackCommand.TRACK_COMMAND_START_DETECT: |
|||
core.start_detect(msg.track_cmd.x, msg.track_cmd.y, msg.track_cmd.w, msg.track_cmd.h) |
|||
elif msg.msgType == MessageType.MESSAGE_TYPE_TRACK_SETTINGS: |
|||
core.set_thickness(msg.track_settings.thickness) |
|||
print(f"thickness: {msg.track_settings.thickness}") |
|||
elif msg.msgType == MessageType.MESSAGE_TYPE_SWITCH_CAMERA: |
|||
print(f"switch camera detected ,primary value is : {msg.cam_switch.primaryCamType}") |
|||
print(f"switch camera detected ,secondary value is : {msg.cam_switch.secondaryCamType}") |
|||
core.set_source(msg.cam_switch.primaryCamType) |
|||
elif msg.msgType == MessageType.MESSAGE_TYPE_SET_CAMERA: |
|||
if msg.cam_set.cameraSource == 1: |
|||
print(f"set camera source to network") |
|||
ip = msg.cam_set.ip |
|||
videoStreamers = [] |
|||
for src in range(2): |
|||
rtsp_link = f'rtsp://admin:Abc.12345@{ip}:554/ch{src}/stream0' |
|||
# ic(rtsp_link) |
|||
videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(src), fps=15) |
|||
videoStreamer.cameraStatus.connect(handle_camera_status) |
|||
print(f'{videoStreamer.id} connected') |
|||
videoStreamers.append(videoStreamer) |
|||
core.set_video_sources(videoStreamers) |
|||
# videoStreamers = [] |
|||
# for idx, rtsp_link in enumerate(rtsp_links): |
|||
# videoStreamer = GstVideoStreamer(rtsp_link, [1920, 1080, 3], str(idx), fps=15) |
|||
# videoStreamer.cameraStatus.connect(handle_camera_status) |
|||
# print(f'{videoStreamer.id} connected') |
|||
# videoStreamers.append(videoStreamer) |
|||
# core.set_video_sources(videoStreamers) |
|||
else: |
|||
sources = findUsbCams() |
|||
if len(sources) >= 2: |
|||
print("set camera source to captutre card") |
|||
core.set_video_sources(findUsbCams()) |
|||
|
|||
|
|||
def handle_camera_status(status: int): |
|||
m = Message() |
|||
m.msgType = MessageType.MESSAGE_TYPE_CAMERA_CONNECTION_STATUS |
|||
m.cam_status.status = status |
|||
shared_manager.manager.send_message(m.SerializeToString()) |
@ -0,0 +1,136 @@ |
|||
import subprocess |
|||
import time |
|||
import re |
|||
import psutil |
|||
import torch |
|||
import sys |
|||
from threading import Thread, Event |
|||
from queue import Queue, Empty |
|||
from colorama import Fore, Style, init |
|||
import os |
|||
|
|||
def get_gpu_memory_usage(): |
|||
"""Get current GPU memory usage percentage""" |
|||
if torch.cuda.is_available(): |
|||
device = torch.cuda.current_device() |
|||
total_mem = torch.cuda.get_device_properties(device).total_memory |
|||
allocated_mem = torch.cuda.memory_allocated(device) |
|||
return (allocated_mem / total_mem) * 100 |
|||
return 0 |
|||
|
|||
|
|||
def check_for_cuda_errors(log_queue): |
|||
"""Check application output for CUDA-related errors""" |
|||
cuda_error_patterns = [ |
|||
r"CUDA error", |
|||
r"out of memory", |
|||
r"cudaError", |
|||
r"RuntimeError: CUDA", |
|||
r"CUDA runtime error", |
|||
r"CUDA out of memory", |
|||
r"CUDA kernel failed" |
|||
] |
|||
|
|||
try: |
|||
while True: |
|||
line = log_queue.get_nowait() |
|||
for pattern in cuda_error_patterns: |
|||
if re.search(pattern, line, re.IGNORECASE): |
|||
return True |
|||
except Empty: |
|||
pass |
|||
return False |
|||
|
|||
|
|||
def read_output(pipe, log_queue, print_event): |
|||
"""Read output from subprocess and distribute it""" |
|||
try: |
|||
for line in iter(pipe.readline, ''): |
|||
log_queue.put(line) |
|||
if print_event.is_set(): |
|||
print(line, end='', flush=True) |
|||
except: |
|||
pass |
|||
|
|||
def run_application(command, max_gpu_usage=90, check_interval=10): |
|||
"""Run application with watchdog functionality and live output""" |
|||
print_event = Event() |
|||
print_event.set() # Enable printing by default |
|||
|
|||
while True: |
|||
print(f"\n{'=' * 40}") |
|||
print(f"Starting application: {' '.join(command)}") |
|||
print(f"{'=' * 40}\n") |
|||
|
|||
log_queue = Queue() |
|||
process = subprocess.Popen( |
|||
command, |
|||
stdout=subprocess.PIPE, |
|||
stderr=subprocess.STDOUT, |
|||
bufsize=1, |
|||
universal_newlines=True |
|||
) |
|||
|
|||
output_thread = Thread( |
|||
target=read_output, |
|||
args=(process.stdout, log_queue, print_event) |
|||
) |
|||
output_thread.daemon = True |
|||
output_thread.start() |
|||
|
|||
try: |
|||
while True: |
|||
if process.poll() is not None: |
|||
print("\nApplication exited with code:", process.returncode) |
|||
break |
|||
|
|||
gpu_usage = get_gpu_memory_usage() |
|||
if gpu_usage > max_gpu_usage: |
|||
print(f"\nGPU memory usage exceeded threshold ({gpu_usage:.1f}%)") |
|||
break |
|||
|
|||
if check_for_cuda_errors(log_queue): |
|||
print("\nCUDA error detected in application output") |
|||
break |
|||
|
|||
time.sleep(check_interval) |
|||
|
|||
print("\nWaiting 1.5 seconds before restart...") |
|||
print(Fore.RED + f"{30 * '-'}") |
|||
print(Fore.RED + "RESTATRING...") |
|||
print(Fore.RED + f"{30 * '-'}") |
|||
print(Fore.WHITE) |
|||
os.system("aplay /home/rog/repos/Tracker/NE-Smart-Tracker/Oxygen-Sys-Warning.wav") |
|||
# Clean up |
|||
try: |
|||
if process.poll() is None: |
|||
parent = psutil.Process(process.pid) |
|||
for child in parent.children(recursive=True): |
|||
child.kill() |
|||
parent.kill() |
|||
except psutil.NoSuchProcess: |
|||
pass |
|||
|
|||
if torch.cuda.is_available(): |
|||
torch.cuda.empty_cache() |
|||
|
|||
|
|||
time.sleep(1.5) |
|||
|
|||
except KeyboardInterrupt: |
|||
print("\nStopping watchdog...") |
|||
print_event.clear() |
|||
output_thread.join() |
|||
try: |
|||
process.kill() |
|||
except: |
|||
pass |
|||
break |
|||
|
|||
if __name__ == "__main__": |
|||
# Configure these parameters |
|||
APP_COMMAND = ["python", "app.py"] # Your application command |
|||
MAX_GPU_USAGE = 90 # Percentage threshold for GPU memory usage |
|||
CHECK_INTERVAL = 0.5 # Seconds between checks |
|||
|
|||
run_application(APP_COMMAND, MAX_GPU_USAGE, CHECK_INTERVAL) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue