You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1462 lines
92 KiB

#!/usr/bin/env python3
import os
import torch
import numpy as np
import glob
import matplotlib.pyplot as plt
from pathlib import Path
import sys
import json
from tqdm import tqdm
import inspect
import argparse
from collections import OrderedDict, defaultdict
import time # <<< IMPORT TIME >>>
# Add the project root to path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Try to force deterministic algorithms
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
print("PYTHON SCRIPT: Set cuDNN benchmark=False, deterministic=True")
# Import model wrappers
from pytracking.features.net_wrappers import DiMPTorchScriptWrapper
# For loading AtomIoUNet from source
from ltr.models.bbreg.atom_iou_net import AtomIoUNet
SCRIPT_DIR_FOR_INIT = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR_FOR_INIT = os.path.dirname(SCRIPT_DIR_FOR_INIT)
# --- Model Configurations ---
def get_model_configs(root_dir_param):
# ... (rest of get_model_configs, ensuring it uses root_dir_param if needed)
# For now, assume it doesn't strictly need root_dir_param for paths if they are relative to script
# or if model_dir in DiMPTorchScriptWrapper handles it.
return {
# ... (existing model_configs definitions)
'ResNet': {
'python_model_loader': lambda: DiMPTorchScriptWrapper(os.path.join(root_dir_param, 'pytracking_models/dimp50_ Ausdruck_ep0050.pth.tar')),
'cpp_output_subdir': 'resnet',
'python_output_subdir': 'resnet_py', # If Python outputs are saved separately
'outputs_to_compare': {
'Conv1': 'conv1_output.pt', # ADDED
'BN1': 'bn1_output.pt', # ADDED
'ReLU1': 'relu1_output.pt', # ADDED for completeness before MaxPool
'MaxPool': 'maxpool_output.pt',
'Features': 'features.pt',
'Layer1': 'layer1.pt',
'Layer2': 'layer2.pt',
'Layer3': 'layer3.pt',
'Layer4': 'layer4.pt',
'Layer1.0 Shortcut': 'layer1_0_shortcut_output.pt'
}
},
'Classifier': {
'python_model_loader': lambda: DiMPTorchScriptWrapper(os.path.join(root_dir_param, 'pytracking_models/dimp50_ Ausdruck_ep0050.pth.tar')),
'cpp_output_subdir': 'classifier',
'python_output_subdir': 'classifier_py',
'outputs_to_compare': {
'Features': 'features.pt',
}
},
'BBRegressor': {
'python_model_loader': lambda: DiMPTorchScriptWrapper(os.path.join(root_dir_param, 'pytracking_models/dimp50_ Ausdruck_ep0050.pth.tar')),
'cpp_output_subdir': 'bb_regressor',
'python_output_subdir': 'bb_regressor_py',
'outputs_to_compare': {
'IoUPred': 'iou_scores.pt',
'PyIoUFeat0': ('iou_feat0.pt', True), # True indicates Python-specific output name
'CppIoUFeat0': 'iou_feat0.pt',
'PyIoUFeat1': ('iou_feat1.pt', True),
'CppIoUFeat1': 'iou_feat1.pt',
'PyMod0': ('mod_vec0.pt', True),
'CppMod0': 'mod_vec0.pt',
'PyMod1': ('mod_vec1.pt', True),
'CppMod1': 'mod_vec1.pt',
}
},
}
class ComparisonRunner:
def __init__(self, root_dir, model_configs, cpp_output_dir, python_output_dir, num_samples=-1, plot_histograms=True, plot_scatter=True):
self.root_dir = root_dir
self.model_configs = model_configs
self.cpp_output_dir = cpp_output_dir
self.python_output_dir = python_output_dir
self.num_samples = num_samples
self.plot_histograms = plot_histograms
self.plot_scatter = plot_scatter
self.all_comparison_stats = defaultdict(lambda: defaultdict(list))
self.python_wrapper = None # ADDED: To store the DiMPTorchScriptWrapper instance
self.models = {} # To store loaded Python sub-models like ResNet, Classifier
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Ensure comparison directory exists
self.comparison_dir = os.path.join(self.root_dir, "test/comparison")
if not os.path.exists(self.comparison_dir):
os.makedirs(self.comparison_dir)
print("PYTHON: Attempting to load 'traced_resnet50.pth'...")
try:
self.models['ResNet'] = torch.jit.load('traced_resnet50.pth', map_location=self.device)
print("PYTHON: Successfully loaded 'traced_resnet50.pth'.")
self.models['ResNet'].eval()
print("PYTHON: ResNet JIT model set to eval().")
except Exception as e:
print(f"PYTHON: CRITICAL ERROR loading 'traced_resnet50.pth': {e}")
self.models['ResNet'] = None # Ensure it's None if loading failed
# Print sums of ResNet.bn1 running_mean and running_var from state_dict
print("PYTHON: Attempting to access ResNet state_dict (if model loaded)...")
if self.models.get('ResNet'):
try:
resnet_state_dict = self.models['ResNet'].state_dict()
print("PYTHON ResNet state_dict keys:", list(resnet_state_dict.keys())) # PRINT ALL KEYS
py_bn1_running_mean = resnet_state_dict.get('bn1.running_mean')
py_bn1_running_var = resnet_state_dict.get('bn1.running_var')
if py_bn1_running_mean is not None and py_bn1_running_var is not None:
print(f"PYTHON ResNet.bn1 running_mean sum (from state_dict): {py_bn1_running_mean.sum().item():.10f}")
print(f"PYTHON ResNet.bn1 running_var sum (from state_dict): {py_bn1_running_var.sum().item():.10f}")
else:
print("PYTHON: ResNet.bn1 running_mean or running_var is None in state_dict.")
except Exception as e:
print(f"PYTHON: Error accessing ResNet.bn1 state_dict: {e}")
# Load other models if necessary (e.g., BBRegressor, Classifier)
def load_python_models(self):
print("DEBUG: ComparisonRunner.load_python_models() ENTERED") # DEBUG PRINT
"""Initialize Python models"""
print("Loading Python models...")
self.python_wrapper = DiMPTorchScriptWrapper(
model_dir=str(Path(self.root_dir) / 'exported_weights'),
device=self.device,
backbone_sd='backbone_regenerated', # CORRECTED: Ensure this uses regenerated weights
classifier_sd='classifier',
bbregressor_sd='bb_regressor'
)
# Populate self.models AFTER python_wrapper is initialized
if self.python_wrapper: # Check if wrapper was successfully initialized
print("DEBUG: self.python_wrapper initialized. Populating self.models.") # DEBUG PRINT
if hasattr(self.python_wrapper, 'backbone') and self.python_wrapper.backbone is not None: # Check for 'backbone'
self.models['ResNet'] = self.python_wrapper.backbone # Assign from .backbone
print(f"DEBUG: self.models['ResNet'] populated with type: {type(self.models['ResNet'])}") # DEBUG PRINT
else:
print("ERROR: python_wrapper does not have a 'backbone' attribute or it is None.")
self.models['ResNet'] = None
if hasattr(self.python_wrapper, 'classifier') and self.python_wrapper.classifier is not None:
self.models['Classifier'] = self.python_wrapper.classifier
print(f"DEBUG: self.models['Classifier'] populated with type: {type(self.models['Classifier'])}") # DEBUG PRINT
else:
print("ERROR: python_wrapper does not have a 'classifier' attribute or it is None.")
self.models['Classifier'] = None
if hasattr(self.python_wrapper, 'bb_regressor') and self.python_wrapper.bb_regressor is not None:
self.models['BBRegressor'] = self.python_wrapper.bb_regressor
print(f"DEBUG: self.models['BBRegressor'] populated with type: {type(self.models['BBRegressor'])}") # DEBUG PRINT
else:
print("ERROR: python_wrapper does not have a 'bb_regressor' attribute or it is None.")
self.models['BBRegressor'] = None
else:
print("CRITICAL ERROR: self.python_wrapper is None after DiMPTorchScriptWrapper instantiation.")
# Ensure self.models has keys to prevent crashes later, though values will be None
self.models['ResNet'] = None
self.models['Classifier'] = None
self.models['BBRegressor'] = None
# Initialize BBRegressor from source for get_modulation fallback
self.bb_regressor_from_source = AtomIoUNet(
input_dim=(512, 1024),
pred_input_dim=(256, 256),
pred_inter_dim=(256, 256)
)
ComparisonRunner.load_weights_for_custom_model(
self.bb_regressor_from_source,
'bb_regressor', # model_name for path and doc file
self.root_dir,
self.device
)
self.bb_regressor_from_source.eval().to(self.device)
print("Python models loaded.")
# New check: Compare the conv1.weight actually used by Python ResNet vs C++ ResNet from backbone_regenerated
print("\n--- COMPARING CURRENTLY USED conv1.weight (Python vs C++) ---")
python_resnet_conv1_weight = None
if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'conv1'):
python_resnet_conv1_weight = self.models['ResNet'].conv1.weight.detach().cpu()
print(f" Python ResNet model's conv1.weight shape: {python_resnet_conv1_weight.shape}")
cpp_conv1_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated/conv1_weight.pt")
cpp_resnet_conv1_weight = None
if os.path.exists(cpp_conv1_path):
try:
cpp_resnet_conv1_weight = torch.load(cpp_conv1_path, map_location='cpu', weights_only=False)
print(f" C++ (loaded from {cpp_conv1_path}) conv1.weight shape: {cpp_resnet_conv1_weight.shape}")
except Exception as e:
print(f" Error loading C++ conv1.weight from {cpp_conv1_path}: {e}")
if python_resnet_conv1_weight is not None and cpp_resnet_conv1_weight is not None:
if isinstance(python_resnet_conv1_weight, torch.Tensor) and isinstance(cpp_resnet_conv1_weight, torch.Tensor):
print(f" torch.allclose(python_model_conv1, cpp_loaded_conv1): {torch.allclose(python_resnet_conv1_weight, cpp_resnet_conv1_weight)}")
abs_diff = torch.abs(python_resnet_conv1_weight - cpp_resnet_conv1_weight)
print(f" Max abs diff for conv1.weight: {torch.max(abs_diff).item()}")
print(f" Mean abs diff for conv1.weight: {torch.mean(abs_diff).item()}")
else:
print(" Skipping conv1.weight comparison due to type mismatch after loading.")
else:
print(" Skipping conv1.weight comparison because one or both tensors could not be obtained.")
print("--- END CURRENTLY USED conv1.weight COMPARISON ---\n")
# New check: Compare ResNet bn1 parameters
print("\n--- COMPARING CURRENTLY USED bn1 PARAMS (Python vs C++) ---")
bn1_param_names = ['weight', 'bias', 'running_mean', 'running_var']
python_resnet_bn1_params = {}
if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'bn1'):
bn1_module = self.models['ResNet'].bn1
for p_name in bn1_param_names:
if hasattr(bn1_module, p_name):
param_tensor = getattr(bn1_module, p_name)
if param_tensor is not None:
python_resnet_bn1_params[p_name] = param_tensor.detach().cpu()
print(f" Python ResNet model's bn1.{p_name} shape: {python_resnet_bn1_params[p_name].shape}")
else:
print(f" Python ResNet model's bn1.{p_name} is None.")
else:
print(f" Python ResNet model's bn1 does not have attribute {p_name}.")
cpp_resnet_bn1_params = {}
for p_name in bn1_param_names:
# Adjust filename for C++ saved tensors (e.g., bn1_running_mean.pt)
cpp_param_filename = f"bn1_{p_name.replace('.', '_')}.pt"
cpp_param_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated", cpp_param_filename)
if os.path.exists(cpp_param_path):
try:
cpp_resnet_bn1_params[p_name] = torch.load(cpp_param_path, map_location='cpu', weights_only=False)
print(f" C++ (loaded from {cpp_param_path}) bn1.{p_name} shape: {cpp_resnet_bn1_params[p_name].shape}")
except Exception as e:
print(f" Error loading C++ bn1.{p_name} from {cpp_param_path}: {e}")
else:
print(f" C++ bn1 parameter file not found: {cpp_param_path}")
for p_name in bn1_param_names:
py_tensor = python_resnet_bn1_params.get(p_name)
cpp_tensor = cpp_resnet_bn1_params.get(p_name)
print(f" Comparison for bn1.{p_name}:")
if py_tensor is not None and cpp_tensor is not None:
if isinstance(py_tensor, torch.Tensor) and isinstance(cpp_tensor, torch.Tensor):
print(f" torch.allclose(python_bn1_{p_name}, cpp_bn1_{p_name}): {torch.allclose(py_tensor, cpp_tensor)}")
abs_diff = torch.abs(py_tensor - cpp_tensor)
print(f" Max abs diff for bn1.{p_name}: {torch.max(abs_diff).item()}")
print(f" Mean abs diff for bn1.{p_name}: {torch.mean(abs_diff).item()}")
else:
print(f" Skipping bn1.{p_name} comparison due to type mismatch after loading.")
else:
print(f" Skipping bn1.{p_name} comparison because one or both tensors could not be obtained.")
print("--- END CURRENTLY USED bn1 PARAMS COMPARISON ---\n")
# New check: Compare ResNet layer1.0 parameters
print("\n--- COMPARING CURRENTLY USED layer1.0 PARAMS (Python vs C++) ---")
layer1_0_block_prefix = "layer1.0."
layer1_0_components = {
"conv1": ["weight"],
"bn1": ["weight", "bias", "running_mean", "running_var"],
"conv2": ["weight"],
"bn2": ["weight", "bias", "running_mean", "running_var"],
"conv3": ["weight"],
"bn3": ["weight", "bias", "running_mean", "running_var"],
"downsample.0": ["weight"], # Downsample Conv
"downsample.1": ["weight", "bias", "running_mean", "running_var"] # Downsample BN
}
if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'layer1') and len(self.models['ResNet'].layer1) > 0:
py_layer1_0_module = self.models['ResNet'].layer1[0]
for comp_name, param_list in layer1_0_components.items():
py_comp_module = py_layer1_0_module
try:
# Handle nested modules like downsample.0
for part_name in comp_name.split('.'):
py_comp_module = getattr(py_comp_module, part_name)
except AttributeError:
print(f" Python ResNet model's layer1.0 does not have component {comp_name}. Skipping.")
continue
for p_name in param_list:
py_param_tensor_name = f"{layer1_0_block_prefix}{comp_name}.{p_name}"
cpp_param_filename = f"{layer1_0_block_prefix.replace('.', '_')}{comp_name.replace('.', '_')}_{p_name}.pt"
py_param_tensor = None
if hasattr(py_comp_module, p_name):
param_tensor_val = getattr(py_comp_module, p_name)
if param_tensor_val is not None:
py_param_tensor = param_tensor_val.detach().cpu()
print(f" Python ResNet {py_param_tensor_name} shape: {py_param_tensor.shape}")
else:
print(f" Python ResNet {py_param_tensor_name} is None.")
else:
print(f" Python ResNet module {comp_name} does not have param {p_name}.")
cpp_param_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated", cpp_param_filename)
cpp_param_tensor = None
if os.path.exists(cpp_param_path):
try:
cpp_param_tensor = torch.load(cpp_param_path, map_location='cpu', weights_only=False)
# print(f" C++ (loaded from {cpp_param_path}) {cpp_param_filename} shape: {cpp_param_tensor.shape}") # Optional: less verbose
except Exception as e:
print(f" Error loading C++ {cpp_param_filename} from {cpp_param_path}: {e}") # Adjusted to cpp_param_filename
else:
print(f" Warning: C++ {cpp_param_filename} file not found: {cpp_param_path}") # Adjusted
print(f" Comparison for {py_param_tensor_name} vs {cpp_param_filename}:") # More specific
if py_param_tensor is not None and cpp_param_tensor is not None:
if isinstance(py_param_tensor, torch.Tensor) and isinstance(cpp_param_tensor, torch.Tensor):
all_close = torch.allclose(py_param_tensor, cpp_param_tensor)
print(f" torch.allclose: {all_close}")
if not all_close:
abs_diff = torch.abs(py_param_tensor - cpp_param_tensor)
print(f" Max abs diff: {torch.max(abs_diff).item()}")
print(f" Mean abs diff: {torch.mean(abs_diff).item()}")
else:
print(f" Skipping comparison due to type mismatch after loading.")
else:
print(f" Skipping comparison because one or both tensors could not be obtained.")
else:
print(" Skipping layer1.0 parameter comparison: ResNet model or its layer1 not found/empty.")
print("--- END CURRENTLY USED layer1.0 PARAMS COMPARISON ---\n") # Corrected to \n
# --- START WEIGHT COMPARISON FOR layer1.1 and layer1.2 ---
for block_idx_in_layer1 in [1, 2]: # For layer1.1 and layer1.2
print(f"\n--- COMPARING CURRENTLY USED layer1.{block_idx_in_layer1} PARAMS (Python vs C++) ---")
layer1_block_prefix = f"layer1.{block_idx_in_layer1}."
# Components within a standard bottleneck block (no downsample for these)
block_components = {
"conv1": ["weight"],
"bn1": ["weight", "bias", "running_mean", "running_var", "num_batches_tracked"],
"conv2": ["weight"],
"bn2": ["weight", "bias", "running_mean", "running_var", "num_batches_tracked"],
"conv3": ["weight"],
"bn3": ["weight", "bias", "running_mean", "running_var", "num_batches_tracked"],
}
if self.models.get('ResNet') and hasattr(self.models['ResNet'], 'layer1') and len(self.models['ResNet'].layer1) > block_idx_in_layer1:
py_layer1_block_module = self.models['ResNet'].layer1[block_idx_in_layer1]
for comp_name, param_list in block_components.items():
py_comp_module = py_layer1_block_module
try:
# No nested modules like 'downsample' for these blocks
py_comp_module = getattr(py_comp_module, comp_name)
except AttributeError:
print(f" Python ResNet model's layer1.{block_idx_in_layer1} does not have component {comp_name}. Skipping.")
continue
for p_name in param_list:
py_param_tensor_name = f"{layer1_block_prefix}{comp_name}.{p_name}"
# C++ saves files like layer1_0_bn1_weight.pt or layer1_1_bn1_weight.pt
cpp_param_filename = f"{layer1_block_prefix.replace('.', '_')}{comp_name.replace('.', '_')}_{p_name}.pt"
py_param_tensor = None
if hasattr(py_comp_module, p_name):
param_tensor_val = getattr(py_comp_module, p_name)
if param_tensor_val is not None:
py_param_tensor = param_tensor_val.detach().cpu()
print(f" Python ResNet {py_param_tensor_name} shape: {py_param_tensor.shape}")
else:
print(f" Python ResNet {py_param_tensor_name} is None.")
elif p_name == "num_batches_tracked" and isinstance(py_comp_module, torch.nn.BatchNorm2d):
# PyTorch stores num_batches_tracked in _buffers, not as a direct attribute usually
if py_comp_module.num_batches_tracked is not None:
py_param_tensor = py_comp_module.num_batches_tracked.detach().cpu()
print(f" Python ResNet {py_param_tensor_name} (from buffer) shape: {py_param_tensor.shape}")
else:
print(f" Python ResNet {py_param_tensor_name} (from buffer) is None.")
else:
print(f" Python ResNet module {comp_name} does not have param/buffer {p_name}.")
cpp_param_path = os.path.join(self.root_dir, "exported_weights/backbone_regenerated", cpp_param_filename)
cpp_param_tensor = None
if os.path.exists(cpp_param_path):
try:
cpp_param_tensor = torch.load(cpp_param_path, map_location='cpu', weights_only=False)
except Exception as e:
print(f" Error loading C++ {cpp_param_filename} from {cpp_param_path}: {e}")
else:
print(f" Warning: C++ {cpp_param_filename} file not found: {cpp_param_path}")
print(f" Comparison for {py_param_tensor_name} vs {cpp_param_filename}:")
if py_param_tensor is not None and cpp_param_tensor is not None:
if isinstance(py_param_tensor, torch.Tensor) and isinstance(cpp_param_tensor, torch.Tensor):
# Ensure tensors are float for allclose if one is int (e.g. num_batches_tracked)
py_param_tensor_float = py_param_tensor.float()
cpp_param_tensor_float = cpp_param_tensor.float()
all_close = torch.allclose(py_param_tensor_float, cpp_param_tensor_float)
print(f" torch.allclose: {all_close}")
if not all_close:
abs_diff = torch.abs(py_param_tensor_float - cpp_param_tensor_float)
mae = torch.mean(abs_diff).item()
max_abs_err = torch.max(abs_diff).item()
print(f" MAE (Weight/Buffer): {mae:.4e}")
print(f" Max Abs Err (Weight/Buffer): {max_abs_err:.4e}")
# Also print L2 norms for context
l2_py = torch.linalg.norm(py_param_tensor_float.flatten()).item()
l2_cpp = torch.linalg.norm(cpp_param_tensor_float.flatten()).item()
print(f" L2 Norm Python: {l2_py:.4e}")
print(f" L2 Norm C++: {l2_cpp:.4e}")
else:
print(f" Skipping comparison due to type mismatch after loading for {py_param_tensor_name}.")
else:
print(f" Skipping comparison because one or both tensors could not be obtained for {py_param_tensor_name}.")
else:
print(f" Skipping layer1.{block_idx_in_layer1} parameter comparison: ResNet model or its layer1 not found/long enough.")
print(f"--- END CURRENTLY USED layer1.{block_idx_in_layer1} PARAMS COMPARISON ---\n")
# --- END WEIGHT COMPARISON FOR layer1.1 and layer1.2 ---
# --- END TEMPORARY WEIGHT COMPARISON --- # This marker is now after layer1.0 checks
print("\n--- Types at END of load_python_models: ---")
if 'ResNet' in self.models: print(f" self.models['ResNet'] type: {type(self.models['ResNet'])}")
if 'Classifier' in self.models: print(f" self.models['Classifier'] type: {type(self.models['Classifier'])}")
if 'BBRegressor' in self.models: print(f" self.models['BBRegressor'] type: {type(self.models['BBRegressor'])}")
def compare_classifier(self):
"""Compare classifier model outputs between Python and C++"""
print("\nComparing classifier outputs...")
# Python model needs C++ ResNet output as its input
cpp_input_dir_path = Path(os.path.join(self.cpp_output_dir, 'resnet'))
cpp_output_classifier_dir = Path(os.path.join(self.cpp_output_dir, 'classifier'))
if not cpp_input_dir_path.exists() or not cpp_output_classifier_dir.exists():
print(f"Classifier input (C++ ResNet features from {cpp_input_dir_path}) or C++ Classifier output dir ({cpp_output_classifier_dir}) not found. Skipping Classifier comparison.")
# Populate NaN for all expected Classifier comparisons if dirs are missing
for i in range(self.num_samples):
sample_key_base = f"Clf_Sample_{i}"
current_errors = {}
self._compare_tensor_data(None, None, "Classifier Features", i, current_errors)
self.all_comparison_stats[sample_key_base] = current_errors
return
print("\nClassifier - Comparing Samples...")
for i in tqdm(range(self.num_samples), desc="Classifier samples"):
current_errors = {} # For this sample
py_clf_feat = None
cpp_clf_feat = None
# Input for Python classifier is the layer3 output of C++ ResNet
cpp_resnet_layer3_for_py_path = cpp_input_dir_path / f'sample_{i}_layer3.pt'
# C++ classifier output
cpp_classifier_feat_path = cpp_output_classifier_dir / f'sample_{i}_features.pt'
if not cpp_resnet_layer3_for_py_path.exists() or not cpp_classifier_feat_path.exists():
print(f"Warning: Skipping classifier sample {i}, files not found: C++ ResNet output {cpp_resnet_layer3_for_py_path} or C++ Clf output {cpp_classifier_feat_path}.")
else:
feat_from_cpp_resnet = self.load_cpp_tensor(cpp_resnet_layer3_for_py_path, self.device)
if feat_from_cpp_resnet is None:
print(f"Critical: Failed to load C++ ResNet output tensor {cpp_resnet_layer3_for_py_path} for classifier sample {i}.")
else:
try:
with torch.no_grad():
if self.models.get('Classifier'):
py_clf_feat = self.models['Classifier'].extract_classification_feat(feat_from_cpp_resnet)
else:
print("ERROR: Python Classifier model not found in self.models")
except Exception as e:
print(f"ERROR: Python model extract_classification_feat failed for sample {i}: {e}")
cpp_clf_feat = self.load_cpp_tensor(cpp_classifier_feat_path, self.device)
if cpp_clf_feat is None:
print(f"Warning: Failed to load C++ output tensor {cpp_classifier_feat_path} for classifier sample {i}.")
self._compare_tensor_data(py_clf_feat, cpp_clf_feat, "Classifier Features", i, current_errors)
if current_errors: self.all_comparison_stats[f"Clf_Sample_{i}"] = current_errors
# Removed the separate "Test Samples" loop for classifier for simplification
# The C++ test_models only produces one set of classifier outputs per sample.
def compare_bb_regressor(self):
"""Compare bb_regressor model outputs between Python and C++"""
print("\nComparing bb_regressor outputs...")
# Python model inputs come from 'common' C++ generated/loaded files
# C++ model outputs are in 'output/bb_regressor'
py_input_common_dir = os.path.join(self.root_dir, 'test', 'input_samples', 'common')
cpp_output_bb_reg_dir = os.path.join(self.cpp_output_dir, 'bb_regressor')
cpp_resnet_output_dir = os.path.join(self.cpp_output_dir, 'resnet')
# Convert to Path for exists check
py_input_common_dir_path = Path(py_input_common_dir)
cpp_output_bb_reg_dir_path = Path(cpp_output_bb_reg_dir)
cpp_resnet_output_dir_path = Path(cpp_resnet_output_dir)
if not py_input_common_dir_path.exists() or not cpp_output_bb_reg_dir_path.exists() or not cpp_resnet_output_dir_path.exists():
print(f"BB Regressor input ({py_input_common_dir_path}), C++ ResNet output ({cpp_resnet_output_dir_path}), or C++ BB Reg output dir ({cpp_output_bb_reg_dir_path}) not found. Skipping BB Regressor comparison.")
# Populate NaN for all expected BB Regressor comparisons if dirs are missing
for i in range(self.num_samples):
sample_key_base = f"BBReg_Sample_{i}"
current_errors = {}
self._compare_tensor_data(None, None, "BBReg PyIoUFeat0 vs CppIoUFeat0", i, current_errors)
self._compare_tensor_data(None, None, "BBReg PyIoUFeat1 vs CppIoUFeat1", i, current_errors)
self._compare_tensor_data(None, None, "BBReg PyMod0 vs CppMod0", i, current_errors)
self._compare_tensor_data(None, None, "BBReg PyMod1 vs CppMod1", i, current_errors)
self._compare_tensor_data(None, None, "BBReg IoUPred", i, current_errors)
self.all_comparison_stats[sample_key_base] = current_errors
return
for i in tqdm(range(self.num_samples), desc="BB Regressor samples"):
current_errors = {} # For this sample
# --- Python Model Path ---
# For BBRegressor, the Python model needs to run its own ResNet pass
# using the common input image.
py_image_input_path = py_input_common_dir_path / f'sample_{i}_image.pt'
py_init_bbox_path = py_input_common_dir_path / f'sample_{i}_bb.pt'
py_proposals_path = py_input_common_dir_path / f'sample_{i}_proposals.pt'
# --- C++ Model Outputs ---
cpp_iou_feat0_path = cpp_output_bb_reg_dir_path / f'sample_{i}_iou_feat0.pt'
cpp_iou_feat1_path = cpp_output_bb_reg_dir_path / f'sample_{i}_iou_feat1.pt'
cpp_mod_vec0_path = cpp_output_bb_reg_dir_path / f'sample_{i}_mod_vec0.pt'
cpp_mod_vec1_path = cpp_output_bb_reg_dir_path / f'sample_{i}_mod_vec1.pt'
cpp_iou_scores_path = cpp_output_bb_reg_dir_path / f'sample_{i}_iou_scores.pt'
# Load initial inputs for Python model
py_image_tensor = self.load_cpp_tensor(py_image_input_path, self.device)
py_init_bbox = self.load_cpp_tensor(py_init_bbox_path, self.device)
py_proposals = self.load_cpp_tensor(py_proposals_path, self.device)
py_feat_layer2, py_feat_layer3 = None, None
if py_image_tensor is not None:
try:
with torch.no_grad():
# Run Python ResNet backbone via the wrapper's method to include preprocessing
if self.python_wrapper:
py_backbone_outputs = self.python_wrapper.extract_backbone(py_image_tensor)
else:
print("ERROR: self.python_wrapper is None, cannot extract backbone features.")
py_backbone_outputs = {} # Ensure it's a dict
# Assign ResNet outputs to be used by BB Regressor
py_feat_layer2 = py_backbone_outputs.get('layer2')
py_feat_layer3 = py_backbone_outputs.get('layer3')
except Exception as e:
print(f"ERROR: Python ResNet backbone failed for sample {i}: {e}")
else:
print(f"Warning: Skipping Python BB Regressor for sample {i}, image input not found at {py_image_input_path}")
# Get Python IoU features
py_iou_feat_list = [None, None] # Initialize as a list of two Nones
if py_feat_layer2 is not None and py_feat_layer3 is not None:
try:
# Use from-source get_iou_feat for consistent 256-channel features
# DiMPTorchScriptWrapper.bb_regressor.get_iou_feat returns features with different channel counts
temp_iou_feat = self.bb_regressor_from_source.get_iou_feat([py_feat_layer2, py_feat_layer3])
if isinstance(temp_iou_feat, tuple): temp_iou_feat = list(temp_iou_feat)
if len(temp_iou_feat) >= 2:
py_iou_feat_list = [temp_iou_feat[0], temp_iou_feat[1]]
elif len(temp_iou_feat) == 1:
py_iou_feat_list[0] = temp_iou_feat[0]
# print(f"Sample {i}: Py from-source get_iou_feat. Shapes: {[f.shape for f in py_iou_feat_list if f is not None]}")
except Exception as e_iou_source:
print(f"Sample {i}: Py from-source get_iou_feat failed: {e_iou_source}")
# Get Python modulation vectors
py_modulation_list = [None, None] # Initialize as a list of two Nones
if py_feat_layer2 is not None and py_feat_layer3 is not None and py_init_bbox is not None:
py_features_list = [py_feat_layer2, py_feat_layer3]
squeezed_init_bbox = py_init_bbox
if py_init_bbox.ndim == 3 and py_init_bbox.shape[0] > 0 and py_init_bbox.shape[1] == 1:
squeezed_init_bbox = py_init_bbox.squeeze(1)
try:
# Using Torchscript model for modulation
if self.python_wrapper and self.python_wrapper.bb_regressor:
temp_mod = self.python_wrapper.bb_regressor.get_modulation(py_features_list, squeezed_init_bbox)
else:
print("ERROR: self.python_wrapper.bb_regressor is not available for get_modulation.")
temp_mod = [None, None]
if isinstance(temp_mod, tuple): temp_mod = list(temp_mod)
if len(temp_mod) >= 2:
py_modulation_list = [temp_mod[0], temp_mod[1]]
elif len(temp_mod) == 1:
py_modulation_list[0] = temp_mod[0]
# print(f"Sample {i}: Py TorchScript get_modulation. Shapes: {[f.shape for f in py_modulation_list if f is not None]}")
except Exception as e_ts:
print(f"Sample {i}: Py TorchScript get_modulation failed: {e_ts}. Trying from-source.")
try:
temp_mod_source = self.bb_regressor_from_source.get_modulation(py_features_list, squeezed_init_bbox)
if isinstance(temp_mod_source, tuple): temp_mod_source = list(temp_mod_source)
if len(temp_mod_source) >=2:
py_modulation_list = [temp_mod_source[0], temp_mod_source[1]]
elif len(temp_mod_source) == 1:
py_modulation_list[0] = temp_mod_source[0]
# print(f"Sample {i}: Py from-source get_modulation. Shapes: {[f.shape for f in py_modulation_list if f is not None]}")
except Exception as e_source:
print(f"Sample {i}: Py from-source get_modulation also failed: {e_source}")
# Run Python bb_regressor's predict_iou (from TorchScript model)
py_iou_pred = None
if all(f is not None for f in py_iou_feat_list) and \
all(m is not None for m in py_modulation_list) and \
py_proposals is not None:
try:
with torch.no_grad():
if self.python_wrapper and self.python_wrapper.bb_regressor:
py_iou_pred = self.python_wrapper.bb_regressor.predict_iou(py_modulation_list, py_iou_feat_list, py_proposals)
else:
print("ERROR: self.python_wrapper.bb_regressor is not available for predict_iou.")
py_iou_pred = None
# print(f"Sample {i}: Py predict_iou output shape: {py_iou_pred.shape if py_iou_pred is not None else 'N/A'}")
except Exception as e:
print(f"ERROR: Python model predict_iou failed for sample {i}: {e}")
# Load C++ outputs
cpp_iou_feat0 = self.load_cpp_tensor(cpp_iou_feat0_path, self.device)
cpp_iou_feat1 = self.load_cpp_tensor(cpp_iou_feat1_path, self.device)
cpp_mod_vec0 = self.load_cpp_tensor(cpp_mod_vec0_path, self.device)
cpp_mod_vec1 = self.load_cpp_tensor(cpp_mod_vec1_path, self.device)
cpp_iou_scores = self.load_cpp_tensor(cpp_iou_scores_path, self.device)
# Comparisons
self._compare_tensor_data(py_iou_feat_list[0], cpp_iou_feat0, "BBReg PyIoUFeat0 vs CppIoUFeat0", i, current_errors)
self._compare_tensor_data(py_iou_feat_list[1], cpp_iou_feat1, "BBReg PyIoUFeat1 vs CppIoUFeat1", i, current_errors)
self._compare_tensor_data(py_modulation_list[0], cpp_mod_vec0, "BBReg PyMod0 vs CppMod0", i, current_errors)
self._compare_tensor_data(py_modulation_list[1], cpp_mod_vec1, "BBReg PyMod1 vs CppMod1", i, current_errors)
self._compare_tensor_data(py_iou_pred, cpp_iou_scores, "BBReg IoUPred", i, current_errors)
if current_errors: self.all_comparison_stats[f"BBReg_Sample_{i}"] = current_errors
def compare_resnet_outputs(self):
print("Comparing ResNet outputs...")
print("\n--- Types at START of compare_resnet_outputs: ---")
if 'ResNet' in self.models: print(f" self.models['ResNet'] type: {type(self.models['ResNet'])}")
if 'Classifier' in self.models: print(f" self.models['Classifier'] type: {type(self.models['Classifier'])}")
if 'BBRegressor' in self.models: print(f" self.models['BBRegressor'] type: {type(self.models['BBRegressor'])}")
py_input_common_dir = os.path.join(self.root_dir, 'test', 'input_samples', 'common')
cpp_output_resnet_dir = os.path.join(self.cpp_output_dir, 'resnet')
# Ensure self.py_resnet_output_dir is defined, e.g., in __init__ or where other py output dirs are
if not hasattr(self, 'py_resnet_output_dir') or not self.py_resnet_output_dir:
self.py_resnet_output_dir = Path(self.python_output_dir) / 'resnet'
self.py_resnet_output_dir.mkdir(parents=True, exist_ok=True)
# Define Path objects for directory checks
py_input_common_dir_path = Path(py_input_common_dir)
cpp_output_resnet_dir_path = Path(cpp_output_resnet_dir)
comparison_configs = [
("ResNet Conv1 Output (Pre-BN)", "_conv1_output_py.pt", "_conv1_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Conv1", "_conv1_output.pt", "_conv1_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir), # Assumes Py also saved conv1 output if it was meant to be same as C++ pre-bn
("ResNet BN1", "_bn1_output.pt", "_bn1_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet ReLU1", "_relu1_output.pt", "_relu1_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet MaxPool", "_maxpool_output.pt", "_maxpool_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Layer1.0 Block Output", "_layer1_0_block_output.pt", "_layer1_0_block_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Layer1.0 Shortcut Output", "_layer1_0_shortcut_output.pt", "_layer1_0_shortcut_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Layer1", "_layer1_output.pt", "_layer1_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Layer2", "_layer2_output.pt", "_layer2_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Layer3", "_layer3_output.pt", "_layer3_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Layer4", "_layer4_output.pt", "_layer4_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir),
("ResNet Features", "_features_output.pt", "_features_output.pt", self.py_resnet_output_dir, cpp_output_resnet_dir)
]
if not py_input_common_dir_path.exists() or not cpp_output_resnet_dir_path.exists():
print(f"ResNet input ({py_input_common_dir_path}) or C++ ResNet output dir ({cpp_output_resnet_dir_path}) not found. Skipping ResNet comparison.")
# Populate NaN for all expected ResNet comparisons if dirs are missing
for i in range(self.num_samples):
sample_key_base = f"ResNet_Sample_{i}"
current_errors = {}
self._compare_tensor_data(None, None, "ResNet Layer1", i, current_errors)
self._compare_tensor_data(None, None, "ResNet Layer2", i, current_errors)
self._compare_tensor_data(None, None, "ResNet Layer3", i, current_errors)
self._compare_tensor_data(None, None, "ResNet Layer4", i, current_errors)
self._compare_tensor_data(None, None, "ResNet Features", i, current_errors)
self.all_comparison_stats[sample_key_base] = current_errors
return
for i in tqdm(range(self.num_samples), desc="ResNet samples"):
current_errors = {} # For this sample
py_image_input_path = py_input_common_dir_path / f'sample_{i}_image.pt'
py_image_tensor = self.load_cpp_tensor(py_image_input_path, self.device)
py_conv1_out, py_bn1_out, py_relu1_out, py_maxpool_out, py_layer1_out, py_layer2_out, py_layer3_out, py_layer4_out, py_features_out = None, None, None, None, None, None, None, None, None # ADDED py_conv1_out, py_bn1_out, py_relu1_out
py_layer1_0_shortcut_out = None
if py_image_tensor is not None:
# Save Python's preprocessed input to conv1
# This py_image_tensor is already preprocessed by DiMPTorchScriptWrapper.extract_backbone -> preprocess_image
# which is called before this compare_resnet_outputs function if we follow the logic for py_feat_layer2, py_feat_layer3 in compare_bb_regressor
# However, here in compare_resnet_outputs, py_image_tensor comes from load_cpp_tensor(py_image_input_path, ...)
# which is the RAW image. Preprocessing for python side happens inside self.python_wrapper.extract_backbone
# or when we manually call py_model_resnet.conv1(py_image_tensor)
# Let's get the preprocessed image from the wrapper as that's the true input to Python's ResNet
# The input to python_wrapper.extract_backbone is the raw image tensor
# It then calls self.preprocess_image(im) and then self.net.extract_backbone_features(im, layers)
# So, py_image_tensor IS the raw image. We need to get the preprocessed one.
preprocessed_py_image_for_conv1 = None
if self.python_wrapper:
# Manually preprocess for saving, mimicking what extract_backbone would do before its first conv
preprocessed_py_image_for_conv1 = self.python_wrapper.preprocess_image(py_image_tensor.clone()) # Clone to avoid in-place modification of py_image_tensor
py_preprocessed_save_path = Path(self.cpp_output_dir) / 'resnet' / f'sample_{i}_image_preprocessed_python.pt'
# Ensure self.cpp_output_dir / resnet exists
(Path(self.cpp_output_dir) / 'resnet').mkdir(parents=True, exist_ok=True)
torch.save(preprocessed_py_image_for_conv1.cpu(), str(py_preprocessed_save_path))
print(f"Saved Python preprocessed image for sample {i} to {py_preprocessed_save_path}")
else:
print("ERROR: self.python_wrapper not available to get preprocessed image for Python.")
try:
with torch.no_grad():
py_model_resnet = self.models.get('ResNet')
if py_model_resnet:
current_features = preprocessed_py_image_for_conv1
py_conv1_out = py_model_resnet.conv1(current_features)
# Ensure self.py_resnet_output_dir is defined and is a Path object
if not hasattr(self, 'py_resnet_output_dir') or not self.py_resnet_output_dir:
self.py_resnet_output_dir = Path(self.python_output_dir) / 'resnet'
self.py_resnet_output_dir.mkdir(parents=True, exist_ok=True)
py_conv1_out_path = self.py_resnet_output_dir / f'sample_{i}_conv1_output_py.pt'
torch.save(py_conv1_out.cpu(), str(py_conv1_out_path))
# --- BN1 on CPU for debugging (Python) ---
py_bn1_out = py_model_resnet.bn1(py_conv1_out) # Original line
py_relu1_out = py_model_resnet.relu(py_bn1_out)
py_maxpool_out = py_model_resnet.maxpool(py_relu1_out)
x_for_py_layer1_input = py_maxpool_out
# Output of the first bottleneck block in layer1
py_layer1_0_block_out_tensor = None # Initialize to avoid ref before assignment if try fails
if hasattr(py_model_resnet, 'layer1') and len(py_model_resnet.layer1) > 0:
try:
py_layer1_0_block_out_tensor = py_model_resnet.layer1[0](x_for_py_layer1_input) # REMOVED .clone() for consistency with best Layer1.0 result
# Ensure cpp_resnet_sample_dir is defined, if not, use a fallback or define it earlier
# Assuming cpp_resnet_sample_dir is defined like: cpp_resnet_sample_dir = Path(self.cpp_output_dir) / 'resnet'
# Which should be: cpp_resnet_dir = Path(self.cpp_output_dir) / 'resnet' # as per usage elsewhere
# And then: cpp_resnet_sample_dir = cpp_resnet_dir # if sample specific subdirs are not used for this
# For safety, let's use the already established cpp_output_resnet_dir path from later in the code
# cpp_output_resnet_dir = os.path.join(self.cpp_output_dir, 'resnet')
# Need to ensure cpp_output_resnet_dir is a Path object if used with /
# From later code: cpp_output_resnet_dir_path = Path(self.cpp_output_dir) / 'resnet'
current_cpp_resnet_dir = Path(self.cpp_output_dir) / 'resnet' # Define it based on existing patterns
current_cpp_resnet_dir.mkdir(parents=True, exist_ok=True) # Ensure directory exists
py_layer1_0_block_save_path = current_cpp_resnet_dir / f'sample_{i}_layer1_0_block_output.pt'
torch.save(py_layer1_0_block_out_tensor.cpu(), str(py_layer1_0_block_save_path))
# print(f"DEBUG: Saved Python layer1[0] block output for sample {i} to {py_layer1_0_block_save_path}")
except Exception as e_block:
print(f"ERROR: Failed to get/save Python layer1[0] block output for sample {i}: {e_block}")
# Shortcut for layer1.0 (if exists)
if hasattr(py_model_resnet, 'layer1') and len(py_model_resnet.layer1) > 0 and \
hasattr(py_model_resnet.layer1[0], 'downsample') and py_model_resnet.layer1[0].downsample is not None:
py_layer1_0_shortcut_out = py_model_resnet.layer1[0].downsample(x_for_py_layer1_input.clone())
# Get full backbone outputs using the wrapper (which uses the raw image_tensor and preprocesses internally)
# This ensures layer1, layer2, etc., are from the standard path.
if self.python_wrapper:
py_backbone_outputs = self.python_wrapper.extract_backbone(py_image_tensor) # py_image_tensor is raw
else:
print("ERROR: self.python_wrapper is None, cannot extract backbone features for ResNet outputs.")
py_backbone_outputs = {}
py_layer1_out = py_backbone_outputs.get('layer1')
py_layer2_out = py_backbone_outputs.get('layer2')
py_layer3_out = py_backbone_outputs.get('layer3')
py_layer4_out = py_backbone_outputs.get('layer4')
py_features_out = py_backbone_outputs.get('layer4') # Typically layer4 is the final feature map
else:
print("ERROR: Python ResNet model not found in self.models")
except Exception as e:
print(f"ERROR: Python ResNet backbone/shortcut processing failed for sample {i}: {e}")
else:
print(f"Warning: Skipping Python ResNet for sample {i}, image input not found at {py_image_input_path}")
# Load C++ ResNet outputs
# NEW: Debug directory listing
print(f"DEBUG: Listing contents of {cpp_output_resnet_dir_path} before loading tensors for sample {i}:")
try:
if cpp_output_resnet_dir_path.exists() and cpp_output_resnet_dir_path.is_dir():
for item_path in cpp_output_resnet_dir_path.iterdir():
print(f" - {item_path.name}")
else:
print(f" Directory {cpp_output_resnet_dir_path} does not exist or is not a directory.")
except Exception as e_list:
print(f" ERROR listing directory: {e_list}")
# END NEW # Removing this marker
time.sleep(0.5) # INCREASED to 0.5s delay to allow filesystem to sync
# Debug blocks for directory listing and direct open test were here and are now fully removed.
cpp_layer1_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_layer1.pt')
cpp_layer2_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_layer2.pt')
cpp_layer3_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_layer3.pt')
cpp_layer4_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_layer4.pt')
cpp_features_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_features.pt')
cpp_layer1_0_shortcut_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_layer1_0_shortcut_output.pt')
cpp_maxpool_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_maxpool_output.pt')
cpp_conv1_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_conv1_output.pt') # ADDED
cpp_bn1_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_bn1_output.pt') # ADDED
cpp_relu1_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_relu1_output.pt') # ADDED
cpp_layer1_0_block_output_path = os.path.join(cpp_output_resnet_dir, f'sample_{i}_layer1_0_block_output.pt') # ADDED
cpp_layer1_out = self.load_cpp_tensor(cpp_layer1_path, self.device)
cpp_layer2_out = self.load_cpp_tensor(cpp_layer2_path, self.device)
cpp_layer3_out = self.load_cpp_tensor(cpp_layer3_path, self.device)
cpp_layer4_out = self.load_cpp_tensor(cpp_layer4_path, self.device)
cpp_features_out = self.load_cpp_tensor(cpp_features_path, self.device)
cpp_layer1_0_shortcut_out = self.load_cpp_tensor(cpp_layer1_0_shortcut_path, self.device)
cpp_maxpool_out = self.load_cpp_tensor(cpp_maxpool_path, self.device)
cpp_conv1_out = self.load_cpp_tensor(cpp_conv1_path, self.device) # ADDED
cpp_bn1_out = self.load_cpp_tensor(cpp_bn1_path, self.device) # ADDED
cpp_relu1_out = self.load_cpp_tensor(cpp_relu1_path, self.device) # ADDED
cpp_layer1_0_block_output_tensor = self.load_cpp_tensor(cpp_layer1_0_block_output_path, self.device) # ADDED
# Load the Python pre-BN conv1 output that was saved earlier
py_conv1_out_pre_bn_tensor = None
# Ensure self.py_resnet_output_dir is defined (it should be if the save operation worked)
if hasattr(self, 'py_resnet_output_dir') and self.py_resnet_output_dir:
py_conv1_out_pre_bn_path = self.py_resnet_output_dir / f'sample_{i}_conv1_output_py.pt'
if py_conv1_out_pre_bn_path.exists():
try:
py_conv1_out_pre_bn_tensor = torch.load(str(py_conv1_out_pre_bn_path), map_location=self.device)
except Exception as e_load_py_conv1:
print(f"Error loading Python conv1_output_py (pre-BN) for sample {i}: {e_load_py_conv1}")
else:
print(f"Warning: self.py_resnet_output_dir not defined, cannot load py_conv1_output_py.pt for sample {i}")
# Comparisons
self._compare_tensor_data(py_conv1_out_pre_bn_tensor, cpp_conv1_out, "ResNet Conv1 Output (Pre-BN)", i, current_errors)
self._compare_tensor_data(py_conv1_out, cpp_conv1_out, "ResNet Conv1", i, current_errors)
self._compare_tensor_data(py_bn1_out, cpp_bn1_out, "ResNet BN1", i, current_errors)
self._compare_tensor_data(py_relu1_out, cpp_relu1_out, "ResNet ReLU1", i, current_errors)
self._compare_tensor_data(py_maxpool_out, cpp_maxpool_out, "ResNet MaxPool", i, current_errors)
self._compare_tensor_data(py_layer1_out, cpp_layer1_out, "ResNet Layer1", i, current_errors)
self._compare_tensor_data(py_layer2_out, cpp_layer2_out, "ResNet Layer2", i, current_errors)
self._compare_tensor_data(py_layer3_out, cpp_layer3_out, "ResNet Layer3", i, current_errors)
self._compare_tensor_data(py_layer4_out, cpp_layer4_out, "ResNet Layer4", i, current_errors)
self._compare_tensor_data(py_features_out, cpp_features_out, "ResNet Features", i, current_errors)
self._compare_tensor_data(py_layer1_0_shortcut_out, cpp_layer1_0_shortcut_out, "ResNet Layer1.0 Shortcut", i, current_errors)
if current_errors: self.all_comparison_stats[f"ResNet_Sample_{i}"] = current_errors
def generate_html_report(self):
print("\nGenerating HTML report...")
report_path = os.path.join(self.comparison_dir, "report.html")
# Prepare data for the report: group by model and comparison type
report_data = {
}
for sample_key, comparisons in self.all_comparison_stats.items():
# sample_key examples: "Clf_Train_Sample_0", "Clf_Test_Sample_0", "BBReg_Sample_0"
parts = sample_key.split("_")
model_prefix = parts[0] # Clf, BBReg, ResNet
sample_type_str = ""
sample_idx = -1
if model_prefix == "Clf":
sample_type_str = parts[1] # Train or Test
sample_idx = int(parts[-1])
model_name_key = f"Classifier {sample_type_str}"
elif model_prefix == "BBReg":
sample_idx = int(parts[-1])
model_name_key = "BB Regressor"
elif model_prefix == "ResNet": # Added this case
sample_idx = int(parts[-1])
model_name_key = "ResNet"
else:
print(f"WARNING: Unknown sample key format in all_comparison_stats: {sample_key}")
continue
for comparison_name, stats in comparisons.items():
# comparison_name examples: "Classifier Features Train", "BBReg PyIoUFeat0 vs CppIoUFeat0"
# Unpack all 11 metrics now
mae, max_err, diff_arr, mean_py_val, std_abs_err, \
l2_py, l2_cpp, l2_diff, cos_sim, pearson, mre = stats
full_comparison_key = f"{model_name_key} - {comparison_name}"
if full_comparison_key not in report_data:
report_data[full_comparison_key] = {
"samples": {},
"all_maes": [],
"all_max_errs": [],
"all_mean_py_vals": [],
"all_std_abs_errs": [], # Renamed from all_std_errs
"all_l2_py_vals": [],
"all_l2_cpp_vals": [],
"all_l2_diff_vals": [],
"all_cos_sim_vals": [],
"all_pearson_vals": [],
"all_mre_vals": []
}
relative_plot_path = None # Initialize relative_plot_path
plot_filename = f"{model_name_key.replace(' ', '_')}_{comparison_name.replace(' ', '_')}_{sample_idx}.png"
plot_abs_path = os.path.join(self.comparison_dir, plot_filename)
if os.path.exists(plot_abs_path):
# relative_plot_path = Path(plot_filename) # Old
relative_plot_path = plot_filename # New, already just filename
self._generate_single_plot(diff_arr, comparison_name, plot_abs_path, mean_py_val, std_abs_err, mae, max_err)
report_data[full_comparison_key]["samples"][sample_idx] = {
"mae": mae,
"max_err": max_err,
"mean_py_val": mean_py_val,
"std_abs_err": std_abs_err, # Renamed from std_err
"l2_py": l2_py,
"l2_cpp": l2_cpp,
"l2_diff": l2_diff,
"cos_sim": cos_sim,
"pearson": pearson,
"mre": mre,
"plot_path": relative_plot_path # Store relative path for HTML
}
if not np.isnan(mae): report_data[full_comparison_key]["all_maes"].append(mae)
if not np.isnan(max_err): report_data[full_comparison_key]["all_max_errs"].append(max_err)
if not np.isnan(mean_py_val): report_data[full_comparison_key]["all_mean_py_vals"].append(mean_py_val)
if not np.isnan(std_abs_err): report_data[full_comparison_key]["all_std_abs_errs"].append(std_abs_err)
if not np.isnan(l2_py): report_data[full_comparison_key]["all_l2_py_vals"].append(l2_py)
if not np.isnan(l2_cpp): report_data[full_comparison_key]["all_l2_cpp_vals"].append(l2_cpp)
if not np.isnan(l2_diff): report_data[full_comparison_key]["all_l2_diff_vals"].append(l2_diff)
if not np.isnan(cos_sim): report_data[full_comparison_key]["all_cos_sim_vals"].append(cos_sim)
if not np.isnan(pearson): report_data[full_comparison_key]["all_pearson_vals"].append(pearson)
if not np.isnan(mre): report_data[full_comparison_key]["all_mre_vals"].append(mre)
# Calculate overall stats
for comp_key, data in report_data.items():
data["overall_mae_mean"] = np.mean(data["all_maes"]) if data["all_maes"] else float('nan')
data["overall_mae_std"] = np.std(data["all_maes"]) if data["all_maes"] else float('nan')
data["overall_max_err_mean"] = np.mean(data["all_max_errs"]) if data["all_max_errs"] else float('nan')
data["overall_mean_py_val_mean"] = np.mean(data["all_mean_py_vals"]) if data["all_mean_py_vals"] else float('nan')
data["overall_std_abs_err_mean"] = np.mean(data["all_std_abs_errs"]) if data["all_std_abs_errs"] else float('nan') # Renamed
data["overall_l2_py_mean"] = np.mean(data["all_l2_py_vals"]) if data["all_l2_py_vals"] else float('nan')
data["overall_l2_cpp_mean"] = np.mean(data["all_l2_cpp_vals"]) if data["all_l2_cpp_vals"] else float('nan')
data["overall_l2_diff_mean"] = np.mean(data["all_l2_diff_vals"]) if data["all_l2_diff_vals"] else float('nan')
data["overall_cos_sim_mean"] = np.mean(data["all_cos_sim_vals"]) if data["all_cos_sim_vals"] else float('nan')
data["overall_pearson_mean"] = np.mean(data["all_pearson_vals"]) if data["all_pearson_vals"] else float('nan')
data["overall_mre_mean"] = np.mean(data["all_mre_vals"]) if data["all_mre_vals"] else float('nan')
# HTML Generation
html_content = """
<html>
<head>
<title>Model Comparison Report</title>
<style>
body { font-family: sans-serif; margin: 20px; }
h1, h2, h3 { color: #333; }
table { border-collapse: collapse; width: 90%; margin-bottom: 20px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.plot-container { margin-bottom: 30px; page-break-inside: avoid; }
img { max-width: 100%; height: auto; border: 1px solid #ccc; }
.nan { color: #999; font-style: italic; }
.collapsible {
background-color: #f2f2f2;
color: #444;
cursor: pointer;
padding: 10px;
width: 100%;
border: none;
text-align: left;
outline: none;
font-size: 1.1em;
margin-top: 10px;
margin-bottom: 5px;
}
.active, .collapsible:hover {
background-color: #ddd;
}
.content {
padding: 0 18px;
display: none;
overflow: hidden;
background-color: #f9f9f9;
}
.metric-explanation { margin-bottom: 20px; padding: 10px; border: 1px solid #eee; background-color: #f9f9f9; }
.metric-explanation dt { font-weight: bold; }
.metric-explanation dd { margin-left: 20px; margin-bottom: 5px; }
</style>
</head>
<body>
<h1>Model Comparison Report</h1>
<p>Number of samples per model component: {self.num_samples}</p>
<div class="metric-explanation">
<h3>Understanding the Metrics:</h3>
<dl>
<dt>Mean MAE (Mean Absolute Error)</dt>
<dd><b>Calculation:</b> Average of the absolute differences between corresponding elements of the Python and C++ tensors (<code>mean(abs(py - cpp))</code>). The "Mean MAE" in the summary table is the average of these MAEs over all samples for a given comparison.</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. Closer to 0 indicates better agreement. This metric shows the average magnitude of error.</dd>
<dt>Std MAE (Standard Deviation of MAE)</dt>
<dd><b>Calculation:</b> Standard deviation of the MAE values calculated for each sample within a comparison group.</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. A smaller value indicates that the MAE is consistent across samples. A larger value suggests variability in agreement from sample to sample.</dd>
<dt>Mean Max Error</dt>
<dd><b>Calculation:</b> Average of the maximum absolute differences found between Python and C++ tensors for each sample (<code>mean(max(abs(py - cpp)))</code> over samples).</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. Closer to 0 is better. Indicates the average of the worst-case discrepancies per sample.</dd>
<dt>Mean Py Val (Mean Python Tensor Value)</dt>
<dd><b>Calculation:</b> Average of the mean values of the Python reference tensors over all samples (<code>mean(mean(py_tensor_sample_N))</code>).</dd>
<dd><b>Range & Interpretation:</b> Problem-dependent. Provides context about the typical magnitude of the Python model's output values.</dd>
<dt>Mean Std Abs Err (Mean Standard Deviation of Absolute Errors)</dt>
<dd><b>Calculation:</b> Average of the standard deviations of the absolute error arrays (<code>abs(py - cpp)</code>) for each sample. The "Err Std" in plot titles is this value for that specific sample.</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. A smaller value indicates that the errors are concentrated around their mean (MAE), implying less spread in error magnitudes within a sample.</dd>
<dt>Mean L2 Py (Mean L2 Norm of Python Tensor)</dt>
<dd><b>Calculation:</b> Average of the L2 norms (Euclidean norm) of the flattened Python tensors over all samples.</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. Represents the average magnitude or "length" of the Python output vectors.</dd>
<dt>Mean L2 Cpp (Mean L2 Norm of C++ Tensor)</dt>
<dd><b>Calculation:</b> Average of the L2 norms of the flattened C++ tensors over all samples.</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. Represents the average magnitude of the C++ output vectors. Should be comparable to Mean L2 Py if models agree in scale.</dd>
<dt>Mean L2 Diff (Mean L2 Norm of Difference)</dt>
<dd><b>Calculation:</b> Average of the L2 norms of the flattened difference tensors (<code>py - cpp</code>) over all samples.</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. Closer to 0 indicates better agreement. This is the magnitude of the average difference vector.</dd>
<dt>Mean Cosine Sim (Mean Cosine Similarity)</dt>
<dd><b>Calculation:</b> Average of the cosine similarities between the flattened Python and C++ tensors over all samples. Cosine similarity is <code>dot(py, cpp) / (norm(py) * norm(cpp))</code>.</dd>
<dd><b>Range & Interpretation:</b> -1 to 1 (typically 0 to 1 for non-negative features). Closer to 1 indicates that the tensors point in the same direction (high similarity in terms of orientation, ignoring magnitude). Values near 0 suggest orthogonality, and near -1 suggest opposite directions.</dd>
<dt>Mean Pearson Corr (Mean Pearson Correlation Coefficient)</dt>
<dd><b>Calculation:</b> Average of the Pearson correlation coefficients between the flattened Python and C++ tensors over all samples. Measures linear correlation.</dd>
<dd><b>Range & Interpretation:</b> -1 to 1. Closer to 1 indicates strong positive linear correlation. Closer to -1 indicates strong negative linear correlation. Closer to 0 indicates weak or no linear correlation.</dd>
<dt>Mean MRE (Mean Relative Error)</dt>
<dd><b>Calculation:</b> Average of the mean relative errors per sample, where relative error is <code>mean(abs(py - cpp) / (abs(py) + epsilon))</code>. Epsilon is a small value to prevent division by zero.</dd>
<dd><b>Range & Interpretation:</b> 0 to &infin;. Closer to 0 is better. This metric normalizes the absolute error by the magnitude of the Python reference values, useful for understanding error relative to signal strength.</dd>
</dl>
</div>
"""
sorted_report_keys = sorted(report_data.keys())
html_content += "<h2>Overall Comparison Statistics</h2><table><tr><th>Comparison Key</th><th>Mean MAE</th><th>Std MAE</th><th>Mean Max Error</th><th>Mean Py Val</th><th>Mean Std Abs Err</th><th>Mean L2 Py</th><th>Mean L2 Cpp</th><th>Mean L2 Diff</th><th>Mean Cosine Sim</th><th>Mean Pearson Corr</th><th>Mean MRE</th></tr>"
for comp_key in sorted_report_keys:
data = report_data[comp_key]
html_content += f"""
<tr>
<td>{comp_key}</td>
<td>{f"{data['overall_mae_mean']:.4e}" if not np.isnan(data['overall_mae_mean']) else 'N/A'}</td>
<td>{f"{data['overall_mae_std']:.4e}" if not np.isnan(data['overall_mae_std']) else 'N/A'}</td>
<td>{f"{data['overall_max_err_mean']:.4e}" if not np.isnan(data['overall_max_err_mean']) else 'N/A'}</td>
<td>{f"{data['overall_mean_py_val_mean']:.4e}" if not np.isnan(data['overall_mean_py_val_mean']) else 'N/A'}</td>
<td>{f"{data['overall_std_abs_err_mean']:.4e}" if not np.isnan(data['overall_std_abs_err_mean']) else 'N/A'}</td>
<td>{f"{data['overall_l2_py_mean']:.4e}" if not np.isnan(data['overall_l2_py_mean']) else 'N/A'}</td>
<td>{f"{data['overall_l2_cpp_mean']:.4e}" if not np.isnan(data['overall_l2_cpp_mean']) else 'N/A'}</td>
<td>{f"{data['overall_l2_diff_mean']:.4e}" if not np.isnan(data['overall_l2_diff_mean']) else 'N/A'}</td>
<td>{f"{data['overall_cos_sim_mean']:.4f}" if not np.isnan(data['overall_cos_sim_mean']) else 'N/A'}</td>
<td>{f"{data['overall_pearson_mean']:.4f}" if not np.isnan(data['overall_pearson_mean']) else 'N/A'}</td>
<td>{f"{data['overall_mre_mean']:.4e}" if not np.isnan(data['overall_mre_mean']) else 'N/A'}</td>
</tr>
"""
html_content += "</table>"
for comp_key in sorted_report_keys:
data = report_data[comp_key]
html_content += f"<h2>Details for: {comp_key}</h2>"
html_content += f"""<p>Overall Mean MAE: {f'{data["overall_mae_mean"]:.4e}' if not np.isnan(data['overall_mae_mean']) else 'N/A'}</p>"""
html_content += "<table><tr><th>Sample Index</th><th>MAE</th><th>Max Error</th><th>Mean Py Val</th><th>Std Abs Err</th><th>L2 Py</th><th>L2 Cpp</th><th>L2 Diff</th><th>Cosine Sim</th><th>Pearson Corr</th><th>MRE</th><th>Error Distribution Plot</th></tr>"
for sample_idx in sorted(data["samples"].keys()):
sample_data = data["samples"][sample_idx]
img_tag = f'<img src="{sample_data["plot_path"]}" alt="Error histogram for {comp_key} sample {sample_idx}" style="max-width:400px; height:auto;">' if sample_data["plot_path"] else "N/A"
html_content += f"""
<tr>
<td>{sample_idx}</td>
<td>{f"{sample_data['mae']:.4e}" if not np.isnan(sample_data['mae']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['max_err']:.4e}" if not np.isnan(sample_data['max_err']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['mean_py_val']:.4e}" if not np.isnan(sample_data['mean_py_val']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['std_abs_err']:.4e}" if not np.isnan(sample_data['std_abs_err']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['l2_py']:.4e}" if not np.isnan(sample_data['l2_py']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['l2_cpp']:.4e}" if not np.isnan(sample_data['l2_cpp']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['l2_diff']:.4e}" if not np.isnan(sample_data['l2_diff']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['cos_sim']:.4f}" if not np.isnan(sample_data['cos_sim']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['pearson']:.4f}" if not np.isnan(sample_data['pearson']) else '<span class="nan">N/A</span>'}</td>
<td>{f"{sample_data['mre']:.4e}" if not np.isnan(sample_data['mre']) else '<span class="nan">N/A</span>'}</td>
<td>{img_tag}</td>
</tr>
"""
html_content += "</table>"
html_content += """
<script>
var coll = document.getElementsByClassName("collapsible");
var i;
for (i = 0; i < coll.length; i++) {
coll[i].addEventListener("click", function() {
this.classList.toggle("active");
var content = this.nextElementSibling;
if (content.style.display === "block") {
content.style.display = "none";
} else {
content.style.display = "block";
}
});
}
</script>
</body></html>
"""
with open(report_path, 'w') as f:
f.write(html_content)
print(f"HTML report generated at {report_path}")
def _generate_single_plot(self, error_array, title, plot_path, mean_val, std_abs_err, mae, max_err):
if error_array is None or len(error_array) == 0 or np.all(np.isnan(error_array)):
# print(f"Skipping plot for {title} as error_array is empty or all NaNs.")
return
plt.figure(figsize=(8, 6))
plt.hist(error_array, bins=50, color='skyblue', edgecolor='black')
stats_text = f"Ref Mean: {mean_val:.3e} | MAE: {mae:.3e} | MaxErr: {max_err:.3e} | Err Std: {std_abs_err:.3e}"
plt.title(f"{title}\n{stats_text}", fontsize=10)
plt.xlabel("Error Value")
plt.ylabel("Frequency")
plt.grid(True, linestyle='--', alpha=0.7)
try:
plt.tight_layout()
plt.savefig(plot_path)
except Exception as e:
print(f"ERROR: Failed to save plot {plot_path}: {e}")
plt.close()
def run_all_tests(self):
print("DEBUG: ComparisonRunner.run_all_tests() ENTERED") # DEBUG PRINT
self.all_comparison_stats = {} # Initialize/clear for the new run
self.load_python_models()
self.compare_resnet_outputs()
self.compare_classifier()
self.compare_bb_regressor()
self.compare_preprocessed_inputs() # ADDED
self.generate_html_report()
print("All tests completed!")
def compare_preprocessed_inputs(self):
print("\nComparing preprocessed input images (Python vs C++)...")
# Directory where both Python and C++ are saving their preprocessed images
# As per current changes, this is self.cpp_output_dir / 'resnet'
preprocessed_dir = Path(self.cpp_output_dir) / 'resnet'
issues_found = False
for i in tqdm(range(self.num_samples), desc="Preprocessed Input Samples"):
py_input_path = preprocessed_dir / f'sample_{i}_image_preprocessed_python.pt'
cpp_input_path = preprocessed_dir / f'sample_{i}_image_preprocessed_cpp.pt'
py_tensor = None
cpp_tensor = None
if py_input_path.exists():
try:
py_tensor = torch.load(str(py_input_path), map_location=self.device)
except Exception as e:
print(f"Error loading Python preprocessed input for sample {i} from {py_input_path}: {e}")
else:
print(f"Python preprocessed input for sample {i} not found at {py_input_path}")
if cpp_input_path.exists():
try:
cpp_tensor = self.load_cpp_tensor(str(cpp_input_path), self.device) # Use existing loader
except Exception as e:
print(f"Error loading C++ preprocessed input for sample {i} from {cpp_input_path}: {e}")
else:
print(f"C++ preprocessed input for sample {i} not found at {cpp_input_path}")
if py_tensor is not None and cpp_tensor is not None:
if py_tensor.shape != cpp_tensor.shape:
print(f" Sample {i}: SHAPE MISMATCH! Python: {py_tensor.shape}, C++: {cpp_tensor.shape}")
issues_found = True
continue
are_close = torch.allclose(py_tensor, cpp_tensor, atol=1e-7) # Using a slightly stricter tolerance for direct input comparison
max_abs_diff = torch.max(torch.abs(py_tensor - cpp_tensor)).item() if not are_close else 0.0
mean_abs_diff = torch.mean(torch.abs(py_tensor - cpp_tensor)).item() if not are_close else 0.0
print(f" Sample {i}: torch.allclose(): {are_close}, Max Abs Diff: {max_abs_diff:.4e}, Mean Abs Diff: {mean_abs_diff:.4e}")
if not are_close:
issues_found = True
else:
print(f" Sample {i}: Skipping comparison due to missing tensor(s).")
issues_found = True # Count missing files as an issue
if not issues_found:
print("Preprocessed input comparison: All samples matched or were close!")
else:
print("Preprocessed input comparison: ISSUES FOUND (details above).")
def load_cpp_tensor(self, file_path_str, device, is_image=False):
file_path_obj = Path(file_path_str) # Convert to Path object early
# Removed debug print: print(f"DEBUG: load_cpp_tensor: Checking existence of Path object: '{file_path_obj}' (from string '{file_path_str}')")
if not file_path_obj.exists(): # Use Path object for exists check
print(f"ERROR: C++ tensor file not found (Path.exists check): {file_path_obj}")
return None
try:
# Try loading as a JIT ScriptModule first (common for exported tensors that might have attributes)
# This also handles plain tensors saved with torch.save if they are not ScriptModules
loaded_obj = torch.jit.load(str(file_path_obj), map_location=device) # Convert Path to str for torch.jit.load
actual_tensor = None
if isinstance(loaded_obj, torch.jit.ScriptModule):
# Attempt to get tensor attribute directly, common for simple JIT-saved tensors
# Check for common weight/tensor attributes first
if hasattr(loaded_obj, 'tensor'): # Explicit "tensor" attribute
if isinstance(loaded_obj.tensor, torch.Tensor):
actual_tensor = loaded_obj.tensor
elif hasattr(loaded_obj, 'weight') and isinstance(loaded_obj.weight, torch.Tensor): # Common for conv/linear
actual_tensor = loaded_obj.weight
# Heuristic: if it has parameters and only one, assume that's the one.
elif len(list(loaded_obj.parameters())) == 1:
actual_tensor = list(loaded_obj.parameters())[0]
# Heuristic: if it has attributes that are tensors, try to find the primary one
else:
tensor_attrs = [getattr(loaded_obj, attr) for attr in dir(loaded_obj) if isinstance(getattr(loaded_obj, attr, None), torch.Tensor)]
if len(tensor_attrs) == 1:
actual_tensor = tensor_attrs[0]
elif len(tensor_attrs) > 1:
# If multiple tensor attributes, try to find one that matches common patterns or is simply 'output'
if hasattr(loaded_obj, 'output') and isinstance(loaded_obj.output, torch.Tensor):
actual_tensor = loaded_obj.output
else: # Heuristic: take the largest tensor if multiple exist and no clear primary one
actual_tensor = max(tensor_attrs, key=lambda t: t.numel())
# print(f"WARNING: Multiple tensor attributes in ScriptModule from {file_path_obj}, using largest: {actual_tensor.shape}")
if actual_tensor is None:
print(f"ERROR: C++ tensor from {file_path_obj} is a ScriptModule, but couldn't extract a single tensor. StateDict keys: {list(loaded_obj.state_dict().keys()) if hasattr(loaded_obj, 'state_dict') else 'N/A'}")
return None
elif isinstance(loaded_obj, torch.Tensor):
actual_tensor = loaded_obj
else:
print(f"ERROR: C++ tensor loaded from {file_path_obj} with torch.jit.load is not a Tensor or ScriptModule. Type: {type(loaded_obj)}")
return None
tensor = actual_tensor.to(device).float() # Ensure tensor is on the correct device and float
return tensor
except Exception as e:
# If torch.jit.load fails (e.g. it's a plain tensor not loadable by JIT), try torch.load
# This also catches errors from the processing above if actual_tensor remains None
# print(f"INFO: torch.jit.load failed for {file_path_obj} ({e}), attempting torch.load as fallback.")
try:
tensor = torch.load(str(file_path_obj), map_location=device) # Convert Path to str for torch.load
if not isinstance(tensor, torch.Tensor):
print(f"ERROR: Fallback torch.load for {file_path_obj} did not return a tensor. Type: {type(tensor)}")
return None
return tensor.to(device).float() # Ensure tensor is on the correct device and float
except Exception as e2:
print(f"ERROR: Failed to load C++ tensor from {file_path_obj}. JIT load error: {e}. Torch load error: {e2}")
import traceback
traceback.print_exc()
return None
def _compare_tensor_data(self, tensor1, tensor2, name, sample_idx, current_errors):
"""Compare two tensors and return error metrics."""
num_metrics = 11 # mae, max_err, diff_arr, mean_py_val, std_abs_err, l2_py, l2_cpp, l2_diff, cos_sim, pearson, mre
nan_metrics_tuple = (
float('nan'), float('nan'), [], float('nan'), float('nan'), # Original 5
float('nan'), float('nan'), float('nan'), float('nan'), float('nan'), float('nan') # New 6
)
if tensor1 is None or tensor2 is None:
py_mean = float('nan')
py_l2 = float('nan')
if tensor1 is not None: # Python tensor exists
t1_cpu_temp = tensor1.cpu().detach().numpy().astype(np.float32)
py_mean = np.mean(t1_cpu_temp)
py_l2 = np.linalg.norm(t1_cpu_temp.flatten())
# If only tensor2 is None, we can't calculate C++ l2 or comparison metrics
# If only tensor1 is None, py_mean and py_l2 remain NaN.
current_errors[name] = (
float('nan'), float('nan'), [], py_mean, float('nan'),
py_l2, float('nan'), float('nan'), float('nan'), float('nan'), float('nan')
)
print(f"Warning: Cannot compare '{name}' for sample {sample_idx}, one or both tensors are None.")
return
t1_cpu = tensor1.cpu().detach().numpy().astype(np.float32)
t2_cpu = tensor2.cpu().detach().numpy().astype(np.float32)
if t1_cpu.shape != t2_cpu.shape:
print(f"Warning: Shape mismatch for '{name}' sample {sample_idx}. Py: {t1_cpu.shape}, Cpp: {t2_cpu.shape}. Skipping most comparisons.")
current_errors[name] = (
float('nan'), float('nan'), [], np.mean(t1_cpu), float('nan'), # MAE, MaxErr, diff_arr, MeanPy, StdAbsErr
np.linalg.norm(t1_cpu.flatten()), np.linalg.norm(t2_cpu.flatten()), float('nan'), # L2Py, L2Cpp, L2Diff
float('nan'), float('nan'), float('nan') # CosSim, Pearson, MRE
)
return
# All calculations from here assume shapes match and tensors are not None
t1_flat = t1_cpu.flatten()
t2_flat = t2_cpu.flatten()
abs_diff_elements = np.abs(t1_cpu - t2_cpu)
mae = np.mean(abs_diff_elements)
max_err = np.max(abs_diff_elements)
diff_arr_for_hist = abs_diff_elements.flatten() # For histogram
mean_py_val = np.mean(t1_cpu)
std_abs_err = np.std(diff_arr_for_hist)
l2_norm_py = np.linalg.norm(t1_flat)
l2_norm_cpp = np.linalg.norm(t2_flat)
l2_norm_diff = np.linalg.norm(t1_flat - t2_flat)
# Cosine Similarity
dot_product = np.dot(t1_flat, t2_flat)
if l2_norm_py == 0 or l2_norm_cpp == 0:
cosine_sim = float('nan')
else:
cosine_sim = dot_product / (l2_norm_py * l2_norm_cpp)
# Pearson Correlation Coefficient
if len(t1_flat) < 2:
pearson_corr = float('nan')
else:
std_t1 = np.std(t1_flat)
std_t2 = np.std(t2_flat)
if std_t1 == 0 or std_t2 == 0: # If either is constant
if std_t1 == 0 and std_t2 == 0 and np.allclose(t1_flat, t2_flat):
pearson_corr = 1.0 # Both constant and identical
else:
pearson_corr = float('nan') # Otherwise, undefined or not meaningfully 1
else:
try:
corr_matrix = np.corrcoef(t1_flat, t2_flat)
if corr_matrix.ndim == 2:
pearson_corr = corr_matrix[0, 1]
else: # Should be a scalar if inputs were effectively constant, already handled by std checks
pearson_corr = float(corr_matrix) if np.isscalar(corr_matrix) else float('nan')
except Exception:
pearson_corr = float('nan')
# Mean Relative Error (MRE)
epsilon_rel_err = 1e-9 # Small epsilon to avoid division by zero and extreme values
# Calculate relative error where abs(t1_cpu) is not zero (or very small)
# For elements where t1_cpu is zero (or very small):
# - If t2_cpu is also zero (small), error is small.
# - If t2_cpu is not zero, relative error is infinite/large.
# Using (abs(t1_cpu) + epsilon) in denominator handles this.
mean_rel_err = np.mean(abs_diff_elements / (np.abs(t1_cpu) + epsilon_rel_err))
current_errors[name] = (
mae, max_err, diff_arr_for_hist, mean_py_val, std_abs_err,
l2_norm_py, l2_norm_cpp, l2_norm_diff, cosine_sim, pearson_corr, mean_rel_err
)
# Optional: print detailed error for specific high-error cases
# if mae > 1e-4:
# print(f"High MAE for {name}, sample {sample_idx}: {mae:.6f}")
# The function implicitly returns None as it modifies current_errors in place.
# For direct use, if needed, it could return the tuple:
# return (mae, max_err, diff_arr_for_hist, mean_py_val, std_abs_err, l2_norm_py, l2_norm_cpp, l2_norm_diff, cosine_sim, pearson_corr, mean_rel_err)
@staticmethod
def load_weights_for_custom_model(model, base_model_dir, model_name, device):
print(f"Loading weights for custom model {model_name} from {base_model_dir}")
tensor_dir = os.path.join(base_model_dir, model_name)
doc_file = Path(tensor_dir) / (model_name + '_weights_doc.txt')
if not doc_file.exists():
print(f"Warning: Documentation file not found: {doc_file} for {model_name}. Skipping weight loading for source model.")
return
with open(doc_file, 'r') as f:
lines = f.readlines()
i = 0
while i < len(lines):
line = lines[i]
if line.startswith('## '):
key = line.strip()[3:]
j = i + 1
while j < len(lines) and 'File:' not in lines[j]:
j += 1
if j < len(lines) and 'File:' in lines[j]:
file_name = lines[j].split('File:')[1].strip()
tensor_path = tensor_dir / file_name
if tensor_path.exists():
try:
tensor_data = torch.load(str(tensor_path), map_location=device)
# For .pt files that might be RecursiveScriptModule, try to extract tensor
if isinstance(tensor_data, torch.jit.RecursiveScriptModule):
if hasattr(tensor_data, 'weight'): tensor = tensor_data.weight
elif hasattr(tensor_data, 'bias'): tensor = tensor_data.bias
elif len(list(tensor_data.parameters())) > 0: tensor = list(tensor_data.parameters())[0]
else: tensor = tensor_data() # Try calling it
else:
tensor = tensor_data
parts = key.split('.')
module_to_set = model
for part in parts[:-1]:
module_to_set = getattr(module_to_set, part)
param_name = parts[-1]
if hasattr(module_to_set, param_name):
if param_name in module_to_set._parameters:
module_to_set._parameters[param_name] = torch.nn.Parameter(tensor.to(device))
elif param_name in module_to_set._buffers:
module_to_set._buffers[param_name] = tensor.to(device)
else: # Direct attribute assignment
setattr(module_to_set, param_name, tensor.to(device))
# print(f"Loaded {key} from {file_name} into source {model_name}")
else:
print(f"Warning: Attribute {key} not found in source model {model_name}.")
except Exception as e:
print(f"Error loading tensor for {key} from {tensor_path} for source {model_name}: {e}")
else:
print(f"Warning: Tensor file not found: {tensor_path} for source {model_name}")
i = j
i += 1
model.eval().to(device)
if __name__ == "__main__":
# Define ROOT_DIR for standalone script execution
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(SCRIPT_DIR) # cpp_tracker directory
# Parse command line arguments
parser = argparse.ArgumentParser(description="Compare Python and C++ model outputs.")
parser.add_argument("--num_samples", type=int, default=3, help="Number of samples to compare (-1 for all).")
args = parser.parse_args()
# Define model configurations
# Ensure get_model_configs uses ROOT_DIR if it constructs absolute paths for models
model_configs = get_model_configs(ROOT_DIR)
# Create a ComparisonRunner instance
runner = ComparisonRunner(
root_dir=ROOT_DIR,
model_configs=model_configs,
cpp_output_dir=os.path.join(ROOT_DIR, "test/output"),
python_output_dir=os.path.join(ROOT_DIR, "test/output_py"),
num_samples=args.num_samples
)
# The one-off raw vs processed conv1.weight check can remain here for now
raw_conv1_path_ref = os.path.join(ROOT_DIR, "exported_weights/raw_backbone/conv1.weight.pt")
runner.run_all_tests()
# runner.generate_html_report() # This is called within run_all_tests
# print(f"HTML report generated at {runner.comparison_dir / 'report.html'}") # Old, caused TypeError
print(f"HTML report generated at {os.path.join(runner.comparison_dir, 'report.html')}") # New, fixed TypeError