#!/usr/bin/env python3 import os import torch import numpy as np import glob import matplotlib.pyplot as plt from pathlib import Path import sys import json from tqdm import tqdm import inspect import argparse from collections import OrderedDict, defaultdict import time # <<< IMPORT TIME >>> # Add the project root to path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # Try to force deterministic algorithms torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True print("PYTHON SCRIPT: Set cuDNN benchmark=False, deterministic=True") # Import model wrappers from pytracking.features.net_wrappers import DiMPTorchScriptWrapper # For loading AtomIoUNet from source from ltr.models.bbreg.atom_iou_net import AtomIoUNet # Add import for new modular comparison from model_comparison.bbreg_comparison import compare_debug_tensors SCRIPT_DIR_FOR_INIT = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR_FOR_INIT = os.path.dirname(SCRIPT_DIR_FOR_INIT) # --- Model Configurations --- def get_model_configs(root_dir_param): # ... (rest of get_model_configs, ensuring it uses root_dir_param if needed) # For now, assume it doesn't strictly need root_dir_param for paths if they are relative to script # or if model_dir in DiMPTorchScriptWrapper handles it. return { # ... (existing model_configs definitions) 'ResNet': { 'python_model_loader': lambda: DiMPTorchScriptWrapper(os.path.join(root_dir_param, 'pytracking_models/dimp50_ausdruck_ep0050.pth.tar')), 'cpp_output_subdir': 'resnet', 'python_output_subdir': 'resnet_py', 'outputs_to_compare': { 'Conv1': ('conv1_output.pt', 'conv1'), 'Debug ResNet Conv1->BN1 Input': ('debug_resnet_conv1_output_for_bn1_input.pt', 'conv1_pre_bn'), # BN1 final output (manual C++ vs manual Python pre-ReLU) 'BN1': ('bn1_output.pt', 'bn1_post_relu_pre'), # BN1 Intermediate comparisons 'BN1 Centered X': ('bn1_centered_x.pt', 'bn1_centered_x_py'), 'BN1 Var+Eps': ('bn1_variance_plus_eps.pt', 'bn1_variance_plus_eps_py'), 'BN1 InvStd': ('bn1_inv_std.pt', 'bn1_inv_std_py'), 'BN1 Normalized X': ('bn1_normalized_x.pt', 'bn1_normalized_x_py'), 'ReLU1': ('relu1_output.pt', 'conv1'), 'MaxPool': 'maxpool_output.pt', 'Features': 'features.pt', 'Layer1': 'layer1.pt', 'Layer2': 'layer2.pt', 'Layer3': 'layer3.pt', 'Layer4': 'layer4.pt', 'Layer1.0 Shortcut': 'layer1_0_shortcut_output.pt' } }, 'Classifier': { 'python_model_loader': lambda: DiMPTorchScriptWrapper(os.path.join(root_dir_param, 'pytracking_models/dimp50_ Ausdruck_ep0050.pth.tar')), 'cpp_output_subdir': 'classifier', 'python_output_subdir': 'classifier_py', 'outputs_to_compare': { 'Features': 'features.pt', } }, 'BBRegressor': { 'python_model_loader': lambda: DiMPTorchScriptWrapper(os.path.join(root_dir_param, 'pytracking_models/dimp50_ Ausdruck_ep0050.pth.tar')), 'cpp_output_subdir': 'bb_regressor', 'python_output_subdir': 'bb_regressor_py', 'outputs_to_compare': { 'IoUPred': 'iou_scores.pt', 'PyIoUFeat0': ('iou_feat0.pt', True), # True indicates Python-specific output name 'CppIoUFeat0': 'iou_feat0.pt', 'PyIoUFeat1': ('iou_feat1.pt', True), 'CppIoUFeat1': 'iou_feat1.pt', 'PyMod0': ('mod_vec0.pt', True), 'CppMod0': 'mod_vec0.pt', 'PyMod1': ('mod_vec1.pt', True), 'CppMod1': 'mod_vec1.pt', } }, } class ComparisonRunner: def __init__(self, root_dir, model_configs, cpp_output_dir, python_output_dir, num_samples=-1, plot_histograms=True, plot_scatter=True): self.root_dir = root_dir self.model_configs = model_configs self.cpp_output_dir = cpp_output_dir self.python_output_dir = python_output_dir self.num_samples = num_samples self.plot_histograms = plot_histograms self.plot_scatter = plot_scatter self.all_comparison_stats = defaultdict(lambda: defaultdict(list)) self.python_wrapper = None # ADDED: To store the DiMPTorchScriptWrapper instance self.models = {} # To store loaded Python sub-models like ResNet, Classifier self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Ensure comparison directory exists self.comparison_dir = os.path.join(self.root_dir, "test/comparison") if not os.path.exists(self.comparison_dir): os.makedirs(self.comparison_dir) print("PYTHON: Attempting to load 'traced_resnet50.pth'...") try: self.models['ResNet'] = torch.jit.load('traced_resnet50.pth', map_location=self.device) print("PYTHON: Successfully loaded 'traced_resnet50.pth'.") self.models['ResNet'].eval() print("PYTHON: ResNet JIT model set to eval().") except Exception as e: print(f"PYTHON: CRITICAL ERROR loading 'traced_resnet50.pth': {e}") self.models['ResNet'] = None # Ensure it's None if loading failed # Print sums of ResNet.bn1 running_mean and running_var from state_dict print("PYTHON: Attempting to access ResNet state_dict (if model loaded)...") if self.models.get('ResNet'): try: resnet_state_dict = self.models['ResNet'].state_dict() print("PYTHON ResNet state_dict keys:", list(resnet_state_dict.keys())) # PRINT ALL KEYS py_bn1_running_mean = resnet_state_dict.get('bn1.running_mean') py_bn1_running_var = resnet_state_dict.get('bn1.running_var') if py_bn1_running_mean is not None and py_bn1_running_var is not None: print(f"PYTHON ResNet.bn1 running_mean sum (from state_dict): {py_bn1_running_mean.sum().item():.10f}") print(f"PYTHON ResNet.bn1 running_var sum (from state_dict): {py_bn1_running_var.sum().item():.10f}") else: print("PYTHON: ResNet.bn1 running_mean or running_var is None in state_dict.") except Exception as e: print(f"PYTHON: Error accessing ResNet.bn1 state_dict: {e}") # Load other models if necessary (e.g., BBRegressor, Classifier) def load_python_models(self): print("DEBUG: ComparisonRunner.load_python_models() ENTERED") # DEBUG PRINT """Initialize Python models""" print("Loading Python models...") self.python_wrapper = DiMPTorchScriptWrapper( model_dir=str(Path(self.root_dir) / 'exported_weights'), device=self.device, backbone_sd='backbone_regenerated', # CORRECTED: Ensure this uses regenerated weights classifier_sd='classifier', bbregressor_sd='bb_regressor' ) # Populate self.models AFTER python_wrapper is initialized if self.python_wrapper: # Check if wrapper was successfully initialized print("DEBUG: self.python_wrapper initialized. Populating self.models.") # DEBUG PRINT if hasattr(self.python_wrapper, 'backbone') and self.python_wrapper.backbone is not None: # Check for 'backbone' self.models['ResNet'] = self.python_wrapper.backbone # Assign from .backbone print(f"DEBUG: self.models['ResNet'] populated with type: {type(self.models['ResNet'])}") # DEBUG PRINT else: print("ERROR: python_wrapper does not have a 'backbone' attribute or it is None.") self.models['ResNet'] = None if hasattr(self.python_wrapper, 'classifier') and self.python_wrapper.classifier is not None: self.models['Classifier'] = self.python_wrapper.classifier print(f"DEBUG: self.models['Classifier'] populated with type: {type(self.models['Classifier'])}") # DEBUG PRINT else: print("ERROR: python_wrapper does not have a 'classifier' attribute or it is None.") self.models['Classifier'] = None if hasattr(self.python_wrapper, 'bb_regressor') and self.python_wrapper.bb_regressor is not None: self.models['BBRegressor'] = self.python_wrapper.bb_regressor print(f"DEBUG: self.models['BBRegressor'] populated with type: {type(self.models['BBRegressor'])}") # DEBUG PRINT else: print("ERROR: python_wrapper does not have a 'bb_regressor' attribute or it is None.") self.models['BBRegressor'] = None else: print("CRITICAL ERROR: self.python_wrapper is None after DiMPTorchScriptWrapper instantiation.") # Ensure self.models has keys to prevent crashes later, though values will be None self.models['ResNet'] = None self.models['Classifier'] = None self.models['BBRegressor'] = None # Initialize BBRegressor from source for get_modulation fallback self.bb_regressor_from_source = AtomIoUNet( input_dim=(512, 1024), pred_input_dim=(256, 256), pred_inter_dim=(256, 256) ) ComparisonRunner.load_weights_for_custom_model( self.bb_regressor_from_source, 'bb_regressor', # model_name for path and doc file self.root_dir, self.device ) self.bb_regressor_from_source.eval().to(self.device) print("Python models loaded.") # New check: Compare the conv1.weight actually used by Python ResNet vs C++ ResNet from backbone_regenerated print("\n--- COMPARING CURRENTLY USED conv1.weight (Python vs C++) ---") python_resnet_conv1_weight = None if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'conv1'): python_resnet_conv1_weight = self.models['ResNet'].conv1.weight.detach().cpu() print(f" Python ResNet model's conv1.weight shape: {python_resnet_conv1_weight.shape}") cpp_conv1_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated/conv1_weight.pt") cpp_resnet_conv1_weight = None if os.path.exists(cpp_conv1_path): try: cpp_resnet_conv1_weight = torch.load(cpp_conv1_path, map_location='cpu', weights_only=False) print(f" C++ (loaded from {cpp_conv1_path}) conv1.weight shape: {cpp_resnet_conv1_weight.shape}") except Exception as e: print(f" Error loading C++ conv1.weight from {cpp_conv1_path}: {e}") if python_resnet_conv1_weight is not None and cpp_resnet_conv1_weight is not None: if isinstance(python_resnet_conv1_weight, torch.Tensor) and isinstance(cpp_resnet_conv1_weight, torch.Tensor): print(f" torch.allclose(python_model_conv1, cpp_loaded_conv1): {torch.allclose(python_resnet_conv1_weight, cpp_resnet_conv1_weight)}") abs_diff = torch.abs(python_resnet_conv1_weight - cpp_resnet_conv1_weight) print(f" Max abs diff for conv1.weight: {torch.max(abs_diff).item()}") print(f" Mean abs diff for conv1.weight: {torch.mean(abs_diff).item()}") else: print(" Skipping conv1.weight comparison due to type mismatch after loading.") else: print(" Skipping conv1.weight comparison because one or both tensors could not be obtained.") print("--- END CURRENTLY USED conv1.weight COMPARISON ---\n") # New check: Compare ResNet bn1 parameters print("\n--- COMPARING CURRENTLY USED bn1 PARAMS (Python vs C++) ---") bn1_param_names = ['weight', 'bias', 'running_mean', 'running_var'] python_resnet_bn1_params = {} if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'bn1'): bn1_module = self.models['ResNet'].bn1 for p_name in bn1_param_names: if hasattr(bn1_module, p_name): param_tensor = getattr(bn1_module, p_name) if param_tensor is not None: python_resnet_bn1_params[p_name] = param_tensor.detach().cpu() print(f" Python ResNet model's bn1.{p_name} shape: {python_resnet_bn1_params[p_name].shape}") else: print(f" Python ResNet model's bn1.{p_name} is None.") else: print(f" Python ResNet model's bn1 does not have attribute {p_name}.") cpp_resnet_bn1_params = {} for p_name in bn1_param_names: # Adjust filename for C++ saved tensors (e.g., bn1_running_mean.pt) cpp_param_filename = f"bn1_{p_name.replace('.', '_')}.pt" cpp_param_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated", cpp_param_filename) if os.path.exists(cpp_param_path): try: cpp_resnet_bn1_params[p_name] = torch.load(cpp_param_path, map_location='cpu', weights_only=False) print(f" C++ (loaded from {cpp_param_path}) bn1.{p_name} shape: {cpp_resnet_bn1_params[p_name].shape}") except Exception as e: print(f" Error loading C++ bn1.{p_name} from {cpp_param_path}: {e}") else: print(f" C++ bn1 parameter file not found: {cpp_param_path}") for p_name in bn1_param_names: py_tensor = python_resnet_bn1_params.get(p_name) cpp_tensor = cpp_resnet_bn1_params.get(p_name) print(f" Comparison for bn1.{p_name}:") if py_tensor is not None and cpp_tensor is not None: if isinstance(py_tensor, torch.Tensor) and isinstance(cpp_tensor, torch.Tensor): print(f" torch.allclose(python_bn1_{p_name}, cpp_bn1_{p_name}): {torch.allclose(py_tensor, cpp_tensor)}") abs_diff = torch.abs(py_tensor - cpp_tensor) print(f" Max abs diff for bn1.{p_name}: {torch.max(abs_diff).item()}") print(f" Mean abs diff for bn1.{p_name}: {torch.mean(abs_diff).item()}") else: print(f" Skipping bn1.{p_name} comparison due to type mismatch after loading.") else: print(f" Skipping bn1.{p_name} comparison because one or both tensors could not be obtained.") print("--- END CURRENTLY USED bn1 PARAMS COMPARISON ---\n") # New check: Compare ResNet layer1.0 parameters print("\n--- COMPARING CURRENTLY USED layer1.0 PARAMS (Python vs C++) ---") layer1_0_block_prefix = "layer1.0." layer1_0_components = { "conv1": ["weight"], "bn1": ["weight", "bias", "running_mean", "running_var"], "conv2": ["weight"], "bn2": ["weight", "bias", "running_mean", "running_var"], "conv3": ["weight"], "bn3": ["weight", "bias", "running_mean", "running_var"], "downsample.0": ["weight"], # Downsample Conv "downsample.1": ["weight", "bias", "running_mean", "running_var"] # Downsample BN } if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'layer1') and len(self.models['ResNet'].layer1) > 0: py_layer1_0_module = self.models['ResNet'].layer1[0] for comp_name, param_list in layer1_0_components.items(): py_comp_module = py_layer1_0_module try: # Handle nested modules like downsample.0 for part_name in comp_name.split('.'): py_comp_module = getattr(py_comp_module, part_name) except AttributeError: print(f" Python ResNet model's layer1.0 does not have component {comp_name}. Skipping.") continue for p_name in param_list: py_param_tensor_name = f"{layer1_0_block_prefix}{comp_name}.{p_name}" cpp_param_filename = f"{layer1_0_block_prefix.replace('.', '_')}{comp_name.replace('.', '_')}_{p_name}.pt" py_param_tensor = None if hasattr(py_comp_module, p_name): param_tensor_val = getattr(py_comp_module, p_name) if param_tensor_val is not None: py_param_tensor = param_tensor_val.detach().cpu() print(f" Python ResNet {py_param_tensor_name} shape: {py_param_tensor.shape}") else: print(f" Python ResNet {py_param_tensor_name} is None.") else: print(f" Python ResNet module {comp_name} does not have param {p_name}.") cpp_param_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated", cpp_param_filename) cpp_param_tensor = None if os.path.exists(cpp_param_path): try: cpp_param_tensor = torch.load(cpp_param_path, map_location='cpu', weights_only=False) # print(f" C++ (loaded from {cpp_param_path}) {cpp_param_filename} shape: {cpp_param_tensor.shape}") # Optional: less verbose except Exception as e: print(f" Error loading C++ {cpp_param_filename} from {cpp_param_path}: {e}") # Adjusted to cpp_param_filename else: print(f" Warning: C++ {cpp_param_filename} file not found: {cpp_param_path}") # Adjusted print(f" Comparison for {py_param_tensor_name} vs {cpp_param_filename}:") # More specific if py_param_tensor is not None and cpp_param_tensor is not None: if isinstance(py_param_tensor, torch.Tensor) and isinstance(cpp_param_tensor, torch.Tensor): all_close = torch.allclose(py_param_tensor, cpp_param_tensor) print(f" torch.allclose: {all_close}") if not all_close: abs_diff = torch.abs(py_param_tensor - cpp_param_tensor) print(f" Max abs diff: {torch.max(abs_diff).item()}") print(f" Mean abs diff: {torch.mean(abs_diff).item()}") else: print(f" Skipping comparison due to type mismatch after loading.") else: print(f" Skipping comparison because one or both tensors could not be obtained.") else: print(" Skipping layer1.0 parameter comparison: ResNet model or its layer1 not found/empty.") print("--- END CURRENTLY USED layer1.0 PARAMS COMPARISON ---\n") # Corrected to \n # --- START WEIGHT COMPARISON FOR layer1.1 and layer1.2 --- for block_idx_in_layer1 in [1, 2]: # For layer1.1 and layer1.2 print(f"\n--- COMPARING CURRENTLY USED layer1.{block_idx_in_layer1} PARAMS (Python vs C++) ---") layer1_block_prefix = f"layer1.{block_idx_in_layer1}." # Components within a standard bottleneck block (no downsample for these) block_components = { "conv1": ["weight"], "bn1": ["weight", "bias", "running_mean", "running_var", "num_batches_tracked"], "conv2": ["weight"], "bn2": ["weight", "bias", "running_mean", "running_var", "num_batches_tracked"], "conv3": ["weight"], "bn3": ["weight", "bias", "running_mean", "running_var", "num_batches_tracked"], } if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'layer1') and len(self.models['ResNet'].layer1) > block_idx_in_layer1: py_layer1_block_module = self.models['ResNet'].layer1[block_idx_in_layer1] for comp_name, param_list in block_components.items(): py_comp_module = py_layer1_block_module try: # No nested modules like 'downsample' for these blocks py_comp_module = getattr(py_comp_module, comp_name) except AttributeError: print(f" Python ResNet model's layer1.{block_idx_in_layer1} does not have component {comp_name}. Skipping.") continue for p_name in param_list: py_param_tensor_name = f"{layer1_block_prefix}{comp_name}.{p_name}" # C++ saves files like layer1_0_bn1_weight.pt or layer1_1_bn1_weight.pt cpp_param_filename = f"{layer1_block_prefix.replace('.', '_')}{comp_name.replace('.', '_')}_{p_name}.pt" py_param_tensor = None if hasattr(py_comp_module, p_name): param_tensor_val = getattr(py_comp_module, p_name) if param_tensor_val is not None: py_param_tensor = param_tensor_val.detach().cpu() print(f" Python ResNet {py_param_tensor_name} shape: {py_param_tensor.shape}") else: print(f" Python ResNet {py_param_tensor_name} is None.") elif p_name == "num_batches_tracked" and isinstance(py_comp_module, torch.nn.BatchNorm2d): # PyTorch stores num_batches_tracked in _buffers, not as a direct attribute usually if py_comp_module.num_batches_tracked is not None: py_param_tensor = py_comp_module.num_batches_tracked.detach().cpu() print(f" Python ResNet {py_param_tensor_name} (from buffer) shape: {py_param_tensor.shape}") else: print(f" Python ResNet {py_param_tensor_name} (from buffer) is None.") else: print(f" Python ResNet module {comp_name} does not have param/buffer {p_name}.") cpp_param_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated", cpp_param_filename) cpp_param_tensor = None if os.path.exists(cpp_param_path): try: cpp_param_tensor = torch.load(cpp_param_path, map_location='cpu', weights_only=False) except Exception as e: print(f" Error loading C++ {cpp_param_filename} from {cpp_param_path}: {e}") else: print(f" Warning: C++ {cpp_param_filename} file not found: {cpp_param_path}") print(f" Comparison for {py_param_tensor_name} vs {cpp_param_filename}:") if py_param_tensor is not None and cpp_param_tensor is not None: if isinstance(py_param_tensor, torch.Tensor) and isinstance(cpp_param_tensor, torch.Tensor): # Ensure tensors are float for allclose if one is int (e.g. num_batches_tracked) py_param_tensor_float = py_param_tensor.float() cpp_param_tensor_float = cpp_param_tensor.float() all_close = torch.allclose(py_param_tensor_float, cpp_param_tensor_float) print(f" torch.allclose: {all_close}") if not all_close: abs_diff = torch.abs(py_param_tensor_float - cpp_param_tensor_float) mae = torch.mean(abs_diff).item() max_abs_err = torch.max(abs_diff).item() print(f" MAE (Weight/Buffer): {mae:.4e}") print(f" Max Abs Err (Weight/Buffer): {max_abs_err:.4e}") # Also print L2 norms for context l2_py = torch.linalg.norm(py_param_tensor_float.flatten()).item() l2_cpp = torch.linalg.norm(cpp_param_tensor_float.flatten()).item() print(f" L2 Norm Python: {l2_py:.4e}") print(f" L2 Norm C++: {l2_cpp:.4e}") else: print(f" Skipping comparison due to type mismatch after loading for {py_param_tensor_name}.") else: print(f" Skipping comparison because one or both tensors could not be obtained for {py_param_tensor_name}.") else: print(f" Skipping layer1.{block_idx_in_layer1} parameter comparison: ResNet model or its layer1 not found/long enough.") print(f"--- END CURRENTLY USED layer1.{block_idx_in_layer1} PARAMS COMPARISON ---\n") # --- END WEIGHT COMPARISON FOR layer1.1 and layer1.2 --- # --- END TEMPORARY WEIGHT COMPARISON --- # This marker is now after layer1.0 checks print("\n--- Types at END of load_python_models: ---") if 'ResNet' in self.models: print(f" self.models['ResNet'] type: {type(self.models['ResNet'])}") if 'Classifier' in self.models: print(f" self.models['Classifier'] type: {type(self.models['Classifier'])}") if 'BBRegressor' in self.models: print(f" self.models['BBRegressor'] type: {type(self.models['BBRegressor'])}") def compare_classifier(self): """Compare classifier model outputs between Python and C++""" print("\nComparing classifier outputs...") # Python model needs C++ ResNet output as its input cpp_input_dir_path = Path(os.path.join(self.cpp_output_dir, 'resnet')) cpp_output_classifier_dir = Path(os.path.join(self.cpp_output_dir, 'classifier')) if not cpp_input_dir_path.exists() or not cpp_output_classifier_dir.exists(): print(f"Classifier input (C++ ResNet features from {cpp_input_dir_path}) or C++ Classifier output dir ({cpp_output_classifier_dir}) not found. Skipping Classifier comparison.") # Populate NaN for all expected Classifier comparisons if dirs are missing for i in range(self.num_samples): sample_key_base = f"Clf_Sample_{i}" current_errors = {} self._compare_tensor_data(None, None, "Classifier Features", i, current_errors) self.all_comparison_stats[sample_key_base] = current_errors return print("\nClassifier - Comparing Samples...") for i in tqdm(range(self.num_samples), desc="Classifier samples"): current_errors = {} # For this sample py_clf_feat = None cpp_clf_feat = None # Input for Python classifier is the layer3 output of C++ ResNet cpp_resnet_layer3_for_py_path = cpp_input_dir_path / f'sample_{i}_layer3.pt' # C++ classifier output cpp_classifier_feat_path = cpp_output_classifier_dir / f'sample_{i}_features.pt' if not cpp_resnet_layer3_for_py_path.exists() or not cpp_classifier_feat_path.exists(): print(f"Warning: Skipping classifier sample {i}, files not found: C++ ResNet output {cpp_resnet_layer3_for_py_path} or C++ Clf output {cpp_classifier_feat_path}.") else: feat_from_cpp_resnet = self.load_cpp_tensor(cpp_resnet_layer3_for_py_path, self.device) if feat_from_cpp_resnet is None: print(f"Critical: Failed to load C++ ResNet output tensor {cpp_resnet_layer3_for_py_path} for classifier sample {i}.") else: try: with torch.no_grad(): if self.models.get('Classifier'): py_clf_feat = self.models['Classifier'].extract_classification_feat(feat_from_cpp_resnet) else: print("ERROR: Python Classifier model not found in self.models") except Exception as e: print(f"ERROR: Python model extract_classification_feat failed for sample {i}: {e}") cpp_clf_feat = self.load_cpp_tensor(cpp_classifier_feat_path, self.device) if cpp_clf_feat is None: print(f"Warning: Failed to load C++ output tensor {cpp_classifier_feat_path} for classifier sample {i}.") self._compare_tensor_data(py_clf_feat, cpp_clf_feat, "Classifier Features", i, current_errors) if current_errors: self.all_comparison_stats[f"Clf_Sample_{i}"] = current_errors # Removed the separate "Test Samples" loop for classifier for simplification # The C++ test_models only produces one set of classifier outputs per sample. def compare_bb_regressor(self): """Compare bb_regressor model outputs between Python and C++""" print("\nComparing bb_regressor outputs...") # Python model inputs come from 'common' C++ generated/loaded files # C++ model outputs are in 'output/bb_regressor' py_input_common_dir = os.path.join(self.root_dir, 'test', 'input_samples', 'common') cpp_output_bb_reg_dir = os.path.join(self.cpp_output_dir, 'bb_regressor') cpp_resnet_output_dir = os.path.join(self.cpp_output_dir, 'resnet') # Convert to Path for exists check py_input_common_dir_path = Path(py_input_common_dir) cpp_output_bb_reg_dir_path = Path(cpp_output_bb_reg_dir) cpp_resnet_output_dir_path = Path(cpp_resnet_output_dir) if not py_input_common_dir_path.exists() or not cpp_output_bb_reg_dir_path.exists() or not cpp_resnet_output_dir_path.exists(): print(f"BB Regressor input ({py_input_common_dir_path}), C++ ResNet output ({cpp_resnet_output_dir_path}), or C++ BB Reg output dir ({cpp_output_bb_reg_dir_path}) not found. Skipping BB Regressor comparison.") # Populate NaN for all expected BB Regressor comparisons if dirs are missing for i in range(self.num_samples): sample_key_base = f"BBReg_Sample_{i}" current_errors = {} self._compare_tensor_data(None, None, "BBReg PyIoUFeat0 vs CppIoUFeat0", i, current_errors) self._compare_tensor_data(None, None, "BBReg PyIoUFeat1 vs CppIoUFeat1", i, current_errors) self._compare_tensor_data(None, None, "BBReg PyMod0 vs CppMod0", i, current_errors) self._compare_tensor_data(None, None, "BBReg PyMod1 vs CppMod1", i, current_errors) self._compare_tensor_data(None, None, "BBReg IoUPred", i, current_errors) self.all_comparison_stats[sample_key_base] = current_errors return for i in tqdm(range(self.num_samples), desc="BB Regressor samples"): current_errors = {} # For this sample # --- Python Model Path --- # For BBRegressor, the Python model needs to run its own ResNet pass # using the common input image. py_image_input_path = py_input_common_dir_path / f'sample_{i}_image.pt' py_init_bbox_path = py_input_common_dir_path / f'sample_{i}_bb.pt' py_proposals_path = py_input_common_dir_path / f'sample_{i}_proposals.pt' # --- C++ Model Outputs --- cpp_iou_feat0_path = cpp_output_bb_reg_dir_path / f'sample_{i}_iou_feat0.pt' cpp_iou_feat1_path = cpp_output_bb_reg_dir_path / f'sample_{i}_iou_feat1.pt' cpp_mod_vec0_path = cpp_output_bb_reg_dir_path / f'sample_{i}_mod_vec0.pt' cpp_mod_vec1_path = cpp_output_bb_reg_dir_path / f'sample_{i}_mod_vec1.pt' cpp_iou_scores_path = cpp_output_bb_reg_dir_path / f'sample_{i}_iou_scores.pt' # Paths for debug C++ outputs cpp_debug_conv3_1t_path = cpp_output_bb_reg_dir_path / f'sample_{i}_debug_conv3_1t_output.pt' cpp_debug_conv4_1t_path = cpp_output_bb_reg_dir_path / f'sample_{i}_debug_conv4_1t_output.pt' # Load initial inputs for Python model py_image_tensor = self.load_cpp_tensor(py_image_input_path, self.device) py_init_bbox = self.load_cpp_tensor(py_init_bbox_path, self.device) py_proposals = self.load_cpp_tensor(py_proposals_path, self.device) py_feat_layer2, py_feat_layer3 = None, None if py_image_tensor is not None: try: with torch.no_grad(): # Run Python ResNet backbone via the wrapper's method to include preprocessing if self.python_wrapper: py_backbone_outputs = self.python_wrapper.extract_backbone(py_image_tensor) else: print("ERROR: self.python_wrapper is None, cannot extract backbone features.") py_backbone_outputs = {} # Ensure it's a dict # Assign ResNet outputs to be used by BB Regressor py_feat_layer2 = py_backbone_outputs.get('layer2') py_feat_layer3 = py_backbone_outputs.get('layer3') except Exception as e: print(f"ERROR: Python ResNet backbone failed for sample {i}: {e}") else: print(f"Warning: Skipping Python BB Regressor for sample {i}, image input not found at {py_image_input_path}") # ---- Intermediate debug outputs for conv3_1t and conv4_1t ---- py_debug_conv3_1t_out = None py_debug_conv4_1t_out = None if py_feat_layer2 is not None: try: _feat2_for_debug_conv3_1t = py_feat_layer2 if _feat2_for_debug_conv3_1t.dim() == 5: _feat2_for_debug_conv3_1t = _feat2_for_debug_conv3_1t.reshape(-1, *_feat2_for_debug_conv3_1t.shape[-3:]) with torch.no_grad(): # Ensure no_grad context py_debug_conv3_1t_out = self.bb_regressor_from_source.conv3_1t(_feat2_for_debug_conv3_1t) except Exception as e: print(f"ERROR calculating Python Debug_Conv3_1t for sample {i}: {e}") if py_feat_layer3 is not None: try: _feat3_for_debug_conv4_1t = py_feat_layer3 if _feat3_for_debug_conv4_1t.dim() == 5: _feat3_for_debug_conv4_1t = _feat3_for_debug_conv4_1t.reshape(-1, *_feat3_for_debug_conv4_1t.shape[-3:]) with torch.no_grad(): # Ensure no_grad context py_debug_conv4_1t_out = self.bb_regressor_from_source.conv4_1t(_feat3_for_debug_conv4_1t) except Exception as e: print(f"ERROR calculating Python Debug_Conv4_1t for sample {i}: {e}") # ---- End intermediate debug outputs ---- # Get Python IoU features py_iou_feat_list = [None, None] # Initialize as a list of two Nones if py_feat_layer2 is not None and py_feat_layer3 is not None: try: # Use from-source get_iou_feat for consistent 256-channel features # DiMPTorchScriptWrapper.bb_regressor.get_iou_feat returns features with different channel counts temp_iou_feat = self.bb_regressor_from_source.get_iou_feat([py_feat_layer2, py_feat_layer3], i) if isinstance(temp_iou_feat, tuple): temp_iou_feat = list(temp_iou_feat) if len(temp_iou_feat) >= 2: py_iou_feat_list = [temp_iou_feat[0], temp_iou_feat[1]] elif len(temp_iou_feat) == 1: py_iou_feat_list[0] = temp_iou_feat[0] # print(f"Sample {i}: Py from-source get_iou_feat. Shapes: {[f.shape for f in py_iou_feat_list if f is not None]}") except Exception as e_iou_source: print(f"Sample {i}: Py from-source get_iou_feat failed: {e_iou_source}") # Get Python modulation vectors py_modulation_list = [None, None] # Initialize as a list of two Nones if py_feat_layer2 is not None and py_feat_layer3 is not None and py_init_bbox is not None: py_features_list = [py_feat_layer2, py_feat_layer3] squeezed_init_bbox = py_init_bbox if py_init_bbox.ndim == 3 and py_init_bbox.shape[0] > 0 and py_init_bbox.shape[1] == 1: squeezed_init_bbox = py_init_bbox.squeeze(1) try: # Using Torchscript model for modulation if self.python_wrapper and self.python_wrapper.bb_regressor: temp_mod = self.python_wrapper.bb_regressor.get_modulation(py_features_list, squeezed_init_bbox) else: print("ERROR: self.python_wrapper.bb_regressor is not available for get_modulation.") temp_mod = [None, None] if isinstance(temp_mod, tuple): temp_mod = list(temp_mod) if len(temp_mod) >= 2: py_modulation_list = [temp_mod[0], temp_mod[1]] elif len(temp_mod) == 1: py_modulation_list[0] = temp_mod[0] # print(f"Sample {i}: Py TorchScript get_modulation. Shapes: {[f.shape for f in py_modulation_list if f is not None]}") except Exception as e_ts: print(f"Sample {i}: Py TorchScript get_modulation failed: {e_ts}. Trying from-source.") try: temp_mod_source = self.bb_regressor_from_source.get_modulation(py_features_list, squeezed_init_bbox) if isinstance(temp_mod_source, tuple): temp_mod_source = list(temp_mod_source) if len(temp_mod_source) >=2: py_modulation_list = [temp_mod_source[0], temp_mod_source[1]] elif len(temp_mod_source) == 1: py_modulation_list[0] = temp_mod_source[0] # print(f"Sample {i}: Py from-source get_modulation. Shapes: {[f.shape for f in py_modulation_list if f is not None]}") except Exception as e_source: print(f"Sample {i}: Py from-source get_modulation also failed: {e_source}") # Run Python bb_regressor's predict_iou (from TorchScript model) py_iou_pred = None if all(f is not None for f in py_iou_feat_list) and \ all(m is not None for m in py_modulation_list) and \ py_proposals is not None: try: with torch.no_grad(): if self.python_wrapper and self.python_wrapper.bb_regressor: py_iou_pred = self.python_wrapper.bb_regressor.predict_iou(py_modulation_list, py_iou_feat_list, py_proposals) else: print("ERROR: self.python_wrapper.bb_regressor is not available for predict_iou.") py_iou_pred = None # print(f"Sample {i}: Py predict_iou output shape: {py_iou_pred.shape if py_iou_pred is not None else 'N/A'}") except Exception as e: print(f"ERROR: Python model predict_iou failed for sample {i}: {e}") # Load C++ outputs cpp_iou_feat0 = self.load_cpp_tensor(cpp_iou_feat0_path, self.device) cpp_iou_feat1 = self.load_cpp_tensor(cpp_iou_feat1_path, self.device) cpp_mod_vec0 = self.load_cpp_tensor(cpp_mod_vec0_path, self.device) cpp_mod_vec1 = self.load_cpp_tensor(cpp_mod_vec1_path, self.device) cpp_iou_scores = self.load_cpp_tensor(cpp_iou_scores_path, self.device) # Load debug C++ tensors cpp_debug_conv3_1t_tensor = self.load_cpp_tensor(cpp_debug_conv3_1t_path, self.device) cpp_debug_conv4_1t_tensor = self.load_cpp_tensor(cpp_debug_conv4_1t_path, self.device) # Comparisons self._compare_tensor_data(py_debug_conv3_1t_out, cpp_debug_conv3_1t_tensor, "BBReg Debug_Conv3_1t", i, current_errors) self._compare_tensor_data(py_debug_conv4_1t_out, cpp_debug_conv4_1t_tensor, "BBReg Debug_Conv4_1t", i, current_errors) self._compare_tensor_data(py_iou_feat_list[0], cpp_iou_feat0, "BBReg PyIoUFeat0 vs CppIoUFeat0", i, current_errors) self._compare_tensor_data(py_iou_feat_list[1], cpp_iou_feat1, "BBReg PyIoUFeat1 vs CppIoUFeat1", i, current_errors) self._compare_tensor_data(py_modulation_list[0], cpp_mod_vec0, "BBReg PyMod0 vs CppMod0", i, current_errors) self._compare_tensor_data(py_modulation_list[1], cpp_mod_vec1, "BBReg PyMod1 vs CppMod1", i, current_errors) self._compare_tensor_data(py_iou_pred, cpp_iou_scores, "BBReg IoUPred", i, current_errors) if current_errors: self.all_comparison_stats[f"BBReg_Sample_{i}"] = current_errors # For sample 0, save these tensors for comparison if i == 0: out_dir = Path('test/input_samples/common') out_dir.mkdir(parents=True, exist_ok=True) if py_feat_layer2 is not None: save_tensor_jit(py_feat_layer2.cpu(), out_dir / f'sample_0_layer2.pt') if py_feat_layer3 is not None: save_tensor_jit(py_feat_layer3.cpu(), out_dir / f'sample_0_layer3.pt') if py_proposals is not None: save_tensor_jit(py_proposals.cpu(), out_dir / f'sample_0_proposals.pt') if py_init_bbox is not None: save_tensor_jit(py_init_bbox.cpu(), out_dir / f'sample_0_bb.pt') def compare_resnet_outputs(self): print("\\n--- Comparing ResNet Outputs ---") if not self.models.get('ResNet'): print("PYTHON: ResNet model not loaded, skipping ResNet comparison.") return resnet_model = self.models['ResNet'] config = self.model_configs['ResNet'] cpp_resnet_dir = os.path.join(self.cpp_output_dir, config['cpp_output_subdir']) python_resnet_save_dir = os.path.join(self.python_output_dir, config.get('python_output_subdir', config['cpp_output_subdir'])) if not os.path.exists(python_resnet_save_dir): os.makedirs(python_resnet_save_dir, exist_ok=True) num_samples_to_process = self.num_samples if num_samples_to_process == -1: # If -1, determine from available C++ output files # This logic can be complex if C++ output is sparse. For now, let's assume if -1 it means process all *common* inputs. # A safer way for -1 would be to count common input samples first. common_input_glob = os.path.join(self.root_dir, "test", "input_samples", "common", "sample_*_image.pt") num_samples_to_process = len(glob.glob(common_input_glob)) print(f"INFO: num_samples set to -1, determined {num_samples_to_process} common input samples.") processed_samples_count = 0 # Renamed from processed_samples to avoid conflict sample_input_base_dir = os.path.join(self.root_dir, "test", "input_samples", "common") # Loop exactly self.num_samples times (or detected count if -1) for sample_idx in tqdm(range(num_samples_to_process), desc="Comparing ResNet samples"): current_errors = {} # Initialize for each sample python_intermediate_outputs_cache = {} # Reset for each sample # Construct the input file path based on sample_idx sample_input_file_path = os.path.join(sample_input_base_dir, f"sample_{sample_idx}_image.pt") if not os.path.exists(sample_input_file_path): print(f"Warning: Input sample file {sample_input_file_path} not found for sample index {sample_idx}. Skipping ResNet sample.") empty_errors_for_skipped_sample = {} for output_key_config in config['outputs_to_compare'].keys(): self._compare_tensor_data(None, None, output_key_config, sample_idx, empty_errors_for_skipped_sample) if empty_errors_for_skipped_sample: self.all_comparison_stats[f"ResNet_Sample_{sample_idx}"] = empty_errors_for_skipped_sample continue # --- START REINSTATED INPUT LOADING AND PREPROCESSING --- input_tensor = self.load_cpp_tensor(sample_input_file_path, self.device, is_image=True) if input_tensor is None: print(f"Warning: Failed to load a valid tensor for ResNet input sample {sample_input_file_path} (sample {sample_idx}) using self.load_cpp_tensor. Skipping.") # Populate NaNs for all expected outputs for this sample empty_errors_for_skipped_sample = {} for output_key_config in config['outputs_to_compare'].keys(): self._compare_tensor_data(None, None, output_key_config, sample_idx, empty_errors_for_skipped_sample) if empty_errors_for_skipped_sample: self.all_comparison_stats[f"ResNet_Sample_{sample_idx}"] = empty_errors_for_skipped_sample continue if not isinstance(input_tensor, torch.Tensor): print(f"Warning: self.load_cpp_tensor for {sample_input_file_path} did not return a Tensor (got {type(input_tensor)}). Skipping sample {sample_idx}.") # Populate NaNs for all expected outputs for this sample empty_errors_for_skipped_sample = {} for output_key_config in config['outputs_to_compare'].keys(): self._compare_tensor_data(None, None, output_key_config, sample_idx, empty_errors_for_skipped_sample) if empty_errors_for_skipped_sample: self.all_comparison_stats[f"ResNet_Sample_{sample_idx}"] = empty_errors_for_skipped_sample continue # Preprocess the input tensor for Python's ResNet if hasattr(self.python_wrapper, 'preprocess_image'): processed_input_tensor = self.python_wrapper.preprocess_image(input_tensor.clone()) # Use clone else: print("Warning: python_wrapper.preprocess_image not found. Using input_tensor as is.") processed_input_tensor = input_tensor.to(self.device) # Ensure device # --- END REINSTATED INPUT LOADING AND PREPROCESSING --- # --- Save preprocessed input for sample 0 --- if sample_idx == 0: preprocessed_dir = Path(self.cpp_output_dir) / 'resnet' preprocessed_dir.mkdir(parents=True, exist_ok=True) py_preprocessed_path = preprocessed_dir / f'sample_{sample_idx}_image_preprocessed_python.pt' torch.save(processed_input_tensor.cpu(), py_preprocessed_path) print(f"Saved Python preprocessed image for sample {sample_idx} to {py_preprocessed_path}") # --- END save preprocessed input --- # Initialize dictionaries to store Python-side outputs for the current sample python_outputs = {} # To store outputs from the Python model for this sample try: # Python ResNet forward pass (assuming it's a JIT model or similar) # The output of a JIT ResNet model might be a dictionary or a list/tuple of tensors # We need to ensure we can map these to the 'outputs_to_compare' keys print(f"PYTHON ResNet forward pass for sample {sample_idx}...") # For ResNet, the output is a dictionary from its forward method. # output_layers = list(config['outputs_to_compare'].keys()) # This might be too broad initially # Define the layers we actually need from the Python ResNet forward pass. # These should match the keys used in the Python ResNet's forward method. # e.g., ['layer1', 'layer2', 'layer3', 'layer4', 'conv1_output', 'bn1_output', etc.] # For now, let's define specific layers needed for the comparison. # The JIT ResNet model we have should output a dictionary. py_output_layers_needed = ['conv1', 'layer1', 'layer2', 'layer3', 'layer4'] # Only request layers that the Python ResNet model actually supports # The Python ResNet model only supports standard layers, not intermediate debug layers # Add 'fc' if configured, though not typically used in these comparisons if 'fc' in config['outputs_to_compare']: py_output_layers_needed.append('fc') # Deduplicate, just in case (though construction above should be fine) py_output_layers_needed = list(OrderedDict.fromkeys(py_output_layers_needed)) print(f"DEBUG: Requesting these layers from Python ResNet: {py_output_layers_needed}") # Call the Python ResNet forward # The `self.models['ResNet']` should be the loaded JIT model # It expects the output_layers argument. # The DiMPTorchScriptWrapper's backbone should also support this. if hasattr(resnet_model, 'forward') and callable(getattr(resnet_model, 'forward')) and 'output_layers' in inspect.signature(resnet_model.forward).parameters: python_model_outputs_dict = resnet_model.forward(processed_input_tensor, output_layers=py_output_layers_needed) elif hasattr(self.python_wrapper, 'extract_backbone') and callable(getattr(self.python_wrapper, 'extract_backbone')): # This is the case if ResNet is accessed via the DiMPTorchScriptWrapper's extract_backbone, # which internally calls the backbone's forward with output_layers. python_model_outputs_dict = self.python_wrapper.extract_backbone(input_tensor.clone()) # extract_backbone handles preprocessing else: print(f"ERROR: Cannot call forward on Python ResNet model. Type: {type(resnet_model)}") continue # DEBUG: Print keys from Python model output if isinstance(python_model_outputs_dict, dict): print(f"DEBUG RN_CMP: Keys from python_model_outputs_dict (sample {sample_idx}): {list(python_model_outputs_dict.keys())}") else: print(f"DEBUG RN_CMP: python_model_outputs_dict is not a dict (sample {sample_idx}), type: {type(python_model_outputs_dict)}") # Populate python_outputs based on the python_model_outputs_dict # This maps the Python output names to the keys used in 'outputs_to_compare' if isinstance(python_model_outputs_dict, dict): python_outputs = python_model_outputs_dict # If 'features' is an alias for 'layer4' in Python output if 'layer4' in python_outputs and 'features' not in python_outputs: python_outputs['features'] = python_outputs['layer4'] if 'conv1_output' in python_outputs: python_intermediate_outputs_cache['conv1_output'] = python_outputs['conv1_output'] else: print(f"ERROR: Python ResNet output is not a dict. Got {type(python_model_outputs_dict)}") # Handle tuple/list output if necessary, mapping by order or specific logic. # For now, we assume dict output from our ResNet. continue except Exception as e: print(f"Error during Python ResNet forward pass for sample {sample_idx}: {e}") import traceback traceback.print_exc() continue # Skip to next sample for output_key, cpp_output_filename_or_tuple in config['outputs_to_compare'].items(): is_python_specific_name = isinstance(cpp_output_filename_or_tuple, tuple) cpp_output_filename = cpp_output_filename_or_tuple[0] if is_python_specific_name else cpp_output_filename_or_tuple # Corrected path construction for C++ ResNet tensors: # The sample index is already part of the cpp_output_filename for ResNet outputs from C++. # (e.g., sample_0_conv1_output.pt) # So, we join cpp_resnet_dir directly with this filename. # However, the C++ code actually saves ResNet outputs as sample_X_LAYERNAME.pt directly in cpp_resnet_dir, # not in a per-sample subdirectory for ResNet outputs. # Let's check how test_models.cpp saves them. # test_models.cpp -> save_resnet_outputs -> file_path = resnet_output_dir + "/sample_" + std::to_string(sample_idx) + "_" + output_name; # This means filenames are like "sample_0_conv1_output.pt" directly in "../test/output/resnet/" correct_cpp_tensor_filename = f"sample_{sample_idx}_{cpp_output_filename}" cpp_tensor_path = os.path.join(cpp_resnet_dir, correct_cpp_tensor_filename) # <<< START ADDED DEBUG PRINTS >>> print(f"DEBUG RN_CMP: Attempting to load C++ tensor for '{output_key}' (sample {sample_idx}) from: {cpp_tensor_path}") # <<< END ADDED DEBUG PRINTS >>> try: cpp_tensor = self.load_cpp_tensor(cpp_tensor_path, self.device) # <<< START ADDED DEBUG PRINTS >>> loaded_status = "None" if cpp_tensor is not None: loaded_status = f"Tensor with shape {cpp_tensor.shape}, dtype {cpp_tensor.dtype}, device {cpp_tensor.device}" print(f"DEBUG RN_CMP: Loaded C++ tensor for '{output_key}' (sample {sample_idx}): {loaded_status}") # <<< END ADDED DEBUG PRINTS >>> if cpp_tensor is None: print(f"Warning: C++ tensor {cpp_output_filename} for sample {sample_idx} ('{output_key}') is None or loading failed. Skipping comparison for this output.") # _compare_tensor_data will be called with cpp_tensor=None, which handles NaN population # Fall through to _compare_tensor_data to record NaNs # continue # This would skip the _compare_tensor_data call entirely # Get the corresponding Python tensor python_tensor = None python_output_save_path = os.path.join(python_resnet_save_dir, f"sample_{sample_idx}", cpp_output_filename) # Save with same name as C++ for consistency # Map the 'output_key' from config to the key used in 'python_outputs' dictionary # This requires knowing how 'outputs_to_compare' keys map to Python model output dict keys. # Example: 'Conv1' maps to 'conv1_output', 'Features' to 'features' (which might be 'layer4'), etc. py_dict_key = None if output_key == 'Conv1': py_dict_key = 'conv1' # Python ResNet outputs conv1 directly elif output_key == 'Debug ResNet Conv1->BN1 Input': print(f"Warning: Python ResNet does not support intermediate debug layers. Skipping {output_key}.") continue elif output_key == 'BN1': print(f"Warning: Python ResNet does not support intermediate BN1 output. Skipping {output_key}.") continue elif output_key == 'BN1 Centered X': print(f"Warning: Python ResNet does not support intermediate BN1 layers. Skipping {output_key}.") continue elif output_key == 'BN1 Var+Eps': print(f"Warning: Python ResNet does not support intermediate BN1 layers. Skipping {output_key}.") continue elif output_key == 'BN1 InvStd': print(f"Warning: Python ResNet does not support intermediate BN1 layers. Skipping {output_key}.") continue elif output_key == 'BN1 Normalized X': print(f"Warning: Python ResNet does not support intermediate BN1 layers. Skipping {output_key}.") continue elif output_key == 'ReLU1': print(f"Warning: Python ResNet does not support intermediate ReLU1 output. Skipping {output_key}.") continue elif output_key == 'MaxPool': # MaxPool is applied *after* 'conv1' (conv1+bn1+relu) block in Python ResNet. # However, the Python ResNet forward doesn't have a separate 'maxpool' output key. # The output of layer1 is *after* maxpool. # C++ saves maxpool_output.pt *before* layer1. # This means we need to save python_outputs['conv1'] (after conv1,bn1,relu) then apply maxpool to it manually for comparison. # OR, recognize that C++ output for maxpool is input to layer1. # For now, this is tricky. Let's see if layer1 input in C++ matches python maxpool output. # The Python output named 'layer1' is after the nn.Sequential that IS layer1. # The input to C++ layer1 is the output of C++ maxpool. # The input to Python model.layer1 is the output of model.maxpool(model.relu(model.bn1(model.conv1(x)))). # So, Python's 'conv1' output, when passed through an nn.MaxPool2d, should match C++ 'maxpool_output.pt'. print(f"Warning: Direct Python equivalent for C++ 'MaxPool' output is complex. Requires manual maxpool application to Python's 'conv1' output. Skipping {output_key} for now.") continue # Skip this key for now elif output_key == 'Layer1': py_dict_key = 'layer1' elif output_key == 'Layer2': py_dict_key = 'layer2' elif output_key == 'Layer3': py_dict_key = 'layer3' elif output_key == 'Layer4': py_dict_key = 'layer4' elif output_key == 'Features': py_dict_key = 'layer4' # 'Features' is an alias for 'layer4' elif output_key == 'Layer1.0 Shortcut': # Shortcut outputs are not available from the Python ResNet forward method. print(f"Warning: Shortcut output '{output_key}' cannot be directly fetched from Python ResNet. Skipping.") continue else: print(f"Warning: Unknown output_key '{output_key}' in ResNet config for Python tensor mapping. Skipping.") continue if py_dict_key and py_dict_key in python_outputs: python_tensor = python_outputs[py_dict_key] else: # DEBUG: Print info if key is not found print(f"DEBUG RN_CMP: py_dict_key '{py_dict_key}' not found in python_outputs (keys: {list(python_outputs.keys())}) for output_key '{output_key}', sample {sample_idx}") if python_tensor is None: print(f"Warning: Python tensor for {output_key} is None for sample {sample_idx}. Skipping.") continue # Save the Python tensor (always, for record-keeping) os.makedirs(os.path.dirname(python_output_save_path), exist_ok=True) torch.save(python_tensor.cpu(), python_output_save_path) # print(f"Saved Python tensor for {output_key} (sample {sample_idx}) to {python_output_save_path}") # Perform comparison self._compare_tensor_data(python_tensor.to(self.device) if python_tensor is not None else None, cpp_tensor, output_key, sample_idx, current_errors) # current_errors is populated in place # The line above was changed to handle python_tensor being None before .to(self.device) # current_errors is populated by _compare_tensor_data directly. # self.all_comparison_stats is updated after this inner loop completes for the sample. except FileNotFoundError: print(f"Warning: C++ output file not found: {cpp_tensor_path}. Skipping for sample {sample_idx}, output {output_key}.") # Populate NaNs for this missing C++ file self._compare_tensor_data(None, None, output_key, sample_idx, current_errors) except Exception as e: print(f"Error comparing {output_key} for sample {sample_idx}: {e}") import traceback traceback.print_exc() # Populate NaNs on error self._compare_tensor_data(None, None, output_key, sample_idx, current_errors) # After processing all output_keys for this sample, store the collected current_errors if current_errors: # If any comparisons were attempted (even if they resulted in NaNs) self.all_comparison_stats[f"ResNet_Sample_{sample_idx}"] = current_errors # processed_samples += 1 # This variable is no longer used as loop is range-based print("--- ResNet Output Comparison Complete ---") def generate_html_report(self): print("\nGenerating HTML report...") report_path = os.path.join(self.comparison_dir, "report.html") # Prepare data for the report: group by model and comparison type report_data = { } for sample_key, comparisons in self.all_comparison_stats.items(): # sample_key examples: "Clf_Train_Sample_0", "Clf_Test_Sample_0", "BBReg_Sample_0" parts = sample_key.split("_") model_prefix = parts[0] # Clf, BBReg, ResNet sample_type_str = "" sample_idx = -1 if model_prefix == "Clf": sample_type_str = parts[1] # Train or Test sample_idx = int(parts[-1]) model_name_key = f"Classifier {sample_type_str}" elif model_prefix == "BBReg": sample_idx = int(parts[-1]) model_name_key = "BB Regressor" elif model_prefix == "ResNet": # Added this case sample_idx = int(parts[-1]) model_name_key = "ResNet" else: print(f"WARNING: Unknown sample key format in all_comparison_stats: {sample_key}") continue for comparison_name, stats in comparisons.items(): # comparison_name examples: "Classifier Features Train", "BBReg PyIoUFeat0 vs CppIoUFeat0" # Unpack all 11 metrics now mae, max_err, diff_arr, mean_py_val, std_abs_err, \ l2_py, l2_cpp, l2_diff, cos_sim, pearson, mre = stats full_comparison_key = f"{model_name_key} - {comparison_name}" if full_comparison_key not in report_data: report_data[full_comparison_key] = { "samples": {}, "all_maes": [], "all_max_errs": [], "all_mean_py_vals": [], "all_std_abs_errs": [], # Renamed from all_std_errs "all_l2_py_vals": [], "all_l2_cpp_vals": [], "all_l2_diff_vals": [], "all_cos_sim_vals": [], "all_pearson_vals": [], "all_mre_vals": [] } relative_plot_path = None # Initialize relative_plot_path plot_filename = f"{model_name_key.replace(' ', '_')}_{comparison_name.replace(' ', '_')}_{sample_idx}.png" plot_abs_path = os.path.join(self.comparison_dir, plot_filename) if os.path.exists(plot_abs_path): # relative_plot_path = Path(plot_filename) # Old relative_plot_path = plot_filename # New, already just filename self._generate_single_plot(diff_arr, comparison_name, plot_abs_path, mean_py_val, std_abs_err, mae, max_err) report_data[full_comparison_key]["samples"][sample_idx] = { "mae": mae, "max_err": max_err, "mean_py_val": mean_py_val, "std_abs_err": std_abs_err, # Renamed from std_err "l2_py": l2_py, "l2_cpp": l2_cpp, "l2_diff": l2_diff, "cos_sim": cos_sim, "pearson": pearson, "mre": mre, "plot_path": relative_plot_path # Store relative path for HTML } if not np.isnan(mae): report_data[full_comparison_key]["all_maes"].append(mae) if not np.isnan(max_err): report_data[full_comparison_key]["all_max_errs"].append(max_err) if not np.isnan(mean_py_val): report_data[full_comparison_key]["all_mean_py_vals"].append(mean_py_val) if not np.isnan(std_abs_err): report_data[full_comparison_key]["all_std_abs_errs"].append(std_abs_err) if not np.isnan(l2_py): report_data[full_comparison_key]["all_l2_py_vals"].append(l2_py) if not np.isnan(l2_cpp): report_data[full_comparison_key]["all_l2_cpp_vals"].append(l2_cpp) if not np.isnan(l2_diff): report_data[full_comparison_key]["all_l2_diff_vals"].append(l2_diff) if not np.isnan(cos_sim): report_data[full_comparison_key]["all_cos_sim_vals"].append(cos_sim) if not np.isnan(pearson): report_data[full_comparison_key]["all_pearson_vals"].append(pearson) if not np.isnan(mre): report_data[full_comparison_key]["all_mre_vals"].append(mre) # Calculate overall stats for comp_key, data in report_data.items(): data["overall_mae_mean"] = np.mean(data["all_maes"]) if data["all_maes"] else float('nan') data["overall_mae_std"] = np.std(data["all_maes"]) if data["all_maes"] else float('nan') data["overall_max_err_mean"] = np.mean(data["all_max_errs"]) if data["all_max_errs"] else float('nan') data["overall_mean_py_val_mean"] = np.mean(data["all_mean_py_vals"]) if data["all_mean_py_vals"] else float('nan') data["overall_std_abs_err_mean"] = np.mean(data["all_std_abs_errs"]) if data["all_std_abs_errs"] else float('nan') # Renamed data["overall_l2_py_mean"] = np.mean(data["all_l2_py_vals"]) if data["all_l2_py_vals"] else float('nan') data["overall_l2_cpp_mean"] = np.mean(data["all_l2_cpp_vals"]) if data["all_l2_cpp_vals"] else float('nan') data["overall_l2_diff_mean"] = np.mean(data["all_l2_diff_vals"]) if data["all_l2_diff_vals"] else float('nan') data["overall_cos_sim_mean"] = np.mean(data["all_cos_sim_vals"]) if data["all_cos_sim_vals"] else float('nan') data["overall_pearson_mean"] = np.mean(data["all_pearson_vals"]) if data["all_pearson_vals"] else float('nan') data["overall_mre_mean"] = np.mean(data["all_mre_vals"]) if data["all_mre_vals"] else float('nan') # HTML Generation html_content = """ Model Comparison Report

