Source code for dire_rapids.metrics

# metrics.py

"""
Performance metrics for dimensionality reduction evaluation.

This module provides GPU-accelerated metrics using RAPIDS cuML for evaluating
the quality of dimensionality reduction embeddings, including:

- Distortion metrics (stress)
- Context preservation metrics (SVM, kNN classification)
- Topological metrics (persistence homology, Betti curves)

The module supports multiple backends for persistence computation:
- giotto-ph (fastest CPU, multi-threaded)
- ripser++ (GPU-accelerated)
- ripser (CPU fallback)
"""

import warnings
import numpy as np

# CuPy for GPU arrays
try:
    import cupy as cp
    HAS_CUPY = True
except ImportError:
    HAS_CUPY = False
    warnings.warn("CuPy not available. GPU acceleration disabled.", UserWarning)

# RAPIDS cuML imports
try:
    from cuml.neighbors import NearestNeighbors as cumlNearestNeighbors
    from cuml.model_selection import train_test_split as cuml_train_test_split
    from cuml.preprocessing import StandardScaler as cumlStandardScaler
    from cuml.svm import SVC as cumlSVC
    from cuml.neighbors import KNeighborsClassifier as cumlKNeighborsClassifier
    HAS_CUML = True
except ImportError:
    HAS_CUML = False
    warnings.warn(
        "cuML not available. Falling back to CPU-based scikit-learn. "
        "Install RAPIDS for GPU acceleration.",
        UserWarning
    )
    from sklearn.neighbors import NearestNeighbors
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.svm import LinearSVC
    from sklearn.neighbors import KNeighborsClassifier

# Persistence backends
_PERSISTENCE_BACKEND = None

try:
    from gph import ripser_parallel
    HAS_GIOTTO_PH = True
except ImportError:
    HAS_GIOTTO_PH = False

try:
    import ripserplusplus as rpp
    HAS_RIPSER_PP = True
except ImportError:
    HAS_RIPSER_PP = False

try:
    from ripser import ripser
    HAS_RIPSER = True
except ImportError:
    HAS_RIPSER = False

# Additional dependencies for persistence
try:
    from persim import wasserstein, bottleneck
    HAS_PERSIM = True
except ImportError:
    HAS_PERSIM = False
    warnings.warn("persim not available. Wasserstein and bottleneck distances will not be available.", UserWarning)

try:
    import ot
    HAS_OT = True
except ImportError:
    HAS_OT = False
    warnings.warn("POT (Python Optimal Transport) not available. EMD distance will not be available.", UserWarning)

try:
    from fastdtw import fastdtw
    HAS_DTW = True
except ImportError:
    HAS_DTW = False
    warnings.warn("fastdtw not available. DTW distance will not be available.", UserWarning)

try:
    from twed import twed
    HAS_TWED = True
except ImportError:
    HAS_TWED = False
    warnings.warn("twed not available. TWED distance will not be available.", UserWarning)


#
# Persistence backend management
#


