#!/usr/bin/env python3 import os import torch import numpy as np import glob import matplotlib.pyplot as plt from pathlib import Path import sys import json from tqdm import tqdm import inspect # Add the project root to path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # Import model wrappers from pytracking.features.net_wrappers import DiMPTorchScriptWrapper # For loading AtomIoUNet from source from ltr.models.bbreg.atom_iou_net import AtomIoUNet class ModelComparison: def __init__(self, model_dir='exported_weights', num_samples=1000): self.model_dir_name = model_dir # Store the string name for paths self.num_samples = num_samples self.device = 'cuda' if torch.cuda.is_available() else 'cpu' # Base path for models, inputs, and outputs self.project_root = Path(__file__).resolve().parent.parent self.model_base_dir = self.project_root / self.model_dir_name self.input_dir_base = self.project_root / 'test' / 'input_samples' self.cpp_output_dir_base = self.project_root / 'test' / 'output' # Initialize comparison results self.comparison_dir = self.project_root / 'test' / 'comparison' self.comparison_dir.mkdir(parents=True, exist_ok=True) self.plots_dir = self.comparison_dir / 'plots' self.plots_dir.mkdir(parents=True, exist_ok=True) self.all_errors_stats = {} # Initialize models (TorchScript and from-source) self._init_models() def _init_models(self): """Initialize Python models""" print("Loading Python models...") # Load TorchScript models using _sd arguments for directories of tensors self.models = DiMPTorchScriptWrapper( model_dir=str(self.model_base_dir), # Expects string path device=self.device, backbone_sd='backbone', # Directory name for backbone weights classifier_sd='classifier', # Directory name for classifier weights bbregressor_sd='bb_regressor' # Directory name for bb_regressor weights ) # Initialize BBRegressor from source for get_modulation fallback self.bb_regressor_from_source = AtomIoUNet( input_dim=(512, 1024), pred_input_dim=(256, 256), pred_inter_dim=(256, 256) ) ModelComparison.load_weights_for_custom_model( self.bb_regressor_from_source, 'bb_regressor', # model_name for path and doc file self.model_base_dir, self.device ) self.bb_regressor_from_source.eval().to(self.device) print("Python models loaded.") def compare_classifier(self): """Compare classifier model outputs between Python and C++""" print("\nComparing classifier outputs...") # Ensure paths are Path objects for consistency if not already input_dir_path = Path('test') / 'input_samples' / 'classifier' cpp_output_dir_path = Path('test') / 'output' / 'classifier' if not input_dir_path.exists() or not cpp_output_dir_path.exists(): print(f"Classifier input or C++ output directory not found ({input_dir_path}, {cpp_output_dir_path}). Skipping.") return # Removed: train_errors = [] # Removed: test_errors = [] # self.all_errors_stats is initialized per test run. # Compare training samples print("\nClassifier - Comparing Training Samples...") for i in tqdm(range(self.num_samples), desc="Training samples"): current_errors = {} # For this sample sample_dir = input_dir_path / f'sample_{i}' cpp_out_sample_dir = cpp_output_dir_path / f'sample_{i}' py_clf_feat = None cpp_clf_feat = None if not sample_dir.exists() or not cpp_out_sample_dir.exists(): print(f"Warning: Skipping classifier train sample {i}, files not found at {sample_dir} or {cpp_out_sample_dir}.") # No explicit error assignment here; _compare_tensor_data will handle Nones else: feat_path = sample_dir / 'backbone_feat.pt' feat = self.load_cpp_tensor(feat_path, self.device) if feat is None: print(f"Critical: Failed to load input tensor for {feat_path} for classifier train sample {i}.") # feat is None, py_clf_feat will remain None else: try: with torch.no_grad(): py_clf_feat = self.models.classifier.extract_classification_feat(feat) except Exception as e: print(f"ERROR: Python model extract_classification_feat (train) failed for sample {i}: {e}") # py_clf_feat remains None cpp_clf_feat_path = cpp_out_sample_dir / 'clf_features.pt' cpp_clf_feat = self.load_cpp_tensor(cpp_clf_feat_path, self.device) if cpp_clf_feat is None: print(f"Warning: Failed to load C++ output tensor {cpp_clf_feat_path} for classifier train sample {i}.") # cpp_clf_feat remains None self._compare_tensor_data(py_clf_feat, cpp_clf_feat, "Classifier Features Train", i, current_errors) if current_errors: self.all_errors_stats[f"Clf_Train_Sample_{i}"] = current_errors # Compare test samples print("\nClassifier - Comparing Test Samples...") for i in tqdm(range(self.num_samples), desc="Test samples"): current_errors = {} # For this sample test_sample_input_dir = input_dir_path / f'test_{i}' cpp_test_out_sample_dir = cpp_output_dir_path / f'test_{i}' py_clf_feat_test = None cpp_clf_feat_test = None if not test_sample_input_dir.exists() or not cpp_test_out_sample_dir.exists(): print(f"Warning: Skipping classifier test sample {i}, files not found at {test_sample_input_dir} or {cpp_test_out_sample_dir}.") # No explicit error assignment here else: test_feat_path = test_sample_input_dir / 'test_feat.pt' test_feat = self.load_cpp_tensor(test_feat_path, self.device) if test_feat is None: print(f"Critical: Failed to load input tensor for {test_feat_path} for classifier test sample {i}.") # test_feat is None, py_clf_feat_test remains None else: try: with torch.no_grad(): py_clf_feat_test = self.models.classifier.extract_classification_feat(test_feat) except Exception as e: print(f"ERROR: Python model extract_classification_feat (test) failed for sample {i}: {e}") # py_clf_feat_test remains None cpp_clf_feat_test_path = cpp_test_out_sample_dir / 'clf_feat_test.pt' cpp_clf_feat_test = self.load_cpp_tensor(cpp_clf_feat_test_path, self.device) if cpp_clf_feat_test is None: print(f"Warning: Failed to load C++ output tensor {cpp_clf_feat_test_path} for classifier test sample {i}.") # cpp_clf_feat_test remains None self._compare_tensor_data(py_clf_feat_test, cpp_clf_feat_test, "Classifier Features Test", i, current_errors) if current_errors: self.all_errors_stats[f"Clf_Test_Sample_{i}"] = current_errors # Old stats and plotting code removed/commented below, now handled by HTML report # print("\nClassifier Comparison Statistics:") # if train_errors: # print(f" Training Features MAE: Mean={np.mean(train_errors):.4e}, Std={np.std(train_errors):.4e}") # if test_errors: # print(f" Test Features MAE: Mean={np.mean(test_errors):.4e}, Std={np.std(test_errors):.4e}") # self._generate_stats_and_plots(train_errors, "Classifier Training Features Error", self.plots_dir / "clf_train_feat_error_hist.png") # self._generate_stats_and_plots(test_errors, "Classifier Test Features Error", self.plots_dir / "clf_test_feat_error_hist.png") def compare_bb_regressor(self): """Compare bb_regressor model outputs between Python and C++""" print("\nComparing bb_regressor outputs...") input_dir_path = self.input_dir_base / 'bb_regressor' cpp_output_dir_path = self.cpp_output_dir_base / 'bb_regressor' if not input_dir_path.exists() or not cpp_output_dir_path.exists(): print(f"BB Regressor input or C++ output directory not found ({input_dir_path}, {cpp_output_dir_path}). Skipping BB Regressor comparison.") # Populate NaN for all expected BB Regressor comparisons if dirs are missing for i in range(self.num_samples): sample_key_base = f"BBReg_Sample_{i}" current_errors = {} self._compare_tensor_data(None, None, "BBReg PyIoUFeat0 vs CppIoUFeat0", i, current_errors) self._compare_tensor_data(None, None, "BBReg PyIoUFeat1 vs CppIoUFeat1", i, current_errors) self._compare_tensor_data(None, None, "BBReg PyMod0 vs CppMod0", i, current_errors) self._compare_tensor_data(None, None, "BBReg PyMod1 vs CppMod1", i, current_errors) self._compare_tensor_data(None, None, "BBReg IoUPred", i, current_errors) self.all_errors_stats[sample_key_base] = current_errors return for i in tqdm(range(self.num_samples), desc="BB Regressor samples"): sample_dir = input_dir_path / f'sample_{i}' cpp_out_sample_dir = cpp_output_dir_path / f'sample_{i}' py_feat_layer2, py_feat_layer3, py_init_bbox, py_proposals = None, None, None, None if not sample_dir.exists() or not cpp_out_sample_dir.exists(): print(f"Warning: Skipping BB Regressor sample {i}, files not found at {sample_dir} or {cpp_out_sample_dir}.") else: py_feat_layer2 = self.load_cpp_tensor(sample_dir / 'feat_layer2.pt', self.device) py_feat_layer3 = self.load_cpp_tensor(sample_dir / 'feat_layer3.pt', self.device) py_init_bbox = self.load_cpp_tensor(sample_dir / 'init_bbox.pt', self.device) py_proposals = self.load_cpp_tensor(sample_dir / 'proposals.pt', self.device) # Get Python IoU features (these come directly from backbone, so should be reliable) py_iou_feat = None if py_feat_layer2 is not None and py_feat_layer3 is not None: # Use from-source get_iou_feat for consistent 256-channel features try: py_iou_feat = self.bb_regressor_from_source.get_iou_feat([py_feat_layer2, py_feat_layer3]) if isinstance(py_iou_feat, tuple): py_iou_feat = list(py_iou_feat) print(f"Sample {i}: Successfully used from-source get_iou_feat.") # py_iou_feat will be a list of two tensors, e.g., [B, 256, H1, W1], [B, 256, H2, W2] except Exception as e_iou_source: print(f"Sample {i}: From-source get_iou_feat failed: {e_iou_source}") py_iou_feat = None # Ensure it's None if failed # Get Python modulation vectors py_modulation = None if py_feat_layer2 is not None and py_feat_layer3 is not None and py_init_bbox is not None: py_features_list = [py_feat_layer2, py_feat_layer3] # Pass as a list # Squeeze py_init_bbox if it's [B, 1, 4] to [B, 4] squeezed_init_bbox = py_init_bbox if py_init_bbox.ndim == 3 and py_init_bbox.shape[0] > 0 and py_init_bbox.shape[1] == 1: squeezed_init_bbox = py_init_bbox.squeeze(1) elif py_init_bbox.ndim == 2: pass # Already [B,4] or similar, assume correct else: print(f"Sample {i}: Warning - py_init_bbox has unexpected shape {py_init_bbox.shape}. Expecting 2D or 3D [B,1,4].") try: # Try TorchScript model first py_modulation = self.models.bb_regressor.get_modulation(py_features_list, squeezed_init_bbox) if isinstance(py_modulation, tuple): py_modulation = list(py_modulation) print(f"Sample {i}: Successfully used TorchScript get_modulation.") except Exception as e_ts: print(f"Sample {i}: TorchScript get_modulation failed: {e_ts}. Trying from-source model.") try: py_modulation = self.bb_regressor_from_source.get_modulation(py_features_list, squeezed_init_bbox) if isinstance(py_modulation, tuple): py_modulation = list(py_modulation) print(f"Sample {i}: Successfully used from-source get_modulation.") except Exception as e_source: print(f"Sample {i}: From-source get_modulation also failed: {e_source}") py_modulation = None if py_modulation: print(f"Sample {i}: py_iou_feat[0] shape: {py_iou_feat[0].shape if py_iou_feat and len(py_iou_feat) > 0 else 'N/A'}, py_modulation[0] shape: {py_modulation[0].shape if py_modulation and len(py_modulation) > 0 else 'N/A'}") print(f"Sample {i}: py_iou_feat[1] shape: {py_iou_feat[1].shape if py_iou_feat and len(py_iou_feat) > 1 else 'N/A'}, py_modulation[1] shape: {py_modulation[1].shape if py_modulation and len(py_modulation) > 1 else 'N/A'}") # Run Python bb_regressor's predict_iou (from TorchScript model) py_iou_pred = None if py_modulation is not None and py_iou_feat is not None and py_proposals is not None: # Ensure modulation vectors are 2D [B, C] for predict_iou py_mod_0_squeezed = py_modulation[0].squeeze(-1).squeeze(-1) if py_modulation[0].ndim == 4 else py_modulation[0] py_mod_1_squeezed = py_modulation[1].squeeze(-1).squeeze(-1) if py_modulation[1].ndim == 4 else py_modulation[1] py_modulation_squeezed_for_pred = [py_mod_0_squeezed, py_mod_1_squeezed] try: # Try TorchScript predict_iou first print(f"Sample {i}: Attempting TorchScript predict_iou with mod_0: {py_modulation[0].shape}, iou_feat_0: {py_iou_feat[0].shape}") # Ensure inputs for predict_iou are 2D for modulation and 4D for iou_feat mod_0_squeezed = py_modulation[0].squeeze(-1).squeeze(-1) if py_modulation[0].dim() == 4 else py_modulation[0] mod_1_squeezed = py_modulation[1].squeeze(-1).squeeze(-1) if py_modulation[1].dim() == 4 else py_modulation[1] # --- BEGIN PYTHON DETAILED LOGGING --- print(f" DEBUG PYTHON: py_iou_feat[0] (first 5 rows of first batch) sample values:\n{py_iou_feat[0][0, :, :5, :5] if py_iou_feat[0].numel() > 0 else 'N/A'}") # Print a slice of the feature map print(f" DEBUG PYTHON: py_iou_feat[1] (first 5 rows of first batch) sample values:\n{py_iou_feat[1][0, :, :5, :5] if py_iou_feat[1].numel() > 0 else 'N/A'}") print(f" DEBUG PYTHON: mod_0_squeezed sample values:\n{mod_0_squeezed[0, :5] if mod_0_squeezed.numel() > 0 else 'N/A'}") print(f" DEBUG PYTHON: mod_1_squeezed sample values:\n{mod_1_squeezed[0, :5] if mod_1_squeezed.numel() > 0 else 'N/A'}") # Construct ioufeat equivalent for Python to log before predict_iou # This mimics the C++: mod_feat1 = mod_target_0 * mod1_repeated_for_proposals; mod_feat2 = mod_target_1 * mod2_repeated_for_proposals; ioufeat = torch.cat({mod_feat1, mod_feat2}, 1); # We need py_pooled_feat1, py_pooled_feat2 from PrRoIPool like in C++ before fc3_rt and fc4_rt # This part is tricky as Python model does it internally in predict_iou. # For now, let's log the direct inputs to predict_iou and its weights. # Log weights of the iou_predictor in the TorchScript model ts_iou_predictor_weight = self.models.bb_regressor.iou_predictor.weight.data ts_iou_predictor_bias = self.models.bb_regressor.iou_predictor.bias.data print(f" DEBUG PYTHON: TorchScript iou_predictor.weight:\n{ts_iou_predictor_weight}") print(f" DEBUG PYTHON: TorchScript iou_predictor.bias:\n{ts_iou_predictor_bias}") # --- END PYTHON DETAILED LOGGING --- py_iou_pred = self.models.bb_regressor.predict_iou( py_modulation_squeezed_for_pred, py_iou_feat, # These are now from source model (256ch) py_proposals ) print(f"Sample {i}: Successfully used TorchScript predict_iou.") except Exception as e_ts_pred: print(f"WARNING: Python model self.models.bb_regressor.predict_iou failed for sample {i}: {e_ts_pred}") print(f"Sample {i}: Falling back to from-source predict_iou.") try: py_iou_pred = self.bb_regressor_from_source.predict_iou( py_modulation_squeezed_for_pred, py_iou_feat, py_proposals ) print(f"Sample {i}: Successfully used from-source predict_iou.") except Exception as e_source_pred: print(f"ERROR: Python model self.bb_regressor_from_source.predict_iou also failed for sample {i}: {e_source_pred}") py_iou_pred = None # Ensure it's None if fallback also failed # Load C++ outputs cpp_iou_pred, cpp_mod_0, cpp_mod_1, cpp_iou_feat_0, cpp_iou_feat_1 = None, None, None, None, None if cpp_out_sample_dir.exists(): cpp_iou_pred = self.load_cpp_tensor(cpp_output_dir_path / f"sample_{i}" / 'iou_pred.pt', self.device) cpp_mod_0 = self.load_cpp_tensor(cpp_output_dir_path / f"sample_{i}" / 'modulation_0.pt', self.device) cpp_mod_1 = self.load_cpp_tensor(cpp_output_dir_path / f"sample_{i}" / 'modulation_1.pt', self.device) cpp_iou_feat_0 = self.load_cpp_tensor(cpp_output_dir_path / f"sample_{i}" / 'iou_feat_0.pt', self.device) cpp_iou_feat_1 = self.load_cpp_tensor(cpp_output_dir_path / f"sample_{i}" / 'iou_feat_1.pt', self.device) current_errors = {} py_iou_f0 = py_iou_feat[0] if py_iou_feat and len(py_iou_feat) > 0 else None py_iou_f1 = py_iou_feat[1] if py_iou_feat and len(py_iou_feat) > 1 else None self._compare_tensor_data(py_iou_f0, cpp_iou_feat_0, "BBReg PyIoUFeat0 vs CppIoUFeat0", i, current_errors) self._compare_tensor_data(py_iou_f1, cpp_iou_feat_1, "BBReg PyIoUFeat1 vs CppIoUFeat1", i, current_errors) py_mod_0 = py_modulation[0] if py_modulation and len(py_modulation) > 0 else None py_mod_1 = py_modulation[1] if py_modulation and len(py_modulation) > 1 else None self._compare_tensor_data(py_mod_0, cpp_mod_0, "BBReg PyMod0 vs CppMod0", i, current_errors) self._compare_tensor_data(py_mod_1, cpp_mod_1, "BBReg PyMod1 vs CppMod1", i, current_errors) self._compare_tensor_data(py_iou_pred, cpp_iou_pred, "BBReg IoUPred", i, current_errors) self.all_errors_stats[f"BBReg_Sample_{i}"] = current_errors def generate_html_report(self): print("\nGenerating HTML report...") report_path = self.comparison_dir / "report.html" # plot_paths_dict = {} # This variable was unused # Prepare data for the report: group by model and comparison type report_data = { # "Model_Type Component_Name": { \ # "samples": {0: {\"mae\":X, \"max_err\":Y, \"mean_py\":Z, \"std_err\":S, \"plot_path\":\"...\"}, 1:{...} },\n # "overall_mae_mean": A, "overall_mae_std": B, "overall_max_err_mean": C\n # }\n } } for sample_key, comparisons in self.all_errors_stats.items(): # sample_key examples: "Clf_Train_Sample_0", "Clf_Test_Sample_0", "BBReg_Sample_0" parts = sample_key.split("_") model_prefix = parts[0] # Clf, BBReg sample_type_str = "" sample_idx = -1 if model_prefix == "Clf": sample_type_str = parts[1] # Train or Test sample_idx = int(parts[-1]) model_name_key = f"Classifier {sample_type_str}" elif model_prefix == "BBReg": sample_idx = int(parts[-1]) model_name_key = "BB Regressor" else: print(f"WARNING: Unknown sample key format in all_errors_stats: {sample_key}") continue for comparison_name, stats in comparisons.items(): # comparison_name examples: "Classifier Features Train", "BBReg PyIoUFeat0 vs CppIoUFeat0" # Unpack all 11 metrics now mae, max_err, diff_arr, mean_py_val, std_abs_err, \ l2_py, l2_cpp, l2_diff, cos_sim, pearson, mre = stats full_comparison_key = f"{model_name_key} - {comparison_name}" if full_comparison_key not in report_data: report_data[full_comparison_key] = { "samples": {}, "all_maes": [], "all_max_errs": [], "all_mean_py_vals": [], "all_std_abs_errs": [], # Renamed from all_std_errs "all_l2_py_vals": [], "all_l2_cpp_vals": [], "all_l2_diff_vals": [], "all_cos_sim_vals": [], "all_pearson_vals": [], "all_mre_vals": [] } plot_filename = None if diff_arr is not None and len(diff_arr) > 0 and not np.all(np.isnan(diff_arr)): plot_filename = f"{model_prefix}_{sample_type_str}_sample{sample_idx}_{comparison_name.replace(' ', '_').replace('/', '_')}_hist.png" plot_abs_path = self.plots_dir / plot_filename # Pass std_abs_err to plotting function self._generate_single_plot(diff_arr, comparison_name, plot_abs_path, mean_py_val, std_abs_err, mae, max_err) report_data[full_comparison_key]["samples"][sample_idx] = { "mae": mae, "max_err": max_err, "mean_py_val": mean_py_val, "std_abs_err": std_abs_err, # Renamed from std_err "l2_py": l2_py, "l2_cpp": l2_cpp, "l2_diff": l2_diff, "cos_sim": cos_sim, "pearson": pearson, "mre": mre, "plot_path": plot_filename # Store relative path for HTML } if not np.isnan(mae): report_data[full_comparison_key]["all_maes"].append(mae) if not np.isnan(max_err): report_data[full_comparison_key]["all_max_errs"].append(max_err) if not np.isnan(mean_py_val): report_data[full_comparison_key]["all_mean_py_vals"].append(mean_py_val) if not np.isnan(std_abs_err): report_data[full_comparison_key]["all_std_abs_errs"].append(std_abs_err) if not np.isnan(l2_py): report_data[full_comparison_key]["all_l2_py_vals"].append(l2_py) if not np.isnan(l2_cpp): report_data[full_comparison_key]["all_l2_cpp_vals"].append(l2_cpp) if not np.isnan(l2_diff): report_data[full_comparison_key]["all_l2_diff_vals"].append(l2_diff) if not np.isnan(cos_sim): report_data[full_comparison_key]["all_cos_sim_vals"].append(cos_sim) if not np.isnan(pearson): report_data[full_comparison_key]["all_pearson_vals"].append(pearson) if not np.isnan(mre): report_data[full_comparison_key]["all_mre_vals"].append(mre) # Calculate overall stats for comp_key, data in report_data.items(): data["overall_mae_mean"] = np.mean(data["all_maes"]) if data["all_maes"] else float('nan') data["overall_mae_std"] = np.std(data["all_maes"]) if data["all_maes"] else float('nan') data["overall_max_err_mean"] = np.mean(data["all_max_errs"]) if data["all_max_errs"] else float('nan') data["overall_mean_py_val_mean"] = np.mean(data["all_mean_py_vals"]) if data["all_mean_py_vals"] else float('nan') data["overall_std_abs_err_mean"] = np.mean(data["all_std_abs_errs"]) if data["all_std_abs_errs"] else float('nan') # Renamed data["overall_l2_py_mean"] = np.mean(data["all_l2_py_vals"]) if data["all_l2_py_vals"] else float('nan') data["overall_l2_cpp_mean"] = np.mean(data["all_l2_cpp_vals"]) if data["all_l2_cpp_vals"] else float('nan') data["overall_l2_diff_mean"] = np.mean(data["all_l2_diff_vals"]) if data["all_l2_diff_vals"] else float('nan') data["overall_cos_sim_mean"] = np.mean(data["all_cos_sim_vals"]) if data["all_cos_sim_vals"] else float('nan') data["overall_pearson_mean"] = np.mean(data["all_pearson_vals"]) if data["all_pearson_vals"] else float('nan') data["overall_mre_mean"] = np.mean(data["all_mre_vals"]) if data["all_mre_vals"] else float('nan') # HTML Generation html_content = """ Model Comparison Report