Model Comparison Report

Number of samples per model component: {self.num_samples}

Understanding the Metrics:

Mean MAE (Mean Absolute Error)
Calculation: Average of the absolute differences between corresponding elements of the Python and C++ tensors (mean(abs(py - cpp))). The "Mean MAE" in the summary table is the average of these MAEs over all samples for a given comparison.
Range & Interpretation: 0 to ∞. Closer to 0 indicates better agreement. This metric shows the average magnitude of error.
Std MAE (Standard Deviation of MAE)
Calculation: Standard deviation of the MAE values calculated for each sample within a comparison group.
Range & Interpretation: 0 to ∞. A smaller value indicates that the MAE is consistent across samples. A larger value suggests variability in agreement from sample to sample.
Mean Max Error
Calculation: Average of the maximum absolute differences found between Python and C++ tensors for each sample (mean(max(abs(py - cpp))) over samples).
Range & Interpretation: 0 to ∞. Closer to 0 is better. Indicates the average of the worst-case discrepancies per sample.
Mean Py Val (Mean Python Tensor Value)
Calculation: Average of the mean values of the Python reference tensors over all samples (mean(mean(py_tensor_sample_N))).
Range & Interpretation: Problem-dependent. Provides context about the typical magnitude of the Python model's output values.
Mean Std Abs Err (Mean Standard Deviation of Absolute Errors)
Calculation: Average of the standard deviations of the absolute error arrays (abs(py - cpp)) for each sample. The "Err Std" in plot titles is this value for that specific sample.
Range & Interpretation: 0 to ∞. A smaller value indicates that the errors are concentrated around their mean (MAE), implying less spread in error magnitudes within a sample.
Mean L2 Py (Mean L2 Norm of Python Tensor)
Calculation: Average of the L2 norms (Euclidean norm) of the flattened Python tensors over all samples.
Range & Interpretation: 0 to ∞. Represents the average magnitude or "length" of the Python output vectors.
Mean L2 Cpp (Mean L2 Norm of C++ Tensor)
Calculation: Average of the L2 norms of the flattened C++ tensors over all samples.
Range & Interpretation: 0 to ∞. Represents the average magnitude of the C++ output vectors. Should be comparable to Mean L2 Py if models agree in scale.
Mean L2 Diff (Mean L2 Norm of Difference)
Calculation: Average of the L2 norms of the flattened difference tensors (py - cpp) over all samples.
Range & Interpretation: 0 to ∞. Closer to 0 indicates better agreement. This is the magnitude of the average difference vector.
Mean Cosine Sim (Mean Cosine Similarity)
Calculation: Average of the cosine similarities between the flattened Python and C++ tensors over all samples. Cosine similarity is dot(py, cpp) / (norm(py) * norm(cpp)).
Range & Interpretation: -1 to 1 (typically 0 to 1 for non-negative features). Closer to 1 indicates that the tensors point in the same direction (high similarity in terms of orientation, ignoring magnitude). Values near 0 suggest orthogonality, and near -1 suggest opposite directions.
Mean Pearson Corr (Mean Pearson Correlation Coefficient)
Calculation: Average of the Pearson correlation coefficients between the flattened Python and C++ tensors over all samples. Measures linear correlation.
Range & Interpretation: -1 to 1. Closer to 1 indicates strong positive linear correlation. Closer to -1 indicates strong negative linear correlation. Closer to 0 indicates weak or no linear correlation.
Mean MRE (Mean Relative Error)
Calculation: Average of the mean relative errors per sample, where relative error is mean(abs(py - cpp) / (abs(py) + epsilon)). Epsilon is a small value to prevent division by zero.
Range & Interpretation: 0 to ∞. Closer to 0 is better. This metric normalizes the absolute error by the magnitude of the Python reference values, useful for understanding error relative to signal strength.
""" sorted_report_keys = sorted(report_data.keys()) html_content += "

Overall Comparison Statistics

" for comp_key in sorted_report_keys: data = report_data[comp_key] html_content += f""" """ html_content += "
Comparison KeyMean MAEStd MAEMean Max ErrorMean Py ValMean Std Abs ErrMean L2 PyMean L2 CppMean L2 DiffMean Cosine SimMean Pearson CorrMean MRE
{comp_key} {f"{data['overall_mae_mean']:.4e}" if not np.isnan(data['overall_mae_mean']) else 'N/A'} {f"{data['overall_mae_std']:.4e}" if not np.isnan(data['overall_mae_std']) else 'N/A'} {f"{data['overall_max_err_mean']:.4e}" if not np.isnan(data['overall_max_err_mean']) else 'N/A'} {f"{data['overall_mean_py_val_mean']:.4e}" if not np.isnan(data['overall_mean_py_val_mean']) else 'N/A'} {f"{data['overall_std_abs_err_mean']:.4e}" if not np.isnan(data['overall_std_abs_err_mean']) else 'N/A'} {f"{data['overall_l2_py_mean']:.4e}" if not np.isnan(data['overall_l2_py_mean']) else 'N/A'} {f"{data['overall_l2_cpp_mean']:.4e}" if not np.isnan(data['overall_l2_cpp_mean']) else 'N/A'} {f"{data['overall_l2_diff_mean']:.4e}" if not np.isnan(data['overall_l2_diff_mean']) else 'N/A'} {f"{data['overall_cos_sim_mean']:.4f}" if not np.isnan(data['overall_cos_sim_mean']) else 'N/A'} {f"{data['overall_pearson_mean']:.4f}" if not np.isnan(data['overall_pearson_mean']) else 'N/A'} {f"{data['overall_mre_mean']:.4e}" if not np.isnan(data['overall_mre_mean']) else 'N/A'}
" for comp_key in sorted_report_keys: data = report_data[comp_key] html_content += f"

Details for: {comp_key}

" html_content += f"""