[docs] def get_available_persistence_backends(): """ Get list of available persistence computation backends. Returns ------- dict : Dictionary mapping backend names to availability status """ return { 'fast': True, # Always available, uses graph-based H0/H1 only 'giotto-ph': HAS_GIOTTO_PH, 'ripser++': HAS_RIPSER_PP, 'ripser': HAS_RIPSER }
[docs] def set_persistence_backend(backend): """ Set the persistence computation backend. Parameters ---------- backend : str or None Backend to use: 'giotto-ph', 'ripser++', 'ripser', or None for auto-selection Raises ------ ValueError If specified backend is not available """ global _PERSISTENCE_BACKEND if backend is None: _PERSISTENCE_BACKEND = None return available = get_available_persistence_backends() if backend not in available: raise ValueError( f"Unknown backend '{backend}'. Available: {list(available.keys())}" ) if not available[backend]: raise ValueError( f"Backend '{backend}' is not available. Install the required package." ) _PERSISTENCE_BACKEND = backend
[docs] def get_persistence_backend(): """ Get the current persistence backend (with auto-selection if None). Returns ------- str : Name of the selected backend Raises ------ RuntimeError If no persistence backend is available """ global _PERSISTENCE_BACKEND # pylint: disable=global-variable-not-assigned if _PERSISTENCE_BACKEND is not None: return _PERSISTENCE_BACKEND # Auto-select: giotto-ph > ripser++ > ripser if HAS_GIOTTO_PH: return 'giotto-ph' if HAS_RIPSER_PP: return 'ripser++' if HAS_RIPSER: return 'ripser' raise RuntimeError( "No persistence backend available. " "Install giotto-ph (recommended), ripserplusplus, or ripser." )
# # Auxiliary functions #
[docs] def welford_update_gpu(count, mean, M2, new_value, finite_threshold=1e12): """ GPU-accelerated Welford's algorithm update step. Parameters ---------- count : cupy.ndarray Running count of valid values mean : cupy.ndarray Running mean M2 : cupy.ndarray Running sum of squared differences new_value : cupy.ndarray New values to incorporate finite_threshold : float Maximum magnitude for inclusion Returns ------- tuple : Updated (count, mean, M2) """ is_finite = cp.isfinite(new_value) & (cp.abs(new_value) < finite_threshold) count = count + is_finite delta = new_value - mean mean = mean + cp.where(is_finite, delta / cp.maximum(count, 1), 0) delta2 = new_value - mean M2 = M2 + cp.where(is_finite, delta * delta2, 0) return count, mean, M2
[docs] def welford_finalize_gpu(count, mean, M2): """ Finalize Welford's algorithm to compute mean and std. Parameters ---------- count : cupy.ndarray Total count of valid values mean : cupy.ndarray Computed mean M2 : cupy.ndarray Sum of squared differences Returns ------- tuple : (mean, std) """ variance = cp.where(count > 1, M2 / (count - 1), 0.0) return mean, cp.sqrt(variance)
[docs] def welford_gpu(data): """ GPU-accelerated computation of mean and std using Welford's algorithm. Parameters ---------- data : cupy.ndarray Input data Returns ------- tuple : (mean, std) """ if not HAS_CUPY: # Fallback to NumPy data_flat = np.asarray(data).ravel() mean = np.nanmean(data_flat) std = np.nanstd(data_flat) return float(mean), float(std) if isinstance(data, np.ndarray): data = cp.asarray(data) data_flat = data.ravel() count = cp.zeros(1, dtype=cp.int32) mean = cp.zeros(1, dtype=cp.float32) M2 = cp.zeros(1, dtype=cp.float32) # Process in chunks to avoid memory issues chunk_size = 1024 * 1024 for i in range(0, len(data_flat), chunk_size): chunk = data_flat[i:i + chunk_size] for val in chunk: count, mean, M2 = welford_update_gpu(count, mean, M2, val) mean_final, std_final = welford_finalize_gpu(count, mean, M2) return float(mean_final), float(std_final)
[docs] def threshold_subsample_gpu(data, layout, labels=None, threshold=0.5, random_state=42): """ GPU-accelerated Bernoulli subsampling of data. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like, optional Data labels threshold : float Probability of keeping each sample (must be between 0.0 and 1.0) random_state : int Random seed Returns ------- tuple : Subsampled arrays Raises ------ ValueError If threshold is not between 0.0 and 1.0 """ if not 0.0 <= threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {threshold}") if HAS_CUPY: cp.random.seed(random_state) # Convert to CuPy if needed data_gpu = cp.asarray(data) if not isinstance(data, cp.ndarray) else data layout_gpu = cp.asarray(layout) if not isinstance(layout, cp.ndarray) else layout n_samples = data_gpu.shape[0] random_numbers = cp.random.uniform(0, 1, size=n_samples) selected_indices = random_numbers < threshold data_sub = data_gpu[selected_indices] layout_sub = layout_gpu[selected_indices] if labels is not None: labels_gpu = cp.asarray(labels) if not isinstance(labels, cp.ndarray) else labels labels_sub = labels_gpu[selected_indices] return data_sub, layout_sub, labels_sub return data_sub, layout_sub # CPU fallback np.random.seed(random_state) data_np = np.asarray(data) layout_np = np.asarray(layout) n_samples = data_np.shape[0] random_numbers = np.random.uniform(0, 1, size=n_samples) selected_indices = random_numbers < threshold data_sub = data_np[selected_indices] layout_sub = layout_np[selected_indices] if labels is not None: labels_np = np.asarray(labels) labels_sub = labels_np[selected_indices] return data_sub, layout_sub, labels_sub return data_sub, layout_sub
# # kNN graph construction #
[docs] def make_knn_graph_gpu(data, n_neighbors, batch_size=50000): """ GPU-accelerated kNN graph construction using cuML. Parameters ---------- data : array-like Data points (n_samples, n_features) n_neighbors : int Number of nearest neighbors batch_size : int, optional Batch size for querying neighbors (queries in batches against full dataset). Default 50000 balances GPU memory and performance. Set to None to query all at once. Returns ------- tuple : (distances, indices) arrays of shape (n_samples, n_neighbors+1) """ if not HAS_CUML: # Fallback to CPU return make_knn_graph_cpu(data, n_neighbors, batch_size) # Convert to CuPy if needed if HAS_CUPY: data_gpu = cp.asarray(data, dtype=cp.float32) else: data_gpu = np.asarray(data, dtype=np.float32) n_samples = data_gpu.shape[0] # Fit on entire dataset nn = cumlNearestNeighbors(n_neighbors=n_neighbors + 1, metric='euclidean') nn.fit(data_gpu) if batch_size is None or batch_size >= n_samples: # Query all at once distances, indices = nn.kneighbors(data_gpu) else: # Query in batches (each batch queries against full dataset for memory efficiency) if HAS_CUPY: distances = cp.zeros((n_samples, n_neighbors + 1), dtype=cp.float32) indices = cp.zeros((n_samples, n_neighbors + 1), dtype=cp.int32) else: distances = np.zeros((n_samples, n_neighbors + 1), dtype=np.float32) indices = np.zeros((n_samples, n_neighbors + 1), dtype=np.int32) for start_idx in range(0, n_samples, batch_size): end_idx = min(start_idx + batch_size, n_samples) # Query batch against FULL dataset batch_distances, batch_indices = nn.kneighbors(data_gpu[start_idx:end_idx]) distances[start_idx:end_idx] = batch_distances indices[start_idx:end_idx] = batch_indices # Convert to CuPy arrays if not already if HAS_CUPY: distances = cp.asarray(distances) indices = cp.asarray(indices) return distances, indices
[docs] def make_knn_graph_cpu(data, n_neighbors, batch_size=10000): """ CPU fallback for kNN graph construction. Parameters ---------- data : array-like Data points n_neighbors : int Number of nearest neighbors batch_size : int, optional Batch size for querying neighbors (queries in batches against full dataset). Default 10000 provides good balance between memory and performance. Set to None to query all at once. Returns ------- tuple : (distances, indices) arrays """ data_np = np.asarray(data, dtype=np.float32) n_samples = data_np.shape[0] # Fit on entire dataset nn = NearestNeighbors(n_neighbors=n_neighbors + 1, metric='euclidean') nn.fit(data_np) if batch_size is None or batch_size >= n_samples: # Query all at once distances, indices = nn.kneighbors(data_np) else: # Query in batches (each batch queries against full dataset for memory efficiency) distances = np.zeros((n_samples, n_neighbors + 1), dtype=np.float32) indices = np.zeros((n_samples, n_neighbors + 1), dtype=np.int32) for start_idx in range(0, n_samples, batch_size): end_idx = min(start_idx + batch_size, n_samples) # Query batch against FULL dataset batch_distances, batch_indices = nn.kneighbors(data_np[start_idx:end_idx]) distances[start_idx:end_idx] = batch_distances indices[start_idx:end_idx] = batch_indices return distances, indices
# # Distortion metrics (stress) #
[docs] def compute_stress(data, layout, n_neighbors, eps=1e-6, use_gpu=True): """ Compute normalized stress (distortion) of an embedding. This metric measures how well distances are preserved between the high-dimensional data and low-dimensional layout. Parameters ---------- data : array-like High-dimensional data (n_samples, n_features) layout : array-like Low-dimensional embedding (n_samples, n_components) n_neighbors : int Number of nearest neighbors to consider eps : float Small constant to prevent division by zero use_gpu : bool Whether to use GPU acceleration Returns ------- float : Normalized stress value """ if use_gpu and HAS_CUML and HAS_CUPY: # GPU version data_gpu = cp.asarray(data, dtype=cp.float32) layout_gpu = cp.asarray(layout, dtype=cp.float32) # Compute kNN graph in high-dimensional space distances, indices = make_knn_graph_gpu(data_gpu, n_neighbors) # Distances are already L2 (Euclidean) from cuML distances = distances[:, 1:] # Remove self indices = indices[:, 1:] # Compute distances in low-dimensional space n_samples = layout_gpu.shape[0] distances_emb = cp.zeros_like(distances) for i in range(n_samples): neighbor_coords = layout_gpu[indices[i]] point_coords = layout_gpu[i:i+1] distances_emb[i] = cp.linalg.norm(neighbor_coords - point_coords, axis=1) # Compute normalized stress ratios = cp.abs(distances / cp.maximum(distances_emb, eps) - 1.0) stress_mean = float(cp.mean(ratios)) stress_std = float(cp.std(ratios)) stress_normalized = 0.0 if stress_mean < eps else stress_std / stress_mean else: # CPU version data_np = np.asarray(data, dtype=np.float32) layout_np = np.asarray(layout, dtype=np.float32) distances, indices = make_knn_graph_cpu(data_np, n_neighbors) distances = distances[:, 1:] indices = indices[:, 1:] # Compute distances in embedding n_samples = layout_np.shape[0] distances_emb = np.zeros_like(distances) for i in range(n_samples): neighbor_coords = layout_np[indices[i]] point_coords = layout_np[i:i+1] distances_emb[i] = np.linalg.norm(neighbor_coords - point_coords, axis=1) ratios = np.abs(distances / np.maximum(distances_emb, eps) - 1.0) stress_mean = float(np.mean(ratios)) stress_std = float(np.std(ratios)) stress_normalized = 0.0 if stress_mean < eps else stress_std / stress_mean return stress_normalized
[docs] def compute_neighbor_score(data, layout, n_neighbors, use_gpu=True): """ Compute neighborhood preservation score. Measures how well k-nearest neighbor relationships are preserved from high-dimensional to low-dimensional space. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding n_neighbors : int Number of neighbors to consider use_gpu : bool Whether to use GPU acceleration Returns ------- list : [mean_score, std_score] """ if use_gpu and HAS_CUML and HAS_CUPY: data_gpu = cp.asarray(data, dtype=cp.float32) layout_gpu = cp.asarray(layout, dtype=cp.float32) _, indices_data = make_knn_graph_gpu(data_gpu, n_neighbors) _, indices_layout = make_knn_graph_gpu(layout_gpu, n_neighbors) indices_data = indices_data[:, 1:] # Remove self indices_layout = indices_layout[:, 1:] # Sort indices indices_data = cp.sort(indices_data, axis=1) indices_layout = cp.sort(indices_layout, axis=1) # Compute preservation scores preservation_scores = cp.mean(indices_data == indices_layout, axis=1) neighbor_mean = float(cp.mean(preservation_scores)) neighbor_std = float(cp.std(preservation_scores)) else: data_np = np.asarray(data, dtype=np.float32) layout_np = np.asarray(layout, dtype=np.float32) _, indices_data = make_knn_graph_cpu(data_np, n_neighbors) _, indices_layout = make_knn_graph_cpu(layout_np, n_neighbors) indices_data = indices_data[:, 1:] indices_layout = indices_layout[:, 1:] indices_data = np.sort(indices_data, axis=1) indices_layout = np.sort(indices_layout, axis=1) preservation_scores = np.mean(indices_data == indices_layout, axis=1) neighbor_mean = float(np.mean(preservation_scores)) neighbor_std = float(np.std(preservation_scores)) return [neighbor_mean, neighbor_std]
[docs] def compute_local_metrics(data, layout, n_neighbors, subsample_threshold=1.0, random_state=42, use_gpu=True): """ Compute local quality metrics (stress and neighborhood preservation). Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding n_neighbors : int Number of neighbors for kNN graph subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0, default 1.0 = no subsampling) random_state : int Random seed for subsampling use_gpu : bool Whether to use GPU acceleration Returns ------- dict : Dictionary containing 'stress' and 'neighbor' metrics Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") # Apply subsampling if threshold < 1.0 if subsample_threshold < 1.0: data_sub, layout_sub = threshold_subsample_gpu( data, layout, threshold=subsample_threshold, random_state=random_state ) # Convert back to numpy if needed if HAS_CUPY and isinstance(data_sub, cp.ndarray): data_sub = cp.asnumpy(data_sub) layout_sub = cp.asnumpy(layout_sub) metrics = { 'stress': compute_stress(data_sub, layout_sub, n_neighbors, use_gpu=use_gpu), 'neighbor': compute_neighbor_score(data_sub, layout_sub, n_neighbors, use_gpu=use_gpu), 'n_samples': len(data_sub) } else: metrics = { 'stress': compute_stress(data, layout, n_neighbors, use_gpu=use_gpu), 'neighbor': compute_neighbor_score(data, layout, n_neighbors, use_gpu=use_gpu), 'n_samples': len(data) } return metrics
# # Context preservation metrics (SVM, kNN classification) #
[docs] def compute_svm_accuracy(X, y, test_size=0.3, reg_param=1.0, max_iter=1000, random_state=42, use_gpu=True): """ Compute SVM classification accuracy. Parameters ---------- X : array-like Features y : array-like Labels test_size : float Test set proportion reg_param : float Regularization parameter max_iter : int Maximum iterations random_state : int Random seed use_gpu : bool Whether to use cuML GPU acceleration Returns ------- float : Classification accuracy """ if use_gpu and HAS_CUML: if HAS_CUPY: X_gpu = cp.asarray(X, dtype=cp.float32) y_gpu = cp.asarray(y) else: X_gpu = np.asarray(X, dtype=np.float32) y_gpu = np.asarray(y) # Train/test split X_train, X_test, y_train, y_test = cuml_train_test_split( X_gpu, y_gpu, test_size=test_size, random_state=random_state ) # Standardize scaler = cumlStandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) # Train SVM model = cumlSVC(C=reg_param, max_iter=max_iter, kernel='linear') model.fit(X_train, y_train) # Predict and compute accuracy y_pred = model.predict(X_test) if HAS_CUPY: accuracy = float(cp.mean(y_pred == y_test)) else: accuracy = float(np.mean(y_pred == y_test)) else: X_np = np.asarray(X, dtype=np.float32) y_np = np.asarray(y) X_train, X_test, y_train, y_test = train_test_split( X_np, y_np, test_size=test_size, random_state=random_state ) scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) model = LinearSVC(C=reg_param, max_iter=max_iter) model.fit(X_train, y_train) y_pred = model.predict(X_test) from sklearn.metrics import accuracy_score # pylint: disable=import-outside-toplevel accuracy = accuracy_score(y_test, y_pred) return accuracy
[docs] def compute_knn_accuracy(X, y, n_neighbors=16, test_size=0.3, random_state=42, use_gpu=True): """ Compute kNN classification accuracy. Parameters ---------- X : array-like Features y : array-like Labels n_neighbors : int Number of neighbors test_size : float Test set proportion random_state : int Random seed use_gpu : bool Whether to use cuML GPU acceleration Returns ------- float : Classification accuracy """ if use_gpu and HAS_CUML: if HAS_CUPY: X_gpu = cp.asarray(X, dtype=cp.float32) y_gpu = cp.asarray(y) else: X_gpu = np.asarray(X, dtype=np.float32) y_gpu = np.asarray(y) X_train, X_test, y_train, y_test = cuml_train_test_split( X_gpu, y_gpu, test_size=test_size, random_state=random_state ) scaler = cumlStandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) model = cumlKNeighborsClassifier(n_neighbors=n_neighbors) model.fit(X_train, y_train) y_pred = model.predict(X_test) if HAS_CUPY: accuracy = float(cp.mean(y_pred == y_test)) else: accuracy = float(np.mean(y_pred == y_test)) else: X_np = np.asarray(X, dtype=np.float32) y_np = np.asarray(y) X_train, X_test, y_train, y_test = train_test_split( X_np, y_np, test_size=test_size, random_state=random_state ) scaler = StandardScaler() X_train = scaler.fit_transform(X_train) X_test = scaler.transform(X_test) model = KNeighborsClassifier(n_neighbors=n_neighbors) model.fit(X_train, y_train) y_pred = model.predict(X_test) from sklearn.metrics import accuracy_score # pylint: disable=import-outside-toplevel accuracy = accuracy_score(y_test, y_pred) return accuracy
[docs] def compute_svm_score(data, layout, labels, subsample_threshold=0.5, random_state=42, use_gpu=True, **kwargs): """ Compute SVM context preservation score. Compares SVM classification accuracy on high-dimensional data vs low-dimensional embedding. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like Class labels subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) random_state : int Random seed use_gpu : bool Whether to use GPU acceleration **kwargs : dict Additional parameters for SVM Returns ------- ndarray : [acc_hd, acc_ld, log_ratio] Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") # Subsample X_hd, X_ld, y = threshold_subsample_gpu( # pylint: disable=unbalanced-tuple-unpacking data, layout, labels, threshold=subsample_threshold, random_state=random_state ) # Convert back to NumPy for sklearn/cuml if HAS_CUPY and isinstance(X_hd, cp.ndarray): X_hd = cp.asnumpy(X_hd) X_ld = cp.asnumpy(X_ld) y = cp.asnumpy(y) test_size = kwargs.pop('test_size', 0.3) reg_param = kwargs.pop('reg_param', 1.0) max_iter = kwargs.pop('max_iter', 1000) svm_acc_hd = compute_svm_accuracy( X_hd, y, test_size=test_size, reg_param=reg_param, max_iter=max_iter, random_state=random_state, use_gpu=use_gpu ) svm_acc_ld = compute_svm_accuracy( X_ld, y, test_size=test_size, reg_param=reg_param, max_iter=max_iter, random_state=random_state, use_gpu=use_gpu ) svm_score = np.log(np.minimum(svm_acc_hd / svm_acc_ld, svm_acc_ld / svm_acc_hd)) return np.array([svm_acc_hd, svm_acc_ld, svm_score], dtype=np.float32)
[docs] def compute_knn_score(data, layout, labels, n_neighbors=16, subsample_threshold=0.5, random_state=42, use_gpu=True, **kwargs): """ Compute kNN context preservation score. Compares kNN classification accuracy on high-dimensional data vs low-dimensional embedding. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like Class labels n_neighbors : int Number of neighbors for kNN subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) random_state : int Random seed use_gpu : bool Whether to use GPU acceleration **kwargs : dict Additional parameters Returns ------- ndarray : [acc_hd, acc_ld, log_ratio] Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") # Subsample X_hd, X_ld, y = threshold_subsample_gpu( # pylint: disable=unbalanced-tuple-unpacking data, layout, labels, threshold=subsample_threshold, random_state=random_state ) # Convert back to NumPy for sklearn/cuml if HAS_CUPY and isinstance(X_hd, cp.ndarray): X_hd = cp.asnumpy(X_hd) X_ld = cp.asnumpy(X_ld) y = cp.asnumpy(y) test_size = kwargs.pop('test_size', 0.3) knn_acc_hd = compute_knn_accuracy( X_hd, y, n_neighbors=n_neighbors, test_size=test_size, random_state=random_state, use_gpu=use_gpu ) knn_acc_ld = compute_knn_accuracy( X_ld, y, n_neighbors=n_neighbors, test_size=test_size, random_state=random_state, use_gpu=use_gpu ) knn_score = np.log(knn_acc_ld / knn_acc_hd) return np.array([knn_acc_hd, knn_acc_ld, knn_score], dtype=np.float32)
[docs] def compute_context_measures(data, layout, labels, subsample_threshold=0.5, n_neighbors=16, random_state=42, use_gpu=True, **kwargs): """ Compute context preservation measures (SVM and kNN). Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding labels : array-like Class labels subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) n_neighbors : int Number of neighbors for kNN random_state : int Random seed use_gpu : bool Whether to use GPU acceleration **kwargs : dict Additional parameters Returns ------- dict : Dictionary with 'svm' and 'knn' scores Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") measures = { 'svm': compute_svm_score( data, layout, labels, subsample_threshold, random_state, use_gpu, **kwargs ), 'knn': compute_knn_score( data, layout, labels, n_neighbors, subsample_threshold, random_state, use_gpu, **kwargs ) } return measures
# # Persistence homology and Betti curves # # # Fast H0 and H1 computation using kNN-based sparse Rips filtration #
[docs] def compute_h0_h1_knn(data, k_neighbors=20, density_threshold=0.8, overlap_factor=1.5, use_gpu=True, return_distances=False): """ Compute H0/H1 using local kNN atlas approach. Build dense local triangulations around each point, then merge consistently. This avoids the "holes" problem of global sparse kNN graphs. Automatically selects between GPU and CPU implementation based on availability and use_gpu parameter. Parameters ---------- data : array-like Point cloud data (n_samples, n_features) k_neighbors : int Size of local neighborhood (default 20, recommended 15-20 for noisy data) density_threshold : float Percentile threshold for edge inclusion (0-1). Lower = denser triangulation. Default 0.8 means edges up to 80th percentile of local distances are included. overlap_factor : float Factor for expanding local neighborhoods to ensure overlap (default 1.5). Higher values create more dense, overlapping patches. use_gpu : bool Whether to use GPU acceleration (if available) return_distances : bool If True, also return edge-to-distance mapping for persistence diagrams Returns ------- tuple : (h0_diagram, h1_diagram) or (h0_diagram, h1_diagram, edge_distances) Persistence diagrams with [birth, death] pairs """ # Select backend: GPU if requested and available, otherwise CPU if use_gpu: try: from .atlas_gpu import compute_h0_h1_atlas_gpu # pylint: disable=import-outside-toplevel return compute_h0_h1_atlas_gpu( data, k_neighbors=k_neighbors, density_threshold=density_threshold, overlap_factor=overlap_factor, return_distances=return_distances ) except ImportError as e: # Fall back to CPU if GPU not available warnings.warn(f"GPU atlas not available ({e}), falling back to CPU", UserWarning) # Use CPU implementation from .atlas_cpu import compute_h0_h1_atlas_cpu # pylint: disable=import-outside-toplevel return compute_h0_h1_atlas_cpu( data, k_neighbors=k_neighbors, density_threshold=density_threshold, overlap_factor=overlap_factor, return_distances=return_distances )
[docs] def compute_persistence_diagrams_fast(data, layout, k_neighbors=30, use_gpu=True): """ Fast computation of H0/H1 persistence diagrams using kNN-based sparse Rips. Much faster than full Vietoris-Rips as it only uses kNN graph (O(nk) vs O(n²) edges). Builds Rips complex from kNN edges and computes persistence via Ripser. Note: Does NOT subsample internally - expects already-subsampled data. This avoids double subsampling when called from compute_global_metrics. Parameters ---------- data : array-like High-dimensional data (already subsampled if needed) layout : array-like Low-dimensional embedding (already subsampled if needed) k_neighbors : int Number of neighbors for kNN graph (default 30, recommended >= 20) use_gpu : bool Whether to use GPU for kNN computation (default True) Returns ------- dict : {'data': [h0_diag, h1_diag], 'layout': [h0_diag, h1_diag], 'backend': 'fast'} """ # Convert to NumPy if needed if HAS_CUPY and isinstance(data, cp.ndarray): data_np = cp.asnumpy(data) layout_np = cp.asnumpy(layout) else: data_np = np.asarray(data, dtype=np.float32) layout_np = np.asarray(layout, dtype=np.float32) # Compute H0 and H1 using kNN-Rips method h0_hd, h1_hd = compute_h0_h1_knn(data_np, k_neighbors=k_neighbors, use_gpu=use_gpu) # pylint: disable=unbalanced-tuple-unpacking h0_ld, h1_ld = compute_h0_h1_knn(layout_np, k_neighbors=k_neighbors, use_gpu=use_gpu) # pylint: disable=unbalanced-tuple-unpacking return { 'data': [h0_hd, h1_hd], 'layout': [h0_ld, h1_ld], 'backend': 'fast' }
[docs] def compute_persistence_diagrams(data, layout, max_dim=1, subsample_threshold=0.5, random_state=42, backend=None, backend_kwargs=None): """ Compute persistence diagrams for data and layout. Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding max_dim : int Maximum homology dimension subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) random_state : int Random seed backend : str, optional Persistence backend: 'fast', 'giotto-ph', 'ripser++', 'ripser', or None for auto backend_kwargs : dict, optional Backend-specific parameters passed to the backend, using defaults unless specified: - 'fast': k_neighbors=30, use_gpu=True - 'giotto-ph': n_threads=-1, collapse_edges=True, return_generators=False - 'ripser++': Any valid parameters for ripserplusplus.run() - 'ripser': Any valid parameters for ripser.ripser() Returns ------- dict : {'data': diagrams_hd, 'layout': diagrams_ld, 'backend': backend_used} Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") # Initialize backend_kwargs if not provided if backend_kwargs is None: backend_kwargs = {} # Select backend if backend is None: backend = get_persistence_backend() else: # Validate backend available = get_available_persistence_backends() if backend not in available or not available[backend]: raise ValueError(f"Backend '{backend}' not available. Use one of {list(available.keys())}") # Subsample ONCE data_sub, layout_sub = threshold_subsample_gpu( data, layout, threshold=subsample_threshold, random_state=random_state ) # Convert to NumPy for persistence computation if HAS_CUPY and isinstance(data_sub, cp.ndarray): data_np = cp.asnumpy(data_sub) layout_np = cp.asnumpy(layout_sub) else: data_np = np.asarray(data_sub, dtype=np.float32) layout_np = np.asarray(layout_sub, dtype=np.float32) # Compute persistence diagrams based on backend if backend == 'fast': # Use fast kNN-Laplacian computation # Extract parameters with defaults k_neighbors = backend_kwargs.get('k_neighbors', 30) use_gpu = backend_kwargs.get('use_gpu', True) h0_hd, h1_hd = compute_h0_h1_knn(data_np, k_neighbors=k_neighbors, use_gpu=use_gpu) # pylint: disable=unbalanced-tuple-unpacking h0_ld, h1_ld = compute_h0_h1_knn(layout_np, k_neighbors=k_neighbors, use_gpu=use_gpu) # pylint: disable=unbalanced-tuple-unpacking diags_hd = [h0_hd, h1_hd] diags_ld = [h0_ld, h1_ld] elif backend == 'giotto-ph': # Use giotto-ph (fastest CPU option) # Extract parameters with defaults n_threads = backend_kwargs.get('n_threads', -1) collapse_edges = backend_kwargs.get('collapse_edges', True) return_generators = backend_kwargs.get('return_generators', False) result_hd = ripser_parallel( data_np, maxdim=max_dim, collapse_edges=collapse_edges, n_threads=n_threads, return_generators=return_generators ) result_ld = ripser_parallel( layout_np, maxdim=max_dim, collapse_edges=collapse_edges, n_threads=n_threads, return_generators=return_generators ) diags_hd = result_hd['dgms'] diags_ld = result_ld['dgms'] elif backend == 'ripser++': # Use GPU-accelerated ripser++ # Pass through all kwargs to rpp.run() diags_hd = rpp.run(data_np, maxdim=max_dim, **backend_kwargs)['dgms'] diags_ld = rpp.run(layout_np, maxdim=max_dim, **backend_kwargs)['dgms'] elif backend == 'ripser': # Use standard CPU ripser # Pass through all kwargs to ripser() diags_hd = ripser(data_np, maxdim=max_dim, **backend_kwargs)['dgms'] diags_ld = ripser(layout_np, maxdim=max_dim, **backend_kwargs)['dgms'] else: raise ValueError(f"Unknown backend: {backend}") return {'data': diags_hd, 'layout': diags_ld, 'backend': backend}
[docs] def betti_curve(diagram, n_steps=100): """ Compute Betti curve from a persistence diagram. A Betti curve shows the number of topological features that persist at different filtration values. Parameters ---------- diagram : array-like Persistence diagram as list of (birth, death) tuples n_steps : int Number of points in the curve Returns ------- tuple : (filtration_values, betti_numbers) """ if len(diagram) == 0: return np.linspace(0, 1, n_steps), np.zeros(n_steps) # Filter out infinite death times finite_diagram = [x for x in diagram if x[1] != np.inf] if len(finite_diagram) == 0: return np.linspace(0, 1, n_steps), np.zeros(n_steps) max_dist = np.max([x[1] for x in finite_diagram]) axis_x = np.linspace(0, max_dist, n_steps) axis_y = np.zeros(n_steps) for i, x in enumerate(axis_x): for b, d in diagram: if b < x < d: axis_y[i] += 1 return axis_x, axis_y
[docs] def compute_dtw(axis_x_hd, axis_y_hd, axis_x_ld, axis_y_ld, norm_factor=1.0): """ Compute Dynamic Time Warping distance between Betti curves. Parameters ---------- axis_x_hd, axis_y_hd : array-like High-dimensional Betti curve axis_x_ld, axis_y_ld : array-like Low-dimensional Betti curve norm_factor : float Normalization factor Returns ------- float : DTW distance """ if not HAS_DTW: warnings.warn("fastdtw not available, returning NaN") return np.nan seq0 = np.array(list(zip(axis_x_hd, axis_y_hd))) seq1 = np.array(list(zip(axis_x_ld, axis_y_ld))) dist_dtw, _ = fastdtw(seq0, seq1, dist=2) return dist_dtw * norm_factor
[docs] def compute_twed(axis_x_hd, axis_y_hd, axis_x_ld, axis_y_ld, norm_factor=1.0): """ Compute Time Warp Edit Distance between Betti curves. Parameters ---------- axis_x_hd, axis_y_hd : array-like High-dimensional Betti curve axis_x_ld, axis_y_ld : array-like Low-dimensional Betti curve norm_factor : float Normalization factor Returns ------- float : TWED distance """ if not HAS_TWED: warnings.warn("twed not available, returning NaN") return np.nan dist_twed = twed( axis_y_hd.reshape(-1, 1), axis_y_ld.reshape(-1, 1), axis_x_hd, axis_x_ld, p=2 ) return dist_twed * norm_factor
[docs] def compute_emd(axis_x_hd, axis_y_hd, axis_x_ld, axis_y_ld, adjust_mass=False, norm_factor=1.0): """ Compute Earth Mover's Distance between Betti curves. Parameters ---------- axis_x_hd, axis_y_hd : array-like High-dimensional Betti curve axis_x_ld, axis_y_ld : array-like Low-dimensional Betti curve adjust_mass : bool Whether to adjust for different total masses norm_factor : float Normalization factor Returns ------- float : EMD distance """ if not HAS_OT: warnings.warn("POT not available, returning NaN") return np.nan sum_hd = np.sum(axis_y_hd) sum_ld = np.sum(axis_y_ld) if sum_hd == 0 or sum_ld == 0: return 0.0 axis_y_hd_norm = axis_y_hd / sum_hd axis_y_ld_norm = axis_y_ld / sum_ld dist_emd = ot.emd2_1d(axis_x_hd, axis_x_ld, axis_y_hd_norm, axis_y_ld_norm, metric='euclidean') if adjust_mass: dist_emd *= np.max([sum_hd / sum_ld, sum_ld / sum_hd]) return dist_emd * norm_factor
[docs] def compute_wasserstein(diag_hd, diag_ld, norm_factor=1.0): """ Compute Wasserstein distance between persistence diagrams. Simple implementation matching dire-jax (no special handling for infinite features). Parameters ---------- diag_hd, diag_ld : array-like Persistence diagrams (birth, death) pairs norm_factor : float Normalization factor Returns ------- float : Wasserstein distance """ if not HAS_PERSIM: warnings.warn("persim not available, returning NaN") return np.nan dist_wass = wasserstein(diag_hd, diag_ld) dist_wass *= norm_factor return dist_wass
[docs] def compute_bottleneck(diag_hd, diag_ld, norm_factor=1.0): """ Compute bottleneck distance between persistence diagrams. Handles infinite death times by: 1. Computing bottleneck on finite features 2. Taking max with birth time difference for infinite features Parameters ---------- diag_hd, diag_ld : array-like Persistence diagrams (birth, death) pairs norm_factor : float Normalization factor Returns ------- float : Bottleneck distance """ if not HAS_PERSIM: warnings.warn("persim not available, returning NaN") return np.nan diag_hd = np.asarray(diag_hd) diag_ld = np.asarray(diag_ld) # Separate finite and infinite features finite_mask_hd = np.isfinite(diag_hd[:, 1]) if diag_hd.size > 0 else np.array([], dtype=bool) finite_mask_ld = np.isfinite(diag_ld[:, 1]) if diag_ld.size > 0 else np.array([], dtype=bool) diag_hd_finite = diag_hd[finite_mask_hd] if diag_hd.size > 0 else np.array([[0, 0]]) diag_ld_finite = diag_ld[finite_mask_ld] if diag_ld.size > 0 else np.array([[0, 0]]) # Compute bottleneck on finite features dist_bott_finite = bottleneck(diag_hd_finite, diag_ld_finite) # Handle infinite features by comparing birth times diag_hd_inf = diag_hd[~finite_mask_hd] if diag_hd.size > 0 and np.any(~finite_mask_hd) else np.array([]) diag_ld_inf = diag_ld[~finite_mask_ld] if diag_ld.size > 0 and np.any(~finite_mask_ld) else np.array([]) dist_bott_inf = 0.0 if diag_hd_inf.size > 0 or diag_ld_inf.size > 0: # Extract birth times of infinite features births_hd = diag_hd_inf[:, 0] if diag_hd_inf.size > 0 else np.array([]) births_ld = diag_ld_inf[:, 0] if diag_ld_inf.size > 0 else np.array([]) # Compute bottleneck distance on birth times if births_hd.size > 0: births_hd_pairs = np.column_stack([births_hd, births_hd]) else: births_hd_pairs = np.array([[0, 0]]) if births_ld.size > 0: births_ld_pairs = np.column_stack([births_ld, births_ld]) else: births_ld_pairs = np.array([[0, 0]]) dist_bott_inf = bottleneck(births_hd_pairs, births_ld_pairs) # Bottleneck distance is the maximum dist_bott = max(dist_bott_finite, dist_bott_inf) return dist_bott * norm_factor
[docs] def compute_global_metrics(data, layout, dimension=1, subsample_threshold=0.5, random_state=42, n_steps=100, metrics_only=True, backend=None, backend_kwargs=None): """ Compute global topological metrics based on persistence homology. Computes distances between persistence diagrams and Betti curves: - DTW, TWED, EMD for Betti curves - Wasserstein, Bottleneck for persistence diagrams Parameters ---------- data : array-like High-dimensional data layout : array-like Low-dimensional embedding dimension : int Maximum homology dimension subsample_threshold : float Subsampling probability (must be between 0.0 and 1.0) random_state : int Random seed n_steps : int Number of points for Betti curves metrics_only : bool If True, return only metrics; otherwise include diagrams and curves backend : str, optional Persistence backend: 'fast', 'giotto-ph', 'ripser++', 'ripser', or None for auto backend_kwargs : dict, optional Backend-specific parameters. See compute_persistence_diagrams() for details. Returns ------- dict : Dictionary containing metrics (and optionally diagrams and betti curves) Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") metrics = { 'dtw': [], 'twed': [], 'emd': [], 'wass': [], 'bott': [] } betti_curves = { 'data': [], 'layout': [] } # Compute persistence diagrams (subsamples once internally) result = compute_persistence_diagrams( data, layout, max_dim=dimension, subsample_threshold=subsample_threshold, random_state=random_state, backend=backend, backend_kwargs=backend_kwargs ) # Get number of points after subsampling (from result data) n_points = len(result['data'][0]) # H0 diagram length diags = result backend_used = result.get('backend', 'unknown') # Compute metrics for each dimension for diag_hd, diag_ld in zip(diags['data'], diags['layout']): # Compute Betti curves axis_x_hd, axis_y_hd = betti_curve(diag_hd, n_steps=n_steps) axis_x_ld, axis_y_ld = betti_curve(diag_ld, n_steps=n_steps) betti_curves['data'].append((axis_x_hd, axis_y_hd)) betti_curves['layout'].append((axis_x_ld, axis_y_ld)) # Compute distances on Betti curves norm_factor = 1.0 / n_points dist_dtw = compute_dtw(axis_x_hd, axis_y_hd, axis_x_ld, axis_y_ld, norm_factor) dist_twed = compute_twed(axis_x_hd, axis_y_hd, axis_x_ld, axis_y_ld, norm_factor) dist_emd = compute_emd(axis_x_hd, axis_y_hd, axis_x_ld, axis_y_ld, adjust_mass=True, norm_factor=norm_factor) # Compute distances on persistence diagrams dist_wass = compute_wasserstein(diag_hd, diag_ld, norm_factor) dist_bott = compute_bottleneck(diag_hd, diag_ld, 1.0) # No normalization for bottleneck metrics['dtw'].append(dist_dtw) metrics['twed'].append(dist_twed) metrics['emd'].append(dist_emd) metrics['wass'].append(dist_wass) metrics['bott'].append(dist_bott) if metrics_only: return {'metrics': metrics, 'backend': backend_used} return {'metrics': metrics, 'diags': diags, 'bettis': betti_curves, 'backend': backend_used}
# # Convenience function for all metrics #
[docs] def evaluate_embedding(data, layout, labels=None, n_neighbors=16, subsample_threshold=0.5, max_homology_dim=1, random_state=42, use_gpu=True, persistence_backend=None, n_threads=-1, compute_distortion=True, compute_context=True, compute_topology=True, **kwargs): """ Comprehensive evaluation of a dimensionality reduction embedding. Computes distortion, context preservation, and topological metrics. Parameters ---------- data : array-like High-dimensional data (n_samples, n_features) layout : array-like Low-dimensional embedding (n_samples, n_components) labels : array-like, optional Class labels for context metrics n_neighbors : int Number of neighbors for kNN metrics subsample_threshold : float Subsampling probability for all metrics (must be between 0.0 and 1.0, default 0.5) max_homology_dim : int Maximum homology dimension for persistence random_state : int Random seed use_gpu : bool Whether to use GPU acceleration persistence_backend : str, optional Persistence backend: 'giotto-ph', 'ripser++', 'ripser', or None for auto n_threads : int Number of threads for giotto-ph (-1 for all cores) compute_distortion : bool Whether to compute distortion metrics (default True) compute_context : bool Whether to compute context metrics (default True) compute_topology : bool Whether to compute topological metrics (default True) **kwargs : dict Additional parameters for specific metrics Returns ------- dict : Dictionary with all computed metrics Raises ------ ValueError If subsample_threshold is not between 0.0 and 1.0 """ if not 0.0 <= subsample_threshold <= 1.0: raise ValueError(f"subsample_threshold must be between 0.0 and 1.0, got {subsample_threshold}") results = {} # Local metrics (distortion) if compute_distortion: print("Computing local metrics (stress, neighbor preservation)...") results['local'] = compute_local_metrics( data, layout, n_neighbors, subsample_threshold=subsample_threshold, random_state=random_state, use_gpu=use_gpu ) # Context metrics (if labels provided) if compute_context and labels is not None: print("Computing context preservation metrics (SVM, kNN)...") results['context'] = compute_context_measures( data, layout, labels, subsample_threshold, n_neighbors, random_state, use_gpu, **kwargs ) # Global topological metrics if compute_topology: if HAS_GIOTTO_PH or HAS_RIPSER_PP or HAS_RIPSER: backend = persistence_backend or get_persistence_backend() print(f"Computing topological metrics using backend: {backend}...") # Pass n_threads through backend_kwargs for giotto-ph backend_kw = {'n_threads': n_threads} if backend == 'giotto-ph' else {} results['topology'] = compute_global_metrics( data, layout, max_homology_dim, subsample_threshold, random_state, backend=backend, backend_kwargs=backend_kw ) else: warnings.warn("Skipping topological metrics (no persistence backend available)") return results