Model Comparison Report

Number of samples per model component: {self.num_samples}

Understanding the Metrics:

Mean MAE (Mean Absolute Error)
Calculation: Average of the absolute differences between corresponding elements of the Python and C++ tensors (mean(abs(py - cpp))). The "Mean MAE" in the summary table is the average of these MAEs over all samples for a given comparison.
Range & Interpretation: 0 to ∞. Closer to 0 indicates better agreement. This metric shows the average magnitude of error.
Std MAE (Standard Deviation of MAE)
Calculation: Standard deviation of the MAE values calculated for each sample within a comparison group.
Range & Interpretation: 0 to ∞. A smaller value indicates that the MAE is consistent across samples. A larger value suggests variability in agreement from sample to sample.
Mean Max Error
Calculation: Average of the maximum absolute differences found between Python and C++ tensors for each sample (mean(max(abs(py - cpp))) over samples).
Range & Interpretation: 0 to ∞. Closer to 0 is better. Indicates the average of the worst-case discrepancies per sample.
Mean Py Val (Mean Python Tensor Value)
Calculation: Average of the mean values of the Python reference tensors over all samples (mean(mean(py_tensor_sample_N))).
Range & Interpretation: Problem-dependent. Provides context about the typical magnitude of the Python model's output values.
Mean Std Abs Err (Mean Standard Deviation of Absolute Errors)
Calculation: Average of the standard deviations of the absolute error arrays (abs(py - cpp)) for each sample. The "Err Std" in plot titles is this value for that specific sample.
Range & Interpretation: 0 to ∞. A smaller value indicates that the errors are concentrated around their mean (MAE), implying less spread in error magnitudes within a sample.
Mean L2 Py (Mean L2 Norm of Python Tensor)
Calculation: Average of the L2 norms (Euclidean norm) of the flattened Python tensors over all samples.
Range & Interpretation: 0 to ∞. Represents the average magnitude or "length" of the Python output vectors.
Mean L2 Cpp (Mean L2 Norm of C++ Tensor)
Calculation: Average of the L2 norms of the flattened C++ tensors over all samples.
Range & Interpretation: 0 to ∞. Represents the average magnitude of the C++ output vectors. Should be comparable to Mean L2 Py if models agree in scale.
Mean L2 Diff (Mean L2 Norm of Difference)
Calculation: Average of the L2 norms of the flattened difference tensors (py - cpp) over all samples.
Range & Interpretation: 0 to ∞. Closer to 0 indicates better agreement. This is the magnitude of the average difference vector.
Mean Cosine Sim (Mean Cosine Similarity)
Calculation: Average of the cosine similarities between the flattened Python and C++ tensors over all samples. Cosine similarity is dot(py, cpp) / (norm(py) * norm(cpp)).
Range & Interpretation: -1 to 1 (typically 0 to 1 for non-negative features). Closer to 1 indicates that the tensors point in the same direction (high similarity in terms of orientation, ignoring magnitude). Values near 0 suggest orthogonality, and near -1 suggest opposite directions.
Mean Pearson Corr (Mean Pearson Correlation Coefficient)
Calculation: Average of the Pearson correlation coefficients between the flattened Python and C++ tensors over all samples. Measures linear correlation.
Range & Interpretation: -1 to 1. Closer to 1 indicates strong positive linear correlation. Closer to -1 indicates strong negative linear correlation. Closer to 0 indicates weak or no linear correlation.
Mean MRE (Mean Relative Error)
Calculation: Average of the mean relative errors per sample, where relative error is mean(abs(py - cpp) / (abs(py) + epsilon)). Epsilon is a small value to prevent division by zero.
Range & Interpretation: 0 to ∞. Closer to 0 is better. This metric normalizes the absolute error by the magnitude of the Python reference values, useful for understanding error relative to signal strength.
""" sorted_report_keys = sorted(report_data.keys()) html_content += "

Overall Comparison Statistics

" for comp_key in sorted_report_keys: data = report_data[comp_key] html_content += f""" """ html_content += "
Comparison KeyMean MAEStd MAEMean Max ErrorMean Py ValMean Std Abs ErrMean L2 PyMean L2 CppMean L2 DiffMean Cosine SimMean Pearson CorrMean MRE
{comp_key} {f"{data['overall_mae_mean']:.4e}" if not np.isnan(data['overall_mae_mean']) else 'N/A'} {f"{data['overall_mae_std']:.4e}" if not np.isnan(data['overall_mae_std']) else 'N/A'} {f"{data['overall_max_err_mean']:.4e}" if not np.isnan(data['overall_max_err_mean']) else 'N/A'} {f"{data['overall_mean_py_val_mean']:.4e}" if not np.isnan(data['overall_mean_py_val_mean']) else 'N/A'} {f"{data['overall_std_abs_err_mean']:.4e}" if not np.isnan(data['overall_std_abs_err_mean']) else 'N/A'} {f"{data['overall_l2_py_mean']:.4e}" if not np.isnan(data['overall_l2_py_mean']) else 'N/A'} {f"{data['overall_l2_cpp_mean']:.4e}" if not np.isnan(data['overall_l2_cpp_mean']) else 'N/A'} {f"{data['overall_l2_diff_mean']:.4e}" if not np.isnan(data['overall_l2_diff_mean']) else 'N/A'} {f"{data['overall_cos_sim_mean']:.4f}" if not np.isnan(data['overall_cos_sim_mean']) else 'N/A'} {f"{data['overall_pearson_mean']:.4f}" if not np.isnan(data['overall_pearson_mean']) else 'N/A'} {f"{data['overall_mre_mean']:.4e}" if not np.isnan(data['overall_mre_mean']) else 'N/A'}
" for comp_key in sorted_report_keys: data = report_data[comp_key] html_content += f"

Details for: {comp_key}

" html_content += f"""