Overall Mean MAE: {f'{data["overall_mae_mean"]:.4e}' if not np.isnan(data['overall_mae_mean']) else 'N/A'}

""" html_content += "" for sample_idx in sorted(data["samples"].keys()): sample_data = data["samples"][sample_idx] img_tag = f'Error histogram for {comp_key} sample {sample_idx}' if sample_data["plot_path"] else "N/A" html_content += f""" """ html_content += "
Sample IndexMAEMax ErrorMean Py ValStd Abs ErrL2 PyL2 CppL2 DiffCosine SimPearson CorrMREError Distribution Plot
{sample_idx} {f"{sample_data['mae']:.4e}" if not np.isnan(sample_data['mae']) else 'N/A'} {f"{sample_data['max_err']:.4e}" if not np.isnan(sample_data['max_err']) else 'N/A'} {f"{sample_data['mean_py_val']:.4e}" if not np.isnan(sample_data['mean_py_val']) else 'N/A'} {f"{sample_data['std_abs_err']:.4e}" if not np.isnan(sample_data['std_abs_err']) else 'N/A'} {f"{sample_data['l2_py']:.4e}" if not np.isnan(sample_data['l2_py']) else 'N/A'} {f"{sample_data['l2_cpp']:.4e}" if not np.isnan(sample_data['l2_cpp']) else 'N/A'} {f"{sample_data['l2_diff']:.4e}" if not np.isnan(sample_data['l2_diff']) else 'N/A'} {f"{sample_data['cos_sim']:.4f}" if not np.isnan(sample_data['cos_sim']) else 'N/A'} {f"{sample_data['pearson']:.4f}" if not np.isnan(sample_data['pearson']) else 'N/A'} {f"{sample_data['mre']:.4e}" if not np.isnan(sample_data['mre']) else 'N/A'} {img_tag}
" html_content += """ """ with open(report_path, 'w') as f: f.write(html_content) print(f"HTML report generated at {report_path}") def _generate_single_plot(self, error_array, title, plot_path, mean_val, std_abs_err, mae, max_err): if error_array is None or len(error_array) == 0 or np.all(np.isnan(error_array)): # print(f"Skipping plot for {title} as error_array is empty or all NaNs.") return plt.figure(figsize=(8, 6)) plt.hist(error_array, bins=50, color='skyblue', edgecolor='black') stats_text = f"Ref Mean: {mean_val:.3e} | MAE: {mae:.3e} | MaxErr: {max_err:.3e} | Err Std: {std_abs_err:.3e}" plt.title(f"{title}\n{stats_text}", fontsize=10) plt.xlabel("Error Value") plt.ylabel("Frequency") plt.grid(True, linestyle='--', alpha=0.7) try: plt.tight_layout() plt.savefig(plot_path) except Exception as e: print(f"ERROR: Failed to save plot {plot_path}: {e}") plt.close() def run_all_tests(self): print("DEBUG: ComparisonRunner.run_all_tests() ENTERED") # DEBUG PRINT self.all_comparison_stats = {} # Initialize/clear for the new run self.load_python_models() self.compare_resnet_outputs() self.compare_classifier() self.compare_bb_regressor() self.compare_preprocessed_inputs() # ADDED self.generate_html_report() print("All tests completed!") def compare_preprocessed_inputs(self): print("\nComparing preprocessed input images (Python vs C++)...") # Directory where both Python and C++ are saving their preprocessed images # As per current changes, this is self.cpp_output_dir / 'resnet' preprocessed_dir = Path(self.cpp_output_dir) / 'resnet' issues_found = False for i in tqdm(range(self.num_samples), desc="Preprocessed Input Samples"): py_input_path = preprocessed_dir / f'sample_{i}_image_preprocessed_python.pt' cpp_input_path = preprocessed_dir / f'sample_{i}_image_preprocessed_cpp.pt' py_tensor = None cpp_tensor = None if py_input_path.exists(): try: py_tensor = torch.load(str(py_input_path), map_location=self.device) except Exception as e: print(f"Error loading Python preprocessed input for sample {i} from {py_input_path}: {e}") else: print(f"Python preprocessed input for sample {i} not found at {py_input_path}") if cpp_input_path.exists(): try: cpp_tensor = self.load_cpp_tensor(str(cpp_input_path), self.device) # Use existing loader except Exception as e: print(f"Error loading C++ preprocessed input for sample {i} from {cpp_input_path}: {e}") else: print(f"C++ preprocessed input for sample {i} not found at {cpp_input_path}") if py_tensor is not None and cpp_tensor is not None: if py_tensor.shape != cpp_tensor.shape: print(f" Sample {i}: SHAPE MISMATCH! Python: {py_tensor.shape}, C++: {cpp_tensor.shape}") issues_found = True continue are_close = torch.allclose(py_tensor, cpp_tensor, atol=1e-7) # Using a slightly stricter tolerance for direct input comparison max_abs_diff = torch.max(torch.abs(py_tensor - cpp_tensor)).item() if not are_close else 0.0 mean_abs_diff = torch.mean(torch.abs(py_tensor - cpp_tensor)).item() if not are_close else 0.0 print(f" Sample {i}: torch.allclose(): {are_close}, Max Abs Diff: {max_abs_diff:.4e}, Mean Abs Diff: {mean_abs_diff:.4e}") if not are_close: issues_found = True else: print(f" Sample {i}: Skipping comparison due to missing tensor(s).") issues_found = True # Count missing files as an issue if not issues_found: print("Preprocessed input comparison: All samples matched or were close!") else: print("Preprocessed input comparison: ISSUES FOUND (details above).") def load_cpp_tensor(self, file_path_str, device, is_image=False): if not os.path.exists(file_path_str): return None attempt_jit_extraction = False loaded_object_from_direct_load = None try: # Attempt direct load first loaded_object_from_direct_load = torch.load(file_path_str, map_location=device, weights_only=False) if isinstance(loaded_object_from_direct_load, torch.Tensor): return loaded_object_from_direct_load.to(device) # Successfully loaded a tensor directly else: # Loaded something, but it's not a tensor. It's likely a JIT module. attempt_jit_extraction = True print(f"INFO: Initial torch.load of {file_path_str} yielded a non-Tensor (type: {type(loaded_object_from_direct_load)}). Will attempt JIT extraction.") except Exception as e_initial_load: # Initial load failed (e.g., it's a JIT module not readable by plain torch.load, or other error) attempt_jit_extraction = True print(f"INFO: Initial torch.load failed for {file_path_str}: {e_initial_load}. Will attempt JIT extraction.") # Common JIT tensor extraction logic def extract_tensor_from_jit_module(module_path, jit_loaded_obj, dev): print(f"DEBUG JIT EXTRACTION: For {module_path}, loaded_obj type: {type(jit_loaded_obj)}") print(f"DEBUG JIT EXTRACTION: str(loaded_obj): {str(jit_loaded_obj)}") # print(f"DEBUG JIT EXTRACTION: dir(loaded_obj): {dir(jit_loaded_obj)}") # Verbose extracted_tensor = None # 1. Try calling if 'forward' method exists if hasattr(jit_loaded_obj, 'forward') and callable(getattr(jit_loaded_obj, 'forward')): print(f"DEBUG JIT EXTRACTION: Attempting jit_loaded_obj.forward()") try: extracted_tensor = jit_loaded_obj.forward() if not isinstance(extracted_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: jit_loaded_obj.forward() did not return a tensor, got {type(extracted_tensor)}. Trying with dummy input.") extracted_tensor = None # Reset before trying with dummy try: print(f"DEBUG JIT EXTRACTION: Attempting jit_loaded_obj.forward(torch.empty(0))") extracted_tensor = jit_loaded_obj.forward(torch.empty(0, device=dev)) if not isinstance(extracted_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: jit_loaded_obj.forward(dummy) also did not return a tensor, got {type(extracted_tensor)}") extracted_tensor = None except Exception as e_fwd_dummy: print(f"DEBUG JIT EXTRACTION: Error calling jit_loaded_obj.forward(dummy): {e_fwd_dummy}") extracted_tensor = None except Exception as e_fwd: # This covers cases where forward exists but call fails (e.g. signature mismatch) print(f"DEBUG JIT EXTRACTION: Error calling jit_loaded_obj.forward(): {e_fwd}. Trying with dummy input as fallback.") extracted_tensor = None # Reset try: print(f"DEBUG JIT EXTRACTION: Attempting jit_loaded_obj.forward(torch.empty(0)) after error.") extracted_tensor = jit_loaded_obj.forward(torch.empty(0, device=dev)) if not isinstance(extracted_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: jit_loaded_obj.forward(dummy) after error also did not return a tensor, got {type(extracted_tensor)}") extracted_tensor = None except Exception as e_fwd_dummy_after_error: print(f"DEBUG JIT EXTRACTION: Error calling jit_loaded_obj.forward(dummy) after initial fwd error: {e_fwd_dummy_after_error}") extracted_tensor = None # 1b. Try calling the module directly if forward attribute exists (covers some cases) # This is after trying explicit .forward() as direct call might have side effects or different interpretation if extracted_tensor is None and callable(jit_loaded_obj) and hasattr(jit_loaded_obj, 'forward'): print(f"DEBUG JIT EXTRACTION: Attempting callable jit_loaded_obj()") try: extracted_tensor = jit_loaded_obj() if not isinstance(extracted_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: callable jit_loaded_obj() did not return a tensor, got {type(extracted_tensor)}") extracted_tensor = None except Exception as e_call_obj: print(f"DEBUG JIT EXTRACTION: Error calling callable jit_loaded_obj() (it had a forward attr): {e_call_obj}") extracted_tensor = None # 2. Check if 'forward' attribute *itself* is a tensor if extracted_tensor is None and hasattr(jit_loaded_obj, 'forward') and isinstance(getattr(jit_loaded_obj, 'forward'), torch.Tensor): print(f"DEBUG JIT EXTRACTION: jit_loaded_obj.forward IS a tensor.") extracted_tensor = getattr(jit_loaded_obj, 'forward') # 3. Look for common direct tensor attributes if extracted_tensor is None and hasattr(jit_loaded_obj, 'tensor') and isinstance(getattr(jit_loaded_obj, 'tensor'), torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor in jit_loaded_obj.tensor") extracted_tensor = jit_loaded_obj.tensor if extracted_tensor is None and hasattr(jit_loaded_obj, 'data') and isinstance(getattr(jit_loaded_obj, 'data'), torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor in jit_loaded_obj.data") extracted_tensor = jit_loaded_obj.data if extracted_tensor is None and hasattr(jit_loaded_obj, 'tensor_data') and isinstance(getattr(jit_loaded_obj, 'tensor_data'), torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor in jit_loaded_obj.tensor_data") extracted_tensor = jit_loaded_obj.tensor_data # 4. Iterate through named_buffers (common for wrapped tensors) if extracted_tensor is None: print(f"DEBUG JIT EXTRACTION: Iterating named_buffers for a tensor...") try: for name, buffer_tensor in jit_loaded_obj.named_buffers(): if isinstance(buffer_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor in named_buffers: {name}") extracted_tensor = buffer_tensor break except Exception as e_buffers: print(f"DEBUG JIT EXTRACTION: Error iterating named_buffers: {e_buffers}") # 5. Iterate through named_parameters if extracted_tensor is None: print(f"DEBUG JIT EXTRACTION: Iterating named_parameters for a tensor...") try: for name, param_tensor in jit_loaded_obj.named_parameters(): if isinstance(param_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor in named_parameters: {name}") extracted_tensor = param_tensor break except Exception as e_params: print(f"DEBUG JIT EXTRACTION: Error iterating named_parameters: {e_params}") # 6. Iterate through members (attributes) using inspect.getmembers - potentially fragile if extracted_tensor is None: print(f"DEBUG JIT EXTRACTION: Attempting to iterate members using inspect.getmembers...") try: for name, member_obj in inspect.getmembers(jit_loaded_obj): if isinstance(member_obj, torch.Tensor): # Avoid re-picking already checked common names if they are somehow also members if name not in ['tensor', 'data', 'tensor_data', 'forward']: print(f"DEBUG JIT EXTRACTION: Found tensor in member (inspect.getmembers): {name}") extracted_tensor = member_obj break except RuntimeError as e_inspect: # Specifically catch RuntimeError that was observed: "Method 'forward' is not defined" print(f"DEBUG JIT EXTRACTION: inspect.getmembers failed with RuntimeError: {e_inspect}. Skipping this method.") except Exception as e_inspect_other: print(f"DEBUG JIT EXTRACTION: inspect.getmembers failed with other Exception: {e_inspect_other}. Skipping this method.") # 7. Iterate through named_children and inspect if extracted_tensor is None: print(f"DEBUG JIT EXTRACTION: Iterating named_children...") try: for child_name, child_module in jit_loaded_obj.named_children(): print(f"DEBUG JIT EXTRACTION: Inspecting child: {child_name} of type {type(child_module)}") # Try common ways to get tensor from child if hasattr(child_module, 'forward') and callable(getattr(child_module, 'forward')) : try: temp_tensor = child_module.forward() if isinstance(temp_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor by calling child {child_name}.forward()") extracted_tensor = temp_tensor; break except: pass if extracted_tensor is None and callable(child_module) and hasattr(child_module, 'forward'): # Added hasattr forward here try: temp_tensor = child_module() if isinstance(temp_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor by calling child {child_name}()") extracted_tensor = temp_tensor; break except: pass if extracted_tensor is None and hasattr(child_module, 'forward') and isinstance(getattr(child_module, 'forward'), torch.Tensor): extracted_tensor = getattr(child_module, 'forward') print(f"DEBUG JIT EXTRACTION: Found tensor in child {child_name}.forward (as attribute)") break if extracted_tensor is None and hasattr(child_module, 'tensor') and isinstance(getattr(child_module, 'tensor'), torch.Tensor): extracted_tensor = child_module.tensor print(f"DEBUG JIT EXTRACTION: Found tensor in child {child_name}.tensor") break if extracted_tensor is None and hasattr(child_module, 'data') and isinstance(getattr(child_module, 'data'), torch.Tensor): extracted_tensor = child_module.data print(f"DEBUG JIT EXTRACTION: Found tensor in child {child_name}.data") break if extracted_tensor is None and hasattr(child_module, 'tensor_data') and isinstance(getattr(child_module, 'tensor_data'), torch.Tensor): extracted_tensor = child_module.tensor_data print(f"DEBUG JIT EXTRACTION: Found tensor in child {child_name}.tensor_data") break if extracted_tensor is None: # Check general members of child if direct attributes fail try: for name, member_obj in inspect.getmembers(child_module): if isinstance(member_obj, torch.Tensor): print(f"DEBUG JIT EXTRACTION: Found tensor in member {name} of child {child_name}") extracted_tensor = member_obj; break if extracted_tensor is not None: break except Exception as e_child_inspect: print(f"DEBUG JIT EXTRACTION: inspect.getmembers on child {child_name} failed: {e_child_inspect}") if extracted_tensor is not None: print(f"DEBUG JIT EXTRACTION: Tensor found in a child module.") else: print(f"DEBUG JIT EXTRACTION: Tensor not found in direct children.") except Exception as e_children: print(f"DEBUG JIT EXTRACTION: Error iterating named_children: {e_children}") if isinstance(extracted_tensor, torch.Tensor): print(f"DEBUG JIT EXTRACTION: Successfully extracted tensor of shape {extracted_tensor.shape} from JIT module {module_path}") return extracted_tensor.to(dev) else: print(f"Warning: JIT EXTRACTION: Could not extract tensor from JIT module: {module_path}. Final extracted_type: {type(extracted_tensor)}. THIS FILE WILL BE SKIPPED.") return None if attempt_jit_extraction: # If primary_jit_load_needed was true, loaded_object_from_direct_load might be the JIT module already. # Otherwise, we need to load it with torch.jit.load. # The critical part is that C++ outputs are *always* JIT modules now if not raw tensors. jit_module_to_process = None if loaded_object_from_direct_load is not None and not isinstance(loaded_object_from_direct_load, torch.Tensor): # This means torch.load succeeded but returned a JIT module directly # (common for files saved with torch.jit.save that are actually modules) print(f"DEBUG JIT: Using object from initial torch.load (type: {type(loaded_object_from_direct_load)}) for JIT extraction for {file_path_str}.") jit_module_to_process = loaded_object_from_direct_load else: # This means initial torch.load either failed OR it was an image and returned a JIT module (handled above), # OR it was not an image and returned a tensor (already returned). # So, if we are here, it means torch.load failed, or we need to fresh load as JIT. try: print(f"DEBUG JIT: Attempting torch.jit.load for {file_path_str} as fallback/primary JIT path.") jit_module_to_process = torch.jit.load(file_path_str, map_location=device) except Exception as e_jit_load_explicit: print(f"Error: torch.jit.load also failed for {file_path_str}: {e_jit_load_explicit}. Traceback: {traceback.format_exc()}. SKIPPING.") return None if jit_module_to_process is not None: final_tensor = extract_tensor_from_jit_module(file_path_str, jit_module_to_process, device) if final_tensor is not None: return final_tensor else: print(f"Warning: JIT extraction path for {file_path_str} (using {type(jit_module_to_process)}) failed to extract tensor. SKIPPING file.") return None else: # This case should be rare if torch.jit.load was attempted and failed, as it would have returned None above. print(f"Warning: jit_module_to_process is None for {file_path_str} before calling extraction. SKIPPING file.") return None # If we reach here, it means initial torch.load returned a tensor (and it was returned), # or all attempts to load and extract have failed. print(f"Warning: load_cpp_tensor is returning None for {file_path_str} after all attempts. This indicates an issue with file content or loading logic for this specific file type when is_image={is_image}.") return None def _compare_tensor_data(self, tensor1, tensor2, name, sample_idx, current_errors_dict_to_populate): """Compare two tensors and return error metrics. Modifies current_errors_dict_to_populate in place.""" num_metrics = 11 # mae, max_err, diff_arr, mean_py_val, std_abs_err, l2_py, l2_cpp, l2_diff, cos_sim, pearson, mre nan_metrics_tuple = ( float('nan'), float('nan'), [], float('nan'), float('nan'), # Original 5 float('nan'), float('nan'), float('nan'), float('nan'), float('nan'), float('nan') # New 6 ) if tensor1 is None or tensor2 is None: py_mean = float('nan') py_l2 = float('nan') if tensor1 is not None: # Python tensor exists t1_cpu_temp = tensor1.cpu().detach().numpy().astype(np.float32) py_mean = np.mean(t1_cpu_temp) py_l2 = np.linalg.norm(t1_cpu_temp.flatten()) # Populate current_errors_dict_to_populate directly current_errors_dict_to_populate[name] = ( float('nan'), float('nan'), [], py_mean, float('nan'), py_l2, float('nan'), float('nan'), float('nan'), float('nan'), float('nan') ) print(f"Warning: Cannot compare '{name}' for sample {sample_idx}, one or both tensors are None.") return # Return None as the function modifies dict in place t1_cpu = tensor1.cpu().detach().numpy().astype(np.float32) t2_cpu = tensor2.cpu().detach().numpy().astype(np.float32) if t1_cpu.shape != t2_cpu.shape: print(f"Warning: Shape mismatch for '{name}' sample {sample_idx}. Py: {t1_cpu.shape}, Cpp: {t2_cpu.shape}. Skipping most comparisons.") current_errors_dict_to_populate[name] = ( float('nan'), float('nan'), [], np.mean(t1_cpu), float('nan'), # MAE, MaxErr, diff_arr, MeanPy, StdAbsErr np.linalg.norm(t1_cpu.flatten()), np.linalg.norm(t2_cpu.flatten()), float('nan'), # L2Py, L2Cpp, L2Diff float('nan'), float('nan'), float('nan') # CosSim, Pearson, MRE ) return # Return None # All calculations from here assume shapes match and tensors are not None t1_flat = t1_cpu.flatten() t2_flat = t2_cpu.flatten() abs_diff_elements = np.abs(t1_cpu - t2_cpu) mae = np.mean(abs_diff_elements) max_err = np.max(abs_diff_elements) diff_arr_for_hist = abs_diff_elements.flatten() # For histogram mean_py_val = np.mean(t1_cpu) std_abs_err = np.std(diff_arr_for_hist) l2_norm_py = np.linalg.norm(t1_flat) l2_norm_cpp = np.linalg.norm(t2_flat) l2_norm_diff = np.linalg.norm(t1_flat - t2_flat) # Cosine Similarity dot_product = np.dot(t1_flat, t2_flat) if l2_norm_py == 0 or l2_norm_cpp == 0: cosine_sim = float('nan') else: cosine_sim = dot_product / (l2_norm_py * l2_norm_cpp) # Pearson Correlation Coefficient if len(t1_flat) < 2: pearson_corr = float('nan') else: std_t1 = np.std(t1_flat) std_t2 = np.std(t2_flat) if std_t1 == 0 or std_t2 == 0: # If either is constant if std_t1 == 0 and std_t2 == 0 and np.allclose(t1_flat, t2_flat): pearson_corr = 1.0 # Both constant and identical else: pearson_corr = float('nan') # Otherwise, undefined or not meaningfully 1 else: try: corr_matrix = np.corrcoef(t1_flat, t2_flat) if corr_matrix.ndim == 2: pearson_corr = corr_matrix[0, 1] else: # Should be a scalar if inputs were effectively constant, already handled by std checks pearson_corr = float(corr_matrix) if np.isscalar(corr_matrix) else float('nan') except Exception: pearson_corr = float('nan') # Mean Relative Error (MRE) epsilon_rel_err = 1e-9 # Small epsilon to avoid division by zero and extreme values # Calculate relative error where abs(t1_cpu) is not zero (or very small) # For elements where t1_cpu is zero (or very small): # - If t2_cpu is also zero (small), error is small. # - If t2_cpu is not zero, relative error is infinite/large. # Using (abs(t1_cpu) + epsilon) in denominator handles this. mean_rel_err = np.mean(abs_diff_elements / (np.abs(t1_cpu) + epsilon_rel_err)) # Populate current_errors_dict_to_populate directly current_errors_dict_to_populate[name] = ( mae, max_err, diff_arr_for_hist, mean_py_val, std_abs_err, l2_norm_py, l2_norm_cpp, l2_norm_diff, cosine_sim, pearson_corr, mean_rel_err ) # Optional: print detailed error for specific high-error cases # if mae > 1e-4: # print(f"High MAE for {name}, sample {sample_idx}: {mae:.6f}") # The function implicitly returns None as it modifies current_errors in place. # For direct use, if needed, it could return the tuple: # return (mae, max_err, diff_arr_for_hist, mean_py_val, std_abs_err, l2_norm_py, l2_norm_cpp, l2_norm_diff, cosine_sim, pearson_corr, mean_rel_err) @staticmethod def load_weights_for_custom_model(model, base_model_dir, model_name, device): print(f"Loading weights for custom model {model_name} from {base_model_dir}") tensor_dir = os.path.join(base_model_dir, model_name) doc_file = Path(tensor_dir) / (model_name + '_weights_doc.txt') if not doc_file.exists(): print(f"Warning: Documentation file not found: {doc_file} for {model_name}. Skipping weight loading for source model.") return with open(doc_file, 'r') as f: lines = f.readlines() i = 0 while i < len(lines): line = lines[i] if line.startswith('## '): key = line.strip()[3:] j = i + 1 while j < len(lines) and 'File:' not in lines[j]: j += 1 if j < len(lines) and 'File:' in lines[j]: file_name = lines[j].split('File:')[1].strip() tensor_path = tensor_dir / file_name if tensor_path.exists(): try: tensor_data = torch.load(str(tensor_path), map_location=device) # For .pt files that might be RecursiveScriptModule, try to extract tensor if isinstance(tensor_data, torch.jit.RecursiveScriptModule): if hasattr(tensor_data, 'weight'): tensor = tensor_data.weight elif hasattr(tensor_data, 'bias'): tensor = tensor_data.bias elif len(list(tensor_data.parameters())) > 0: tensor = list(tensor_data.parameters())[0] else: tensor = tensor_data() # Try calling it else: tensor = tensor_data parts = key.split('.') module_to_set = model for part in parts[:-1]: module_to_set = getattr(module_to_set, part) param_name = parts[-1] if hasattr(module_to_set, param_name): if param_name in module_to_set._parameters: module_to_set._parameters[param_name] = torch.nn.Parameter(tensor.to(device)) elif param_name in module_to_set._buffers: module_to_set._buffers[param_name] = tensor.to(device) else: # Direct attribute assignment setattr(module_to_set, param_name, tensor.to(device)) # print(f"Loaded {key} from {file_name} into source {model_name}") else: print(f"Warning: Attribute {key} not found in source model {model_name}.") except Exception as e: print(f"Error loading tensor for {key} from {tensor_path} for source {model_name}: {e}") else: print(f"Warning: Tensor file not found: {tensor_path} for source {model_name}") i = j i += 1 model.eval().to(device) # Add this function near the top-level utilities def save_tensor_jit(tensor, path): """Save a tensor as a JIT-wrapped module for compatibility with C++ outputs.""" class TensorContainer(torch.nn.Module): def __init__(self, t): super().__init__() self.tensor = torch.nn.Parameter(t) tc = TensorContainer(tensor) scripted = torch.jit.script(tc) torch.jit.save(scripted, str(path)) if __name__ == "__main__": # Define ROOT_DIR for standalone script execution SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(SCRIPT_DIR) # cpp_tracker directory # Parse command line arguments parser = argparse.ArgumentParser(description="Compare Python and C++ model outputs.") parser.add_argument("--num_samples", type=int, default=3, help="Number of samples to compare (-1 for all).") args = parser.parse_args() # Define model configurations # Ensure get_model_configs uses ROOT_DIR if it constructs absolute paths for models model_configs = get_model_configs(ROOT_DIR) # Create a ComparisonRunner instance runner = ComparisonRunner( root_dir=ROOT_DIR, model_configs=model_configs, cpp_output_dir=os.path.join(ROOT_DIR, "test/output"), python_output_dir=os.path.join(ROOT_DIR, "test/output_py"), num_samples=args.num_samples ) # The one-off raw vs processed conv1.weight check can remain here for now raw_conv1_path_ref = os.path.join(ROOT_DIR, "exported_weights/raw_backbone/conv1.weight.pt") runner.run_all_tests() # runner.generate_html_report() # This is called within run_all_tests # print(f"HTML report generated at {runner.comparison_dir / 'report.html'}") # Old, caused TypeError print(f"HTML report generated at {os.path.join(runner.comparison_dir, 'report.html')}") # New, fixed TypeError # Compare BB regressor debug tensors (BatchNorm and ReLU outputs) cpp_bbreg_dir = os.path.join("test", "output", "bb_regressor") py_bbreg_dir = os.path.join("test", "output_py", "bb_regressor") print("\n=== BB Regressor Debug Tensor Comparison (BatchNorm and ReLU) ===") compare_debug_tensors(cpp_bbreg_dir, py_bbreg_dir, sample_idx=0, verbose=True)