Overall Mean MAE: {f'{data["overall_mae_mean"]:.4e}' if not np.isnan(data['overall_mae_mean']) else 'N/A'}

""" html_content += "" for sample_idx in sorted(data["samples"].keys()): sample_data = data["samples"][sample_idx] plot_path_html = f'./plots/{sample_data["plot_path"]}' if sample_data["plot_path"] else "N/A" img_tag = f'Error histogram' if sample_data["plot_path"] else "N/A" html_content += f""" """ html_content += "
Sample IndexMAEMax ErrorMean Py ValStd Abs ErrL2 PyL2 CppL2 DiffCosine SimPearson CorrMREError Distribution Plot
{sample_idx} {f"{sample_data['mae']:.4e}" if not np.isnan(sample_data['mae']) else 'N/A'} {f"{sample_data['max_err']:.4e}" if not np.isnan(sample_data['max_err']) else 'N/A'} {f"{sample_data['mean_py_val']:.4e}" if not np.isnan(sample_data['mean_py_val']) else 'N/A'} {f"{sample_data['std_abs_err']:.4e}" if not np.isnan(sample_data['std_abs_err']) else 'N/A'} {f"{sample_data['l2_py']:.4e}" if not np.isnan(sample_data['l2_py']) else 'N/A'} {f"{sample_data['l2_cpp']:.4e}" if not np.isnan(sample_data['l2_cpp']) else 'N/A'} {f"{sample_data['l2_diff']:.4e}" if not np.isnan(sample_data['l2_diff']) else 'N/A'} {f"{sample_data['cos_sim']:.4f}" if not np.isnan(sample_data['cos_sim']) else 'N/A'} {f"{sample_data['pearson']:.4f}" if not np.isnan(sample_data['pearson']) else 'N/A'} {f"{sample_data['mre']:.4e}" if not np.isnan(sample_data['mre']) else 'N/A'} {img_tag}
" html_content += """ """ with open(report_path, 'w') as f: f.write(html_content) print(f"HTML report generated at {report_path}") def _generate_single_plot(self, error_array, title, plot_path, mean_val, std_abs_err, mae, max_err): if error_array is None or len(error_array) == 0 or np.all(np.isnan(error_array)): # print(f"Skipping plot for {title} as error_array is empty or all NaNs.") return plt.figure(figsize=(8, 6)) plt.hist(error_array, bins=50, color='skyblue', edgecolor='black') stats_text = f"Ref Mean: {mean_val:.3e} | MAE: {mae:.3e} | MaxErr: {max_err:.3e} | Err Std: {std_abs_err:.3e}" plt.title(f"{title}\n{stats_text}", fontsize=10) plt.xlabel("Error Value") plt.ylabel("Frequency") plt.grid(True, linestyle='--', alpha=0.7) try: plt.tight_layout() plt.savefig(plot_path) except Exception as e: print(f"ERROR: Failed to save plot {plot_path}: {e}") plt.close() def run_all_tests(self): self.all_errors_stats = {} # Initialize/clear for the new run self.plots_dir.mkdir(parents=True, exist_ok=True) # Ensure plots_dir exists self.compare_classifier() self.compare_bb_regressor() self.generate_html_report() print("All tests completed!") def load_cpp_tensor(self, path, device): path_str = str(path) # Ensure path is a string try: # Attempt 1: Load as a plain tensor, assuming it's not a TorchScript module. # This is the most common and safest way to load tensors saved from PyTorch (Python or C++). tensor = torch.load(path_str, map_location=device, weights_only=True) # print(f"Successfully loaded tensor from {path_str} with weights_only=True") return tensor except RuntimeError as e_weights_only: # Handle cases where weights_only=True is not appropriate (e.g., TorchScript archives) if "TorchScript archive" in str(e_weights_only) or \ "PytorchStreamReader failed" in str(e_weights_only) or \ "weights_only" in str(e_weights_only): # Broader check for weights_only issues # print(f"weights_only=True failed for {path_str} ({e_weights_only}). Trying weights_only=False.") try: # Attempt 2: Load with weights_only=False. loaded_obj = torch.load(path_str, map_location=device, weights_only=False) if isinstance(loaded_obj, torch.Tensor): # print(f"Successfully loaded tensor from {path_str} with weights_only=False.") return loaded_obj # Check for _actual_script_module for deeply nested tensors elif hasattr(loaded_obj, '_actual_script_module') and hasattr(loaded_obj._actual_script_module, 'forward'): # print(f"Found _actual_script_module in {path_str}, trying its forward().") try: potential_tensor = loaded_obj._actual_script_module.forward() if isinstance(potential_tensor, torch.Tensor): # print(f"Extracted tensor using _actual_script_module.forward() from {path_str}") return potential_tensor except Exception as e_deep_forward: print(f"Warning: Calling _actual_script_module.forward() from {path_str} failed: {e_deep_forward}") # General ScriptModule handling (RecursiveScriptModule or any object with forward) elif isinstance(loaded_obj, torch.jit.RecursiveScriptModule) or hasattr(loaded_obj, 'forward'): # print(f"Loaded a ScriptModule/object with forward from {path_str}. Attempting extraction.") # Attempt 2a: Greedily find the first tensor attribute for attr_name in dir(loaded_obj): if attr_name.startswith('__'): continue try: attr_val = getattr(loaded_obj, attr_name) if isinstance(attr_val, torch.Tensor): # print(f"Extracted tensor from attribute '{attr_name}' of ScriptModule at {path_str}") return attr_val except Exception: pass # Ignore errors from getattr # Attempt 2b: Try calling forward() if it exists and no tensor attribute was found if hasattr(loaded_obj, 'forward') and callable(loaded_obj.forward): sig = inspect.signature(loaded_obj.forward) if not sig.parameters: # Only call if forward() takes no arguments try: potential_tensor = loaded_obj.forward() if isinstance(potential_tensor, torch.Tensor): # print(f"Extracted tensor using forward() from ScriptModule at {path_str}") return potential_tensor except Exception as e_forward: print(f"Warning: Calling forward() on ScriptModule from {path_str} failed: {e_forward}") # Attempt 2c: Check state_dict try: sd = loaded_obj.state_dict() # print(f"DEBUG: state_dict for {path_str}: {list(sd.keys())}") if len(sd) == 1: tensor_name = list(sd.keys())[0] potential_tensor = sd[tensor_name] if isinstance(potential_tensor, torch.Tensor): print(f"INFO: Extracted tensor '{tensor_name}' from single-entry state_dict of ScriptModule at {path_str}") return potential_tensor elif len(sd) > 1: # If multiple tensors, this is heuristic. Prefer known/simple names if possible. # For now, just take the first one if it's a tensor. for tensor_name, potential_tensor in sd.items(): if isinstance(potential_tensor, torch.Tensor): print(f"INFO: Extracted tensor '{tensor_name}' (from multiple) from state_dict of ScriptModule at {path_str}") return potential_tensor print(f"Warning: ScriptModule at {path_str} has multiple state_dict entries: {list(sd.keys())} but none were straightforwardly returned as the primary tensor.") # else: state_dict is empty, or no tensors found above except Exception as e_sd: print(f"Warning: Error accessing/processing state_dict for ScriptModule at {path_str}: {e_sd}") print(f"ERROR: Could not extract tensor from ScriptModule at {path_str} after trying attributes, forward(), and state_dict(). Dir: {dir(loaded_obj)}") return None else: print(f"ERROR: Loaded object from {path_str} (with weights_only=False) is not a Tensor or recognized ScriptModule. Type: {type(loaded_obj)}.") return None except Exception as e_load_false: print(f"ERROR: weights_only=False also failed for {path_str}. Last error: {e_load_false}") return None else: # Some other error with weights_only=True print(f"ERROR: Loading tensor from {path_str} with weights_only=True failed with an unexpected error: {e_weights_only}") return None except Exception as e_generic: print(f"ERROR: A generic error occurred while loading tensor from {path_str}: {e_generic}") return None def _compare_tensor_data(self, tensor1, tensor2, name, sample_idx, current_errors): """Compare two tensors and return error metrics.""" num_metrics = 11 # mae, max_err, diff_arr, mean_py_val, std_abs_err, l2_py, l2_cpp, l2_diff, cos_sim, pearson, mre nan_metrics_tuple = ( float('nan'), float('nan'), [], float('nan'), float('nan'), # Original 5 float('nan'), float('nan'), float('nan'), float('nan'), float('nan'), float('nan') # New 6 ) if tensor1 is None or tensor2 is None: py_mean = float('nan') py_l2 = float('nan') if tensor1 is not None: # Python tensor exists t1_cpu_temp = tensor1.cpu().detach().numpy().astype(np.float32) py_mean = np.mean(t1_cpu_temp) py_l2 = np.linalg.norm(t1_cpu_temp.flatten()) # If only tensor2 is None, we can't calculate C++ l2 or comparison metrics # If only tensor1 is None, py_mean and py_l2 remain NaN. current_errors[name] = ( float('nan'), float('nan'), [], py_mean, float('nan'), py_l2, float('nan'), float('nan'), float('nan'), float('nan'), float('nan') ) print(f"Warning: Cannot compare '{name}' for sample {sample_idx}, one or both tensors are None.") return t1_cpu = tensor1.cpu().detach().numpy().astype(np.float32) t2_cpu = tensor2.cpu().detach().numpy().astype(np.float32) if t1_cpu.shape != t2_cpu.shape: print(f"Warning: Shape mismatch for '{name}' sample {sample_idx}. Py: {t1_cpu.shape}, Cpp: {t2_cpu.shape}. Skipping most comparisons.") current_errors[name] = ( float('nan'), float('nan'), [], np.mean(t1_cpu), float('nan'), # MAE, MaxErr, diff_arr, MeanPy, StdAbsErr np.linalg.norm(t1_cpu.flatten()), np.linalg.norm(t2_cpu.flatten()), float('nan'), # L2Py, L2Cpp, L2Diff float('nan'), float('nan'), float('nan') # CosSim, Pearson, MRE ) return # All calculations from here assume shapes match and tensors are not None t1_flat = t1_cpu.flatten() t2_flat = t2_cpu.flatten() abs_diff_elements = np.abs(t1_cpu - t2_cpu) mae = np.mean(abs_diff_elements) max_err = np.max(abs_diff_elements) diff_arr_for_hist = abs_diff_elements.flatten() # For histogram mean_py_val = np.mean(t1_cpu) std_abs_err = np.std(diff_arr_for_hist) l2_norm_py = np.linalg.norm(t1_flat) l2_norm_cpp = np.linalg.norm(t2_flat) l2_norm_diff = np.linalg.norm(t1_flat - t2_flat) # Cosine Similarity dot_product = np.dot(t1_flat, t2_flat) if l2_norm_py == 0 or l2_norm_cpp == 0: cosine_sim = float('nan') else: cosine_sim = dot_product / (l2_norm_py * l2_norm_cpp) # Pearson Correlation Coefficient if len(t1_flat) < 2: pearson_corr = float('nan') else: std_t1 = np.std(t1_flat) std_t2 = np.std(t2_flat) if std_t1 == 0 or std_t2 == 0: # If either is constant if std_t1 == 0 and std_t2 == 0 and np.allclose(t1_flat, t2_flat): pearson_corr = 1.0 # Both constant and identical else: pearson_corr = float('nan') # Otherwise, undefined or not meaningfully 1 else: try: corr_matrix = np.corrcoef(t1_flat, t2_flat) if corr_matrix.ndim == 2: pearson_corr = corr_matrix[0, 1] else: # Should be a scalar if inputs were effectively constant, already handled by std checks pearson_corr = float(corr_matrix) if np.isscalar(corr_matrix) else float('nan') except Exception: pearson_corr = float('nan') # Mean Relative Error (MRE) epsilon_rel_err = 1e-9 # Small epsilon to avoid division by zero and extreme values # Calculate relative error where abs(t1_cpu) is not zero (or very small) # For elements where t1_cpu is zero (or very small): # - If t2_cpu is also zero (small), error is small. # - If t2_cpu is not zero, relative error is infinite/large. # Using (abs(t1_cpu) + epsilon) in denominator handles this. mean_rel_err = np.mean(abs_diff_elements / (np.abs(t1_cpu) + epsilon_rel_err)) current_errors[name] = ( mae, max_err, diff_arr_for_hist, mean_py_val, std_abs_err, l2_norm_py, l2_norm_cpp, l2_norm_diff, cosine_sim, pearson_corr, mean_rel_err ) # Optional: print detailed error for specific high-error cases # if mae > 1e-4: # print(f"High MAE for {name}, sample {sample_idx}: {mae:.6f}") # The function implicitly returns None as it modifies current_errors in place. # For direct use, if needed, it could return the tuple: # return (mae, max_err, diff_arr_for_hist, mean_py_val, std_abs_err, l2_norm_py, l2_norm_cpp, l2_norm_diff, cosine_sim, pearson_corr, mean_rel_err) @staticmethod def load_weights_for_custom_model(model, model_name, base_model_dir, device): """ Helper to load weights from individual .pt files into a model instance. model: the PyTorch nn.Module instance. model_name: e.g., 'classifier' or 'bb_regressor'. base_model_dir: Path object to the base directory like 'exported_weights'. device: torch device. """ tensor_dir = base_model_dir / model_name doc_file = tensor_dir / f"{model_name}_weights_doc.txt" if not doc_file.exists(): print(f"Warning: Documentation file not found: {doc_file} for {model_name}. Skipping weight loading for source model.") return with open(doc_file, 'r') as f: lines = f.readlines() i = 0 while i < len(lines): line = lines[i] if line.startswith('## '): key = line.strip()[3:] j = i + 1 while j < len(lines) and 'File:' not in lines[j]: j += 1 if j < len(lines) and 'File:' in lines[j]: file_name = lines[j].split('File:')[1].strip() tensor_path = tensor_dir / file_name if tensor_path.exists(): try: tensor_data = torch.load(str(tensor_path), map_location=device) # For .pt files that might be RecursiveScriptModule, try to extract tensor if isinstance(tensor_data, torch.jit.RecursiveScriptModule): if hasattr(tensor_data, 'weight'): tensor = tensor_data.weight elif hasattr(tensor_data, 'bias'): tensor = tensor_data.bias elif len(list(tensor_data.parameters())) > 0: tensor = list(tensor_data.parameters())[0] else: tensor = tensor_data() # Try calling it else: tensor = tensor_data parts = key.split('.') module_to_set = model for part in parts[:-1]: module_to_set = getattr(module_to_set, part) param_name = parts[-1] if hasattr(module_to_set, param_name): if param_name in module_to_set._parameters: module_to_set._parameters[param_name] = torch.nn.Parameter(tensor.to(device)) elif param_name in module_to_set._buffers: module_to_set._buffers[param_name] = tensor.to(device) else: # Direct attribute assignment setattr(module_to_set, param_name, tensor.to(device)) # print(f"Loaded {key} from {file_name} into source {model_name}") else: print(f"Warning: Attribute {key} not found in source model {model_name}.") except Exception as e: print(f"Error loading tensor for {key} from {tensor_path} for source {model_name}: {e}") else: print(f"Warning: Tensor file not found: {tensor_path} for source {model_name}") i = j i += 1 model.eval().to(device) if __name__ == "__main__": # Parse command line arguments import argparse parser = argparse.ArgumentParser(description="Compare Python and C++ model implementations") parser.add_argument("--num-samples", type=int, default=1000, help="Number of test samples (default: 1000)") args = parser.parse_args() # Run comparison comparison = ModelComparison(num_samples=args.num_samples) comparison.run_all_tests()